Source code for nannyml.drift.model_inputs.multivariate.data_reconstruction.calculator

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#            Nikolaos Perrakis  <nikos@nannyml.com>
#  License: Apache Software License 2.0

"""Drift calculator using Reconstruction Error as a measure of drift."""

from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from category_encoders import CountEncoder
from sklearn.decomposition import PCA
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler

from nannyml.chunk import Chunker
from nannyml.drift import DriftCalculator
from nannyml.drift.model_inputs.multivariate.data_reconstruction.results import DataReconstructionDriftCalculatorResult
from nannyml.metadata.base import NML_METADATA_COLUMNS, Feature
from nannyml.preprocessing import preprocess


[docs]class DataReconstructionDriftCalculator(DriftCalculator): """BaseDriftCalculator implementation using Reconstruction Error as a measure of drift.""" def __init__( self, model_metadata, features: List[str] = None, n_components: Union[int, float, str] = 0.65, chunk_size: int = None, chunk_number: int = None, chunk_period: str = None, chunker: Chunker = None, imputer_categorical: SimpleImputer = None, imputer_continuous: SimpleImputer = None, ): """Creates a new DataReconstructionDriftCalculator instance. Parameters ---------- model_metadata: ModelMetadata Metadata for the model whose data is to be processed. features: List[str], default=None An optional list of feature names to use during drift calculation. None by default, in this case all features are used during calculation. n_components: Union[int, float, str] The n_components parameter as passed to the sklearn.decomposition.PCA constructor. See https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. imputer_categorical: SimpleImputer The SimpleImputer used to impute categorical features in the data. Defaults to using most_frequent value. imputer_continuous: SimpleImputer The SimpleImputer used to impute continuous features in the data. Defaults to using mean value. Examples -------- >>> import nannyml as nml >>> ref_df, ana_df, _ = nml.load_synthetic_binary_classification_dataset() >>> metadata = nml.extract_metadata(ref_df, model_type=nml.ModelType.CLASSIFICATION_BINARY) >>> # Create a calculator that will chunk by week >>> drift_calc = nml.DataReconstructionDriftCalculator(model_metadata=metadata, chunk_period='W') """ super(DataReconstructionDriftCalculator, self).__init__( model_metadata, features, chunk_size, chunk_number, chunk_period, chunker ) self._n_components = n_components self._scaler = None self._encoder = None self._pca = None self._upper_alert_threshold: Optional[float] = None self._lower_alert_threshold: Optional[float] = None if imputer_categorical: if not isinstance(imputer_categorical, SimpleImputer): raise TypeError("imputer_categorical needs to be an instantiated SimpleImputer object.") if imputer_categorical.strategy not in ["most_frequent", "constant"]: raise ValueError("Please use a SimpleImputer strategy appropriate for categorical features.") else: imputer_categorical = SimpleImputer(missing_values=np.nan, strategy='most_frequent') self._imputer_categorical = imputer_categorical if imputer_continuous: if not isinstance(imputer_continuous, SimpleImputer): raise TypeError("imputer_continuous needs to be an instantiated SimpleImputer object.") else: imputer_continuous = SimpleImputer(missing_values=np.nan, strategy='mean') self._imputer_continuous = imputer_continuous
[docs] def fit(self, reference_data: pd.DataFrame): """Fits the drift calculator using a set of reference data. Parameters ---------- reference_data : pd.DataFrame A reference data set containing predictions (labels and/or probabilities) and target values. Returns ------- calculator: DriftCalculator The fitted calculator. Examples -------- >>> import nannyml as nml >>> ref_df, ana_df, _ = nml.load_synthetic_binary_classification_dataset() >>> metadata = nml.extract_metadata(ref_df, model_type=nml.ModelType.CLASSIFICATION_BINARY) >>> # Create a calculator and fit it >>> drift_calc = nml.DataReconstructionDriftCalculator(model_metadata=metadata, chunk_period='W').fit(ref_df) """ reference_data = preprocess(reference_data, self.model_metadata, reference=True) selected_categorical_column_names = _get_selected_feature_names( self.selected_features, self.model_metadata.categorical_features ) selected_continuous_column_names = _get_selected_feature_names( self.selected_features, self.model_metadata.continuous_features ) # TODO: We duplicate the reference data 3 times, here. Improve to something more memory efficient? imputed_reference_data = reference_data.copy(deep=True) if len(selected_categorical_column_names) > 0: imputed_reference_data[selected_categorical_column_names] = self._imputer_categorical.fit_transform( imputed_reference_data[selected_categorical_column_names] ) if len(selected_continuous_column_names) > 0: imputed_reference_data[selected_continuous_column_names] = self._imputer_continuous.fit_transform( imputed_reference_data[selected_continuous_column_names] ) encoder = CountEncoder(cols=selected_categorical_column_names, normalize=True) encoded_reference_data = imputed_reference_data.copy(deep=True) encoded_reference_data[self.selected_features] = encoder.fit_transform( encoded_reference_data[self.selected_features] ) scaler = StandardScaler() scaled_reference_data = pd.DataFrame( scaler.fit_transform(encoded_reference_data[self.selected_features]), columns=self.selected_features ) pca = PCA(n_components=self._n_components, random_state=16) pca.fit(scaled_reference_data[self.selected_features]) self._encoder = encoder self._scaler = scaler self._pca = pca # Calculate thresholds self._upper_alert_threshold, self._lower_alert_threshold = self._calculate_alert_thresholds(reference_data) return self
[docs] def calculate( self, data: pd.DataFrame, ) -> DataReconstructionDriftCalculatorResult: """Calculates the data reconstruction drift for a given data set. Parameters ---------- data : pd.DataFrame The dataset to calculate the reconstruction drift for. Returns ------- reconstruction_drift: DataReconstructionDriftCalculatorResult A :class:`result<nannyml.drift.model_inputs.multivariate.data_reconstruction.results.DataReconstructionDriftCalculatorResult>` object where each row represents a :class:`~nannyml.chunk.Chunk`, containing :class:`~nannyml.chunk.Chunk` properties and the reconstruction_drift calculated for that :class:`~nannyml.chunk.Chunk`. Examples -------- >>> import nannyml as nml >>> ref_df, ana_df, _ = nml.load_synthetic_binary_classification_dataset() >>> metadata = nml.extract_metadata(ref_df, model_type=nml.ModelType.CLASSIFICATION_BINARY) >>> # Create a calculator and fit it >>> drift_calc = nml.DataReconstructionDriftCalculator(model_metadata=metadata, chunk_period='W').fit(ref_df) >>> drift = drift_calc.calculate(data) """ data = preprocess(data, self.model_metadata) selected_categorical_column_names = _get_selected_feature_names( self.selected_features, self.model_metadata.categorical_features ) selected_continuous_column_names = _get_selected_feature_names( self.selected_features, self.model_metadata.continuous_features ) features_and_metadata = NML_METADATA_COLUMNS + self.selected_features chunks = self.chunker.split( data, columns=features_and_metadata, minimum_chunk_size=_minimum_chunk_size(self.selected_features) ) res = pd.DataFrame.from_records( [ { 'key': chunk.key, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, 'partition': 'analysis' if chunk.is_transition else chunk.partition, 'reconstruction_error': _calculate_reconstruction_error_for_data( selected_features=self.selected_features, selected_categorical_features=selected_categorical_column_names, selected_continuous_features=selected_continuous_column_names, data=chunk.data, encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ), } for chunk in chunks ] ) res['lower_threshold'] = [self._lower_alert_threshold] * len(res) res['upper_threshold'] = [self._upper_alert_threshold] * len(res) res['alert'] = _add_alert_flag(res, self._upper_alert_threshold, self._lower_alert_threshold) # type: ignore res = res.reset_index(drop=True) return DataReconstructionDriftCalculatorResult( analysis_data=chunks, drift_data=res, model_metadata=self.model_metadata )
def _calculate_alert_thresholds(self, reference_data) -> Tuple[float, float]: reference_chunks = self.chunker.split(reference_data) # type: ignore selected_categorical_column_names = _get_selected_feature_names( self.selected_features, self.model_metadata.categorical_features ) selected_continuous_column_names = _get_selected_feature_names( self.selected_features, self.model_metadata.continuous_features ) reference_reconstruction_error = pd.Series( [ _calculate_reconstruction_error_for_data( selected_features=self.selected_features, selected_categorical_features=selected_categorical_column_names, selected_continuous_features=selected_continuous_column_names, data=chunk.data, encoder=self._encoder, scaler=self._scaler, pca=self._pca, imputer_categorical=self._imputer_categorical, imputer_continuous=self._imputer_continuous, ) for chunk in reference_chunks ] ) return ( reference_reconstruction_error.mean() + 3 * reference_reconstruction_error.std(), reference_reconstruction_error.mean() - 3 * reference_reconstruction_error.std(), )
def _calculate_reconstruction_error_for_data( selected_features: List[str], selected_categorical_features: List[str], selected_continuous_features: List[str], data: pd.DataFrame, encoder: CountEncoder, scaler: StandardScaler, pca: PCA, imputer_categorical: SimpleImputer, imputer_continuous: SimpleImputer, ) -> pd.DataFrame: """Calculates reconstruction error for a single Chunk. Parameters ---------- selected_features : List[str] Subset of features to be included in calculation. selected_categorical_features : List[str] Subset of categorical features to be included in calculation. selected_continuous_features : List[str] Subset of continuous features to be included in calculation. data : pd.DataFrame The dataset to calculate reconstruction error on encoder : category_encoders.CountEncoder Encoder used to transform categorical features into a numerical representation scaler : sklearn.preprocessing.StandardScaler Standardize features by removing the mean and scaling to unit variance pca : sklearn.decomposition.PCA Linear dimensionality reduction using Singular Value Decomposition of the data to project it to a lower dimensional space. imputer_categorical: SimpleImputer The SimpleImputer fitted to impute categorical features in the data. imputer_continuous: SimpleImputer The SimpleImputer fitted to impute continuous features in the data. Returns ------- rce_for_chunk: pd.DataFrame A pandas.DataFrame containing the Chunk key and reconstruction error for the given Chunk data. """ # encode categorical features data = data.reset_index(drop=True) # Impute missing values if len(selected_categorical_features) > 0: data[selected_categorical_features] = imputer_categorical.transform(data[selected_categorical_features]) if len(selected_continuous_features) > 0: data[selected_continuous_features] = imputer_continuous.transform(data[selected_continuous_features]) data[selected_features] = encoder.transform(data[selected_features]) # scale all features data[selected_features] = scaler.transform(data[selected_features]) # perform dimensionality reduction reduced_data = pca.transform(data[selected_features]) # perform reconstruction reconstructed = pca.inverse_transform(reduced_data) reconstructed_feature_column_names = [f'rf_{col}' for col in selected_features] reconstructed_data = pd.DataFrame(reconstructed, columns=reconstructed_feature_column_names) # combine preprocessed rows with reconstructed rows data = pd.concat([data, reconstructed_data], axis=1) # calculate reconstruction error using euclidian norm (row-wise between preprocessed and reconstructed value) data = data.assign( rc_error=lambda x: _calculate_distance(data, selected_features, reconstructed_feature_column_names) ) res = data['rc_error'].mean() return res def _get_selected_feature_names(selected_features: List[str], features: List[Feature]) -> List[str]: feature_column_names = [f.column_name for f in features] # Calculate intersection return list(set(selected_features) & set(feature_column_names)) def _calculate_distance(df: pd.DataFrame, features_preprocessed: List[str], features_reconstructed: List[str]): """Calculate row-wise euclidian distance between preprocessed and reconstructed feature values.""" x1 = df[features_preprocessed] x2 = df[features_reconstructed] x2.columns = x1.columns x = x1.subtract(x2) x['rc_error'] = x.apply(lambda row: np.linalg.norm(row), axis=1) return x['rc_error'] def _add_alert_flag(drift_result: pd.DataFrame, upper_threshold: float, lower_threshold: float) -> pd.Series: alert = drift_result.apply( lambda row: True if (row['reconstruction_error'] > upper_threshold or row['reconstruction_error'] < lower_threshold) and row['partition'] == 'analysis' else False, axis=1, ) return alert def _minimum_chunk_size( features: List[str] = None, ) -> int: return int(20 * np.power(len(features), 5 / 6)) # type: ignore