# Author: Niels Nuyttens <niels@nannyml.com>
#
# License: Apache Software License 2.0
"""Calculates drift for model predictions and model outputs using statistical tests."""
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, ks_2samp
from nannyml._typing import ModelOutputsType, ProblemType, model_output_column_names
from nannyml.base import AbstractCalculator, _column_is_categorical, _list_missing
from nannyml.chunk import Chunker
from nannyml.drift.model_outputs.univariate.statistical.results import UnivariateDriftResult
from nannyml.exceptions import InvalidArgumentsException
ALERT_THRESHOLD_P_VALUE = 0.05
[docs]class StatisticalOutputDriftCalculator(AbstractCalculator):
"""Calculates drift for model predictions and model outputs using statistical tests."""
def __init__(
self,
y_pred: str,
timestamp_column_name: str,
problem_type: Union[str, ProblemType],
y_pred_proba: ModelOutputsType = None,
chunk_size: int = None,
chunk_number: int = None,
chunk_period: str = None,
chunker: Chunker = None,
):
"""Creates a new StatisticalOutputDriftCalculator.
Parameters
----------
y_pred_proba: ModelOutputsType
Name(s) of the column(s) containing your model output.
Pass a single string when there is only a single model output column, e.g. in binary classification cases.
Pass a dictionary when working with multiple output columns, e.g. in multiclass classification cases.
The dictionary maps a class/label string to the column name containing model outputs for that class/label.
y_pred: str
The name of the column containing your model predictions.
timestamp_column_name: str
The name of the column containing the timestamp of the model prediction.
chunk_size: int, default=None
Splits the data into chunks containing `chunks_size` observations.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_number: int, default=None
Splits the data into `chunk_number` pieces.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_period: str, default=None
Splits the data according to the given period.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunker : Chunker, default=None
The `Chunker` used to split the data sets into a lists of chunks.
Examples
--------
>>> import nannyml as nml
>>>
>>> reference_df, analysis_df, _ = nml.load_synthetic_binary_classification_dataset()
>>>
>>> calc = nml.StatisticalOutputDriftCalculator(
>>> y_pred_proba='y_pred_proba',
>>> y_pred='y_pred',
>>> timestamp_column_name='timestamp'
>>> )
>>> calc.fit(reference_df)
>>> results = calc.calculate(analysis_df)
>>>
>>> print(results.data) # check the numbers
key start_index ... y_pred_proba_alert y_pred_proba_threshold
0 [0:4999] 0 ... True 0.05
1 [5000:9999] 5000 ... False 0.05
2 [10000:14999] 10000 ... False 0.05
3 [15000:19999] 15000 ... False 0.05
4 [20000:24999] 20000 ... False 0.05
5 [25000:29999] 25000 ... True 0.05
6 [30000:34999] 30000 ... True 0.05
7 [35000:39999] 35000 ... True 0.05
8 [40000:44999] 40000 ... True 0.05
9 [45000:49999] 45000 ... True 0.05
>>>
>>> results.plot(kind='score_drift', metric='p_value', plot_reference=True).show()
>>> results.plot(kind='score_distribution', plot_reference=True).show()
>>> results.plot(kind='prediction_drift', plot_reference=True).show()
>>> results.plot(kind='prediction_distribution', plot_reference=True).show()
"""
super(StatisticalOutputDriftCalculator, self).__init__(chunk_size, chunk_number, chunk_period, chunker)
self.y_pred_proba = y_pred_proba
self.y_pred = y_pred
self.timestamp_column_name = timestamp_column_name
if isinstance(problem_type, str):
problem_type = ProblemType.parse(problem_type)
self.problem_type: ProblemType = problem_type # type: ignore
if self.problem_type is not ProblemType.REGRESSION and self.y_pred_proba is None:
raise InvalidArgumentsException(
f"'y_pred_proba' can not be 'None' for " f"problem type {self.problem_type.value}"
)
self.previous_reference_data: Optional[pd.DataFrame] = None
self.previous_reference_results: Optional[pd.DataFrame] = None
self.previous_analysis_data: Optional[pd.DataFrame] = None
def _fit(self, reference_data: pd.DataFrame, *args, **kwargs):
"""Fits the drift calculator using a set of reference data."""
if reference_data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')
if self.y_pred_proba:
_list_missing([self.y_pred] + model_output_column_names(self.y_pred_proba), reference_data)
else:
_list_missing([self.y_pred], reference_data)
self.previous_reference_data = reference_data.copy()
# Force categorical columns to be set to 'category' pandas dtype
# TODO: we should try to get rid of this
if _column_is_categorical(reference_data[self.y_pred]):
reference_data[self.y_pred] = reference_data[self.y_pred].astype('category')
# Reference stability
self._reference_stability = 0 # TODO: Jakub
self.previous_reference_results = self._calculate(reference_data).data
return self
def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> UnivariateDriftResult:
"""Calculates the data reconstruction drift for a given data set."""
if data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')
if self.y_pred_proba:
_list_missing([self.y_pred] + model_output_column_names(self.y_pred_proba), data)
else:
_list_missing([self.y_pred], data)
continuous_columns: List[str] = []
categorical_columns: List[str] = []
if self.problem_type == ProblemType.CLASSIFICATION_BINARY:
if isinstance(self.y_pred_proba, str):
continuous_columns += [self.y_pred_proba]
categorical_columns += [self.y_pred]
elif self.problem_type == ProblemType.CLASSIFICATION_MULTICLASS:
if self.y_pred_proba is not None:
continuous_columns += model_output_column_names(self.y_pred_proba)
categorical_columns += [self.y_pred]
elif self.problem_type == ProblemType.REGRESSION:
continuous_columns += [self.y_pred]
chunks = self.chunker.split(
data, columns=continuous_columns + categorical_columns, timestamp_column_name=self.timestamp_column_name
)
chunk_drifts = []
# Calculate chunk-wise drift statistics.
# Append all into resulting DataFrame indexed by chunk key.
for chunk in chunks:
chunk_drift: Dict[str, Any] = {
'key': chunk.key,
'start_index': chunk.start_index,
'end_index': chunk.end_index,
'start_date': chunk.start_datetime,
'end_date': chunk.end_datetime,
}
for column in categorical_columns:
statistic, p_value, _, _ = chi2_contingency(
pd.concat(
[
self.previous_reference_data[column].value_counts(), # type: ignore
chunk.data[column].value_counts(),
],
axis=1,
).fillna(0)
)
chunk_drift[f'{column}_chi2'] = statistic
chunk_drift[f'{column}_p_value'] = np.round(p_value, decimals=3)
chunk_drift[f'{column}_alert'] = p_value < ALERT_THRESHOLD_P_VALUE
chunk_drift[f'{column}_threshold'] = ALERT_THRESHOLD_P_VALUE
for column in continuous_columns:
statistic, p_value = ks_2samp(self.previous_reference_data[column], chunk.data[column]) # type: ignore
chunk_drift[f'{column}_dstat'] = statistic
chunk_drift[f'{column}_p_value'] = np.round(p_value, decimals=3)
chunk_drift[f'{column}_alert'] = p_value < ALERT_THRESHOLD_P_VALUE
chunk_drift[f'{column}_threshold'] = ALERT_THRESHOLD_P_VALUE
chunk_drifts.append(chunk_drift)
res = pd.DataFrame.from_records(chunk_drifts)
res = res.reset_index(drop=True)
self.previous_analysis_data = data.copy()
return UnivariateDriftResult(results_data=res, calculator=self)