# Author: Niels Nuyttens <niels@nannyml.com>
#
# License: Apache Software License 2.0
from collections import defaultdict
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from pandas import MultiIndex
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OrdinalEncoder
from nannyml._typing import ProblemType, Self
from nannyml.base import AbstractEstimator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunk, Chunker
from nannyml.exceptions import InvalidArgumentsException
from nannyml.performance_estimation.direct_loss_estimation import SUPPORTED_METRIC_VALUES
from nannyml.performance_estimation.direct_loss_estimation.metrics import Metric, MetricFactory
from nannyml.performance_estimation.direct_loss_estimation.result import Result
from nannyml.thresholds import StandardDeviationThreshold, Threshold
from nannyml.usage_logging import UsageEvent, log_usage
DEFAULT_THRESHOLDS: Dict[str, Threshold] = {
'mae': StandardDeviationThreshold(),
'mape': StandardDeviationThreshold(),
'mse': StandardDeviationThreshold(),
'msle': StandardDeviationThreshold(),
'rmse': StandardDeviationThreshold(),
'rmsle': StandardDeviationThreshold(),
}
[docs]class DLE(AbstractEstimator):
"""The Direct Loss Estimator (DLE) estimates the :term:`loss<Loss>` resulting
from the difference between the prediction and the target before the targets become known.
The loss is defined from the regression performance metric
specified. For all metrics used the loss function is positive.
It uses an internal
`LGBMRegressor <https://lightgbm.readthedocs.io/en/latest/pythonapi/lightgbm.LGBMRegressor.html>`_
model per metric to predict the value of the error function
(the function returning the error for a given prediction) of the monitored model.
The error results on the reference data become a target for those internal models.
It is possible to specify a set of hyperparameters to instantiate these internal nanny models with using the
`hyperparameters` parameter. You can also opt to run hyperparameter tuning using FLAML to determine hyperparameters
for you. Tuning hyperparameters takes some time and does not guarantee better results,
hence we don't do it by default.
The estimator manages a list of :class:`~nannyml.performance_estimation.direct_loss_estimation.metrics.Metric`
instances, constructed using the
:class:`~nannyml.performance_estimation.direct_loss_estimation.metrics.MetricFactory`.
The estimator is then responsible for delegating the `fit` and `estimate` method calls to each of the managed
:class:`~nannyml.performance_estimation.direct_loss_estimation.metrics.Metric` instances and building a
:class:`~nannyml.performance_estimation.direct_loss_estimation.results.Result` object.
For more information, check out the `tutorial`_ and the `deep dive`_.
.. _tutorial:
https://nannyml.readthedocs.io/en/stable/tutorials/performance_estimation/regression_performance_estimation.html
.. _deep dive:
https://nannyml.readthedocs.io/en/stable/how_it_works/performance_estimation.html#direct-loss-estimation-dle
"""
def __init__(
self,
feature_column_names: List[str],
y_pred: str,
y_true: str,
timestamp_column_name: Optional[str] = None,
chunk_size: Optional[int] = None,
chunk_number: Optional[int] = None,
chunk_period: Optional[str] = None,
chunker: Optional[Chunker] = None,
metrics: Optional[Union[str, List[str]]] = None,
hyperparameters: Optional[Dict[str, Any]] = None,
tune_hyperparameters: bool = False,
hyperparameter_tuning_config: Optional[Dict[str, Any]] = None,
thresholds: Optional[Dict[str, Threshold]] = None,
):
"""
Creates a new Direct Loss Estimator.
Parameters
----------
feature_column_names : List[str]
A list of column names indicating which columns contain feature values.
y_pred : str
A column name indicating which column contains the model predictions.
y_true : str
A column name indicating which column contains the target values.
timestamp_column_name : str
A column name indicating which column contains the timestamp of the prediction.
chunk_size : int, default=None
Splits the data into chunks containing `chunks_size` observations.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_number : int, default=None
Splits the data into `chunk_number` pieces.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_period : str, default=None
Splits the data according to the given period.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunker : Chunker, default=None
The `Chunker` used to split the data sets into a lists of chunks.
metrics : Optional[Union[str, List[str]]], default = ['mae', 'mape', 'mse', 'rmse', 'msle', 'rmsle']
A list of metrics to calculate. When not provided it will default to include all currently supported
metrics.
hyperparameters : Dict[str, Any], default = None
A dictionary used to provide your own custom hyperparameters when `tune_hyperparameters` has
been set to `True`.
Check out the available hyperparameter options in the
`LightGBM documentation <https://lightgbm.readthedocs.io/en/latest/pythonapi/lightgbm.LGBMRegressor.html>`_.
tune_hyperparameters : bool, default = False
A boolean controlling whether hypertuning should be performed on the internal regressor models
whilst fitting on reference data.
Tuning hyperparameters takes some time and does not guarantee better results, hence it defaults to `False`.
hyperparameter_tuning_config : Dict[str, Any], default = None
A dictionary that allows you to provide a custom hyperparameter tuning configuration when
`tune_hyperparameters` has been set to `True`.
The following dictionary is the default tuning configuration. It can be used as a template to modify::
{
"time_budget": 15,
"metric": "mse",
"estimator_list": ['lgbm'],
"eval_method": "cv",
"hpo_method": "cfo",
"n_splits": 5,
"task": 'regression',
"seed": 1,
"verbose": 0,
}
For an overview of possible parameters for the tuning process check out the
`FLAML documentation <https://microsoft.github.io/FLAML/docs/reference/automl#automl-objects>`_.
thresholds: dict
The default values are::
{
'mae': StandardDeviationThreshold(),
'mape': StandardDeviationThreshold(),
'mse': StandardDeviationThreshold(),
'msle': StandardDeviationThreshold(),
'rmse': StandardDeviationThreshold(),
'rmsle': StandardDeviationThreshold(),
}
A dictionary allowing users to set a custom threshold for each method. It links a `Threshold` subclass
to a method name. This dictionary is optional.
When a dictionary is given its values will override the default values. If no dictionary is given a default
will be applied.
Returns
-------
estimator: DLE
A new DLE instance to be fitted on reference data.
Examples
--------
Without hyperparameter tuning:
>>> import nannyml as nml
>>> reference_df, analysis_df, _ = nml.load_synthetic_car_price_dataset()
>>> estimator = nml.DLE(
... feature_column_names=['car_age', 'km_driven', 'price_new', 'accident_count',
... 'door_count', 'fuel', 'transmission'],
... y_pred='y_pred',
... y_true='y_true',
... timestamp_column_name='timestamp',
... metrics=['rmse', 'rmsle'],
... chunk_size=6000,
>>> )
>>> estimator.fit(reference_df)
>>> results = estimator.estimate(analysis_df)
>>> metric_fig = results.plot()
>>> metric_fig.show()
With hyperparameter tuning, using a custom hyperparameter tuning configuration:
>>> import nannyml as nml
>>> reference_df, analysis_df, _ = nml.load_synthetic_car_price_dataset()
>>> estimator = nml.DLE(
... feature_column_names=['car_age', 'km_driven', 'price_new', 'accident_count',
... 'door_count', 'fuel', 'transmission'],
... y_pred='y_pred',
... y_true='y_true',
... timestamp_column_name='timestamp',
... metrics=['rmse', 'rmsle'],
... chunk_size=6000,
... tune_hyperparameters=True,
... hyperparameter_tuning_config={
... "time_budget": 60, # run longer
... "metric": "mse",
... "estimator_list": ['lgbm'],
... "eval_method": "cv",
... "hpo_method": "cfo",
... "n_splits": 5,
... "task": 'regression',
... "seed": 1,
... "verbose": 0,
... }
>>> )
>>> estimator.fit(reference_df)
>>> results = estimator.estimate(analysis_df)
>>> metric_fig = results.plot()
>>> metric_fig.show()
"""
super().__init__(chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name)
self.feature_column_names = feature_column_names
self.y_pred = y_pred
self.y_true = y_true
if hyperparameter_tuning_config is None:
hyperparameter_tuning_config = {
"time_budget": 15, # total running time in seconds
"metric": "mse",
"estimator_list": ['lgbm'], # list of ML learners; we tune lightgbm in this example
"eval_method": "cv", # resampling strategy
"hpo_method": "cfo", # hyperparameter optimization method, cfo is default.
"n_splits": 5, # Default Value is 5
"task": 'regression', # task type
"seed": 1, # random seed
"verbose": 0,
}
self.hyperparameter_tuning_config = hyperparameter_tuning_config
self.tune_hyperparameters = tune_hyperparameters
self.hyperparameters = hyperparameters
self.thresholds = DEFAULT_THRESHOLDS
if thresholds:
self.thresholds.update(**thresholds)
if metrics is None:
metrics = SUPPORTED_METRIC_VALUES
elif isinstance(metrics, str):
metrics = [metrics]
self.metrics: List[Metric] = []
for metric in metrics:
if metric not in SUPPORTED_METRIC_VALUES:
raise InvalidArgumentsException(
f"unknown metric key '{metric}' given. " f"Should be one of {SUPPORTED_METRIC_VALUES}."
)
self.metrics.append(
MetricFactory.create(
metric,
ProblemType.REGRESSION,
feature_column_names=self.feature_column_names,
y_true=self.y_true,
y_pred=self.y_pred,
chunker=self.chunker,
tune_hyperparameters=self.tune_hyperparameters,
hyperparameter_tuning_config=self.hyperparameter_tuning_config,
hyperparameters=self.hyperparameters,
threshold=self.thresholds[metric],
)
)
self._categorical_imputer = SimpleImputer(strategy='constant', fill_value='NML_missing_value')
self._categorical_encoders: defaultdict = defaultdict(_default_encoder)
self.result: Optional[Result] = None
def __str__(self):
return (
f"{self.__class__.__name__}[tune_hyperparameters={self.tune_hyperparameters}, "
f"metrics={[str(m) for m in self.metrics]}]"
)
@log_usage(UsageEvent.DLE_ESTIMATOR_FIT, metadata_from_self=['metrics'])
def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> Self:
"""Fits the drift calculator using a set of reference data."""
if reference_data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')
_list_missing([self.y_true, self.y_pred], list(reference_data.columns))
_, categorical_feature_columns = _split_features_by_type(reference_data, self.feature_column_names)
for categorical_feature_column in categorical_feature_columns:
reference_data[categorical_feature_column] = reference_data[categorical_feature_column].astype("object")
reference_data[categorical_feature_column] = self._categorical_imputer.fit_transform(
reference_data[categorical_feature_column].values.reshape(-1, 1)
)
reference_data[categorical_feature_column] = self._categorical_encoders[
categorical_feature_column
].fit_transform(reference_data[categorical_feature_column].values.reshape(-1, 1))
# LGBM treats -1 for categorical features as missing
# https://lightgbm.readthedocs.io/en/latest/Advanced-Topics.html#categorical-feature-support
# Ordinal encoder encodes from 0 to n-1.
# https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OrdinalEncoder.html
# hence we add 1 before we pass the data to lightgbm for fitting so treating unseen values
# is more consistent
reference_data[categorical_feature_column] = reference_data[categorical_feature_column] + 1
for metric in self.metrics:
metric.categorical_column_names = categorical_feature_columns
metric.fit(reference_data)
self.result = self._estimate(reference_data)
self.result.data[('chunk', 'period')] = 'reference'
return self
@log_usage(UsageEvent.DLE_ESTIMATOR_RUN, metadata_from_self=['metrics'])
def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
if data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')
_list_missing([self.y_pred], list(data.columns))
_, categorical_feature_columns = _split_features_by_type(data, self.feature_column_names)
for categorical_feature_column in categorical_feature_columns:
data[categorical_feature_column] = data[categorical_feature_column].astype("object")
data[categorical_feature_column] = self._categorical_imputer.transform(
data[categorical_feature_column].values.reshape(-1, 1)
)
data[categorical_feature_column] = self._categorical_encoders[categorical_feature_column].transform(
data[categorical_feature_column].values.reshape(-1, 1)
)
# LGBM treats -1 for categorical features as missing
# https://lightgbm.readthedocs.io/en/latest/Advanced-Topics.html#categorical-feature-support
# Ordinal encoder encodes from 0 to n-1.
# https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.OrdinalEncoder.html
# hence we add 1 before we pass the data to lightgbm for fitting so treating unseen values
# is more consistent
data[categorical_feature_column] = data[categorical_feature_column] + 1
chunks = self.chunker.split(data)
res = pd.DataFrame.from_records(
[
{
'key': chunk.key,
'chunk_index': chunk.chunk_index,
'start_index': chunk.start_index,
'end_index': chunk.end_index,
'start_date': chunk.start_datetime,
'end_date': chunk.end_datetime,
'period': 'analysis',
**self._estimate_chunk(chunk),
}
for chunk in chunks
]
)
multilevel_index = _create_multilevel_index([metric.column_name for metric in self.metrics])
res.columns = multilevel_index
res = res.reset_index(drop=True)
if self.result is None:
self.result = Result(
results_data=res,
metrics=self.metrics,
feature_column_names=self.feature_column_names,
y_pred=self.y_pred,
y_true=self.y_true,
timestamp_column_name=self.timestamp_column_name,
chunker=self.chunker,
tune_hyperparameters=self.tune_hyperparameters,
hyperparameter_tuning_config=self.hyperparameter_tuning_config,
hyperparameters=self.hyperparameters,
)
else:
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
return self.result
def _estimate_chunk(self, chunk: Chunk) -> Dict:
estimates: Dict[str, Any] = {}
for metric in self.metrics:
try:
estimated_metric = metric.estimate(chunk.data)
sampling_error = metric.sampling_error(chunk.data)
upper_confidence_boundary = estimated_metric + 3 * sampling_error
if metric.upper_threshold_value_limit is not None:
upper_confidence_boundary = min(metric.upper_threshold_value_limit, upper_confidence_boundary)
lower_confidence_boundary = estimated_metric - 3 * sampling_error
if metric.lower_threshold_value_limit is not None:
lower_confidence_boundary = max(metric.lower_threshold_value_limit, lower_confidence_boundary)
estimates[f'sampling_error_{metric.column_name}'] = sampling_error
estimates[f'realized_{metric.column_name}'] = metric.realized_performance(chunk.data)
estimates[f'estimated_{metric.column_name}'] = estimated_metric
estimates[f'upper_confidence_{metric.column_name}'] = upper_confidence_boundary
estimates[f'lower_confidence_{metric.column_name}'] = lower_confidence_boundary
estimates[f'upper_threshold_{metric.column_name}'] = metric.upper_threshold_value
estimates[f'lower_threshold_{metric.column_name}'] = metric.lower_threshold_value
estimates[f'alert_{metric.column_name}'] = metric.alert(estimated_metric)
except Exception as exc:
self._logger.error(
f"an unexpected error occurred while calculating metric {metric.display_name}: {exc}"
)
estimates[f'sampling_error_{metric.column_name}'] = np.NaN
estimates[f'realized_{metric.column_name}'] = np.NaN
estimates[f'estimated_{metric.column_name}'] = np.NaN
estimates[f'upper_confidence_{metric.column_name}'] = np.NaN
estimates[f'lower_confidence_{metric.column_name}'] = np.NaN
estimates[f'upper_threshold_{metric.column_name}'] = metric.upper_threshold_value
estimates[f'lower_threshold_{metric.column_name}'] = metric.lower_threshold_value
estimates[f'alert_{metric.column_name}'] = np.NaN
return estimates
def _create_multilevel_index(metric_names: List[str]):
chunk_column_names = [
'key',
'chunk_index',
'start_index',
'end_index',
'start_date',
'end_date',
'period',
]
method_column_names = [
'sampling_error',
'realized',
'value',
'upper_confidence_boundary',
'lower_confidence_boundary',
'upper_threshold',
'lower_threshold',
'alert',
]
chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names]
reconstruction_tuples = [
(metric_name, column_name) for metric_name in metric_names for column_name in method_column_names
]
tuples = chunk_tuples + reconstruction_tuples
return MultiIndex.from_tuples(tuples)
def _default_encoder():
return OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)