Source code for nannyml.performance_estimation.confidence_based.metrics

"""A module containing the implementations of metrics estimated by CBPE.

The :class:`~nannyml.performance_estimation.confidence_based.cbpe.CBPE` estimator converts a list of metric names into
:class:`~nannyml.performance_estimation.confidence_based.metrics.Metric` instances using the
:class:`~nannyml.performance_estimation.confidence_based.metrics.MetricFactory`.

The :class:`~nannyml.performance_estimation.confidence_based.cbpe.CBPE` estimator will then loop over these
:class:`~nannyml.performance_estimation.confidence_based.metrics.Metric` instances to fit them on reference data
and run the estimation on analysis data.
"""

import abc
import logging
import warnings
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score,
    auc,
    average_precision_score,
    confusion_matrix,
    f1_score,
    multilabel_confusion_matrix,
    precision_score,
    recall_score,
    roc_auc_score,
)
from sklearn.preprocessing import LabelBinarizer, label_binarize

import nannyml.sampling_error.binary_classification as bse
import nannyml.sampling_error.multiclass_classification as mse
from nannyml._typing import ModelOutputsType, ProblemType, class_labels
from nannyml.chunk import Chunk, Chunker
from nannyml.exceptions import CalculatorException, InvalidArgumentsException
from nannyml.performance_estimation.confidence_based import SUPPORTED_METRIC_VALUES
from nannyml.sampling_error import SAMPLING_ERROR_RANGE
from nannyml.thresholds import Threshold, calculate_threshold_values


