Source code for nannyml.drift.multivariate.data_reconstruction.calculator

#  Author:   Niels Nuyttens  <>
#            Nikolaos Perrakis  <>
#  License: Apache Software License 2.0

"""Calculates the data reconstruction error on unseen analysis data after fitting on reference data.

This calculator wraps a PCA transformation. It will be fitted on reference data when the `fit` method is called.
On calling the `calculate` method it will perform the inverse transformation on the analysis data and calculate
the euclidian distance between the analysis data and the reconstructed version of it.

This is the data reconstruction error, and it can be used as a measure of drift between
the reference and analysis data sets.


from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from category_encoders import CountEncoder
from pandas import MultiIndex
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.drift.multivariate.data_reconstruction.result import Result
from nannyml.exceptions import InvalidArgumentsException
from nannyml.sampling_error import SAMPLING_ERROR_RANGE
from nannyml.thresholds import StandardDeviationThreshold, Threshold
from nannyml.usage_logging import UsageEvent, log_usage

[docs]class DataReconstructionDriftCalculator(AbstractCalculator): """BaseDriftCalculator implementation using Reconstruction Error as a measure of drift.""" def __init__( self, column_names: List[str], timestamp_column_name: Optional[str] = None, n_components: Union[int, float, str] = 0.65, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, imputer_categorical: Optional[SimpleImputer] = None, imputer_continuous: Optional[SimpleImputer] = None, threshold: Threshold = StandardDeviationThreshold(), ): """Creates a new DataReconstructionDriftCalculator instance. Parameters: column_names: List[str] A list containing the names of features in the provided data set. All of these features will be used by the multivariate data reconstruction drift calculator to calculate an aggregate drift score. timestamp_column_name: str, default=None The name of the column containing the timestamp of the model prediction. n_components: Union[int, float, str], default=0.65 The n_components parameter as passed to the sklearn.decomposition.PCA constructor. See chunk_size: int, default=None Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int, default=None Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str, default=None Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker, default=None The `Chunker` used to split the data sets into a lists of chunks. imputer_categorical: SimpleImputer, default=None The SimpleImputer used to impute categorical features in the data. Defaults to using most_frequent value. imputer_continuous: SimpleImputer, default=None The SimpleImputer used to impute continuous features in the data. Defaults to using mean value. threshold: Threshold, default=StandardDeviationThreshold The threshold you wish to evaluate values on. Defaults to a StandardDeviationThreshold with default options. The other allowed value is ConstantThreshold. Examples: >>> import nannyml as nml >>> # Load synthetic data >>> reference, analysis, _ = nml.load_synthetic_car_loan_dataset() >>> non_feature_columns = ['timestamp', 'y_pred_proba', 'y_pred', 'repaid'] >>> feature_column_names = [ ... col for col in reference.columns ... if col not in non_feature_columns >>> ] >>> calc = nml.DataReconstructionDriftCalculator( ... column_names=feature_column_names, ... timestamp_column_name='timestamp', ... chunk_size=5000 >>> ) >>> >>> results = calc.calculate(analysis) >>> figure = results.plot() >>> """ super(DataReconstructionDriftCalculator, self).__init__( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) self.column_names = column_names self.continuous_column_names: List[str] = [] self.categorical_column_names: List[str] = [] self._n_components = n_components self.threshold = threshold self._scaler = None self._encoder = None self._pca = None self._upper_alert_threshold: Optional[float] self._lower_alert_threshold: Optional[float] if imputer_categorical: if not isinstance(imputer_categorical, SimpleImputer): raise TypeError("imputer_categorical needs to be an instantiated SimpleImputer object.") if imputer_categorical.strategy not in ["most_frequent", "constant"]: raise ValueError("Please use a SimpleImputer strategy appropriate for categorical features.") else: imputer_categorical = SimpleImputer(missing_values=np.nan, strategy='most_frequent') self._imputer_categorical = imputer_categorical if imputer_continuous: if not isinstance(imputer_continuous, SimpleImputer): raise TypeError("imputer_continuous needs to be an instantiated SimpleImputer object.") else: imputer_continuous = SimpleImputer(missing_values=np.nan, strategy='mean') self._imputer_continuous = imputer_continuous # sampling error self._sampling_error_components: Tuple = () self.previous_reference_results: Optional[pd.DataFrame] = None self.result: Optional[Result] = None @log_usage(UsageEvent.MULTIVAR_DRIFT_CALC_FIT) def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): """Fits the drift calculator to a set of reference data.""" if reference_data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, reference_data) self.continuous_column_names, self.categorical_column_names = _split_features_by_type( reference_data, self.column_names ) # TODO: We duplicate the reference data 3 times, here. Improve to something more memory efficient? imputed_reference_data = reference_data.copy(deep=True) if self.categorical_column_names: imputed_reference_data[self.categorical_column_names] = self._imputer_categorical.fit_transform( imputed_reference_data[self.categorical_column_names] ) if self.continuous_column_names: imputed_reference_data[self.continuous_column_names] = self._imputer_continuous.fit_transform( imputed_reference_data[self.continuous_column_names] ) encoder = CountEncoder(cols=self.categorical_column_names, normalize=True) encoded_reference_data = imputed_reference_data.copy(deep=True) encoded_reference_data[self.column_names] = encoder.fit_transform(encoded_reference_data[self.column_names]) scaler = StandardScaler() scaled_reference_data = pd.DataFrame( scaler.fit_transform(encoded_reference_data[self.column_names]), columns=self.column_names ) pca = PCA(n_components=self._n_components, random_state=16)[self.column_names]) self._encoder = encoder self._scaler = scaler self._pca = pca # Calculate thresholds self._lower_alert_threshold, self._upper_alert_threshold = self._calculate_alert_thresholds(reference_data) # Reference stability self._sampling_error_components = ( _calculate_reconstruction_error_for_data( column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names, data=reference_data, # TODO: check with Nikos if this needs to be chunked or not? encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ).std(), ) self.result = self._calculate(data=reference_data)[('chunk', 'period')] = 'reference' return self @log_usage(UsageEvent.MULTIVAR_DRIFT_CALC_RUN) def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: """Calculates the data reconstruction drift for a given data set.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, data) self.continuous_column_names, self.categorical_column_names = _split_features_by_type(data, self.column_names) chunks = self.chunker.split(data, columns=self.column_names) res = pd.DataFrame.from_records( [ { 'key': chunk.key, 'chunk_index': chunk.chunk_index, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, 'period': 'analysis', 'sampling_error': sampling_error(self._sampling_error_components,, 'reconstruction_error': _calculate_reconstruction_error_for_data( column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names,, encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ).mean(), } for chunk in chunks ] ) res['upper_confidence_bound'] = res['reconstruction_error'] + SAMPLING_ERROR_RANGE * res['sampling_error'] res['lower_confidence_bound'] = res['reconstruction_error'] - SAMPLING_ERROR_RANGE * res['sampling_error'] res['upper_threshold'] = [self._upper_alert_threshold] * len(res) res['lower_threshold'] = [self._lower_alert_threshold] * len(res) res['alert'] = _add_alert_flag(res, self._upper_alert_threshold, self._lower_alert_threshold) multilevel_index = _create_multilevel_index() res.columns = multilevel_index res = res.reset_index(drop=True) if self.result is None: self.result = Result( results_data=res, timestamp_column_name=self.timestamp_column_name, column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names, ) else: self.result = self.result.filter(period='reference') = pd.concat([, res]).reset_index(drop=True) return self.result def _calculate_alert_thresholds(self, reference_data) -> Tuple[Optional[float], Optional[float]]: reference_chunks = self.chunker.split(reference_data) reference_reconstruction_error = np.asarray( [ _calculate_reconstruction_error_for_data( column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names,, encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ).mean() for chunk in reference_chunks ] ) return self.threshold.thresholds(reference_reconstruction_error)
def _calculate_reconstruction_error_for_data( column_names: List[str], categorical_column_names: List[str], continuous_column_names: List[str], data: pd.DataFrame, encoder: CountEncoder, scaler: StandardScaler, pca: PCA, imputer_categorical: SimpleImputer, imputer_continuous: SimpleImputer, ) -> pd.Series: """Calculates reconstruction error for a single Chunk. Parameters ---------- column_names : List[str] Subset of features to be included in calculation. categorical_column_names : List[str] Subset of categorical features to be included in calculation. continuous_column_names : List[str] Subset of continuous features to be included in calculation. data : pd.DataFrame The dataset to calculate reconstruction error on encoder : category_encoders.CountEncoder Encoder used to transform categorical features into a numerical representation scaler : sklearn.preprocessing.StandardScaler Standardize features by removing the mean and scaling to unit variance pca : sklearn.decomposition.PCA Linear dimensionality reduction using Singular Value Decomposition of the data to project it to a lower dimensional space. imputer_categorical: SimpleImputer The SimpleImputer fitted to impute categorical features in the data. imputer_continuous: SimpleImputer The SimpleImputer fitted to impute continuous features in the data. Returns ------- rce_for_chunk: pd.DataFrame A pandas.DataFrame containing the Chunk key and reconstruction error for the given Chunk data. """ # encode categorical features data = data.copy(deep=True).reset_index(drop=True) # Impute missing values if categorical_column_names: data[categorical_column_names] = imputer_categorical.transform(data[categorical_column_names]) if continuous_column_names: data[continuous_column_names] = imputer_continuous.transform(data[continuous_column_names]) data[column_names] = encoder.transform(data[column_names]) # scale all features data[column_names] = scaler.transform(data[column_names]) # perform dimensionality reduction reduced_data = pca.transform(data[column_names]) # perform reconstruction reconstructed = pca.inverse_transform(reduced_data) reconstructed_feature_column_names = [f'rf_{col}' for col in column_names] reconstructed_data = pd.DataFrame(reconstructed, columns=reconstructed_feature_column_names) # combine preprocessed rows with reconstructed rows data = pd.concat([data, reconstructed_data], axis=1) # calculate reconstruction error using euclidian norm (row-wise between preprocessed and reconstructed value) data = data.assign(rc_error=lambda x: _calculate_distance(data, column_names, reconstructed_feature_column_names)) return data['rc_error'] def _calculate_distance(df: pd.DataFrame, features_preprocessed: List[str], features_reconstructed: List[str]): """Calculate row-wise euclidian distance between preprocessed and reconstructed feature values.""" x1 = df[features_preprocessed] x2 = df[features_reconstructed] x2.columns = x1.columns x = x1.subtract(x2) x['rc_error'] = x.apply(lambda row: np.linalg.norm(row), axis=1) return x['rc_error'] def _add_alert_flag( drift_result: pd.DataFrame, upper_threshold: Optional[float], lower_threshold: Optional[float] ) -> pd.Series: alert = drift_result.apply( lambda row: True if ( (upper_threshold is not None and row['reconstruction_error'] > upper_threshold) or (lower_threshold is not None and row['reconstruction_error'] < lower_threshold) ) else False, axis=1, ) return alert
[docs]def sampling_error(components: Tuple, data: pd.DataFrame) -> float: """Calculates the sampling error with respect to the reference data for a given chunk of data. Parameters ---------- components: Tuple data: pd.DataFrame The data to calculate the sampling error on, with respect to the reference data. Returns ------- sampling_error: float The expected sampling error. """ return components[0] / np.sqrt(len(data))
def _create_multilevel_index(): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] method_column_names = [ 'sampling_error', 'value', 'upper_confidence_boundary', 'lower_confidence_boundary', 'upper_threshold', 'lower_threshold', 'alert', ] chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] reconstruction_tuples = [('reconstruction_error', column_name) for column_name in method_column_names] tuples = chunk_tuples + reconstruction_tuples return MultiIndex.from_tuples(tuples)