Source code for nannyml.drift.model_inputs.univariate.statistical.calculator

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Calculates drift for individual features using the `Kolmogorov-Smirnov` and `chi2-contingency` statistical tests."""

from __future__ import annotations

from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd
from scipy.stats import chi2_contingency, ks_2samp

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.drift.model_inputs.univariate.statistical.results import UnivariateStatisticalDriftCalculatorResult
from nannyml.exceptions import InvalidArgumentsException

ALERT_THRESHOLD_P_VALUE = 0.05


[docs]class UnivariateStatisticalDriftCalculator(AbstractCalculator): """Calculates drift for individual features using statistical tests.""" def __init__( self, feature_column_names: List[str], timestamp_column_name: str, chunk_size: int = None, chunk_number: int = None, chunk_period: str = None, chunker: Chunker = None, ): """Creates a new UnivariateStatisticalDriftCalculator instance. Parameters ---------- feature_column_names: List[str] A list containing the names of features in the provided data set. A drift score will be calculated for each entry in this list. timestamp_column_name: str The name of the column containing the timestamp of the model prediction. chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. Examples -------- >>> import nannyml as nml >>> >>> reference_df, analysis_df, _ = nml.load_synthetic_binary_classification_dataset() >>> >>> feature_column_names = [col for col in reference_df.columns >>> if col not in ['y_pred', 'y_pred_proba', 'work_home_actual', 'timestamp']] >>> calc = nml.UnivariateStatisticalDriftCalculator( >>> feature_column_names=feature_column_names, >>> timestamp_column_name='timestamp' >>> ) >>> calc.fit(reference_df) >>> results = calc.calculate(analysis_df) >>> print(results.data) # check the numbers key start_index ... identifier_alert identifier_threshold 0 [0:4999] 0 ... True 0.05 1 [5000:9999] 5000 ... True 0.05 2 [10000:14999] 10000 ... True 0.05 3 [15000:19999] 15000 ... True 0.05 4 [20000:24999] 20000 ... True 0.05 5 [25000:29999] 25000 ... True 0.05 6 [30000:34999] 30000 ... True 0.05 7 [35000:39999] 35000 ... True 0.05 8 [40000:44999] 40000 ... True 0.05 9 [45000:49999] 45000 ... True 0.05 >>> fig = results.plot(kind='feature_drift', plot_reference=True, feature_column_name='distance_from_office') >>> fig.show() """ super(UnivariateStatisticalDriftCalculator, self).__init__(chunk_size, chunk_number, chunk_period, chunker) self.feature_column_names = feature_column_names self.continuous_column_names: List[str] = [] self.categorical_column_names: List[str] = [] self.timestamp_column_name = timestamp_column_name # required for distribution plots self.previous_reference_data: Optional[pd.DataFrame] = None self.previous_reference_results: Optional[pd.DataFrame] = None self.previous_analysis_data: Optional[pd.DataFrame] = None def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateStatisticalDriftCalculator: """Fits the drift calculator using a set of reference data.""" if reference_data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.feature_column_names, reference_data) # Reference stability self._reference_stability = 0 # TODO: Jakub self.previous_reference_data = reference_data.copy() self.previous_reference_results = self._calculate(self.previous_reference_data).data return self def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> UnivariateStatisticalDriftCalculatorResult: """Calculates the data reconstruction drift for a given data set.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.feature_column_names, data) self.continuous_column_names, self.categorical_column_names = _split_features_by_type( data, self.feature_column_names ) chunks = self.chunker.split(data, self.timestamp_column_name) chunk_drifts = [] # Calculate chunk-wise drift statistics. # Append all into resulting DataFrame indexed by chunk key. for chunk in chunks: chunk_drift: Dict[str, Any] = { 'key': chunk.key, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, } for column in self.categorical_column_names: statistic, p_value, _, _ = chi2_contingency( pd.concat( [ self.previous_reference_data[column].value_counts(), # type: ignore chunk.data[column].value_counts(), ], axis=1, ).fillna(0) ) chunk_drift[f'{column}_chi2'] = statistic chunk_drift[f'{column}_p_value'] = np.round(p_value, decimals=3) chunk_drift[f'{column}_alert'] = p_value < ALERT_THRESHOLD_P_VALUE chunk_drift[f'{column}_threshold'] = ALERT_THRESHOLD_P_VALUE for column in self.continuous_column_names: statistic, p_value = ks_2samp(self.previous_reference_data[column], chunk.data[column]) # type: ignore chunk_drift[f'{column}_dstat'] = statistic chunk_drift[f'{column}_p_value'] = np.round(p_value, decimals=3) chunk_drift[f'{column}_alert'] = p_value < ALERT_THRESHOLD_P_VALUE chunk_drift[f'{column}_threshold'] = ALERT_THRESHOLD_P_VALUE chunk_drifts.append(chunk_drift) res = pd.DataFrame.from_records(chunk_drifts) res = res.reset_index(drop=True) self.previous_analysis_data = data from nannyml.drift.model_inputs.univariate.statistical.results import UnivariateStatisticalDriftCalculatorResult return UnivariateStatisticalDriftCalculatorResult(results_data=res, calculator=self)