Source code for nannyml.drift.model_inputs.univariate.statistical.calculator

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Statistical drift calculation using `Kolmogorov-Smirnov` and `chi2-contingency` tests."""
from typing import Any, Dict, List, cast

import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, ks_2samp

from nannyml.chunk import Chunker
from nannyml.drift.base import DriftCalculator
from nannyml.drift.model_inputs.univariate.statistical.results import UnivariateDriftResult
from nannyml.exceptions import CalculatorNotFittedException, MissingMetadataException
from nannyml.metadata import BinaryClassificationMetadata, MulticlassClassificationMetadata
from nannyml.metadata.base import NML_METADATA_COLUMNS, NML_METADATA_PARTITION_COLUMN_NAME, ModelMetadata
from nannyml.preprocessing import preprocess

ALERT_THRESHOLD_P_VALUE = 0.05


[docs]class UnivariateStatisticalDriftCalculator(DriftCalculator): """A drift calculator that relies on statistics to detect drift.""" def __init__( self, model_metadata: ModelMetadata, features: List[str] = None, chunk_size: int = None, chunk_number: int = None, chunk_period: str = None, chunker: Chunker = None, ): """Constructs a new UnivariateStatisticalDriftCalculator. Parameters ---------- model_metadata: ModelMetadata Metadata for the model whose data is to be processed. features: List[str], default=None An optional list of feature names to use during drift calculation. None by default, in this case all features are used during calculation. chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. Examples -------- >>> import nannyml as nml >>> ref_df, ana_df, _ = nml.load_synthetic_binary_classification_dataset() >>> metadata = nml.extract_metadata(ref_df) >>> # Create a calculator that will chunk by week >>> drift_calc = nml.UnivariateStatisticalDriftCalculator(model_metadata=metadata, chunk_period='W') """ super(UnivariateStatisticalDriftCalculator, self).__init__( model_metadata, features, chunk_size, chunk_number, chunk_period, chunker ) # add predicted probabilities from metadata to the selected features if isinstance(model_metadata, BinaryClassificationMetadata): if model_metadata.predicted_probability_column_name is None: raise MissingMetadataException( "missing value for 'predicted_probability_column_name'. " "Please update your model metadata accordingly." ) self.__predicted_probabilities_column_names = [ cast(BinaryClassificationMetadata, self.model_metadata).predicted_probability_column_name ] elif isinstance(model_metadata, MulticlassClassificationMetadata): if model_metadata.predicted_probabilities_column_names is None: raise MissingMetadataException( "missing value for 'predicted_probability_column_name'. " "Please update your model metadata accordingly." ) md = cast(MulticlassClassificationMetadata, self.model_metadata) self.__predicted_probabilities_column_names = list(md.predicted_probabilities_column_names.values()) self.selected_features += self.__predicted_probabilities_column_names self._reference_data = None
[docs] def fit(self, reference_data: pd.DataFrame): """Fits the drift calculator using a set of reference data. Parameters ---------- reference_data : pd.DataFrame A reference data set containing predictions (labels and/or probabilities) and target values. Returns ------- calculator: DriftCalculator The fitted calculator. Examples -------- >>> import nannyml as nml >>> ref_df, ana_df, _ = nml.load_synthetic_binary_classification_dataset() >>> metadata = nml.extract_metadata(ref_df, model_type=nml.ModelType.CLASSIFICATION_BINARY) >>> # Create a calculator and fit it >>> drift_calc = nml.UnivariateStatisticalDriftCalculator(model_metadata=metadata, chunk_period='W').fit(ref_df) """ reference_data = preprocess(data=reference_data, metadata=self.model_metadata, reference=True) self._reference_data = reference_data.copy(deep=True) return self
[docs] def calculate( self, data: pd.DataFrame, ) -> UnivariateDriftResult: """Calculates the data reconstruction drift for a given data set. Parameters ---------- data : pd.DataFrame The dataset to calculate the reconstruction drift for. Returns ------- reconstruction_drift: UnivariateDriftResult A :class:`result<nannyml.drift.model_inputs.univariate.statistical.results.UnivariateDriftResult>` object where each row represents a :class:`~nannyml.chunk.Chunk`, containing :class:`~nannyml.chunk.Chunk` properties and the reconstruction_drift calculated for that :class:`~nannyml.chunk.Chunk`. Examples -------- >>> import nannyml as nml >>> ref_df, ana_df, _ = nml.load_synthetic_binary_classification_dataset() >>> metadata = nml.extract_metadata(ref_df, model_type=nml.ModelType.CLASSIFICATION_BINARY) >>> # Create a calculator and fit it >>> drift_calc = nml.UnivariateStatisticalDriftCalculator(model_metadata=metadata, chunk_period='W').fit(ref_df) >>> drift = drift_calc.calculate(data) """ data = preprocess(data=data, metadata=self.model_metadata) # Get lists of categorical <-> categorical features categorical_column_names = [f.column_name for f in self.model_metadata.categorical_features] continuous_column_names = [ f.column_name for f in self.model_metadata.continuous_features ] + self.__predicted_probabilities_column_names features_and_metadata = NML_METADATA_COLUMNS + self.selected_features chunks = self.chunker.split(data, columns=features_and_metadata, minimum_chunk_size=500) chunk_drifts = [] # Calculate chunk-wise drift statistics. # Append all into resulting DataFrame indexed by chunk key. for chunk in chunks: chunk_drift: Dict[str, Any] = { 'key': chunk.key, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, 'partition': 'analysis' if chunk.is_transition else chunk.partition, } present_categorical_column_names = list(set(chunk.data.columns) & set(categorical_column_names)) for column in present_categorical_column_names: statistic, p_value, _, _ = chi2_contingency( pd.concat( [ self._reference_data[column].value_counts(), # type: ignore chunk.data[column].value_counts(), ], axis=1, ).fillna(0) ) chunk_drift[f'{column}_chi2'] = statistic chunk_drift[f'{column}_p_value'] = np.round(p_value, decimals=3) chunk_drift[f'{column}_alert'] = (p_value < ALERT_THRESHOLD_P_VALUE) and ( chunk.data[NML_METADATA_PARTITION_COLUMN_NAME] == 'analysis' ).all() chunk_drift[f'{column}_threshold'] = ALERT_THRESHOLD_P_VALUE present_continuous_column_names = list(set(chunk.data.columns) & set(continuous_column_names)) for column in present_continuous_column_names: statistic, p_value = ks_2samp(self._reference_data[column], chunk.data[column]) # type: ignore chunk_drift[f'{column}_dstat'] = statistic chunk_drift[f'{column}_p_value'] = np.round(p_value, decimals=3) chunk_drift[f'{column}_alert'] = (p_value < ALERT_THRESHOLD_P_VALUE) and ( chunk.data[NML_METADATA_PARTITION_COLUMN_NAME] == 'analysis' ).all() chunk_drift[f'{column}_threshold'] = ALERT_THRESHOLD_P_VALUE chunk_drifts.append(chunk_drift) res = pd.DataFrame.from_records(chunk_drifts) res = res.reset_index(drop=True) res.attrs['nml_drift_calculator'] = __name__ if self.chunker is None: raise CalculatorNotFittedException( 'chunker has not been set. ' 'Please ensure you run ``calculator.fit()`` ' 'before running ``calculator.calculate()``' ) return UnivariateDriftResult(analysis_data=chunks, drift_data=res, model_metadata=self.model_metadata)