# Author: Niels Nuyttens <niels@nannyml.com>
# Nikolaos Perrakis <nikos@nannyml.com>
# License: Apache Software License 2.0
"""Drift calculator using Reconstruction Error as a measure of drift."""
from typing import List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from category_encoders import CountEncoder
from pandas import MultiIndex
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.drift.multivariate.data_reconstruction.result import Result
from nannyml.exceptions import InvalidArgumentsException
from nannyml.sampling_error import SAMPLING_ERROR_RANGE
from nannyml.usage_logging import UsageEvent, log_usage
[docs]class DataReconstructionDriftCalculator(AbstractCalculator):
"""BaseDriftCalculator implementation using Reconstruction Error as a measure of drift."""
def __init__(
self,
column_names: List[str],
timestamp_column_name: Optional[str] = None,
n_components: Union[int, float, str] = 0.65,
chunk_size: Optional[int] = None,
chunk_number: Optional[int] = None,
chunk_period: Optional[str] = None,
chunker: Optional[Chunker] = None,
imputer_categorical: Optional[SimpleImputer] = None,
imputer_continuous: Optional[SimpleImputer] = None,
):
"""Creates a new DataReconstructionDriftCalculator instance.
Parameters
----------
column_names: List[str]
A list containing the names of features in the provided data set. All of these features will be used by
the multivariate data reconstruction drift calculator to calculate an aggregate drift score.
timestamp_column_name: str, default=None
The name of the column containing the timestamp of the model prediction.
n_components: Union[int, float, str], default=0.65
The n_components parameter as passed to the sklearn.decomposition.PCA constructor.
See https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html
chunk_size: int, default=None
Splits the data into chunks containing `chunks_size` observations.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_number: int, default=None
Splits the data into `chunk_number` pieces.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_period: str, default=None
Splits the data according to the given period.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunker : Chunker, default=None
The `Chunker` used to split the data sets into a lists of chunks.
imputer_categorical: SimpleImputer, default=None
The SimpleImputer used to impute categorical features in the data. Defaults to using most_frequent value.
imputer_continuous: SimpleImputer, default=None
The SimpleImputer used to impute continuous features in the data. Defaults to using mean value.
Examples
--------
>>> import nannyml as nml
>>> from IPython.display import display
>>> # Load synthetic data
>>> reference = nml.load_synthetic_binary_classification_dataset()[0]
>>> analysis = nml.load_synthetic_binary_classification_dataset()[1]
>>> display(reference.head())
>>> # Define feature columns
>>> column_names = [
... col for col in reference.columns if col not in [
... 'timestamp', 'y_pred_proba', 'period', 'y_pred', 'work_home_actual', 'identifier'
... ]]
>>> calc = nml.DataReconstructionDriftCalculator(
... column_names=column_names,
... timestamp_column_name='timestamp',
... chunk_size=5000
>>> )
>>> calc.fit(reference)
>>> results = calc.calculate(analysis)
>>> display(results.data)
>>> display(results.calculator.previous_reference_results)
>>> figure = results.plot(plot_reference=True)
>>> figure.show()
"""
super(DataReconstructionDriftCalculator, self).__init__(
chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name
)
self.column_names = column_names
self.continuous_column_names: List[str] = []
self.categorical_column_names: List[str] = []
self._n_components = n_components
self._scaler = None
self._encoder = None
self._pca = None
self._upper_alert_threshold: Optional[float] = None
self._lower_alert_threshold: Optional[float] = None
if imputer_categorical:
if not isinstance(imputer_categorical, SimpleImputer):
raise TypeError("imputer_categorical needs to be an instantiated SimpleImputer object.")
if imputer_categorical.strategy not in ["most_frequent", "constant"]:
raise ValueError("Please use a SimpleImputer strategy appropriate for categorical features.")
else:
imputer_categorical = SimpleImputer(missing_values=np.nan, strategy='most_frequent')
self._imputer_categorical = imputer_categorical
if imputer_continuous:
if not isinstance(imputer_continuous, SimpleImputer):
raise TypeError("imputer_continuous needs to be an instantiated SimpleImputer object.")
else:
imputer_continuous = SimpleImputer(missing_values=np.nan, strategy='mean')
self._imputer_continuous = imputer_continuous
# sampling error
self._sampling_error_components: Tuple = ()
self.previous_reference_results: Optional[pd.DataFrame] = None
self.result: Optional[Result] = None
@log_usage(UsageEvent.MULTIVAR_DRIFT_CALC_FIT)
def _fit(self, reference_data: pd.DataFrame, *args, **kwargs):
"""Fits the drift calculator to a set of reference data."""
if reference_data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')
_list_missing(self.column_names, reference_data)
self.continuous_column_names, self.categorical_column_names = _split_features_by_type(
reference_data, self.column_names
)
# TODO: We duplicate the reference data 3 times, here. Improve to something more memory efficient?
imputed_reference_data = reference_data.copy(deep=True)
if self.categorical_column_names:
imputed_reference_data[self.categorical_column_names] = self._imputer_categorical.fit_transform(
imputed_reference_data[self.categorical_column_names]
)
if self.continuous_column_names:
imputed_reference_data[self.continuous_column_names] = self._imputer_continuous.fit_transform(
imputed_reference_data[self.continuous_column_names]
)
encoder = CountEncoder(cols=self.categorical_column_names, normalize=True)
encoded_reference_data = imputed_reference_data.copy(deep=True)
encoded_reference_data[self.column_names] = encoder.fit_transform(encoded_reference_data[self.column_names])
scaler = StandardScaler()
scaled_reference_data = pd.DataFrame(
scaler.fit_transform(encoded_reference_data[self.column_names]), columns=self.column_names
)
pca = PCA(n_components=self._n_components, random_state=16)
pca.fit(scaled_reference_data[self.column_names])
self._encoder = encoder
self._scaler = scaler
self._pca = pca
# Calculate thresholds
self._upper_alert_threshold, self._lower_alert_threshold = self._calculate_alert_thresholds(reference_data)
# Reference stability
self._sampling_error_components = (
_calculate_reconstruction_error_for_data(
column_names=self.column_names,
categorical_column_names=self.categorical_column_names,
continuous_column_names=self.continuous_column_names,
data=reference_data, # TODO: check with Nikos if this needs to be chunked or not?
encoder=self._encoder,
scaler=self._scaler,
pca=self._pca,
imputer_categorical=self._imputer_categorical,
imputer_continuous=self._imputer_continuous,
).std(),
)
self.result = self._calculate(data=reference_data)
assert self.result is not None
self.result.data[('chunk', 'period')] = 'reference'
return self
@log_usage(UsageEvent.MULTIVAR_DRIFT_CALC_RUN)
def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
"""Calculates the data reconstruction drift for a given data set."""
if data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')
_list_missing(self.column_names, data)
self.continuous_column_names, self.categorical_column_names = _split_features_by_type(data, self.column_names)
chunks = self.chunker.split(data, columns=self.column_names)
res = pd.DataFrame.from_records(
[
{
'key': chunk.key,
'chunk_index': chunk.chunk_index,
'start_index': chunk.start_index,
'end_index': chunk.end_index,
'start_date': chunk.start_datetime,
'end_date': chunk.end_datetime,
'period': 'analysis',
'sampling_error': sampling_error(self._sampling_error_components, chunk.data),
'reconstruction_error': _calculate_reconstruction_error_for_data(
column_names=self.column_names,
categorical_column_names=self.categorical_column_names,
continuous_column_names=self.continuous_column_names,
data=chunk.data,
encoder=self._encoder,
scaler=self._scaler,
pca=self._pca,
imputer_categorical=self._imputer_categorical,
imputer_continuous=self._imputer_continuous,
).mean(),
}
for chunk in chunks
]
)
res['upper_confidence_bound'] = res['reconstruction_error'] + SAMPLING_ERROR_RANGE * res['sampling_error']
res['lower_confidence_bound'] = res['reconstruction_error'] - SAMPLING_ERROR_RANGE * res['sampling_error']
res['upper_threshold'] = [self._upper_alert_threshold] * len(res)
res['lower_threshold'] = [self._lower_alert_threshold] * len(res)
res['alert'] = _add_alert_flag(res, self._upper_alert_threshold, self._lower_alert_threshold) # type: ignore
multilevel_index = _create_multilevel_index()
res.columns = multilevel_index
res = res.reset_index(drop=True)
if self.result is None:
self.result = Result(
results_data=res,
timestamp_column_name=self.timestamp_column_name,
column_names=self.column_names,
categorical_column_names=self.categorical_column_names,
continuous_column_names=self.continuous_column_names,
)
else:
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
return self.result
def _calculate_alert_thresholds(self, reference_data) -> Tuple[float, float]:
reference_chunks = self.chunker.split(reference_data) # type: ignore
reference_reconstruction_error = pd.Series(
[
_calculate_reconstruction_error_for_data(
column_names=self.column_names,
categorical_column_names=self.categorical_column_names,
continuous_column_names=self.continuous_column_names,
data=chunk.data,
encoder=self._encoder,
scaler=self._scaler,
pca=self._pca,
imputer_categorical=self._imputer_categorical,
imputer_continuous=self._imputer_continuous,
).mean()
for chunk in reference_chunks
]
)
return (
reference_reconstruction_error.mean() + 3 * reference_reconstruction_error.std(),
reference_reconstruction_error.mean() - 3 * reference_reconstruction_error.std(),
)
def _calculate_reconstruction_error_for_data(
column_names: List[str],
categorical_column_names: List[str],
continuous_column_names: List[str],
data: pd.DataFrame,
encoder: CountEncoder,
scaler: StandardScaler,
pca: PCA,
imputer_categorical: SimpleImputer,
imputer_continuous: SimpleImputer,
) -> pd.Series:
"""Calculates reconstruction error for a single Chunk.
Parameters
----------
column_names : List[str]
Subset of features to be included in calculation.
categorical_column_names : List[str]
Subset of categorical features to be included in calculation.
continuous_column_names : List[str]
Subset of continuous features to be included in calculation.
data : pd.DataFrame
The dataset to calculate reconstruction error on
encoder : category_encoders.CountEncoder
Encoder used to transform categorical features into a numerical representation
scaler : sklearn.preprocessing.StandardScaler
Standardize features by removing the mean and scaling to unit variance
pca : sklearn.decomposition.PCA
Linear dimensionality reduction using Singular Value Decomposition of the
data to project it to a lower dimensional space.
imputer_categorical: SimpleImputer
The SimpleImputer fitted to impute categorical features in the data.
imputer_continuous: SimpleImputer
The SimpleImputer fitted to impute continuous features in the data.
Returns
-------
rce_for_chunk: pd.DataFrame
A pandas.DataFrame containing the Chunk key and reconstruction error for the given Chunk data.
"""
# encode categorical features
data = data.copy(deep=True).reset_index(drop=True)
# Impute missing values
if categorical_column_names:
data[categorical_column_names] = imputer_categorical.transform(data[categorical_column_names])
if continuous_column_names:
data[continuous_column_names] = imputer_continuous.transform(data[continuous_column_names])
data[column_names] = encoder.transform(data[column_names])
# scale all features
data[column_names] = scaler.transform(data[column_names])
# perform dimensionality reduction
reduced_data = pca.transform(data[column_names])
# perform reconstruction
reconstructed = pca.inverse_transform(reduced_data)
reconstructed_feature_column_names = [f'rf_{col}' for col in column_names]
reconstructed_data = pd.DataFrame(reconstructed, columns=reconstructed_feature_column_names)
# combine preprocessed rows with reconstructed rows
data = pd.concat([data, reconstructed_data], axis=1)
# calculate reconstruction error using euclidian norm (row-wise between preprocessed and reconstructed value)
data = data.assign(rc_error=lambda x: _calculate_distance(data, column_names, reconstructed_feature_column_names))
return data['rc_error']
def _calculate_distance(df: pd.DataFrame, features_preprocessed: List[str], features_reconstructed: List[str]):
"""Calculate row-wise euclidian distance between preprocessed and reconstructed feature values."""
x1 = df[features_preprocessed]
x2 = df[features_reconstructed]
x2.columns = x1.columns
x = x1.subtract(x2)
x['rc_error'] = x.apply(lambda row: np.linalg.norm(row), axis=1)
return x['rc_error']
def _add_alert_flag(drift_result: pd.DataFrame, upper_threshold: float, lower_threshold: float) -> pd.Series:
alert = drift_result.apply(
lambda row: True
if (row['reconstruction_error'] > upper_threshold or row['reconstruction_error'] < lower_threshold)
else False,
axis=1,
)
return alert
[docs]def sampling_error(components: Tuple, data: pd.DataFrame) -> float:
return components[0] / np.sqrt(len(data))
def _create_multilevel_index():
chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period']
method_column_names = [
'sampling_error',
'value',
'upper_confidence_boundary',
'lower_confidence_boundary',
'upper_threshold',
'lower_threshold',
'alert',
]
chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names]
reconstruction_tuples = [('reconstruction_error', column_name) for column_name in method_column_names]
tuples = chunk_tuples + reconstruction_tuples
return MultiIndex.from_tuples(tuples)