Source code for nannyml.drift.multivariate.data_reconstruction.calculator

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#            Nikolaos Perrakis  <nikos@nannyml.com>
#  License: Apache Software License 2.0

"""Drift calculator using Reconstruction Error as a measure of drift."""

from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from category_encoders import CountEncoder
from pandas import MultiIndex
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.drift.multivariate.data_reconstruction.result import Result
from nannyml.exceptions import InvalidArgumentsException
from nannyml.sampling_error import SAMPLING_ERROR_RANGE


[docs]class DataReconstructionDriftCalculator(AbstractCalculator): """BaseDriftCalculator implementation using Reconstruction Error as a measure of drift.""" def __init__( self, column_names: List[str], timestamp_column_name: str = None, n_components: Union[int, float, str] = 0.65, chunk_size: int = None, chunk_number: int = None, chunk_period: str = None, chunker: Chunker = None, imputer_categorical: SimpleImputer = None, imputer_continuous: SimpleImputer = None, ): """Creates a new DataReconstructionDriftCalculator instance. Parameters ---------- column_names: List[str] A list containing the names of features in the provided data set. All of these features will be used by the multivariate data reconstruction drift calculator to calculate an aggregate drift score. timestamp_column_name: str, default=None The name of the column containing the timestamp of the model prediction. n_components: Union[int, float, str], default=0.65 The n_components parameter as passed to the sklearn.decomposition.PCA constructor. See https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html chunk_size: int, default=None Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int, default=None Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str, default=None Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker, default=None The `Chunker` used to split the data sets into a lists of chunks. imputer_categorical: SimpleImputer, default=None The SimpleImputer used to impute categorical features in the data. Defaults to using most_frequent value. imputer_continuous: SimpleImputer, default=None The SimpleImputer used to impute continuous features in the data. Defaults to using mean value. Examples -------- >>> import nannyml as nml >>> from IPython.display import display >>> # Load synthetic data >>> reference = nml.load_synthetic_binary_classification_dataset()[0] >>> analysis = nml.load_synthetic_binary_classification_dataset()[1] >>> display(reference.head()) >>> # Define feature columns >>> column_names = [ ... col for col in reference.columns if col not in [ ... 'timestamp', 'y_pred_proba', 'period', 'y_pred', 'work_home_actual', 'identifier' ... ]] >>> calc = nml.DataReconstructionDriftCalculator( ... column_names=column_names, ... timestamp_column_name='timestamp', ... chunk_size=5000 >>> ) >>> calc.fit(reference) >>> results = calc.calculate(analysis) >>> display(results.data) >>> display(results.calculator.previous_reference_results) >>> figure = results.plot(plot_reference=True) >>> figure.show() """ super(DataReconstructionDriftCalculator, self).__init__( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) self.column_names = column_names self.continuous_column_names: List[str] = [] self.categorical_column_names: List[str] = [] self._n_components = n_components self._scaler = None self._encoder = None self._pca = None self._upper_alert_threshold: Optional[float] = None self._lower_alert_threshold: Optional[float] = None if imputer_categorical: if not isinstance(imputer_categorical, SimpleImputer): raise TypeError("imputer_categorical needs to be an instantiated SimpleImputer object.") if imputer_categorical.strategy not in ["most_frequent", "constant"]: raise ValueError("Please use a SimpleImputer strategy appropriate for categorical features.") else: imputer_categorical = SimpleImputer(missing_values=np.nan, strategy='most_frequent') self._imputer_categorical = imputer_categorical if imputer_continuous: if not isinstance(imputer_continuous, SimpleImputer): raise TypeError("imputer_continuous needs to be an instantiated SimpleImputer object.") else: imputer_continuous = SimpleImputer(missing_values=np.nan, strategy='mean') self._imputer_continuous = imputer_continuous # sampling error self._sampling_error_components: Tuple = () self.previous_reference_results: Optional[pd.DataFrame] = None self.result: Optional[Result] = None def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): """Fits the drift calculator to a set of reference data.""" if reference_data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, reference_data) self.continuous_column_names, self.categorical_column_names = _split_features_by_type( reference_data, self.column_names ) # TODO: We duplicate the reference data 3 times, here. Improve to something more memory efficient? imputed_reference_data = reference_data.copy(deep=True) if len(self.categorical_column_names) > 0: imputed_reference_data[self.categorical_column_names] = self._imputer_categorical.fit_transform( imputed_reference_data[self.categorical_column_names] ) if len(self.continuous_column_names) > 0: imputed_reference_data[self.continuous_column_names] = self._imputer_continuous.fit_transform( imputed_reference_data[self.continuous_column_names] ) encoder = CountEncoder(cols=self.categorical_column_names, normalize=True) encoded_reference_data = imputed_reference_data.copy(deep=True) encoded_reference_data[self.column_names] = encoder.fit_transform(encoded_reference_data[self.column_names]) scaler = StandardScaler() scaled_reference_data = pd.DataFrame( scaler.fit_transform(encoded_reference_data[self.column_names]), columns=self.column_names ) pca = PCA(n_components=self._n_components, random_state=16) pca.fit(scaled_reference_data[self.column_names]) self._encoder = encoder self._scaler = scaler self._pca = pca # Calculate thresholds self._upper_alert_threshold, self._lower_alert_threshold = self._calculate_alert_thresholds(reference_data) # Reference stability self._sampling_error_components = ( _calculate_reconstruction_error_for_data( column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names, data=reference_data, # TODO: check with Nikos if this needs to be chunked or not? encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ).std(), ) self.result = self._calculate(data=reference_data) self.result.data[('chunk', 'period')] = 'reference' return self def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: """Calculates the data reconstruction drift for a given data set.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, data) self.continuous_column_names, self.categorical_column_names = _split_features_by_type(data, self.column_names) chunks = self.chunker.split(data, columns=self.column_names) res = pd.DataFrame.from_records( [ { 'key': chunk.key, 'chunk_index': chunk.chunk_index, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, 'period': 'analysis', 'sampling_error': sampling_error(self._sampling_error_components, chunk.data), 'reconstruction_error': _calculate_reconstruction_error_for_data( column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names, data=chunk.data, encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ).mean(), } for chunk in chunks ] ) res['upper_confidence_bound'] = res['reconstruction_error'] + SAMPLING_ERROR_RANGE * res['sampling_error'] res['lower_confidence_bound'] = res['reconstruction_error'] - SAMPLING_ERROR_RANGE * res['sampling_error'] res['lower_threshold'] = [self._lower_alert_threshold] * len(res) res['upper_threshold'] = [self._upper_alert_threshold] * len(res) res['alert'] = _add_alert_flag(res, self._upper_alert_threshold, self._lower_alert_threshold) # type: ignore multilevel_index = _create_multilevel_index() res.columns = multilevel_index res = res.reset_index(drop=True) if self.result is None: self.result = Result( results_data=res, timestamp_column_name=self.timestamp_column_name, column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names, ) else: self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True) return self.result def _calculate_alert_thresholds(self, reference_data) -> Tuple[float, float]: reference_chunks = self.chunker.split(reference_data) # type: ignore reference_reconstruction_error = pd.Series( [ _calculate_reconstruction_error_for_data( column_names=self.column_names, categorical_column_names=self.categorical_column_names, continuous_column_names=self.continuous_column_names, data=chunk.data, encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ).mean() for chunk in reference_chunks ] ) return ( reference_reconstruction_error.mean() + 3 * reference_reconstruction_error.std(), reference_reconstruction_error.mean() - 3 * reference_reconstruction_error.std(), )
def _calculate_reconstruction_error_for_data( column_names: List[str], categorical_column_names: List[str], continuous_column_names: List[str], data: pd.DataFrame, encoder: CountEncoder, scaler: StandardScaler, pca: PCA, imputer_categorical: SimpleImputer, imputer_continuous: SimpleImputer, ) -> pd.Series: """Calculates reconstruction error for a single Chunk. Parameters ---------- column_names : List[str] Subset of features to be included in calculation. categorical_column_names : List[str] Subset of categorical features to be included in calculation. continuous_column_names : List[str] Subset of continuous features to be included in calculation. data : pd.DataFrame The dataset to calculate reconstruction error on encoder : category_encoders.CountEncoder Encoder used to transform categorical features into a numerical representation scaler : sklearn.preprocessing.StandardScaler Standardize features by removing the mean and scaling to unit variance pca : sklearn.decomposition.PCA Linear dimensionality reduction using Singular Value Decomposition of the data to project it to a lower dimensional space. imputer_categorical: SimpleImputer The SimpleImputer fitted to impute categorical features in the data. imputer_continuous: SimpleImputer The SimpleImputer fitted to impute continuous features in the data. Returns ------- rce_for_chunk: pd.DataFrame A pandas.DataFrame containing the Chunk key and reconstruction error for the given Chunk data. """ # encode categorical features data = data.copy(deep=True).reset_index(drop=True) # Impute missing values if len(categorical_column_names) > 0: data[categorical_column_names] = imputer_categorical.transform(data[categorical_column_names]) if len(continuous_column_names) > 0: data[continuous_column_names] = imputer_continuous.transform(data[continuous_column_names]) data[column_names] = encoder.transform(data[column_names]) # scale all features data[column_names] = scaler.transform(data[column_names]) # perform dimensionality reduction reduced_data = pca.transform(data[column_names]) # perform reconstruction reconstructed = pca.inverse_transform(reduced_data) reconstructed_feature_column_names = [f'rf_{col}' for col in column_names] reconstructed_data = pd.DataFrame(reconstructed, columns=reconstructed_feature_column_names) # combine preprocessed rows with reconstructed rows data = pd.concat([data, reconstructed_data], axis=1) # calculate reconstruction error using euclidian norm (row-wise between preprocessed and reconstructed value) data = data.assign(rc_error=lambda x: _calculate_distance(data, column_names, reconstructed_feature_column_names)) return data['rc_error'] def _calculate_distance(df: pd.DataFrame, features_preprocessed: List[str], features_reconstructed: List[str]): """Calculate row-wise euclidian distance between preprocessed and reconstructed feature values.""" x1 = df[features_preprocessed] x2 = df[features_reconstructed] x2.columns = x1.columns x = x1.subtract(x2) x['rc_error'] = x.apply(lambda row: np.linalg.norm(row), axis=1) return x['rc_error'] def _add_alert_flag(drift_result: pd.DataFrame, upper_threshold: float, lower_threshold: float) -> pd.Series: alert = drift_result.apply( lambda row: True if (row['reconstruction_error'] > upper_threshold or row['reconstruction_error'] < lower_threshold) else False, axis=1, ) return alert
[docs]def sampling_error(components: Tuple, data: pd.DataFrame) -> float: return components[0] / np.sqrt(len(data))
def _create_multilevel_index(): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] method_column_names = [ 'sampling_error', 'value', 'upper_confidence_boundary', 'lower_confidence_boundary', 'upper_threshold', 'lower_threshold', 'alert', ] chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] reconstruction_tuples = [('reconstruction_error', column_name) for column_name in method_column_names] tuples = chunk_tuples + reconstruction_tuples return MultiIndex.from_tuples(tuples)