# Source code for nannyml.drift.univariate.calculator

```#  Author:   Niels Nuyttens  <niels@nannyml.com>
#

"""Calculates drift for individual columns.

Supported drift detection methods are:

- Kolmogorov-Smirnov statistic (continuous)
- Wasserstein distance (continuous)
- Chi-squared statistic (categorical)
- L-infinity distance (categorical)
- Jensen-Shannon distance
- Hellinger distance

For help selecting the correct univariate drift detection method for your use case, check the `method selection guide`_.

.. _tutorial:

.. _deep dive:

.. _method selection guide:
"""

from __future__ import annotations

import warnings
from typing import Any, Dict, List, Optional, Union

import pandas as pd
from pandas import MultiIndex

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.drift.univariate.methods import FeatureType, Method, MethodFactory
from nannyml.drift.univariate.result import Result
from nannyml.exceptions import InvalidArgumentsException
from nannyml.thresholds import ConstantThreshold, StandardDeviationThreshold, Threshold
from nannyml.usage_logging import UsageEvent, log_usage

DEFAULT_THRESHOLDS: Dict[str, Threshold] = {
'kolmogorov_smirnov': StandardDeviationThreshold(std_lower_multiplier=None),
'chi2': StandardDeviationThreshold(),  # currently ignored
'jensen_shannon': ConstantThreshold(lower=None, upper=0.1),
'wasserstein': StandardDeviationThreshold(std_lower_multiplier=None),
'hellinger': ConstantThreshold(lower=None, upper=0.1),
'l_infinity': ConstantThreshold(lower=None, upper=0.1),
}

[docs]class UnivariateDriftCalculator(AbstractCalculator):
"""Calculates drift for individual features."""

def __init__(
self,
column_names: Union[str, List[str]],
treat_as_categorical: Optional[Union[str, List[str]]] = None,
timestamp_column_name: Optional[str] = None,
categorical_methods: Optional[Union[str, List[str]]] = None,
continuous_methods: Optional[Union[str, List[str]]] = None,
chunk_size: Optional[int] = None,
chunk_number: Optional[int] = None,
chunk_period: Optional[str] = None,
chunker: Optional[Chunker] = None,
thresholds: Optional[Dict[str, Threshold]] = None,
computation_params: Optional[dict[str, Any]] = None,
):
"""Creates a new UnivariateDriftCalculator instance.

Parameters
----------
column_names: Union[str, List[str]]
A string or list containing the names of features in the provided data set.
A drift score will be calculated for each entry in this list.
treat_as_categorical: Union[str, List[str]]
A single column name or list of column names to be treated as categorical by the calculator.
timestamp_column_name: str
The name of the column containing the timestamp of the model prediction.
categorical_methods: Union[str, List[str]], default=['jensen_shannon']
A method name or list of method names that will be performed on categorical columns.
Supported methods for categorical variables:

- `jensen_shannon`
- `chi2`
- `hellinger`
- `l_infinity`
continuous_methods: Union[str, List[str]], default=['jensen_shannon']
A method name list of method names that will be performed on continuous columns.
Supported methods for continuous variables:

- `jensen_shannon`
- `kolmogorov_smirnov`
- `hellinger`
- `wasserstein`
chunk_size: int
Splits the data into chunks containing `chunks_size` observations.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_number: int
Splits the data into `chunk_number` pieces.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunk_period: str
Splits the data according to the given period.
Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given.
chunker : Chunker
The `Chunker` used to split the data sets into a lists of chunks.
thresholds: dict

Defaults to::

{
'kolmogorov_smirnov': StandardDeviationThreshold(std_lower_multiplier=None),
'jensen_shannon': ConstantThreshold(upper=0.1),
'wasserstein': StandardDeviationThreshold(std_lower_multiplier=None),
'hellinger': ConstantThreshold(upper=0.1),
'l_infinity': ConstantThreshold(upper=0.1)
}

A dictionary allowing users to set a custom threshold for each method. It links a `Threshold` subclass
to a method name. This dictionary is optional.
When a dictionary is given its values will override the default values. If no dictionary is given a default
will be applied. The default method thresholds are as follows:

- `kolmogorov_smirnov`: `StandardDeviationThreshold(std_lower_multiplier=None)`
- `jensen_shannon`: `ConstantThreshold(upper=0.1)`
- `wasserstein`: `StandardDeviationThreshold(std_lower_multiplier=None)`
- `hellinger`: `ConstantThreshold(upper=0.1)`
- `l_infinity`: `ConstantThreshold(upper=0.1)`

The `chi2` method does not support custom thresholds for now. Additional research is required to determine
how to transition from its current p-value based implementation.

computation_params : dict

Defaults to::

{
'kolmogorov_smirnov': {
'calculation_method': 'auto',
'n_bins':10 000
},
'wasserstein': {
'calculation_method': 'auto',
'n_bins':10 000
}
}

A dictionary which allows users to specify whether they want drift calculated on
the exact reference data or an estimated distribution of the reference data obtained
using binning techniques. Applicable only to Kolmogorov-Smirnov and Wasserstein.

`calculation_method`: Specify whether the entire or the binned reference data will be stored.

The default value is `auto`.

- `auto` : Use `exact` for reference data smaller than 10 000 rows, `estimated` for larger.
- `exact` : Store the whole reference data.

When calculating on chunk `scipy.stats.ks_2samp(reference, chunk,  method = `exact` )`
is called and whole reference and chunk vectors are passed.
- `estimated` : Store reference data binned into `n_bins` (default=10 000).

The D-statistic will be calculated based on binned eCDF.
Bins are quantile-based for Kolmogorov-Smirnov and equal-width based for Wasserstein.
Notice that for the reference data of 10 000 rows the resulting D-statistic for exact and
estimated methods should be the same. The pvalue in that method is calculated using asymptotic
distribution of test statistic (as it is in the `scipy.stats.ks_2samp` with method = `asymp` ).

`n_bins` : Number of bins used to bin data when calculation_method = `estimated`.

The default value is 10 000. The larger the value the more precise the calculation
(closer to  calculation_method = `exact` ) but more data will be stored in the fitted calculator.

Examples
--------
>>> import nannyml as nml
>>> reference, analysis, _ = nml.load_synthetic_car_price_dataset()
>>> column_names = [col for col in reference.columns if col not in ['timestamp', 'y_pred', 'y_true']]
>>> calc = nml.UnivariateDriftCalculator(
...   column_names=column_names,
...   timestamp_column_name='timestamp',
...   continuous_methods=['kolmogorov_smirnov', 'jensen_shannon', 'wasserstein'],
...   categorical_methods=['chi2', 'jensen_shannon', 'l_infinity'],
... ).fit(reference)
>>> res = calc.calculate(analysis)
>>> res = res.filter(period='analysis')
>>> for column_name in res.continuous_column_names:
...  for method in res.continuous_method_names:
...    res.plot(kind='drift', column_name=column_name, method=method).show()
"""
super(UnivariateDriftCalculator, self).__init__(
chunk_size,
chunk_number,
chunk_period,
chunker,
timestamp_column_name,
)
if isinstance(column_names, str):
column_names = [column_names]
self.column_names = column_names

if not treat_as_categorical:
treat_as_categorical = []
if isinstance(treat_as_categorical, str):
treat_as_categorical = [treat_as_categorical]
self.treat_as_categorical = treat_as_categorical

if not continuous_methods:
continuous_methods = ['jensen_shannon']
elif isinstance(continuous_methods, str):
continuous_methods = [continuous_methods]
self.continuous_method_names = continuous_methods

if not categorical_methods:
categorical_methods = ['jensen_shannon']
elif isinstance(categorical_methods, str):
categorical_methods = [categorical_methods]
self.categorical_method_names: List[str] = categorical_methods

self.computation_params: Optional[Dict[str, Any]] = computation_params

# Setting thresholds: update default values with custom values if given
self.thresholds = DEFAULT_THRESHOLDS
if thresholds is not None:
if 'chi2' in thresholds:
msg = "ignoring custom threshold for 'chi2' as it does not support custom thresholds for now."
self._logger.warning(msg)
warnings.warn(msg)

# thresholds.pop('chi2')  # chi2 has no custom threshold support for now
self.thresholds.update(**thresholds)

# set to default values within the method function in methods.py

self._column_to_models_mapping: Dict[str, List[Method]] = {column_name: [] for column_name in column_names}

# required for distribution plots
self.previous_reference_results: Optional[pd.DataFrame] = None
self.previous_analysis_data: Optional[pd.DataFrame] = None

self.result: Optional[Result] = None

@log_usage(
)
def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> UnivariateDriftCalculator:
"""Fits the drift calculator using a set of reference data."""
if reference_data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')

_list_missing(self.column_names, reference_data)

self.continuous_column_names, self.categorical_column_names = _split_features_by_type(
reference_data, self.column_names
)

for column_name in self.treat_as_categorical:
if column_name not in self.column_names:
self._logger.info(
f"ignoring 'treat_as_categorical' value '{column_name}' because it was not in "
f"listed column names"
)
break
if column_name in self.continuous_column_names:
self.continuous_column_names.remove(column_name)
if column_name not in self.categorical_column_names:
self.categorical_column_names.append(column_name)

for column_name in self.continuous_column_names:
self._column_to_models_mapping[column_name] += [
MethodFactory.create(
key=method,
feature_type=FeatureType.CONTINUOUS,
chunker=self.chunker,
computation_params=self.computation_params or {},
threshold=self.thresholds[method],
).fit(
reference_data=reference_data[column_name],
timestamps=reference_data[self.timestamp_column_name] if self.timestamp_column_name else None,
)
for method in self.continuous_method_names
]

for column_name in self.categorical_column_names:
self._column_to_models_mapping[column_name] += [
MethodFactory.create(
key=method,
feature_type=FeatureType.CATEGORICAL,
chunker=self.chunker,
threshold=self.thresholds[method],
).fit(
reference_data=reference_data[column_name],
timestamps=reference_data[self.timestamp_column_name] if self.timestamp_column_name else None,
)
for method in self.categorical_method_names
]

self.result = self._calculate(reference_data)
self.result.data['chunk', 'chunk', 'period'] = 'reference'
self.result.reference_data = reference_data.copy()

return self

@log_usage(
)
def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result:
"""Calculates methods for both categorical and continuous columns."""
if data.empty:
raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.')

_list_missing(self.column_names, data)

chunks = self.chunker.split(data)

rows = []
for chunk in chunks:
row = {
'key': chunk.key,
'chunk_index': chunk.chunk_index,
'start_index': chunk.start_index,
'end_index': chunk.end_index,
'start_datetime': chunk.start_datetime,
'end_datetime': chunk.end_datetime,
'period': 'analysis',
}

for column_name in self.continuous_column_names:
for method in self._column_to_models_mapping[column_name]:
for k, v in _calculate_for_column(chunk.data, column_name, method).items():
row[f'{column_name}_{method.column_name}_{k}'] = v

for column_name in self.categorical_column_names:
for method in self._column_to_models_mapping[column_name]:
for k, v in _calculate_for_column(chunk.data, column_name, method).items():
row[f'{column_name}_{method.column_name}_{k}'] = v

rows.append(row)

result_index = _create_multilevel_index(
continuous_column_names=self.continuous_column_names,
continuous_method_names=[m for m in self.continuous_method_names],
categorical_column_names=self.categorical_column_names,
categorical_method_names=[m for m in self.categorical_method_names],
)
res = pd.DataFrame(rows)
res.columns = result_index
res = res.reset_index(drop=True)

if self.result is None:
self.result = Result(
results_data=res,
column_names=self.column_names,
continuous_column_names=self.continuous_column_names,
categorical_column_names=self.categorical_column_names,
continuous_method_names=self.continuous_method_names,
categorical_method_names=self.categorical_method_names,
timestamp_column_name=self.timestamp_column_name,
chunker=self.chunker,
)
else:
# TODO: review subclassing setup => superclass + '_filter' is screwing up typing.
#       Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK
#       but this causes us to lose the "common behavior" in the top level 'filter' method when overriding.
#       Applicable here but to many of the base classes as well (e.g. fitting and calculating)
self.result = self.result.filter(period='reference')
self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True)
self.result.analysis_data = data.copy()

return self.result

def _calculate_for_column(data: pd.DataFrame, column_name: str, method: Method) -> Dict[str, Any]:
result = {}
value = method.calculate(data[column_name])
result['value'] = value
result['upper_threshold'] = method.upper_threshold_value
result['lower_threshold'] = method.lower_threshold_value
return result

def _create_multilevel_index(
continuous_column_names: List[str],
categorical_column_names: List[str],
continuous_method_names: List[str],
categorical_method_names: List[str],
):
chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period']
method_column_names = ['value', 'upper_threshold', 'lower_threshold', 'alert']
chunk_tuples = [('chunk', 'chunk', chunk_column_name) for chunk_column_name in chunk_column_names]
continuous_column_tuples = [
(column_name, method_name, method_column_name)
for column_name in continuous_column_names
for method_name in continuous_method_names
for method_column_name in method_column_names
]

categorical_column_tuples = [
(column_name, method_name, method_column_name)
for column_name in categorical_column_names
for method_name in categorical_method_names
for method_column_name in method_column_names
]

tuples = chunk_tuples + continuous_column_tuples + categorical_column_tuples

return MultiIndex.from_tuples(tuples)
```