Source code for nannyml.drift.univariate.calculator

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Calculates drift for individual features using the `Kolmogorov-Smirnov` and `chi2-contingency` statistical tests."""

from __future__ import annotations

from typing import Any, Dict, List, Optional

import pandas as pd
from pandas import MultiIndex

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.drift.univariate.methods import FeatureType, Method, MethodFactory
from nannyml.drift.univariate.result import Result
from nannyml.exceptions import InvalidArgumentsException


[docs]class UnivariateDriftCalculator(AbstractCalculator): """Calculates drift for individual features.""" def __init__( self, column_names: List[str], timestamp_column_name: Optional[str] = None, categorical_methods: List[str] = None, continuous_methods: List[str] = None, chunk_size: int = None, chunk_number: int = None, chunk_period: str = None, chunker: Chunker = None, ): """Creates a new UnivariateDriftCalculator instance. Parameters ---------- column_names: List[str] A list containing the names of features in the provided data set. A drift score will be calculated for each entry in this list. timestamp_column_name: str The name of the column containing the timestamp of the model prediction. categorical_methods: List[str], default=None A list of method names that will be performed on categorical columns. When not given all available methods supporting categorical columns will be used. continuous_methods: List[str], default=None A list of method names that will be performed on continuous columns. When not given all available methods supporting continuous columns will be used. chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. Examples -------- >>> import nannyml as nml >>> reference, analysis, _ = nml.load_synthetic_car_price_dataset() >>> column_names = [col for col in reference.columns if col not in ['timestamp', 'y_pred', 'y_true']] >>> calc = nml.UnivariateDriftCalculator( ... column_names=column_names, ... timestamp_column_name='timestamp', ... continuous_methods=['kolmogorov_smirnov', 'jensen_shannon'], ... categorical_methods=['chi2', 'jensen_shannon'], ... ).fit(reference) >>> res = calc.calculate(analysis) >>> res = res.filter(period='analysis') >>> for column_name in res.continuous_column_names: ... for method in res.continuous_method_names: ... res.plot(kind='drift', column_name=column_name, method=method).show() """ super(UnivariateDriftCalculator, self).__init__( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) self.column_names = column_names self.continuous_method_names = continuous_methods or ['kolmogorov_smirnov'] self.categorical_method_names = categorical_methods or ['chi2'] self._column_to_models_mapping: Dict[str, List[Method]] = {column_name: [] for column_name in column_names} # required for distribution plots self.previous_reference_results: Optional[pd.DataFrame] = None self.previous_analysis_data: Optional[pd.DataFrame] = None self.result: Optional[Result] = None def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateDriftCalculator: """Fits the drift calculator using a set of reference data.""" if reference_data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, reference_data) self.continuous_column_names, self.categorical_column_names = _split_features_by_type( reference_data, self.column_names ) for column_name in self.continuous_column_names: self._column_to_models_mapping[column_name] += [ MethodFactory.create(key=method, feature_type=FeatureType.CONTINUOUS).fit(reference_data[column_name]) for method in self.continuous_method_names ] for column_name in self.categorical_column_names: self._column_to_models_mapping[column_name] += [ MethodFactory.create(key=method, feature_type=FeatureType.CATEGORICAL).fit(reference_data[column_name]) for method in self.categorical_method_names ] self.result = self._calculate(reference_data) self.result.data['chunk', 'chunk', 'period'] = 'reference' self.result.reference_data = reference_data.copy() return self def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: """Calculates methods for both categorical and continuous columns.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, data) chunks = self.chunker.split(data) rows = [] for chunk in chunks: row = { 'key': chunk.key, 'chunk_index': chunk.chunk_index, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_datetime': chunk.start_datetime, 'end_datetime': chunk.end_datetime, 'period': 'analysis', } for column_name in self.continuous_column_names: for method in self._column_to_models_mapping[column_name]: for k, v in _calculate_for_column(chunk.data, column_name, method).items(): row[f'{column_name}_{method.column_name}_{k}'] = v for column_name in self.categorical_column_names: for method in self._column_to_models_mapping[column_name]: for k, v in _calculate_for_column(chunk.data, column_name, method).items(): row[f'{column_name}_{method.column_name}_{k}'] = v rows.append(row) result_index = _create_multilevel_index( continuous_column_names=self.continuous_column_names, continuous_method_names=[m for m in self.continuous_method_names], categorical_column_names=self.categorical_column_names, categorical_method_names=[m for m in self.categorical_method_names], ) res = pd.DataFrame(rows) res.columns = result_index res = res.reset_index(drop=True) if self.result is None: self.result = Result( results_data=res, column_names=self.column_names, continuous_column_names=self.continuous_column_names, categorical_column_names=self.categorical_column_names, continuous_method_names=self.continuous_method_names, categorical_method_names=self.categorical_method_names, timestamp_column_name=self.timestamp_column_name, chunker=self.chunker, ) else: # TODO: review subclassing setup => superclass + '_filter' is screwing up typing. # Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK # but this causes us to lose the "common behavior" in the top level 'filter' method when overriding. # Applicable here but to many of the base classes as well (e.g. fitting and calculating) self.result = self.result.filter(period='reference') # type: ignore self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True) self.result.analysis_data = data.copy() return self.result
def _calculate_for_column(data: pd.DataFrame, column_name: str, method: Method) -> Dict[str, Any]: result = {} value = method.calculate(data[column_name]) result['value'] = value result['upper_threshold'] = method.upper_threshold result['lower_threshold'] = method.lower_threshold result['alert'] = method.alert(data[column_name]) return result def _create_multilevel_index( continuous_column_names: List[str], categorical_column_names: List[str], continuous_method_names: List[str], categorical_method_names: List[str], ): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] method_column_names = ['value', 'upper_threshold', 'lower_threshold', 'alert'] chunk_tuples = [('chunk', 'chunk', chunk_column_name) for chunk_column_name in chunk_column_names] continuous_column_tuples = [ (column_name, method_name, method_column_name) for column_name in continuous_column_names for method_name in continuous_method_names for method_column_name in method_column_names ] categorical_column_tuples = [ (column_name, method_name, method_column_name) for column_name in categorical_column_names for method_name in categorical_method_names for method_column_name in method_column_names ] tuples = chunk_tuples + continuous_column_tuples + categorical_column_tuples return MultiIndex.from_tuples(tuples)