Source code for nannyml.stats.sum.calculator

#  Author:   Nikolaos Perrakis  <nikos@nannyml.com>
#
#  License: Apache Software License 2.0

"""Simple Statistics Sum Calculator"""

from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
from pandas import MultiIndex

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker
from nannyml.exceptions import InvalidArgumentsException
from nannyml.sampling_error import SAMPLING_ERROR_RANGE
from nannyml.stats.base import _add_alert_flag
from nannyml.stats.sum.result import Result
from nannyml.thresholds import StandardDeviationThreshold, Threshold, calculate_threshold_values
from nannyml.usage_logging import UsageEvent, log_usage


[docs]class SummaryStatsSumCalculator(AbstractCalculator): """SummaryStatsSumCalculator implementation.""" def __init__( self, column_names: Union[str, List[str]], timestamp_column_name: Optional[str] = None, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, threshold: Threshold = StandardDeviationThreshold(), ): """Creates a new SummaryStatsSumCalculator instance. Parameters ---------- column_names: Union[str, List[str]] A string or list containing the names of features in the provided data set. Missing Values will be calculated for each entry in this list. timestamp_column_name: str The name of the column containing the timestamp of the model prediction. chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. threshold: Appropriate `Threshold` subclass. Defines alert thresholds strategy. Defaults to StandardDeviationThreshold() Examples -------- >>> import nannyml as nml >>> reference, analysis, _ = nml.load_synthetic_car_price_dataset() >>> column_names = ['car_value', 'debt_to_income_ratio', 'driver_tenure'] >>> calc = nml.SummaryStatsSumCalculator( ... column_names=column_names, ... timestamp_column_name='timestamp', ... ).fit(reference) >>> res = calc.calculate(analysis) >>> for column_name in res.column_names: ... res = res.filter(period='analysis', column_name=column_name).plot().show() """ super(SummaryStatsSumCalculator, self).__init__( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) if isinstance(column_names, str): self.column_names = [column_names] elif isinstance(column_names, list): for el in column_names: if not isinstance(el, str): raise InvalidArgumentsException( f"column_names elements should be either a column name string or a list of strings, found\n{el}" ) self.column_names = column_names else: raise InvalidArgumentsException( "column_names should be either a column name string or a list of columns names strings, " f"found {column_names}" ) self.result: Optional[Result] = None self._sampling_error_components: Dict[str, float] = {column_name: 0 for column_name in self.column_names} # threshold strategy is the same across all columns self.threshold = threshold self._upper_alert_thresholds: Dict[str, Optional[float]] = {column_name: 0 for column_name in self.column_names} self._lower_alert_thresholds: Dict[str, Optional[float]] = {column_name: 0 for column_name in self.column_names} self.lower_threshold_value_limit: float = np.nan self.upper_threshold_value_limit: float = np.nan self.simple_stats_metric = 'values_sum' @log_usage(UsageEvent.STATS_SUM_FIT) def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): """Fits the drift calculator to a set of reference data.""" if reference_data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, reference_data) continuous_column_names, categorical_column_names = _split_features_by_type(reference_data, self.column_names) if len(categorical_column_names) >= 1: raise InvalidArgumentsException( f"Cannot calculate sum for categorical columns:\n {categorical_column_names}" ) for col in self.column_names: self._sampling_error_components[col] = reference_data[col].std() for column in self.column_names: reference_chunk_results = np.asarray( [_calculate_sum_value_stats(chunk.data[column]) for chunk in self.chunker.split(reference_data)] ) self._lower_alert_thresholds[column], self._upper_alert_thresholds[column] = calculate_threshold_values( threshold=self.threshold, data=reference_chunk_results, lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, metric_name=self.simple_stats_metric, override_using_none=True, ) self.result = self._calculate(data=reference_data) self.result.data[('chunk', 'period')] = 'reference' return self @log_usage(UsageEvent.STATS_SUM_RUN) def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: """Calculates methods for both categorical and continuous columns.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, data) chunks = self.chunker.split(data) rows = [] for chunk in chunks: row = { 'key': chunk.key, 'chunk_index': chunk.chunk_index, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_datetime': chunk.start_datetime, 'end_datetime': chunk.end_datetime, 'period': 'analysis', } for column_name in self.column_names: for k, v in self._calculate_for_column(chunk.data, column_name).items(): row[f'{column_name}_{k}'] = v rows.append(row) result_index = _create_multilevel_index( column_names=self.column_names, ) res = pd.DataFrame(rows) res.columns = result_index res = res.reset_index(drop=True) if self.result is None: self.result = Result( results_data=res, column_names=self.column_names, simple_stats_metric=self.simple_stats_metric, timestamp_column_name=self.timestamp_column_name, chunker=self.chunker, ) else: # TODO: review subclassing setup => superclass + '_filter' is screwing up typing. # Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK # but this causes us to lose the "common behavior" in the top level 'filter' method when overriding. # Applicable here but to many of the base classes as well (e.g. fitting and calculating) self.result = self.result.filter(period='reference') self.result.data = pd.concat([self.result.data, res]).reset_index(drop=True) return self.result def _calculate_for_column(self, data: pd.DataFrame, column_name: str) -> Dict[str, Any]: result = {} try: value = _calculate_sum_value_stats(data[column_name]) result['value'] = value result['sampling_error'] = self._sampling_error_components[column_name] * np.sqrt(data.shape[0]) result['upper_confidence_boundary'] = result['value'] + SAMPLING_ERROR_RANGE * result['sampling_error'] result['lower_confidence_boundary'] = result['value'] - SAMPLING_ERROR_RANGE * result['sampling_error'] result['upper_threshold'] = self._upper_alert_thresholds[column_name] result['lower_threshold'] = self._lower_alert_thresholds[column_name] result['alert'] = _add_alert_flag(result) except Exception as exc: if self._logger: self._logger.error( f"an unexpected exception occurred during calculation of column '{column_name}': " f"{exc}" ) result['value'] = np.NaN result['sampling_error'] = np.NaN result['upper_confidence_boundary'] = np.NaN result['lower_confidence_boundary'] = np.NaN result['upper_threshold'] = self._upper_alert_thresholds[column_name] result['lower_threshold'] = self._lower_alert_thresholds[column_name] result['alert'] = np.NaN finally: return result
def _create_multilevel_index( column_names, ): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] column_tuples = [ (column_name, el) for column_name in column_names for el in [ 'value', 'sampling_error', 'upper_confidence_boundary', 'lower_confidence_boundary', 'upper_threshold', 'lower_threshold', 'alert', ] ] tuples = chunk_tuples + column_tuples return MultiIndex.from_tuples(tuples) def _calculate_sum_value_stats(data: pd.Series): return data.sum()