[docs]class Metric(abc.ABC): """A base class representing a performance metric to estimate.""" def __init__( self, name: str, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, components: List[Tuple[str, str]], timestamp_column_name: Optional[str] = None, lower_threshold_value_limit: Optional[float] = None, upper_threshold_value_limit: Optional[float] = None, **kwargs, ): """Creates a new Metric instance. Parameters ---------- name: str The name used to indicate the metric in columns of a DataFrame. y_pred_proba: Union[str, Dict[str, str]] Name(s) of the column(s) containing your model output. - For binary classification, pass a single string referring to the model output column. - For multiclass classification, pass a dictionary that maps a class string to the column name containing model outputs for that class. y_pred: str The name of the column containing your model predictions. y_true: str The name of the column containing target values (that are provided in reference data during fitting). chunker: Chunker The `Chunker` used to split the data sets into a lists of chunks. threshold: Threshold The Threshold instance that determines how the lower and upper threshold values will be calculated. components: List[Tuple[str str]] A list of (display_name, column_name) tuples. timestamp_column_name: Optional[str], default=None The name of the column containing the timestamp of the model prediction. If not given, plots will not use a time-based x-axis but will use the index of the chunks instead. lower_threshold_value_limit: Optional[float], default=None An optional value that serves as a limit for the lower threshold value. Any calculated lower threshold values that end up below this limit will be replaced by this limit value. The limit is often a theoretical constraint enforced by a specific drift detection method or performance metric. upper_threshold_value_limit: Optional[float], default=None An optional value that serves as a limit for the upper threshold value. Any calculated upper threshold values that end up above this limit will be replaced by this limit value. The limit is often a theoretical constraint enforced by a specific drift detection method or performance metric. Notes ----- The `components` approach taken here is a quick fix to deal with metrics that return multiple values. Look at the `confusion_matrix` for example: a single metric produces 4 different result sets (containing values, thresholds, alerts, etc.). """ self.name = name self.y_pred_proba = y_pred_proba self.y_pred = y_pred self.y_true = y_true self.timestamp_column_name = timestamp_column_name self.chunker = chunker self.threshold = threshold self.lower_threshold_value: Optional[float] = None self.upper_threshold_value: Optional[float] = None self.lower_threshold_value_limit: Optional[float] = lower_threshold_value_limit self.upper_threshold_value_limit: Optional[float] = upper_threshold_value_limit self.uncalibrated_y_pred_proba = f'uncalibrated_{self.y_pred_proba}' # A list of (display_name, column_name) tuples self.components: List[Tuple[str, str]] = components @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__) @property def display_name(self) -> str: # noqa: D102 return self.name @property def column_name(self) -> str: # noqa: D102 return self.components[0][1] @property def display_names(self): # noqa: D102 return [c[0] for c in self.components] @property def column_names(self): # noqa: D102 return [c[1] for c in self.components] def __str__(self): # noqa: D105 return self.display_name def __repr__(self): # noqa: D105 return self.column_name
[docs] def fit(self, reference_data: pd.DataFrame): """Fits a Metric on reference data. Parameters ---------- reference_data: pd.DataFrame The reference data used for fitting. Must have target data available. """ # Delegate to subclass self._fit(reference_data) reference_chunks = self.chunker.split(reference_data) # Calculate alert thresholds reference_chunk_results = np.asarray([self._realized_performance(chunk.data) for chunk in reference_chunks]) self.lower_threshold_value, self.upper_threshold_value = calculate_threshold_values( threshold=self.threshold, data=reference_chunk_results, lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, metric_name=self.display_name, ) return
@abc.abstractmethod def _fit(self, reference_data: pd.DataFrame): raise NotImplementedError( f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the _fit method" ) @abc.abstractmethod def _estimate(self, data: pd.DataFrame): raise NotImplementedError( f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the _estimate method" ) @abc.abstractmethod def _sampling_error(self, data: pd.DataFrame) -> float: raise NotImplementedError( f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the _sampling_error method" ) @abc.abstractmethod def _realized_performance(self, data: pd.DataFrame) -> float: raise NotImplementedError( f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the realized_performance method" )
[docs] def alert(self, value: float) -> bool: """Returns True if an estimated metric value is below a lower threshold or above an upper threshold. Parameters ---------- value: float Value of an estimated metric. Returns ------- bool: bool """ return (self.lower_threshold_value is not None and value < self.lower_threshold_value) or ( self.upper_threshold_value is not None and value > self.upper_threshold_value )
[docs] def __eq__(self, other): """Compares two Metric instances. They are considered equal when their components are equal. Parameters ---------- other: Metric The other Metric instance you're comparing to. Returns ------- is_equal: bool """ return self.components == other.components
# def _common_cleaning( # self, data: pd.DataFrame, y_pred_proba_column_name: Optional[str] = None # ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: # if y_pred_proba_column_name is None: # if not isinstance(self.y_pred_proba, str): # raise InvalidArgumentsException( # f"'y_pred_proba' is of type '{type(self.y_pred_proba)}'. " # f"Binary use cases require 'y_pred_proba' to be a string." # ) # y_pred_proba_column_name = self.y_pred_proba # data = _remove_nans(data, [self.y_pred, y_pred_proba_column_name]) # clean_targets = self.y_true in data.columns and not data[self.y_true].isna().all() # if clean_targets: # data = _remove_nans(data, [self.y_true]) # return data[y_pred_proba_column_name], data[self.y_pred], (data[self.y_true] if clean_targets else None) def _common_cleaning(self, data: pd.DataFrame, selected_columns: List[str]) -> Tuple[List[pd.Series], bool]: """Remove NaN values from rows of selected columns. Parameters ---------- data: pd.DataFrame Pandas dataframe containing data. selected_columns: List[str] List containing the strings of column names Returns ------- col_list: List containing the clean columns specified. Order of columns from selected_columns is preserved. """ # If we want target and it's not available we get None if not set(selected_columns) <= set(data.columns): raise InvalidArgumentsException( f"Selected columns: {selected_columns} not all present in provided data columns {list(data.columns)}" ) df = data[selected_columns].dropna(axis=0, how='any', inplace=False).reset_index() empty: bool = False if df.shape[0] == 0: empty = True results = [] for el in selected_columns: results.append(df[el]) return (results, empty)
[docs] def get_chunk_record(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing the performance metrics for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Raises ------ NotImplementedError: occurs when a metric has multiple componets Returns ------- chunk_record : Dict A dictionary of perfomance metric, value pairs. """ if len(self.components) > 1: raise NotImplementedError( "cannot use default 'get_chunk_record' implementation when a metric has multiple components." ) column_name = self.components[0][1] chunk_record = {} try: estimated_metric_value = self._estimate(chunk_data) metric_estimate_sampling_error = self._sampling_error(chunk_data) chunk_record[f'estimated_{column_name}'] = estimated_metric_value chunk_record[f'sampling_error_{column_name}'] = metric_estimate_sampling_error chunk_record[f'realized_{column_name}'] = self._realized_performance(chunk_data) chunk_record[f'upper_confidence_boundary_{column_name}'] = np.minimum( np.inf if self.upper_threshold_value_limit is None else self.upper_threshold_value_limit, estimated_metric_value + SAMPLING_ERROR_RANGE * metric_estimate_sampling_error, ) chunk_record[f'lower_confidence_boundary_{column_name}'] = np.maximum( -np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit, estimated_metric_value - SAMPLING_ERROR_RANGE * metric_estimate_sampling_error, ) chunk_record[f'upper_threshold_{column_name}'] = self.upper_threshold_value chunk_record[f'lower_threshold_{column_name}'] = self.lower_threshold_value chunk_record[f'alert_{column_name}'] = self.alert(estimated_metric_value) except Exception as exc: self._logger.error(f"an unexpected error occurred while calculating metric {self.display_name}: {exc}") chunk_record[f'estimated_{column_name}'] = np.NaN chunk_record[f'sampling_error_{column_name}'] = np.NaN chunk_record[f'realized_{column_name}'] = np.NaN chunk_record[f'upper_confidence_boundary_{column_name}'] = np.NaN chunk_record[f'lower_confidence_boundary_{column_name}'] = np.NaN chunk_record[f'upper_threshold_{column_name}'] = np.NaN chunk_record[f'lower_threshold_{column_name}'] = np.NaN chunk_record[f'alert_{column_name}'] = np.NaN finally: return chunk_record
[docs]class MetricFactory: """A factory class that produces Metric instances based on a given magic string or a metric specification.""" registry: Dict[str, Dict[ProblemType, Type[Metric]]] = {} @classmethod def _logger(cls) -> logging.Logger: return logging.getLogger(__name__)
[docs] @classmethod def create(cls, key: str, use_case: ProblemType, **kwargs) -> Metric: """Create new Metric.""" if kwargs is None: kwargs = {} """Returns a Metric instance for a given key.""" if not isinstance(key, str): raise InvalidArgumentsException( f"cannot create metric given a '{type(key)}'" "Please provide a string, function or Metric" ) if key not in cls.registry: raise InvalidArgumentsException( f"unknown metric key '{key}' given. " f"Should be one of {SUPPORTED_METRIC_VALUES}." ) if use_case not in cls.registry[key]: raise RuntimeError( f"metric '{key}' is currently not supported for use case {use_case}. " "Please specify another metric or use one of these supported model types for this metric: " f"{[md for md in cls.registry[key]]}" ) metric_class = cls.registry[key][use_case] return metric_class(**kwargs)
[docs] @classmethod def register(cls, metric: str, use_case: ProblemType) -> Callable: """Register a Metric in the MetricFactory registry.""" def inner_wrapper(wrapped_class: Type[Metric]) -> Type[Metric]: if metric in cls.registry: if use_case in cls.registry[metric]: cls._logger().warning(f"re-registering Metric for metric='{metric}' and use_case='{use_case}'") cls.registry[metric][use_case] = wrapped_class else: cls.registry[metric] = {use_case: wrapped_class} return wrapped_class return inner_wrapper
[docs]@MetricFactory.register('roc_auc', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationAUROC(Metric): """CBPE binary classification AUROC Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification AUROC Metric Class.""" super().__init__( name='roc_auc', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('ROC AUC', 'roc_auc')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.auroc_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_proba_reference=reference_data[self.y_pred_proba], ) def _estimate(self, data: pd.DataFrame): try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning( data=data, selected_columns=[self.y_pred_proba, self.uncalibrated_y_pred_proba] ) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, uncalibrated_y_pred_proba = _dat return estimate_roc_auc(y_pred_proba, uncalibrated_y_pred_proba) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning( data=data, selected_columns=[self.uncalibrated_y_pred_proba, self.y_true] ) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_true = _dat if y_true.nunique() <= 1: warnings.warn( f"'{self.y_true}' contains a single class for chunk, " f"cannot compute realized {self.display_name}." ) return np.NaN return roc_auc_score(y_true, y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.auroc_sampling_error(self._sampling_error_components, data)
[docs]def estimate_roc_auc(true_y_pred_proba: pd.Series, model_y_pred_proba: pd.Series) -> float: """Estimates the ROC AUC metric. Parameters ---------- true_y_pred_proba : pd.Series Calibrated score predictions from the model. model_y_pred_proba : pd.Series Un-Calibrated score predictions from the model. Returns ------- metric: float Estimated ROC AUC score. """ true_y_pred_proba = true_y_pred_proba.to_numpy() model_y_pred_proba = model_y_pred_proba.to_numpy() sorted_index = np.argsort(model_y_pred_proba)[::-1] model_y_pred_proba = model_y_pred_proba[sorted_index] true_y_pred_proba = true_y_pred_proba[sorted_index] with np.errstate(divide='ignore', invalid='ignore'): tps = np.cumsum(true_y_pred_proba) fps = 1 + np.arange(len(true_y_pred_proba)) - tps tps = np.r_[0, tps] fps = np.r_[0, fps] tps = np.round(tps, 5) fps = np.round(fps, 5) tpr = tps / tps[-1] fpr = fps / fps[-1] metric = auc(fpr, tpr) return metric
[docs]@MetricFactory.register('average_precision', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationAP(Metric): """CBPE binary classification AP Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification AP Metric Class.""" super().__init__( name='average_precision', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Average Precision', 'average_precision')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): """Metric _fit implementation on reference data.""" # if requested columns are missing we want to raise an error. _dat, _ = self._common_cleaning( data=reference_data, selected_columns=[self.y_true, self.y_pred_proba] # type: ignore ) y_true, y_pred_proba = _dat # if empty then positive class won't be part of y_true series if 1 not in y_true.unique(): self._logger.debug(f"Not enough data to compute fit {self.display_name}.") warnings.warn(f"Not enough data to compute fit {self.display_name}.") self._sampling_error_components = np.NaN, 0 else: self._sampling_error_components = bse.ap_sampling_error_components( y_true_reference=y_true, y_pred_proba_reference=y_pred_proba, ) def _estimate(self, data: pd.DataFrame): calibrated_y_pred_proba = data[self.y_pred_proba].to_numpy() uncalibrated_y_pred_proba = data[self.uncalibrated_y_pred_proba].to_numpy() return estimate_ap(calibrated_y_pred_proba, uncalibrated_y_pred_proba) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _ = self._common_cleaning(data=data, selected_columns=[self.uncalibrated_y_pred_proba, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex uncalibrated_y_pred_proba, y_true = _dat # if empty then positive class won't be part of y_true series if 1 not in y_true.unique(): warnings.warn( f"'{self.y_true}' does not contain positive class for chunk, cannot calculate {self.display_name}. " f"Returning NaN." ) return np.NaN else: return average_precision_score(y_true, uncalibrated_y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.ap_sampling_error(self._sampling_error_components, data)
[docs]def estimate_ap(calibrated_y_pred_proba: np.ndarray, uncalibrated_y_pred_proba: np.ndarray) -> float: """Estimates the AP metric. Parameters ---------- calibrated_y_pred_proba : pd.Series Calibrated probability estimates of the sample for each class in the model. uncalibrated_y_pred_proba : pd.Series Raw probability estimates of the sample for each class in the model. Returns ------- metric: float Estimated AP score. """ descending_order_index = np.argsort(uncalibrated_y_pred_proba)[::-1] calibrated_y_pred_proba = calibrated_y_pred_proba[descending_order_index] tps = np.cumsum(calibrated_y_pred_proba) fps = 1 + np.arange(calibrated_y_pred_proba.shape[0]) - tps tps = np.round(tps, 5) fps = np.round(fps, 5) ps = np.arange(1, tps.shape[0] + 1) precision = tps / ps # we add an element for (tps, fps) = (0,0) after the division to avoid error precision = np.r_[1, precision] tps = np.r_[0, tps] recall = tps / tps[-1] # reverse so (0,1) is last element # recall is descending from 1 to 0 precision = precision[::-1] recall = recall[::-1] # actual AP calculation # https://github.com/scikit-learn/scikit-learn/blob/main/sklearn/metrics/_ranking.py#L236 # non unique values will be eliminated because diff will be 0! metric = -np.sum(np.diff(recall) * precision[:-1]) return metric
[docs]@MetricFactory.register('f1', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationF1(Metric): """CBPE binary classification f1 Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification f1 Metric Class.""" super().__init__( name='f1', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('F1', 'f1')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.f1_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], ) def _estimate(self, data: pd.DataFrame): try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat return estimate_f1(y_pred, y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.f1_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat if y_true.nunique() <= 1: warnings.warn( f"Too few unique values present in '{self.y_true}', " f"returning NaN as realized {self.display_name} score." ) return np.NaN if y_pred.nunique() <= 1: warnings.warn( f"Too few unique values present in '{self.y_pred}', " f"returning NaN as realized {self.display_name} score." ) return np.NaN # TODO: zero_division should be np.nan # update when we update sklearn to 1.3+ and remove unnecessary checks. return f1_score(y_true=y_true, y_pred=y_pred, zero_division='warn')
[docs]def estimate_f1(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float: """Estimates the F1 metric. Parameters ---------- y_pred: pd.DataFrame Predicted class labels of the sample y_pred_proba: pd.DataFrame Probability estimates of the sample for each class in the model. Returns ------- metric: float Estimated F1 score. """ tp = np.where(y_pred == 1, y_pred_proba, 0) fp = np.where(y_pred == 1, 1 - y_pred_proba, 0) fn = np.where(y_pred == 0, y_pred_proba, 0) TP, FP, FN = np.sum(tp), np.sum(fp), np.sum(fn) metric = TP / (TP + 0.5 * (FP + FN)) return metric
[docs]@MetricFactory.register('precision', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationPrecision(Metric): """CBPE binary classification precision Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification precision Metric Class.""" super().__init__( name='precision', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Precision', 'precision')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.precision_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], ) pass def _estimate(self, data: pd.DataFrame): try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat return estimate_precision(y_pred, y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.precision_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat if y_true.nunique() <= 1: warnings.warn( f"Too few unique values present in '{self.y_true}', " f"returning NaN as realized {self.display_name} score." ) return np.NaN if y_pred.nunique() <= 1: warnings.warn( f"Too few unique values present in '{self.y_pred}', " f"returning NaN as realized {self.display_name} score." ) return np.NaN # TODO: zero_division should be np.nan # update when we update sklearn to 1.3+ and remove unnecessary checks. return precision_score(y_true=y_true, y_pred=y_pred, zero_division='warn')
[docs]def estimate_precision(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float: """Estimates the Precision metric. Parameters ---------- y_pred: pd.DataFrame Predicted class labels of the sample y_pred_proba: pd.DataFrame Probability estimates of the sample for each class in the model. Returns ------- metric: float Estimated Precision score. """ tp = np.where(y_pred == 1, y_pred_proba, 0) fp = np.where(y_pred == 1, 1 - y_pred_proba, 0) TP, FP = np.sum(tp), np.sum(fp) metric = TP / (TP + FP) return metric
[docs]@MetricFactory.register('recall', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationRecall(Metric): """CBPE binary classification recall Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification recall Metric Class.""" super().__init__( name='recall', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Recall', 'recall')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.recall_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], ) def _estimate(self, data: pd.DataFrame): try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat return estimate_recall(y_pred, y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.recall_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat if y_true.nunique() <= 1: warnings.warn( f"Too few unique values present in '{self.y_true}', " f"returning NaN as realized {self.display_name} score." ) return np.NaN if y_pred.nunique() <= 1: warnings.warn( f"Too few unique values present in '{self.y_pred}', " f"returning NaN as realized {self.display_name} score." ) return np.NaN # TODO: zero_division should be np.nan # update when we update sklearn to 1.3+ and remove unnecessary checks. return recall_score(y_true=y_true, y_pred=y_pred, zero_division='warn')
[docs]def estimate_recall(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float: """Estimates the Recall metric. Parameters ---------- y_pred: pd.DataFrame Predicted class labels of the sample y_pred_proba: pd.DataFrame Probability estimates of the sample for each class in the model. Returns ------- metric: float Estimated Recall score. """ tp = np.where(y_pred == 1, y_pred_proba, 0) fn = np.where(y_pred == 0, y_pred_proba, 0) TP, FN = np.sum(tp), np.sum(fn) metric = TP / (TP + FN) return metric
[docs]@MetricFactory.register('specificity', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationSpecificity(Metric): """CBPE binary classification specificity Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification specificity Metric Class.""" super().__init__( name='specificity', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Specificity', 'specificity')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.specificity_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], ) def _estimate(self, data: pd.DataFrame): try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat return estimate_specificity(y_pred, y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.specificity_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel() denominator = tn + fp if denominator == 0: return np.NaN else: return tn / denominator
[docs]def estimate_specificity(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float: """Estimates the Specificity metric. Parameters ---------- y_pred: pd.DataFrame Predicted class labels of the sample y_pred_proba: pd.DataFrame Probability estimates of the sample for each class in the model. Returns ------- metric: float Estimated Specificity score. """ tn = np.where(y_pred == 0, 1 - y_pred_proba, 0) fp = np.where(y_pred == 1, 1 - y_pred_proba, 0) TN, FP = np.sum(tn), np.sum(fp) metric = TN / (TN + FP) return metric
[docs]@MetricFactory.register('accuracy', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationAccuracy(Metric): """CBPE binary classification accuracy Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification accuracy Metric Class.""" super().__init__( name='accuracy', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Accuracy', 'accuracy')], lower_threshold_value_limit=0, upper_threshold_value_limit=1, ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.accuracy_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], ) def _estimate(self, data: pd.DataFrame): try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat return estimate_accuracy(y_pred, y_pred_proba) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.accuracy_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat return accuracy_score(y_true=y_true, y_pred=y_pred)
[docs]def estimate_accuracy(y_pred: pd.Series, y_pred_proba: pd.Series) -> float: """Estimates the accuracy metric. Parameters ---------- y_pred: pd.Series Predicted class labels of the sample y_pred_proba: pd.Series Probability estimates of the sample for each class in the model. Returns ------- metric: float Estimated accuracy score. """ tp = np.where(y_pred == 1, y_pred_proba, 0) tn = np.where(y_pred == 0, 1 - y_pred_proba, 0) TP, TN = np.sum(tp), np.sum(tn) metric = (TP + TN) / len(y_pred) return metric
[docs]@MetricFactory.register('confusion_matrix', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationConfusionMatrix(Metric): """CBPE binary classification confusion matrix Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, normalize_confusion_matrix: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification confusion matrix Metric Class.""" super().__init__( name='confusion_matrix', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[ ('True Positive', 'true_positive'), ('True Negative', 'true_negative'), ('False Positive', 'false_positive'), ('False Negative', 'false_negative'), ], lower_threshold_value_limit=0, ) self.normalize_confusion_matrix: Optional[str] = normalize_confusion_matrix if self.normalize_confusion_matrix is not None: self.upper_threshold_value_limit = 1 self.true_positive_lower_threshold: Optional[float] = None self.true_positive_upper_threshold: Optional[float] = None self.true_negative_lower_threshold: Optional[float] = None self.true_negative_upper_threshold: Optional[float] = None self.false_positive_lower_threshold: Optional[float] = None self.false_positive_upper_threshold: Optional[float] = None self.false_negative_lower_threshold: Optional[float] = None self.false_negative_upper_threshold: Optional[float] = None
[docs] def fit(self, reference_data: pd.DataFrame): # override the superclass fit method """Fits a Metric on reference data. Parameters ---------- reference_data: pd.DataFrame The reference data used for fitting. Must have target data available. """ # Calculate alert thresholds reference_chunks = self.chunker.split( reference_data, ) self.true_positive_lower_threshold, self.true_positive_upper_threshold = self._true_positive_alert_thresholds( reference_chunks ) self.true_negative_lower_threshold, self.true_negative_upper_threshold = self._true_negative_alert_thresholds( reference_chunks ) ( self.false_positive_lower_threshold, self.false_positive_upper_threshold, ) = self._false_positive_alert_thresholds(reference_chunks) ( self.false_negative_lower_threshold, self.false_negative_upper_threshold, ) = self._false_negative_alert_thresholds(reference_chunks) # Delegate to confusion matrix subclass self._fit(reference_data) # could probably put _fit functionality here since overide fit method return
def _fit(self, reference_data: pd.DataFrame): self._true_positive_sampling_error_components = bse.true_positive_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], normalize_confusion_matrix=self.normalize_confusion_matrix, ) self._true_negative_sampling_error_components = bse.true_negative_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], normalize_confusion_matrix=self.normalize_confusion_matrix, ) self._false_positive_sampling_error_components = bse.false_positive_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], normalize_confusion_matrix=self.normalize_confusion_matrix, ) self._false_negative_sampling_error_components = bse.false_negative_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], normalize_confusion_matrix=self.normalize_confusion_matrix, ) def _true_positive_alert_thresholds(self, reference_chunks: List[Chunk]) -> Tuple[Optional[float], Optional[float]]: realized_chunk_performance = np.asarray( [self._true_positive_realized_performance(chunk.data) for chunk in reference_chunks] ) lower_threshold_value, upper_threshold_value = calculate_threshold_values( threshold=self.threshold, data=realized_chunk_performance, lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, metric_name=self.display_name, ) return lower_threshold_value, upper_threshold_value def _true_negative_alert_thresholds(self, reference_chunks: List[Chunk]) -> Tuple[Optional[float], Optional[float]]: realized_chunk_performance = np.asarray( [self._true_negative_realized_performance(chunk.data) for chunk in reference_chunks] ) lower_threshold_value, upper_threshold_value = calculate_threshold_values( threshold=self.threshold, data=realized_chunk_performance, lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, metric_name=self.display_name, ) return lower_threshold_value, upper_threshold_value def _false_positive_alert_thresholds( self, reference_chunks: List[Chunk] ) -> Tuple[Optional[float], Optional[float]]: realized_chunk_performance = np.asarray( [self._false_positive_realized_performance(chunk.data) for chunk in reference_chunks] ) lower_threshold_value, upper_threshold_value = calculate_threshold_values( threshold=self.threshold, data=realized_chunk_performance, lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, metric_name=self.display_name, ) return lower_threshold_value, upper_threshold_value def _false_negative_alert_thresholds( self, reference_chunks: List[Chunk] ) -> Tuple[Optional[float], Optional[float]]: realized_chunk_performance = np.asarray( [self._false_negative_realized_performance(chunk.data) for chunk in reference_chunks] ) lower_threshold_value, upper_threshold_value = calculate_threshold_values( threshold=self.threshold, data=realized_chunk_performance, lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, logger=self._logger, metric_name=self.display_name, ) return lower_threshold_value, upper_threshold_value def _true_positive_realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat _, _, _, tp = confusion_matrix(y_true, y_pred, normalize=self.normalize_confusion_matrix).ravel() return tp def _true_negative_realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat tn, _, _, _ = confusion_matrix(y_true, y_pred, normalize=self.normalize_confusion_matrix).ravel() return tn def _false_positive_realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat _, fp, _, _ = confusion_matrix(y_true, y_pred, normalize=self.normalize_confusion_matrix).ravel() return fp def _false_negative_realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat _, _, fn, _ = confusion_matrix(y_true, y_pred, normalize=self.normalize_confusion_matrix).ravel() return fn
[docs] def get_true_positive_estimate(self, chunk_data: pd.DataFrame) -> float: """Estimates the true positive rate for a given chunk of data. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- normalized_est_tp_ratio : float Estimated true positive rate. """ try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=chunk_data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0)) est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0)) est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0)) if self.normalize_confusion_matrix is None: normalized_est_tp_ratio = est_tp_ratio * len(y_pred) elif self.normalize_confusion_matrix == 'all': normalized_est_tp_ratio = est_tp_ratio elif self.normalize_confusion_matrix == 'true': normalizer = 1 / (est_tp_ratio + est_fn_ratio) normalized_est_tp_ratio = est_tp_ratio * normalizer elif self.normalize_confusion_matrix == 'pred': normalizer = 1 / (est_tp_ratio + est_fp_ratio) normalized_est_tp_ratio = est_tp_ratio * normalizer else: raise InvalidArgumentsException( f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' " f"but got '{self.normalize_confusion_matrix}" ) return normalized_est_tp_ratio
[docs] def get_true_negative_estimate(self, chunk_data: pd.DataFrame) -> float: """Estimates the true negative rate for a given chunk of data. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- normalized_est_tn_ratio : float Estimated true negative rate. """ try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=chunk_data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0)) est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0)) est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0)) if self.normalize_confusion_matrix is None: normalized_est_tn_ratio = est_tn_ratio * len(y_pred) elif self.normalize_confusion_matrix == 'all': normalized_est_tn_ratio = est_tn_ratio elif self.normalize_confusion_matrix == 'true': normalizer = 1 / (est_tn_ratio + est_fp_ratio) normalized_est_tn_ratio = est_tn_ratio * normalizer elif self.normalize_confusion_matrix == 'pred': normalizer = 1 / (est_tn_ratio + est_fn_ratio) normalized_est_tn_ratio = est_tn_ratio * normalizer else: raise InvalidArgumentsException( f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' " f"but got '{self.normalize_confusion_matrix}" ) return normalized_est_tn_ratio
[docs] def get_false_positive_estimate(self, chunk_data: pd.DataFrame) -> float: """Estimates the false positive rate for a given chunk of data. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- normalized_est_fp_ratio : float Estimated false positive rate. """ try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=chunk_data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0)) est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0)) est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0)) if self.normalize_confusion_matrix is None: normalized_est_fp_ratio = est_fp_ratio * len(y_pred) elif self.normalize_confusion_matrix == 'all': normalized_est_fp_ratio = est_fp_ratio elif self.normalize_confusion_matrix == 'true': normalizer = 1 / (est_tn_ratio + est_fp_ratio) normalized_est_fp_ratio = est_fp_ratio * normalizer elif self.normalize_confusion_matrix == 'pred': normalizer = 1 / (est_tp_ratio + est_fp_ratio) normalized_est_fp_ratio = est_fp_ratio * normalizer else: raise InvalidArgumentsException( f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' " f"but got '{self.normalize_confusion_matrix}" ) return normalized_est_fp_ratio
[docs] def get_false_negative_estimate(self, chunk_data: pd.DataFrame) -> float: """Estimates the false negative rate for a given chunk of data. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- normalized_est_fn_ratio : float Estimated false negative rate. """ try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=chunk_data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0)) est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0)) est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0)) if self.normalize_confusion_matrix is None: normalized_est_fn_ratio = est_fn_ratio * len(y_pred) elif self.normalize_confusion_matrix == 'all': normalized_est_fn_ratio = est_fn_ratio elif self.normalize_confusion_matrix == 'true': normalizer = 1 / (est_tp_ratio + est_fn_ratio) normalized_est_fn_ratio = est_fn_ratio * normalizer elif self.normalize_confusion_matrix == 'pred': normalizer = 1 / (est_tn_ratio + est_fn_ratio) normalized_est_fn_ratio = est_fn_ratio * normalizer else: raise InvalidArgumentsException( f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' " f"but got '{self.normalize_confusion_matrix}" ) return normalized_est_fn_ratio
[docs] def get_true_pos_info(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing infomation about the true positives for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- true_pos_info : Dict A dictionary of true positive's information and its value pairs. """ true_pos_info: Dict[str, Any] = {} estimated_true_positives = self.get_true_positive_estimate(chunk_data) sampling_error_true_positives = bse.true_positive_sampling_error( self._true_positive_sampling_error_components, chunk_data ) true_pos_info['estimated_true_positive'] = estimated_true_positives true_pos_info['sampling_error_true_positive'] = sampling_error_true_positives true_pos_info['realized_true_positive'] = self._true_positive_realized_performance(chunk_data) true_pos_info['upper_confidence_boundary_true_positive'] = np.minimum( np.inf if self.upper_threshold_value_limit is None else self.upper_threshold_value_limit, estimated_true_positives + SAMPLING_ERROR_RANGE * sampling_error_true_positives, ) true_pos_info['lower_confidence_boundary_true_positive'] = np.maximum( -np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit, estimated_true_positives - SAMPLING_ERROR_RANGE * sampling_error_true_positives, ) true_pos_info['upper_threshold_true_positive'] = self.true_positive_upper_threshold true_pos_info['lower_threshold_true_positive'] = self.true_positive_lower_threshold true_pos_info['alert_true_positive'] = ( self.true_positive_upper_threshold is not None and estimated_true_positives > self.true_positive_upper_threshold ) or ( self.true_positive_lower_threshold is not None and estimated_true_positives < self.true_positive_lower_threshold ) return true_pos_info
[docs] def get_true_neg_info(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing infomation about the true negatives for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- true_neg_info : Dict A dictionary of true negative's information and its value pairs. """ true_neg_info: Dict[str, Any] = {} estimated_true_negatives = self.get_true_negative_estimate(chunk_data) sampling_error_true_negatives = bse.true_negative_sampling_error( self._true_negative_sampling_error_components, chunk_data ) true_neg_info['estimated_true_negative'] = estimated_true_negatives true_neg_info['sampling_error_true_negative'] = sampling_error_true_negatives true_neg_info['realized_true_negative'] = self._true_negative_realized_performance(chunk_data) true_neg_info['upper_confidence_boundary_true_negative'] = np.minimum( np.inf if self.upper_threshold_value_limit is None else self.upper_threshold_value_limit, estimated_true_negatives + SAMPLING_ERROR_RANGE * sampling_error_true_negatives, ) true_neg_info['lower_confidence_boundary_true_negative'] = np.maximum( -np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit, estimated_true_negatives - SAMPLING_ERROR_RANGE * sampling_error_true_negatives, ) true_neg_info['upper_threshold_true_negative'] = self.true_negative_upper_threshold true_neg_info['lower_threshold_true_negative'] = self.true_negative_lower_threshold true_neg_info['alert_true_negative'] = ( self.true_negative_upper_threshold is not None and estimated_true_negatives > self.true_negative_upper_threshold ) or ( self.true_negative_lower_threshold is not None and estimated_true_negatives < self.true_negative_lower_threshold ) return true_neg_info
[docs] def get_false_pos_info(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing infomation about the false positives for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- false_pos_info : Dict A dictionary of false positive's information and its value pairs. """ false_pos_info: Dict[str, Any] = {} estimated_false_positives = self.get_false_positive_estimate(chunk_data) sampling_error_false_positives = bse.false_positive_sampling_error( self._false_positive_sampling_error_components, chunk_data ) false_pos_info['estimated_false_positive'] = estimated_false_positives false_pos_info['sampling_error_false_positive'] = sampling_error_false_positives false_pos_info['realized_false_positive'] = self._false_positive_realized_performance(chunk_data) false_pos_info['upper_confidence_boundary_false_positive'] = np.minimum( np.inf if self.upper_threshold_value_limit is None else self.upper_threshold_value_limit, estimated_false_positives + SAMPLING_ERROR_RANGE * sampling_error_false_positives, ) false_pos_info['lower_confidence_boundary_false_positive'] = np.maximum( -np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit, estimated_false_positives - SAMPLING_ERROR_RANGE * sampling_error_false_positives, ) false_pos_info['upper_threshold_false_positive'] = self.false_positive_upper_threshold false_pos_info['lower_threshold_false_positive'] = self.false_positive_lower_threshold false_pos_info['alert_false_positive'] = ( self.false_positive_upper_threshold is not None and estimated_false_positives > self.false_positive_upper_threshold ) or ( self.false_positive_lower_threshold is not None and estimated_false_positives < self.false_positive_lower_threshold ) return false_pos_info
[docs] def get_false_neg_info(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing infomation about the false negatives for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- false_neg_info : Dict A dictionary of false negative's information and its value pairs. """ false_neg_info: Dict[str, Any] = {} estimated_false_negatives = self.get_false_negative_estimate(chunk_data) sampling_error_false_negatives = bse.false_negative_sampling_error( self._false_negative_sampling_error_components, chunk_data ) false_neg_info['estimated_false_negative'] = estimated_false_negatives false_neg_info['sampling_error_false_negative'] = sampling_error_false_negatives false_neg_info['realized_false_negative'] = self._false_negative_realized_performance(chunk_data) false_neg_info['upper_confidence_boundary_false_negative'] = np.minimum( np.inf if self.upper_threshold_value_limit is None else self.upper_threshold_value_limit, estimated_false_negatives + SAMPLING_ERROR_RANGE * sampling_error_false_negatives, ) false_neg_info['lower_confidence_boundary_false_negative'] = np.maximum( -np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit, estimated_false_negatives - SAMPLING_ERROR_RANGE * sampling_error_false_negatives, ) false_neg_info['upper_threshold_false_negative'] = self.false_negative_upper_threshold false_neg_info['lower_threshold_false_negative'] = self.false_negative_lower_threshold false_neg_info['alert_false_negative'] = ( self.false_negative_upper_threshold is not None and estimated_false_negatives > self.false_negative_upper_threshold ) or ( self.false_negative_lower_threshold is not None and estimated_false_negatives < self.false_negative_lower_threshold ) return false_neg_info
[docs] def get_chunk_record(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing the performance metrics for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- chunk_record : Dict A dictionary of perfomance metric, value pairs. """ chunk_record = {} true_pos_info = self.get_true_pos_info(chunk_data) chunk_record.update(true_pos_info) true_neg_info = self.get_true_neg_info(chunk_data) chunk_record.update(true_neg_info) false_pos_info = self.get_false_pos_info(chunk_data) chunk_record.update(false_pos_info) false_neg_info = self.get_false_neg_info(chunk_data) chunk_record.update(false_neg_info) return chunk_record
def _estimate(self, data: pd.DataFrame): pass def _sampling_error(self, data: pd.DataFrame) -> float: return 0.0 def _realized_performance(self, data: pd.DataFrame) -> float: return 0.0
[docs]@MetricFactory.register('business_value', ProblemType.CLASSIFICATION_BINARY) class BinaryClassificationBusinessValue(Metric): """CBPE binary classification business value Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, business_value_matrix: Union[List, np.ndarray], normalize_business_value: Optional[str] = None, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE binary classification business value Metric Class.""" super().__init__( name='business_value', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Business Value', 'business_value')], ) if business_value_matrix is None: raise ValueError("business_value_matrix must be provided for 'business_value' metric") if not (isinstance(business_value_matrix, np.ndarray) or isinstance(business_value_matrix, list)): raise ValueError( f"business_value_matrix must be a numpy array or a list, but got {type(business_value_matrix)}" ) if isinstance(business_value_matrix, list): business_value_matrix = np.array(business_value_matrix) if business_value_matrix.shape != (2, 2): raise ValueError( f"business_value_matrix must have shape (2,2), but got matrix of shape {business_value_matrix.shape}" ) self.business_value_matrix = business_value_matrix self.normalize_business_value: Optional[str] = normalize_business_value # self.lower_threshold: Optional[float] = 0 # self.upper_threshold: Optional[float] = 1 def _fit(self, reference_data: pd.DataFrame): self._sampling_error_components = bse.business_value_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], business_value_matrix=self.business_value_matrix, normalize_business_value=self.normalize_business_value, ) def _realized_performance(self, data: pd.DataFrame) -> float: try: _dat, _empty = self._common_cleaning(data=data, selected_columns=[self.y_pred, self.y_true]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred, y_true = _dat tp_value = self.business_value_matrix[1, 1] tn_value = self.business_value_matrix[0, 0] fp_value = self.business_value_matrix[0, 1] fn_value = self.business_value_matrix[1, 0] bv_array = np.array([[tn_value, fp_value], [fn_value, tp_value]]) cm = confusion_matrix(y_true, y_pred) if self.normalize_business_value == 'per_prediction': with np.errstate(all="ignore"): cm = cm / cm.sum(axis=0, keepdims=True) cm = np.nan_to_num(cm) return (bv_array * cm).sum() def _estimate(self, chunk_data: pd.DataFrame) -> float: try: assert isinstance(self.y_pred_proba, str) # because of binary classification _dat, _empty = self._common_cleaning(data=chunk_data, selected_columns=[self.y_pred_proba, self.y_pred]) except InvalidArgumentsException as ex: if "not all present in provided data columns" in str(ex): self._logger.debug(str(ex)) return np.NaN else: raise ex if _empty: self._logger.debug(f"Not enough data to compute estimated {self.display_name}.") warnings.warn(f"Not enough data to compute estimated {self.display_name}.") return np.NaN y_pred_proba, y_pred = _dat business_value_normalization = self.normalize_business_value business_value_matrix = self.business_value_matrix return estimate_business_value(y_pred, y_pred_proba, business_value_normalization, business_value_matrix) def _sampling_error(self, data: pd.DataFrame) -> float: return bse.business_value_sampling_error( self._sampling_error_components, data, )
[docs]def estimate_business_value( y_pred: np.ndarray, y_pred_proba: np.ndarray, normalize_business_value: Optional[str], business_value_matrix: np.ndarray, ) -> float: """Estimates the Business Value metric. Parameters ---------- y_pred: np.ndarray Predicted class labels of the sample y_pred_proba: np.ndarray Probability estimates of the sample for each class in the model. normalize_business_value: str, default=None Determines how the business value will be normalized. Allowed values are None and 'per_prediction'. - None - the business value will not be normalized and the value returned will be the total value per chunk. - 'per_prediction' - the value will be normalized by the number of predictions in the chunk. business_value_matrix: np.ndarray A 2x2 matrix that specifies the value of each cell in the confusion matrix. The format of the business value matrix must be specified as [[value_of_TN, value_of_FP], \ [value_of_FN, value_of_TP]]. Returns ------- business_value: float Estimated Business Value score. """ est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0)) est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0)) est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0)) est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0)) cm = np.array([[est_tn_ratio, est_fp_ratio], [est_fn_ratio, est_tp_ratio]]) * len(y_pred) if normalize_business_value == 'per_prediction': with np.errstate(all="ignore"): cm = cm / cm.sum(axis=0, keepdims=True) cm = np.nan_to_num(cm) tp_value = business_value_matrix[1, 1] tn_value = business_value_matrix[0, 0] fp_value = business_value_matrix[0, 1] fn_value = business_value_matrix[1, 0] bv_array = np.array([[tn_value, fp_value], [fn_value, tp_value]]) return (bv_array * cm).sum()
def _get_binarized_multiclass_predictions(data: pd.DataFrame, y_pred: str, y_pred_proba: ModelOutputsType): if not isinstance(y_pred_proba, dict): raise CalculatorException( "multiclass model outputs should be of type Dict[str, str].\n" f"'{y_pred_proba}' is of type '{type(y_pred_proba)}'" ) classes = sorted(y_pred_proba.keys()) y_preds = list(label_binarize(data[y_pred], classes=classes).T) y_pred_probas = [data[y_pred_proba[clazz]] for clazz in classes] return y_preds, y_pred_probas, classes def _get_multiclass_uncalibrated_predictions(data: pd.DataFrame, y_pred: str, y_pred_proba: ModelOutputsType): if not isinstance(y_pred_proba, dict): raise CalculatorException( "multiclass model outputs should be of type Dict[str, str].\n" f"'{y_pred_proba}' is of type '{type(y_pred_proba)}'" ) labels, class_probability_columns = [], [] for label in sorted(y_pred_proba.keys()): labels.append(label) class_probability_columns.append(f'uncalibrated_{y_pred_proba[label]}') return data[y_pred], data[class_probability_columns], labels class _MulticlassClassificationMetric(Metric): """Base class for multiclass classification metrics.""" def _ensure_targets(self, data: pd.DataFrame) -> Optional[pd.DataFrame]: """Ensures that the data contains the target column and that it doesn't contain all NaNs. Any rows in the input where the target is NaN are dropped. """ if self.y_true not in data.columns: return None na = data[self.y_true].isna() if na.all(): return None else: return data[~na]
[docs]@MetricFactory.register('roc_auc', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationAUROC(_MulticlassClassificationMetric): """CBPE multiclass classification AUROC Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification AUROC Metric Class.""" super().__init__( name='roc_auc', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('ROC AUC', 'roc_auc')], ) # FIXME: Should we check the y_pred_proba argument here to ensure it's a dict? self.y_pred_proba: Dict[str, str] # sampling error self._sampling_error_components: List[Tuple] = [] def _fit(self, reference_data: pd.DataFrame): classes = class_labels(self.y_pred_proba) binarized_y_true = list(label_binarize(reference_data[self.y_true], classes=classes).T) y_pred_proba = [reference_data[self.y_pred_proba[clazz]].T for clazz in classes] self._sampling_error_components = mse.auroc_sampling_error_components( y_true_reference=binarized_y_true, y_pred_proba_reference=y_pred_proba ) def _estimate(self, data: pd.DataFrame): _, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba) _, y_pred_probas_uncalibrated, _ = _get_multiclass_uncalibrated_predictions( data, self.y_pred, self.y_pred_proba ) ovr_estimates = [] for el in range(len(y_pred_probas)): ovr_estimates.append( estimate_roc_auc( # sorting according to classes is/should_be the same across # _get_binarized_multiclass_predictions and _get_multiclass_uncalibrated_predictions y_pred_probas[el], y_pred_probas_uncalibrated.iloc[:, el], ) ) multiclass_roc_auc = np.mean(ovr_estimates) return multiclass_roc_auc def _sampling_error(self, data: pd.DataFrame) -> float: return mse.auroc_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: data = self._ensure_targets(data) if data is None: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized ROC-AUC.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized ROC-AUC.") return np.NaN _, y_pred_probas, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba) return roc_auc_score(data[self.y_true], y_pred_probas, multi_class='ovr', average='macro', labels=labels)
[docs]@MetricFactory.register('f1', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationF1(_MulticlassClassificationMetric): """CBPE multiclass classification f1 Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification f1 Metric Class.""" super().__init__( name='f1', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('F1', 'f1')], ) # sampling error: self._sampling_error_components: List[Tuple] = [] def _fit(self, reference_data: pd.DataFrame): label_binarizer = LabelBinarizer() binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T) binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T) self._sampling_error_components = mse.f1_sampling_error_components( y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred ) def _estimate(self, data: pd.DataFrame): y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba) ovr_estimates = [] for y_pred, y_pred_proba in zip(y_preds, y_pred_probas): ovr_estimates.append(estimate_f1(y_pred, y_pred_proba)) multiclass_metric = np.mean(ovr_estimates) return multiclass_metric def _sampling_error(self, data: pd.DataFrame) -> float: return mse.f1_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: data = self._ensure_targets(data) if data is None: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized F1 score.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized F1 score.") return np.NaN if data[self.y_pred].nunique() <= 1: warnings.warn("Too few unique values present in 'y_pred', returning NaN as realized F1 score.") return np.NaN y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba) return f1_score(y_true=data[self.y_true], y_pred=y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register('precision', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationPrecision(_MulticlassClassificationMetric): """CBPE multiclass classification precision Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification precision Metric Class.""" super().__init__( name='precision', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Precision', 'precision')], ) # sampling error self._sampling_error_components: List[Tuple] = [] def _fit(self, reference_data: pd.DataFrame): label_binarizer = LabelBinarizer() binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T) binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T) self._sampling_error_components = mse.precision_sampling_error_components( y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred ) def _estimate(self, data: pd.DataFrame): y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba) ovr_estimates = [] for y_pred, y_pred_proba in zip(y_preds, y_pred_probas): ovr_estimates.append(estimate_precision(y_pred, y_pred_proba)) multiclass_metric = np.mean(ovr_estimates) return multiclass_metric def _sampling_error(self, data: pd.DataFrame) -> float: return mse.precision_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: data = self._ensure_targets(data) if data is None: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized precision.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized precision.") return np.NaN if data[self.y_pred].nunique() <= 1: warnings.warn("Too few unique values present in 'y_pred', returning NaN as realized precision.") return np.NaN y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba) return precision_score(y_true=data[self.y_true], y_pred=y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register('recall', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationRecall(_MulticlassClassificationMetric): """CBPE multiclass classification recall Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification recall Metric Class.""" super().__init__( name='recall', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Recall', 'recall')], ) # sampling error self._sampling_error_components: List[Tuple] = [] def _fit(self, reference_data: pd.DataFrame): label_binarizer = LabelBinarizer() binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T) binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T) self._sampling_error_components = mse.recall_sampling_error_components( y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred ) def _estimate(self, data: pd.DataFrame): y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba) ovr_estimates = [] for y_pred, y_pred_proba in zip(y_preds, y_pred_probas): ovr_estimates.append(estimate_recall(y_pred, y_pred_proba)) multiclass_metric = np.mean(ovr_estimates) return multiclass_metric def _sampling_error(self, data: pd.DataFrame) -> float: return mse.recall_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: data = self._ensure_targets(data) if data is None: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized recall.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized recall.") return np.NaN if data[self.y_pred].nunique() <= 1: warnings.warn("Too few unique values present in 'y_pred', returning NaN as realized recall.") return np.NaN y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba) return recall_score(y_true=data[self.y_true], y_pred=y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register('specificity', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationSpecificity(_MulticlassClassificationMetric): """CBPE multiclass classification specificity Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification specificity Metric Class.""" super().__init__( name='specificity', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Specificity', 'specificity')], ) # sampling error self._sampling_error_components: List[Tuple] = [] def _fit(self, reference_data: pd.DataFrame): label_binarizer = LabelBinarizer() binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T) binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T) self._sampling_error_components = mse.specificity_sampling_error_components( y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred ) def _estimate(self, data: pd.DataFrame): y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba) ovr_estimates = [] for y_pred, y_pred_proba in zip(y_preds, y_pred_probas): ovr_estimates.append(estimate_specificity(y_pred, y_pred_proba)) multiclass_metric = np.mean(ovr_estimates) return multiclass_metric def _sampling_error(self, data: pd.DataFrame) -> float: return mse.specificity_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: data = self._ensure_targets(data) if data is None: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized specificity.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized specificity.") return np.NaN if data[self.y_pred].nunique() <= 1: warnings.warn("Too few unique values present in 'y_pred', returning NaN as realized specificity.") return np.NaN y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba) mcm = multilabel_confusion_matrix(data[self.y_true], y_pred, labels=labels) tn_sum = mcm[:, 0, 0] fp_sum = mcm[:, 0, 1] class_wise_specificity = tn_sum / (tn_sum + fp_sum) return np.mean(class_wise_specificity)
[docs]@MetricFactory.register('accuracy', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationAccuracy(_MulticlassClassificationMetric): """CBPE multiclass classification accuracy Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification accuracy Metric Class.""" super().__init__( name='accuracy', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=[('Accuracy', 'accuracy')], ) # sampling error self._sampling_error_components: Tuple = () def _fit(self, reference_data: pd.DataFrame): label_binarizer = LabelBinarizer() binarized_y_true = label_binarizer.fit_transform(reference_data[self.y_true]) binarized_y_pred = label_binarizer.transform(reference_data[self.y_pred]) self._sampling_error_components = mse.accuracy_sampling_error_components( y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred ) def _estimate(self, data: pd.DataFrame): y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba) y_preds_array = np.asarray(y_preds).T y_pred_probas_array = np.asarray(y_pred_probas).T probability_of_predicted = np.max(y_preds_array * y_pred_probas_array, axis=1) return np.mean(probability_of_predicted) def _sampling_error(self, data: pd.DataFrame) -> float: return mse.accuracy_sampling_error(self._sampling_error_components, data) def _realized_performance(self, data: pd.DataFrame) -> float: data = self._ensure_targets(data) if data is None: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized accuracy.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized accuracy.") return np.NaN if data[self.y_pred].nunique() <= 1: warnings.warn("Too few unique values present in 'y_pred', returning NaN as realized accuracy.") return np.NaN y_pred, _, _ = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba) return accuracy_score(data[self.y_true], y_pred)
[docs]@MetricFactory.register('confusion_matrix', ProblemType.CLASSIFICATION_MULTICLASS) class MulticlassClassificationConfusionMatrix(Metric): """CBPE multiclass classification confusion matrix Metric Class.""" def __init__( self, y_pred_proba: ModelOutputsType, y_pred: str, y_true: str, chunker: Chunker, threshold: Threshold, timestamp_column_name: Optional[str] = None, normalize_confusion_matrix: Optional[str] = None, **kwargs, ): """Initialize CBPE multiclass classification confusion matrix Metric Class.""" if isinstance(y_pred_proba, str): raise ValueError( "y_pred_proba must be a dictionary with class labels as keys and pred_proba column names as values" ) self.classes = list(y_pred_proba.keys()) super().__init__( name='confusion_matrix', y_pred_proba=y_pred_proba, y_pred=y_pred, y_true=y_true, timestamp_column_name=timestamp_column_name, chunker=chunker, threshold=threshold, components=self._get_components(self.classes), lower_threshold_value_limit=0, ) self.normalize_confusion_matrix: Optional[str] = normalize_confusion_matrix if self.normalize_confusion_matrix is None: # overwrite default upper bound setting. self.upper_threshold_value_limit = None else: self.upper_threshold_value_limit = 1 def _get_components(self, classes: List[str]) -> List[Tuple[str, str]]: components = [] for true_class in classes: for pred_class in classes: components.append( ( f"true class: '{true_class}', predicted class: '{pred_class}'", f'true_{true_class}_pred_{pred_class}', ) ) return components
[docs] def fit(self, reference_data: pd.DataFrame): # override the superclass fit method """Fits a Metric on reference data. Parameters ---------- reference_data: pd.DataFrame The reference data used for fitting. Must have target data available. """ # Calculate alert thresholds reference_chunks = self.chunker.split( reference_data, ) self.alert_thresholds = self._multiclass_confusion_matrix_alert_thresholds(reference_chunks) # Delegate to confusion matrix subclass self._fit(reference_data) # could probably put _fit functionality here since overide fit method return
def _fit(self, reference_data: pd.DataFrame): self._confusion_matrix_sampling_error_components = mse.multiclass_confusion_matrix_sampling_error_components( y_true_reference=reference_data[self.y_true], y_pred_reference=reference_data[self.y_pred], normalize_confusion_matrix=self.normalize_confusion_matrix, ) def _multiclass_confusion_matrix_alert_thresholds( self, reference_chunks: List[Chunk] ) -> Dict[str, Tuple[Optional[float], Optional[float]]]: realized_chunk_performance = np.asarray( [self._multi_class_confusion_matrix_realized_performance(chunk.data) for chunk in reference_chunks] ) alert_thresholds = {} num_classes = len(self.classes) for i in range(num_classes): for j in range(num_classes): lower_threshold_value, upper_threshold_value = calculate_threshold_values( threshold=self.threshold, data=realized_chunk_performance[:, i, j], lower_threshold_value_limit=self.lower_threshold_value_limit, upper_threshold_value_limit=self.upper_threshold_value_limit, ) alert_thresholds[f'true_{self.classes[i]}_pred_{self.classes[j]}'] = ( lower_threshold_value, upper_threshold_value, ) return alert_thresholds def _multi_class_confusion_matrix_realized_performance(self, data: pd.DataFrame) -> Union[np.ndarray, float]: if data is None or self.y_true not in data.columns: warnings.warn("No 'y_true' values given for chunk, returning NaN as realized precision.") return np.NaN if data[self.y_true].nunique() <= 1: warnings.warn("Too few unique values present in 'y_true', returning NaN as realized precision.") return np.NaN if data[self.y_pred].nunique() <= 1: warnings.warn("Too few unique values present in 'y_pred', returning NaN as realized precision.") return np.NaN cm = confusion_matrix( data[self.y_true], data[self.y_pred], labels=self.classes, normalize=self.normalize_confusion_matrix ) return cm def _get_multiclass_confusion_matrix_estimate(self, chunk_data: pd.DataFrame) -> np.ndarray: if isinstance(self.y_pred_proba, str): raise ValueError( "y_pred_proba must be a dictionary with class labels as keys and pred_proba column names as values" ) y_pred_proba = {key: chunk_data[value] for key, value in self.y_pred_proba.items()} y_pred = chunk_data[self.y_pred] num_classes = len(self.classes) est_confusion_matrix = np.zeros((num_classes, num_classes)) for i in range(num_classes): for j in range(num_classes): est_confusion_matrix[i, j] = np.mean( np.where( (y_pred == self.classes[j]), y_pred_proba[self.classes[i]], 0, ) ) if self.normalize_confusion_matrix is None: normalized_est_confusion_matrix = est_confusion_matrix * len(y_pred) elif self.normalize_confusion_matrix == 'true': normalized_est_confusion_matrix = est_confusion_matrix / np.sum(est_confusion_matrix, axis=1)[:, None] elif self.normalize_confusion_matrix == 'pred': normalized_est_confusion_matrix = est_confusion_matrix / np.sum(est_confusion_matrix, axis=0)[None, :] elif self.normalize_confusion_matrix == 'all': normalized_est_confusion_matrix = est_confusion_matrix / np.sum(est_confusion_matrix) else: raise ValueError( f'normalize_confusion_matrix should be one of None, "true", \ "pred", or "all", but got {self.normalize_confusion_matrix}' ) return normalized_est_confusion_matrix
[docs] def get_chunk_record(self, chunk_data: pd.DataFrame) -> Dict: """Returns a dictionary containing the performance metrics for a given chunk. Parameters ---------- chunk_data : pd.DataFrame A pandas dataframe containing the data for a given chunk. Returns ------- chunk_record : Dict A dictionary of perfomance metric, value pairs. """ chunk_record = {} estimated_cm = self._get_multiclass_confusion_matrix_estimate(chunk_data) realized_cm = self._multi_class_confusion_matrix_realized_performance(chunk_data) sampling_error = mse.multiclass_confusion_matrix_sampling_error( self._confusion_matrix_sampling_error_components, chunk_data, ) for true_class in self.classes: for pred_class in self.classes: chunk_record[f'estimated_true_{true_class}_pred_{pred_class}'] = estimated_cm[ self.classes.index(true_class), self.classes.index(pred_class) ] chunk_record[f'sampling_error_true_{true_class}_pred_{pred_class}'] = sampling_error[ self.classes.index(true_class), self.classes.index(pred_class) ] # check if realized_cm is nan if isinstance(realized_cm, np.ndarray): chunk_record[f'realized_true_{true_class}_pred_{pred_class}'] = realized_cm[ self.classes.index(true_class), self.classes.index(pred_class) ] else: chunk_record[f'realized_true_{true_class}_pred_{pred_class}'] = realized_cm upper_confidence_boundary = ( estimated_cm[self.classes.index(true_class), self.classes.index(pred_class)] + SAMPLING_ERROR_RANGE * sampling_error[self.classes.index(true_class), self.classes.index(pred_class)] ) chunk_record[f'upper_confidence_boundary_true_{true_class}_pred_{pred_class}'] = min( np.inf if self.upper_threshold_value_limit is None else self.upper_threshold_value_limit, upper_confidence_boundary, ) lower_confidence_boundary = ( estimated_cm[self.classes.index(true_class), self.classes.index(pred_class)] - SAMPLING_ERROR_RANGE * sampling_error[self.classes.index(true_class), self.classes.index(pred_class)] ) chunk_record[f'lower_confidence_boundary_true_{true_class}_pred_{pred_class}'] = max( -np.inf if self.lower_threshold_value_limit is None else self.lower_threshold_value_limit, lower_confidence_boundary, ) chunk_record[f'upper_threshold_true_{true_class}_pred_{pred_class}'] = self.alert_thresholds[ f'true_{true_class}_pred_{pred_class}' ][1] chunk_record[f'lower_threshold_true_{true_class}_pred_{pred_class}'] = self.alert_thresholds[ f'true_{true_class}_pred_{pred_class}' ][0] # do alerts chunk_record[f'alert_true_{true_class}_pred_{pred_class}'] = ( self.alert_thresholds is not None and ( estimated_cm[self.classes.index(true_class), self.classes.index(pred_class)] > self.alert_thresholds[f'true_{true_class}_pred_{pred_class}'][1] ) or ( self.alert_thresholds is not None and ( estimated_cm[self.classes.index(true_class), self.classes.index(pred_class)] < self.alert_thresholds[f'true_{true_class}_pred_{pred_class}'][0] ) ) ) return chunk_record
def _estimate(self, data: pd.DataFrame): pass def _sampling_error(self, data: pd.DataFrame) -> float: return 0.0 def _realized_performance(self, data: pd.DataFrame) -> float: return 0.0