Source code for nannyml.data_quality.unseen.calculator

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#  Author:   Nikolaos Perrakis  <nikos@nannyml.com>
#
#  License: Apache Software License 2.0

"""Drift calculator using Reconstruction Error as a measure of drift."""
import logging
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd
from pandas import MultiIndex

from nannyml.base import AbstractCalculator, _list_missing, _split_features_by_type
from nannyml.chunk import Chunker

# from nannyml.data_quality.base import _add_alert_flag
from nannyml.exceptions import InvalidArgumentsException
from nannyml.thresholds import ConstantThreshold, Threshold, calculate_threshold_values
from nannyml.usage_logging import UsageEvent, log_usage

from .result import Result

"""
Unseen Values Data Quality Module.
"""


[docs]class UnseenValuesCalculator(AbstractCalculator): """UnseenValuesCalculator implementation using unseen value rate as a measure of data quality. This only works for categorical features. Seen values are the ones encountered on the reference data.""" def __init__( self, column_names: Union[str, List[str]], normalize: bool = True, y_pred_column_name: Optional[str] = None, y_true_column_name: Optional[str] = None, timestamp_column_name: Optional[str] = None, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, threshold: Threshold = ConstantThreshold(lower=None, upper=0), ): """Creates a new MissingValuesCalculator instance. Parameters ---------- column_names: Union[str, List[str]] A string or list containing the names of features in the provided data set. Unseen Values will be calculated for each entry in this list. normalize: bool, default=True Whether to provide the unseen value ratio (True) or the absolute number of unseen values (False). timestamp_column_name: str The name of the column containing the timestamp of the model prediction. chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. Examples -------- >>> import nannyml as nml >>> reference, analysis, _ = nml.load_synthetic_car_price_dataset() >>> column_names = [col for col in reference.columns if col not in [ ... 'car_age', 'km_driven', 'price_new', 'accident_count', 'door_count','timestamp', 'y_pred', 'y_true']] >>> calc = nml.UnseenValuesCalculator( ... column_names=column_names, ... timestamp_column_name='timestamp', ... ).fit(reference) >>> res = calc.calculate(analysis) >>> res.filter(period='analysis').plot().show() """ super(UnseenValuesCalculator, self).__init__( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) if isinstance(column_names, str): self.column_names = [column_names] elif isinstance(column_names, list): for el in column_names: if not isinstance(el, str): raise InvalidArgumentsException( f"column_names elements should be either a column name string or a list of strings, found\n{el}" ) self.column_names = column_names else: raise InvalidArgumentsException( "column_names should be either a column name string or a list of columns names strings, " "found\n{column_names}" ) self.y_pred_column_name = y_pred_column_name self.y_true_column_name = y_true_column_name self.result: Optional[Result] = None # Threshold strategy is the same across all columns # By default for unseen values there is no lower threshold or threshold limit. # The value should be 0 and can't go lower. # The upper limit is also 0 because there shouldn't be any. If there is we alert. self.threshold = threshold self._upper_alert_thresholds: Dict[str, Optional[float]] = {column_name: 0 for column_name in self.column_names} self._lower_alert_thresholds: Dict[str, Optional[float]] = {column_name: 0 for column_name in self.column_names} self.lower_threshold_value_limit: float = 0 self.upper_threshold_value_limit: float self.normalize = normalize if self.normalize: self.data_quality_metric = 'unseen_values_rate' self.upper_threshold_value_limit = 1 else: self.data_quality_metric = 'unseen_values_count' self.upper_threshold_value_limit = np.nan self._categorical_seen_values: Dict[str, set] = {column_name: set() for column_name in self.column_names} def _calculate_unseen_value_stats(self, data: pd.Series, seen_set: set): count_tot = data.shape[0] count_uns = count_tot - data.isin(seen_set).sum() if self.normalize: count_uns = count_uns / count_tot return count_uns @log_usage(UsageEvent.DQ_CALC_UNSEEN_VALUES_FIT, metadata_from_self=['normalize']) def _fit(self, reference_data: pd.DataFrame, *args, **kwargs): """Fits the drift calculator to a set of reference data.""" if reference_data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, reference_data) # Included columns of dtype=int should be considered categorical. We'll try converting those explicitly. reference_data = _convert_int_columns_to_categorical(reference_data, self.column_names, self._logger) # y_true and y_pred columns are treated as categorical for the purpose of this calculator if self.y_pred_column_name: reference_data[self.y_pred_column_name] = reference_data[self.y_pred_column_name].astype('category') if self.y_true_column_name: reference_data[self.y_true_column_name] = reference_data[self.y_true_column_name].astype('category') # All provided columns must be categorical continuous_column_names, categorical_column_names = _split_features_by_type(reference_data, self.column_names) if not set(self.column_names) == set(categorical_column_names): raise InvalidArgumentsException( f"Specified columns_names for UnseenValuesCalculator must all be categorical.\n" f"Continuous columns found:\n{continuous_column_names}" ) for col in self.column_names: self._categorical_seen_values[col] = set(reference_data[col].unique()) # By definition everything (sampling error and confidence boundaries) here is 0. # We are not breaking pattern by artificially creating the result object # But maybe we should? to be more efficient?? self.result = self._calculate(data=reference_data) self.result.data[('chunk', 'period')] = 'reference' return self @log_usage(UsageEvent.DQ_CALC_UNSEEN_VALUES_RUN, metadata_from_self=['normalize']) def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: """Calculates methods for both categorical and continuous columns.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, data) chunks = self.chunker.split(data) rows = [] for chunk in chunks: row = { 'key': chunk.key, 'chunk_index': chunk.chunk_index, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_datetime': chunk.start_datetime, 'end_datetime': chunk.end_datetime, 'period': 'analysis', } for column_name in self.column_names: for k, v in self._calculate_for_column(chunk.data, column_name).items(): row[f'{column_name}_{k}'] = v rows.append(row) result_index = _create_multilevel_index( column_names=self.column_names, ) res = pd.DataFrame(rows) res.columns = result_index res = res.reset_index(drop=True) if self.result is None: self._set_metric_thresholds(res) res = self._populate_alert_thresholds(res) self.result = Result( results_data=res, column_names=self.column_names, data_quality_metric=self.data_quality_metric, timestamp_column_name=self.timestamp_column_name, chunker=self.chunker, ) else: # TODO: review subclassing setup => superclass + '_filter' is screwing up typing. # Dropping the intermediate '_filter' and directly returning the correct 'Result' class works OK # but this causes us to lose the "common behavior" in the top level 'filter' method when overriding. # Applicable here but to many of the base classes as well (e.g. fitting and calculating) res = self._populate_alert_thresholds(res) self.result = self.result.filter(period='reference') self.result.data = pd.concat([self.result.data, res], ignore_index=True) return self.result def _calculate_for_column(self, data: pd.DataFrame, column_name: str) -> Dict[str, Any]: result = {} seen_values = self._categorical_seen_values[column_name] value = self._calculate_unseen_value_stats(data[column_name], seen_values) result['value'] = value return result def _set_metric_thresholds(self, result_data: pd.DataFrame): for column_name in self.column_names: ( self._lower_alert_thresholds[column_name], self._upper_alert_thresholds[column_name], ) = calculate_threshold_values( # noqa: E501 threshold=self.threshold, data=result_data.loc[:, (column_name, 'value')], lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, ) def _populate_alert_thresholds(self, result_data: pd.DataFrame) -> pd.DataFrame: for column_name in self.column_names: result_data[(column_name, 'upper_threshold')] = self._upper_alert_thresholds[column_name] result_data[(column_name, 'lower_threshold')] = self._lower_alert_thresholds[column_name] result_data[(column_name, 'alert')] = result_data.apply( lambda row: True if ( row[(column_name, 'value')] > ( np.inf if row[(column_name, 'upper_threshold')] is None else row[(column_name, 'upper_threshold')] # noqa: E501 ) or row[(column_name, 'value')] < ( -np.inf if row[(column_name, 'lower_threshold')] is None else row[(column_name, 'lower_threshold')] # noqa: E501 ) ) else False, axis=1, ) return result_data
def _convert_int_columns_to_categorical( data: pd.DataFrame, column_names: List[str], logger: Optional[logging.Logger] ) -> pd.DataFrame: res = data.copy() int_cols = list( filter( lambda c: c in column_names and data[c].dtype in ('int_', 'int8', 'int16', 'int32', 'int64', 'uint8', 'uint16', 'uint32', 'uint64'), data.columns, ) ) for col in int_cols: res[col] = res[col].astype('category') if logger: logger.warning(f"converting integer columns to categorical: {list(int_cols)}") return res def _create_multilevel_index( column_names, ): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] column_tuples = [ (column_name, 'value') for column_name in column_names # for el in ['value', 'upper_threshold', 'lower_threshold', 'alert'] ] tuples = chunk_tuples + column_tuples return MultiIndex.from_tuples(tuples)