import abc
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score,
auc,
confusion_matrix,
f1_score,
multilabel_confusion_matrix,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.preprocessing import LabelBinarizer, label_binarize
import nannyml.sampling_error.binary_classification as bse
import nannyml.sampling_error.multiclass_classification as mse
from nannyml._typing import ModelOutputsType, ProblemType, class_labels
from nannyml.chunk import Chunk, Chunker
from nannyml.exceptions import CalculatorException, InvalidArgumentsException
from nannyml.performance_estimation.confidence_based import SUPPORTED_METRIC_VALUES
from nannyml.sampling_error import SAMPLING_ERROR_RANGE
from nannyml.thresholds import Threshold, calculate_threshold_values
[docs]class Metric(abc.ABC):
"""A performance metric used to calculate realized model performance."""
def __init__(
self,
name: str,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
components: List[Tuple[str, str]],
timestamp_column_name: Optional[str] = None,
lower_threshold_value_limit: Optional[float] = None,
upper_threshold_value_limit: Optional[float] = None,
**kwargs,
):
"""Creates a new Metric instance.
Parameters
----------
name: str
The name used to indicate the metric in columns of a DataFrame.
"""
self.name = name
self.y_pred_proba = y_pred_proba
self.y_pred = y_pred
self.y_true = y_true
self.timestamp_column_name = timestamp_column_name
self.chunker = chunker
self.threshold = threshold
self.lower_threshold_value: Optional[float] = None
self.upper_threshold_value: Optional[float] = None
self.lower_threshold_value_limit: Optional[float] = lower_threshold_value_limit
self.upper_threshold_value_limit: Optional[float] = upper_threshold_value_limit
self.confidence_deviation: Optional[float] = None
self.uncalibrated_y_pred_proba = f'uncalibrated_{self.y_pred_proba}'
self.confidence_upper_bound: Optional[float] = 1.0
self.confidence_lower_bound: Optional[float] = 0.0
# A list of (display_name, column_name) tuples
self.components: List[Tuple[str, str]] = components
@property
def _logger(self) -> logging.Logger:
return logging.getLogger(__name__)
@property
def display_name(self) -> str:
return self.name
@property
def column_name(self) -> str:
return self.components[0][0]
@property
def display_names(self):
return [c[0] for c in self.components]
@property
def column_names(self):
return [c[1] for c in self.components]
def __str__(self):
return self.display_name
def __repr__(self):
return self.column_name
[docs] def fit(self, reference_data: pd.DataFrame):
"""Fits a Metric on reference data.
Parameters
----------
reference_data: pd.DataFrame
The reference data used for fitting. Must have target data available.
"""
# Delegate to subclass
self._fit(reference_data)
reference_chunks = self.chunker.split(reference_data)
# Calculate confidence bands
self.confidence_deviation = self._confidence_deviation(reference_chunks)
# Calculate alert thresholds
reference_chunk_results = np.asarray([self._realized_performance(chunk.data) for chunk in reference_chunks])
self.lower_threshold_value, self.upper_threshold_value = calculate_threshold_values(
threshold=self.threshold,
data=reference_chunk_results,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.display_name,
)
return
@abc.abstractmethod
def _fit(self, reference_data: pd.DataFrame):
raise NotImplementedError(
f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the _fit method"
)
@abc.abstractmethod
def _estimate(self, data: pd.DataFrame):
raise NotImplementedError(
f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the _estimate method"
)
@abc.abstractmethod
def _sampling_error(self, data: pd.DataFrame) -> float:
raise NotImplementedError(
f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the _sampling_error method"
)
def _confidence_deviation(self, reference_chunks: List[Chunk]):
return np.std([self._estimate(chunk.data) for chunk in reference_chunks])
@abc.abstractmethod
def _realized_performance(self, data: pd.DataFrame) -> float:
raise NotImplementedError(
f"'{self.__class__.__name__}' is a subclass of Metric and it must implement the realized_performance method"
)
[docs] def alert(self, value: float) -> bool:
return (self.lower_threshold_value is not None and value < self.lower_threshold_value) or (
self.upper_threshold_value is not None and value > self.upper_threshold_value
)
def __eq__(self, other):
return self.components == other.components
def _common_cleaning(
self, data: pd.DataFrame, y_pred_proba_column_name: Optional[str] = None
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
if y_pred_proba_column_name is None:
if not isinstance(self.y_pred_proba, str):
raise InvalidArgumentsException(
f"'y_pred_proba' is of type '{type(self.y_pred_proba)}'. "
f"Binary use cases require 'y_pred_proba' to be a string."
)
y_pred_proba_column_name = self.y_pred_proba
clean_targets = self.y_true in data.columns and not data[self.y_true].isna().all()
y_pred_proba = data[y_pred_proba_column_name]
y_pred = data[self.y_pred]
y_pred_proba.dropna(inplace=True)
if clean_targets:
y_true = data[self.y_true]
y_true = y_true[~y_pred_proba.isna()]
y_pred_proba = y_pred_proba[~y_true.isna()]
y_pred = y_pred[~y_true.isna()]
y_true.dropna(inplace=True)
else:
y_true = None
return y_pred_proba, y_pred, y_true
[docs] def get_chunk_record(self, chunk_data: pd.DataFrame) -> Dict:
if len(self.components) > 1:
raise NotImplementedError(
"cannot use default 'get_chunk_record' implementation when a metric has multiple components."
)
column_name = self.components[0][1]
chunk_record = {}
estimated_metric_value = self._estimate(chunk_data)
metric_estimate_sampling_error = self._sampling_error(chunk_data)
chunk_record[f'estimated_{column_name}'] = estimated_metric_value
chunk_record[f'sampling_error_{column_name}'] = metric_estimate_sampling_error
chunk_record[f'realized_{column_name}'] = self._realized_performance(chunk_data)
chunk_record[f'upper_confidence_boundary_{column_name}'] = np.minimum(
self.confidence_upper_bound or np.inf,
estimated_metric_value + SAMPLING_ERROR_RANGE * metric_estimate_sampling_error,
)
chunk_record[f'lower_confidence_boundary_{column_name}'] = np.maximum(
self.confidence_lower_bound or -np.inf,
estimated_metric_value - SAMPLING_ERROR_RANGE * metric_estimate_sampling_error,
)
chunk_record[f'upper_threshold_{column_name}'] = self.upper_threshold_value
chunk_record[f'lower_threshold_{column_name}'] = self.lower_threshold_value
chunk_record[f'alert_{column_name}'] = self.alert(estimated_metric_value)
return chunk_record
[docs]class MetricFactory:
"""A factory class that produces Metric instances based on a given magic string or a metric specification."""
registry: Dict[str, Dict[ProblemType, Type[Metric]]] = {}
@classmethod
def _logger(cls) -> logging.Logger:
return logging.getLogger(__name__)
[docs] @classmethod
def create(cls, key: str, use_case: ProblemType, **kwargs) -> Metric:
if kwargs is None:
kwargs = {}
"""Returns a Metric instance for a given key."""
if not isinstance(key, str):
raise InvalidArgumentsException(
f"cannot create metric given a '{type(key)}'" "Please provide a string, function or Metric"
)
if key not in cls.registry:
raise InvalidArgumentsException(
f"unknown metric key '{key}' given. " f"Should be one of {SUPPORTED_METRIC_VALUES}."
)
if use_case not in cls.registry[key]:
raise RuntimeError(
f"metric '{key}' is currently not supported for use case {use_case}. "
"Please specify another metric or use one of these supported model types for this metric: "
f"{[md for md in cls.registry[key]]}"
)
metric_class = cls.registry[key][use_case]
return metric_class(**kwargs)
[docs] @classmethod
def register(cls, metric: str, use_case: ProblemType) -> Callable:
def inner_wrapper(wrapped_class: Type[Metric]) -> Type[Metric]:
if metric in cls.registry:
if use_case in cls.registry[metric]:
cls._logger().warning(f"re-registering Metric for metric='{metric}' and use_case='{use_case}'")
cls.registry[metric][use_case] = wrapped_class
else:
cls.registry[metric] = {use_case: wrapped_class}
return wrapped_class
return inner_wrapper
[docs]@MetricFactory.register('roc_auc', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationAUROC(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='roc_auc',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('ROC AUC', 'roc_auc')],
lower_threshold_value_limit=0,
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.auroc_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_proba_reference=reference_data[self.y_pred_proba],
)
def _estimate(self, data: pd.DataFrame):
y_pred_proba = data[self.y_pred_proba]
return estimate_roc_auc(y_pred_proba)
def _realized_performance(self, data: pd.DataFrame) -> float:
y_pred_proba, _, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
return roc_auc_score(y_true, y_pred_proba)
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.auroc_sampling_error(self._sampling_error_components, data)
[docs]def estimate_roc_auc(y_pred_proba: pd.Series) -> float:
thresholds = np.sort(y_pred_proba)
one_min_thresholds = 1 - thresholds
TP = np.cumsum(thresholds[::-1])[::-1]
FP = np.cumsum(one_min_thresholds[::-1])[::-1]
thresholds_with_zero = np.insert(thresholds, 0, 0, axis=0)[:-1]
one_min_thresholds_with_zero = np.insert(one_min_thresholds, 0, 0, axis=0)[:-1]
FN = np.cumsum(thresholds_with_zero)
TN = np.cumsum(one_min_thresholds_with_zero)
non_duplicated_thresholds = np.diff(np.insert(thresholds, 0, -1, axis=0)).astype(bool)
TP = TP[non_duplicated_thresholds]
FP = FP[non_duplicated_thresholds]
FN = FN[non_duplicated_thresholds]
TN = TN[non_duplicated_thresholds]
tpr = TP / (TP + FN)
fpr = FP / (FP + TN)
metric = auc(fpr, tpr)
return metric
[docs]@MetricFactory.register('f1', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationF1(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='f1',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('F1', 'f1')],
lower_threshold_value_limit=0,
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.f1_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
)
def _estimate(self, data: pd.DataFrame):
y_pred_proba = data[self.y_pred_proba]
y_pred = data[self.y_pred]
return estimate_f1(y_pred, y_pred_proba)
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.f1_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
return f1_score(y_true=y_true, y_pred=y_pred)
[docs]def estimate_f1(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float:
tp = np.where(y_pred == 1, y_pred_proba, 0)
fp = np.where(y_pred == 1, 1 - y_pred_proba, 0)
fn = np.where(y_pred == 0, y_pred_proba, 0)
TP, FP, FN = np.sum(tp), np.sum(fp), np.sum(fn)
metric = TP / (TP + 0.5 * (FP + FN))
return metric
[docs]@MetricFactory.register('precision', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationPrecision(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='precision',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Precision', 'precision')],
lower_threshold_value_limit=0,
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.precision_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
)
pass
def _estimate(self, data: pd.DataFrame):
y_pred_proba = data[self.y_pred_proba]
y_pred = data[self.y_pred]
return estimate_precision(y_pred, y_pred_proba)
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.precision_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
return precision_score(y_true=y_true, y_pred=y_pred)
[docs]def estimate_precision(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float:
tp = np.where(y_pred == 1, y_pred_proba, 0)
fp = np.where(y_pred == 1, 1 - y_pred_proba, 0)
TP, FP = np.sum(tp), np.sum(fp)
metric = TP / (TP + FP)
return metric
[docs]@MetricFactory.register('recall', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationRecall(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='recall',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Recall', 'recall')],
lower_threshold_value_limit=0,
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.recall_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
)
def _estimate(self, data: pd.DataFrame):
y_pred_proba = data[self.y_pred_proba]
y_pred = data[self.y_pred]
return estimate_recall(y_pred, y_pred_proba)
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.recall_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
return recall_score(y_true=y_true, y_pred=y_pred)
[docs]def estimate_recall(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float:
tp = np.where(y_pred == 1, y_pred_proba, 0)
fn = np.where(y_pred == 0, y_pred_proba, 0)
TP, FN = np.sum(tp), np.sum(fn)
metric = TP / (TP + FN)
return metric
[docs]@MetricFactory.register('specificity', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationSpecificity(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='specificity',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Specificity', 'specificity')],
lower_threshold_value_limit=0,
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.specificity_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
)
def _estimate(self, data: pd.DataFrame):
y_pred_proba = data[self.y_pred_proba]
y_pred = data[self.y_pred]
return estimate_specificity(y_pred, y_pred_proba)
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.specificity_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
conf_matrix = confusion_matrix(y_true=y_true, y_pred=y_pred)
return conf_matrix[1, 1] / (conf_matrix[1, 0] + conf_matrix[1, 1])
[docs]def estimate_specificity(y_pred: pd.DataFrame, y_pred_proba: pd.DataFrame) -> float:
tn = np.where(y_pred == 0, 1 - y_pred_proba, 0)
fp = np.where(y_pred == 1, 1 - y_pred_proba, 0)
TN, FP = np.sum(tn), np.sum(fp)
metric = TN / (TN + FP)
return metric
[docs]@MetricFactory.register('accuracy', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationAccuracy(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='accuracy',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Accuracy', 'accuracy')],
lower_threshold_value_limit=0,
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.accuracy_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
)
def _estimate(self, data: pd.DataFrame):
y_pred_proba = data[self.y_pred_proba]
y_pred = data[self.y_pred]
tp = np.where(y_pred == 1, y_pred_proba, 0)
tn = np.where(y_pred == 0, 1 - y_pred_proba, 0)
TP, TN = np.sum(tp), np.sum(tn)
metric = (TP + TN) / len(y_pred)
return metric
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.accuracy_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
return accuracy_score(y_true=y_true, y_pred=y_pred)
[docs]@MetricFactory.register('confusion_matrix', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationConfusionMatrix(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
normalize_confusion_matrix: Optional[str] = None,
**kwargs,
):
super().__init__(
name='confusion_matrix',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[
('True Positive', 'true_positive'),
('True Negative', 'true_negative'),
('False Positive', 'false_positive'),
('False Negative', 'false_negative'),
],
lower_threshold_value_limit=0,
)
self.normalize_confusion_matrix: Optional[str] = normalize_confusion_matrix
self.true_positive_lower_threshold: Optional[float] = 0
self.true_positive_upper_threshold: Optional[float] = 1
self.true_negative_lower_threshold: Optional[float] = 0
self.true_negative_upper_threshold: Optional[float] = 1
[docs] def fit(self, reference_data: pd.DataFrame): # override the superclass fit method
"""Fits a Metric on reference data.
Parameters
----------
reference_data: pd.DataFrame
The reference data used for fitting. Must have target data available.
"""
# Calculate alert thresholds
reference_chunks = self.chunker.split(
reference_data,
)
self.true_positive_lower_threshold, self.true_positive_upper_threshold = self._true_positive_alert_thresholds(
reference_chunks
)
self.true_negative_lower_threshold, self.true_negative_upper_threshold = self._true_negative_alert_thresholds(
reference_chunks
)
(
self.false_positive_lower_threshold,
self.false_positive_upper_threshold,
) = self._false_positive_alert_thresholds(reference_chunks)
(
self.false_negative_lower_threshold,
self.false_negative_upper_threshold,
) = self._false_negative_alert_thresholds(reference_chunks)
# Delegate to confusion matrix subclass
self._fit(reference_data) # could probably put _fit functionality here since overide fit method
return
def _fit(self, reference_data: pd.DataFrame):
self._true_positive_sampling_error_components = bse.true_positive_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
normalize_confusion_matrix=self.normalize_confusion_matrix,
)
self._true_negative_sampling_error_components = bse.true_negative_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
normalize_confusion_matrix=self.normalize_confusion_matrix,
)
self._false_positive_sampling_error_components = bse.false_positive_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
normalize_confusion_matrix=self.normalize_confusion_matrix,
)
self._false_negative_sampling_error_components = bse.false_negative_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
normalize_confusion_matrix=self.normalize_confusion_matrix,
)
def _true_positive_alert_thresholds(self, reference_chunks: List[Chunk]) -> Tuple[Optional[float], Optional[float]]:
realized_chunk_performance = np.asarray(
[self._true_positive_realized_performance(chunk.data) for chunk in reference_chunks]
)
lower_threshold_value, upper_threshold_value = calculate_threshold_values(
threshold=self.threshold,
data=realized_chunk_performance,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.display_name,
)
return lower_threshold_value, upper_threshold_value
def _true_negative_alert_thresholds(self, reference_chunks: List[Chunk]) -> Tuple[Optional[float], Optional[float]]:
realized_chunk_performance = np.asarray(
[self._true_negative_realized_performance(chunk.data) for chunk in reference_chunks]
)
lower_threshold_value, upper_threshold_value = calculate_threshold_values(
threshold=self.threshold,
data=realized_chunk_performance,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.display_name,
)
return lower_threshold_value, upper_threshold_value
def _false_positive_alert_thresholds(
self, reference_chunks: List[Chunk]
) -> Tuple[Optional[float], Optional[float]]:
realized_chunk_performance = np.asarray(
[self._false_positive_realized_performance(chunk.data) for chunk in reference_chunks]
)
lower_threshold_value, upper_threshold_value = calculate_threshold_values(
threshold=self.threshold,
data=realized_chunk_performance,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.display_name,
)
return lower_threshold_value, upper_threshold_value
def _false_negative_alert_thresholds(
self, reference_chunks: List[Chunk]
) -> Tuple[Optional[float], Optional[float]]:
realized_chunk_performance = np.asarray(
[self._false_negative_realized_performance(chunk.data) for chunk in reference_chunks]
)
lower_threshold_value, upper_threshold_value = calculate_threshold_values(
threshold=self.threshold,
data=realized_chunk_performance,
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
logger=self._logger,
metric_name=self.display_name,
)
return lower_threshold_value, upper_threshold_value
def _true_positive_realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
num_tp = np.sum(np.logical_and(y_pred, y_true))
num_fp = np.sum(np.logical_and(y_pred, np.logical_not(y_true)))
num_fn = np.sum(np.logical_and(np.logical_not(y_pred), y_true))
if self.normalize_confusion_matrix is None:
return num_tp
elif self.normalize_confusion_matrix == 'true':
return num_tp / (num_tp + num_fn)
elif self.normalize_confusion_matrix == 'pred':
return num_tp / (num_tp + num_fp)
else: # normalization is 'all'
return num_tp / len(y_true)
def _true_negative_realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
num_tn = np.sum(np.logical_and(np.logical_not(y_pred), np.logical_not(y_true)))
num_fp = np.sum(np.logical_and(y_pred, np.logical_not(y_true)))
num_fn = np.sum(np.logical_and(np.logical_not(y_pred), y_true))
if self.normalize_confusion_matrix is None:
return num_tn
elif self.normalize_confusion_matrix == 'true':
return num_tn / (num_tn + num_fp)
elif self.normalize_confusion_matrix == 'pred':
return num_tn / (num_tn + num_fn)
else:
return num_tn / len(y_true)
def _false_positive_realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
num_tp = np.sum(np.logical_and(y_pred, y_true))
num_tn = np.sum(np.logical_and(np.logical_not(y_pred), np.logical_not(y_true)))
num_fp = np.sum(np.logical_and(y_pred, np.logical_not(y_true)))
if self.normalize_confusion_matrix is None:
return num_fp
elif self.normalize_confusion_matrix == 'true':
return num_fp / (num_fp + num_tn)
elif self.normalize_confusion_matrix == 'pred':
return num_fp / (num_fp + num_tp)
else:
return num_fp / len(y_true)
def _false_negative_realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
num_tp = np.sum(np.logical_and(y_pred, y_true))
num_tn = np.sum(np.logical_and(np.logical_not(y_pred), np.logical_not(y_true)))
num_fn = np.sum(np.logical_and(np.logical_not(y_pred), y_true))
if self.normalize_confusion_matrix is None:
return num_fn
elif self.normalize_confusion_matrix == 'true':
return num_fn / (num_fn + num_tp)
elif self.normalize_confusion_matrix == 'pred':
return num_fn / (num_fn + num_tn)
else:
return num_fn / len(y_true)
[docs] def get_true_positive_estimate(self, chunk_data: pd.DataFrame) -> float:
y_pred_proba = chunk_data[self.y_pred_proba]
y_pred = chunk_data[self.y_pred]
est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0))
est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0))
est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0))
if self.normalize_confusion_matrix is None:
normalized_est_tp_ratio = est_tp_ratio * len(y_pred)
elif self.normalize_confusion_matrix == 'all':
normalized_est_tp_ratio = est_tp_ratio
elif self.normalize_confusion_matrix == 'true':
normalizer = 1 / (est_tp_ratio + est_fn_ratio)
normalized_est_tp_ratio = est_tp_ratio * normalizer
elif self.normalize_confusion_matrix == 'pred':
normalizer = 1 / (est_tp_ratio + est_fp_ratio)
normalized_est_tp_ratio = est_tp_ratio * normalizer
else:
raise InvalidArgumentsException(
f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' "
f"but got '{self.normalize_confusion_matrix}"
)
return normalized_est_tp_ratio
[docs] def get_true_negative_estimate(self, chunk_data: pd.DataFrame) -> float:
y_pred_proba = chunk_data[self.y_pred_proba]
y_pred = chunk_data[self.y_pred]
est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0))
est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0))
est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0))
if self.normalize_confusion_matrix is None:
normalized_est_tn_ratio = est_tn_ratio * len(y_pred)
elif self.normalize_confusion_matrix == 'all':
normalized_est_tn_ratio = est_tn_ratio
elif self.normalize_confusion_matrix == 'true':
normalizer = 1 / (est_tn_ratio + est_fp_ratio)
normalized_est_tn_ratio = est_tn_ratio * normalizer
elif self.normalize_confusion_matrix == 'pred':
normalizer = 1 / (est_tn_ratio + est_fn_ratio)
normalized_est_tn_ratio = est_tn_ratio * normalizer
else:
raise InvalidArgumentsException(
f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' "
f"but got '{self.normalize_confusion_matrix}"
)
return normalized_est_tn_ratio
[docs] def get_false_positive_estimate(self, chunk_data: pd.DataFrame) -> float:
y_pred_proba = chunk_data[self.y_pred_proba]
y_pred = chunk_data[self.y_pred]
est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0))
est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0))
est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0))
if self.normalize_confusion_matrix is None:
normalized_est_fp_ratio = est_fp_ratio * len(y_pred)
elif self.normalize_confusion_matrix == 'all':
normalized_est_fp_ratio = est_fp_ratio
elif self.normalize_confusion_matrix == 'true':
normalizer = 1 / (est_tn_ratio + est_fp_ratio)
normalized_est_fp_ratio = est_fp_ratio * normalizer
elif self.normalize_confusion_matrix == 'pred':
normalizer = 1 / (est_tp_ratio + est_fp_ratio)
normalized_est_fp_ratio = est_fp_ratio * normalizer
else:
raise InvalidArgumentsException(
f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' "
f"but got '{self.normalize_confusion_matrix}"
)
return normalized_est_fp_ratio
[docs] def get_false_negative_estimate(self, chunk_data: pd.DataFrame) -> float:
y_pred_proba = chunk_data[self.y_pred_proba]
y_pred = chunk_data[self.y_pred]
est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0))
est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0))
est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0))
if self.normalize_confusion_matrix is None:
normalized_est_fn_ratio = est_fn_ratio * len(y_pred)
elif self.normalize_confusion_matrix == 'all':
normalized_est_fn_ratio = est_fn_ratio
elif self.normalize_confusion_matrix == 'true':
normalizer = 1 / (est_tp_ratio + est_fn_ratio)
normalized_est_fn_ratio = est_fn_ratio * normalizer
elif self.normalize_confusion_matrix == 'pred':
normalizer = 1 / (est_tn_ratio + est_fn_ratio)
normalized_est_fn_ratio = est_fn_ratio * normalizer
else:
raise InvalidArgumentsException(
f"'normalize_confusion_matrix' should be None, 'true', 'pred' or 'all' "
f"but got '{self.normalize_confusion_matrix}"
)
return normalized_est_fn_ratio
[docs] def get_true_pos_info(self, chunk_data: pd.DataFrame) -> Dict:
true_pos_info: Dict[str, Any] = {}
estimated_true_positives = self.get_true_positive_estimate(chunk_data)
sampling_error_true_positives = bse.true_positive_sampling_error(
self._true_positive_sampling_error_components, chunk_data
)
true_pos_info['estimated_true_positive'] = estimated_true_positives
true_pos_info['sampling_error_true_positive'] = sampling_error_true_positives
true_pos_info['realized_true_positive'] = self._true_positive_realized_performance(chunk_data)
if self.normalize_confusion_matrix is None:
true_pos_info['upper_confidence_boundary_true_positive'] = (
estimated_true_positives + SAMPLING_ERROR_RANGE * sampling_error_true_positives
)
else:
true_pos_info['upper_confidence_boundary_true_positive'] = np.minimum(
self.confidence_upper_bound,
estimated_true_positives + SAMPLING_ERROR_RANGE * sampling_error_true_positives,
)
true_pos_info['lower_confidence_boundary_true_positive'] = np.maximum(
self.confidence_lower_bound, estimated_true_positives - SAMPLING_ERROR_RANGE * sampling_error_true_positives
)
true_pos_info['upper_threshold_true_positive'] = self.true_positive_upper_threshold
true_pos_info['lower_threshold_true_positive'] = self.true_positive_lower_threshold
true_pos_info['alert_true_positive'] = (
self.true_positive_upper_threshold is not None
and estimated_true_positives > self.true_positive_upper_threshold
) or (
self.true_positive_lower_threshold is not None
and estimated_true_positives < self.true_positive_lower_threshold
)
return true_pos_info
[docs] def get_true_neg_info(self, chunk_data: pd.DataFrame) -> Dict:
true_neg_info: Dict[str, Any] = {}
estimated_true_negatives = self.get_true_negative_estimate(chunk_data)
sampling_error_true_negatives = bse.true_negative_sampling_error(
self._true_negative_sampling_error_components, chunk_data
)
true_neg_info['estimated_true_negative'] = estimated_true_negatives
true_neg_info['sampling_error_true_negative'] = sampling_error_true_negatives
true_neg_info['realized_true_negative'] = self._true_negative_realized_performance(chunk_data)
if self.normalize_confusion_matrix is None:
true_neg_info['upper_confidence_boundary_true_negative'] = (
estimated_true_negatives + SAMPLING_ERROR_RANGE * sampling_error_true_negatives
)
else:
true_neg_info['upper_confidence_boundary_true_negative'] = np.minimum(
self.confidence_upper_bound,
estimated_true_negatives + SAMPLING_ERROR_RANGE * sampling_error_true_negatives,
)
true_neg_info['lower_confidence_boundary_true_negative'] = np.maximum(
self.confidence_lower_bound, estimated_true_negatives - SAMPLING_ERROR_RANGE * sampling_error_true_negatives
)
true_neg_info['upper_threshold_true_negative'] = self.true_negative_upper_threshold
true_neg_info['lower_threshold_true_negative'] = self.true_negative_lower_threshold
true_neg_info['alert_true_negative'] = (
self.true_negative_upper_threshold is not None
and estimated_true_negatives > self.true_negative_upper_threshold
) or (
self.true_negative_lower_threshold is not None
and estimated_true_negatives < self.true_negative_lower_threshold
)
return true_neg_info
[docs] def get_false_pos_info(self, chunk_data: pd.DataFrame) -> Dict:
false_pos_info: Dict[str, Any] = {}
estimated_false_positives = self.get_false_positive_estimate(chunk_data)
sampling_error_false_positives = bse.false_positive_sampling_error(
self._false_positive_sampling_error_components, chunk_data
)
false_pos_info['estimated_false_positive'] = estimated_false_positives
false_pos_info['sampling_error_false_positive'] = sampling_error_false_positives
false_pos_info['realized_false_positive'] = self._false_positive_realized_performance(chunk_data)
if self.normalize_confusion_matrix is None:
false_pos_info['upper_confidence_boundary_false_positive'] = (
estimated_false_positives + SAMPLING_ERROR_RANGE * sampling_error_false_positives
)
else:
false_pos_info['upper_confidence_boundary_false_positive'] = np.minimum(
self.confidence_upper_bound,
estimated_false_positives + SAMPLING_ERROR_RANGE * sampling_error_false_positives,
)
false_pos_info['lower_confidence_boundary_false_positive'] = np.maximum(
self.confidence_lower_bound,
estimated_false_positives - SAMPLING_ERROR_RANGE * sampling_error_false_positives,
)
false_pos_info['upper_threshold_false_positive'] = self.false_positive_upper_threshold
false_pos_info['lower_threshold_false_positive'] = self.false_positive_lower_threshold
false_pos_info['alert_false_positive'] = (
self.false_positive_upper_threshold is not None
and estimated_false_positives > self.false_positive_upper_threshold
) or (
self.false_positive_lower_threshold is not None
and estimated_false_positives < self.false_positive_lower_threshold
)
return false_pos_info
[docs] def get_false_neg_info(self, chunk_data: pd.DataFrame) -> Dict:
false_neg_info: Dict[str, Any] = {}
estimated_false_negatives = self.get_false_negative_estimate(chunk_data)
sampling_error_false_negatives = bse.false_negative_sampling_error(
self._false_negative_sampling_error_components, chunk_data
)
false_neg_info['estimated_false_negative'] = estimated_false_negatives
false_neg_info['sampling_error_false_negative'] = sampling_error_false_negatives
false_neg_info['realized_false_negative'] = self._false_negative_realized_performance(chunk_data)
if self.normalize_confusion_matrix is None:
false_neg_info['upper_confidence_boundary_false_negative'] = (
estimated_false_negatives + SAMPLING_ERROR_RANGE * sampling_error_false_negatives
)
else:
false_neg_info['upper_confidence_boundary_false_negative'] = np.minimum(
self.confidence_upper_bound,
estimated_false_negatives + SAMPLING_ERROR_RANGE * sampling_error_false_negatives,
)
false_neg_info['lower_confidence_boundary_false_negative'] = np.maximum(
self.confidence_lower_bound,
estimated_false_negatives - SAMPLING_ERROR_RANGE * sampling_error_false_negatives,
)
false_neg_info['upper_threshold_false_negative'] = self.false_negative_upper_threshold
false_neg_info['lower_threshold_false_negative'] = self.false_negative_lower_threshold
false_neg_info['alert_false_negative'] = (
self.false_negative_upper_threshold is not None
and estimated_false_negatives > self.false_negative_upper_threshold
) or (
self.false_negative_lower_threshold is not None
and estimated_false_negatives < self.false_negative_lower_threshold
)
return false_neg_info
[docs] def get_chunk_record(self, chunk_data: pd.DataFrame) -> Dict:
chunk_record = {}
true_pos_info = self.get_true_pos_info(chunk_data)
chunk_record.update(true_pos_info)
true_neg_info = self.get_true_neg_info(chunk_data)
chunk_record.update(true_neg_info)
false_pos_info = self.get_false_pos_info(chunk_data)
chunk_record.update(false_pos_info)
false_neg_info = self.get_false_neg_info(chunk_data)
chunk_record.update(false_neg_info)
return chunk_record
def _estimate(self, data: pd.DataFrame):
pass
def _sampling_error(self, data: pd.DataFrame) -> float:
return 0.0
def _realized_performance(self, data: pd.DataFrame) -> float:
return 0.0
[docs]@MetricFactory.register('business_value', ProblemType.CLASSIFICATION_BINARY)
class BinaryClassificationBusinessValue(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
business_value_matrix: Union[List, np.ndarray],
normalize_business_value: Optional[str] = None,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='business_value',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Business Value', 'business_value')],
)
if business_value_matrix is None:
raise ValueError("business_value_matrix must be provided for 'business_value' metric")
if not (isinstance(business_value_matrix, np.ndarray) or isinstance(business_value_matrix, list)):
raise ValueError(
f"business_value_matrix must be a numpy array or a list, but got {type(business_value_matrix)}"
)
if isinstance(business_value_matrix, list):
business_value_matrix = np.array(business_value_matrix)
if business_value_matrix.shape != (2, 2):
raise ValueError(
f"business_value_matrix must have shape (2,2), but got matrix of shape {business_value_matrix.shape}"
)
self.business_value_matrix = business_value_matrix
self.normalize_business_value: Optional[str] = normalize_business_value
self.lower_threshold: Optional[float] = 0
self.upper_threshold: Optional[float] = 1
self.confidence_upper_bound: Optional[float] = None
self.confidence_lower_bound: Optional[float] = None
def _fit(self, reference_data: pd.DataFrame):
self._sampling_error_components = bse.business_value_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
business_value_matrix=self.business_value_matrix,
normalize_business_value=self.normalize_business_value,
)
def _realized_performance(self, data: pd.DataFrame) -> float:
_, y_pred, y_true = self._common_cleaning(data, y_pred_proba_column_name=self.uncalibrated_y_pred_proba)
if y_true is None:
return np.NaN
tp_value = self.business_value_matrix[1, 1]
tn_value = self.business_value_matrix[0, 0]
fp_value = self.business_value_matrix[0, 1]
fn_value = self.business_value_matrix[1, 0]
num_tp = np.sum(np.logical_and(y_pred, y_true))
num_tn = np.sum(np.logical_and(np.logical_not(y_pred), np.logical_not(y_true)))
num_fp = np.sum(np.logical_and(y_pred, np.logical_not(y_true)))
num_fn = np.sum(np.logical_and(np.logical_not(y_pred), y_true))
business_value = num_tp * tp_value + num_tn * tn_value + num_fp * fp_value + num_fn * fn_value
if self.normalize_business_value is None:
return business_value
else: # normalize must be 'per_prediction'
return business_value / len(y_true)
def _estimate(self, chunk_data: pd.DataFrame) -> float:
y_pred_proba = chunk_data[self.y_pred_proba]
y_pred = chunk_data[self.y_pred]
business_value_normalization = self.normalize_business_value
business_value_matrix = self.business_value_matrix
return estimate_business_value(y_pred, y_pred_proba, business_value_normalization, business_value_matrix)
def _sampling_error(self, data: pd.DataFrame) -> float:
return bse.business_value_sampling_error(
self._sampling_error_components,
data,
)
[docs]def estimate_business_value(
y_pred: np.ndarray,
y_pred_proba: np.ndarray,
normalize_business_value: Optional[str],
business_value_matrix: np.ndarray,
):
est_tn_ratio = np.mean(np.where(y_pred == 0, 1 - y_pred_proba, 0))
est_tp_ratio = np.mean(np.where(y_pred == 1, y_pred_proba, 0))
est_fp_ratio = np.mean(np.where(y_pred == 1, 1 - y_pred_proba, 0))
est_fn_ratio = np.mean(np.where(y_pred == 0, y_pred_proba, 0))
tp_value = business_value_matrix[1, 1]
tn_value = business_value_matrix[0, 0]
fp_value = business_value_matrix[0, 1]
fn_value = business_value_matrix[1, 0]
business_value = (
est_tn_ratio * tn_value + est_tp_ratio * tp_value + est_fp_ratio * fp_value + est_fn_ratio * fn_value
)
if normalize_business_value is None:
return business_value * len(y_pred)
else: # normalize must be 'per_prediciton'
return business_value
def _get_binarized_multiclass_predictions(data: pd.DataFrame, y_pred: str, y_pred_proba: ModelOutputsType):
if not isinstance(y_pred_proba, dict):
raise CalculatorException(
"multiclass model outputs should be of type Dict[str, str].\n"
f"'{y_pred_proba}' is of type '{type(y_pred_proba)}'"
)
classes = sorted(y_pred_proba.keys())
y_preds = list(label_binarize(data[y_pred], classes=classes).T)
y_pred_probas = [data[y_pred_proba[clazz]] for clazz in classes]
return y_preds, y_pred_probas, classes
def _get_multiclass_uncalibrated_predictions(data: pd.DataFrame, y_pred: str, y_pred_proba: ModelOutputsType):
if not isinstance(y_pred_proba, dict):
raise CalculatorException(
"multiclass model outputs should be of type Dict[str, str].\n"
f"'{y_pred_proba}' is of type '{type(y_pred_proba)}'"
)
labels, class_probability_columns = [], []
for label in sorted(y_pred_proba.keys()):
labels.append(label)
class_probability_columns.append(f'uncalibrated_{y_pred_proba[label]}')
return data[y_pred], data[class_probability_columns], labels
[docs]@MetricFactory.register('roc_auc', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationAUROC(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='roc_auc',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('ROC AUC', 'roc_auc')],
)
# FIXME: Should we check the y_pred_proba argument here to ensure it's a dict?
self.y_pred_proba: Dict[str, str]
# sampling error
self._sampling_error_components: List[Tuple] = []
def _fit(self, reference_data: pd.DataFrame):
classes = class_labels(self.y_pred_proba)
binarized_y_true = list(label_binarize(reference_data[self.y_true], classes=classes).T)
y_pred_proba = [reference_data[self.y_pred_proba[clazz]].T for clazz in classes]
self._sampling_error_components = mse.auroc_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_proba_reference=y_pred_proba
)
def _estimate(self, data: pd.DataFrame):
_, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba)
ovr_estimates = []
for y_pred_proba_class in y_pred_probas:
ovr_estimates.append(estimate_roc_auc(y_pred_proba_class))
multiclass_roc_auc = np.mean(ovr_estimates)
return multiclass_roc_auc
def _sampling_error(self, data: pd.DataFrame) -> float:
return mse.auroc_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
if self.y_true not in data.columns or data[self.y_true].isna().all():
return np.NaN
y_true = data[self.y_true]
_, y_pred_probas, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba)
return roc_auc_score(y_true, y_pred_probas, multi_class='ovr', average='macro', labels=labels)
[docs]@MetricFactory.register('f1', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationF1(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='f1',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('F1', 'f1')],
)
# sampling error:
self._sampling_error_components: List[Tuple] = []
def _fit(self, reference_data: pd.DataFrame):
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = mse.f1_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _estimate(self, data: pd.DataFrame):
y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba)
ovr_estimates = []
for y_pred, y_pred_proba in zip(y_preds, y_pred_probas):
ovr_estimates.append(estimate_f1(y_pred, y_pred_proba))
multiclass_metric = np.mean(ovr_estimates)
return multiclass_metric
def _sampling_error(self, data: pd.DataFrame) -> float:
return mse.f1_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
if self.y_true not in data.columns or data[self.y_true].isna().all():
return np.NaN
y_true = data[self.y_true]
y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba)
return f1_score(y_true=y_true, y_pred=y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register('precision', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationPrecision(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='precision',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Precision', 'precision')],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def _fit(self, reference_data: pd.DataFrame):
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = mse.precision_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _estimate(self, data: pd.DataFrame):
y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba)
ovr_estimates = []
for y_pred, y_pred_proba in zip(y_preds, y_pred_probas):
ovr_estimates.append(estimate_precision(y_pred, y_pred_proba))
multiclass_metric = np.mean(ovr_estimates)
return multiclass_metric
def _sampling_error(self, data: pd.DataFrame) -> float:
return mse.precision_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
if self.y_true not in data.columns or data[self.y_true].isna().all():
return np.NaN
y_true = data[self.y_true]
y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba)
return precision_score(y_true=y_true, y_pred=y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register('recall', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationRecall(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='recall',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Recall', 'recall')],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def _fit(self, reference_data: pd.DataFrame):
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = mse.recall_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _estimate(self, data: pd.DataFrame):
y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba)
ovr_estimates = []
for y_pred, y_pred_proba in zip(y_preds, y_pred_probas):
ovr_estimates.append(estimate_recall(y_pred, y_pred_proba))
multiclass_metric = np.mean(ovr_estimates)
return multiclass_metric
def _sampling_error(self, data: pd.DataFrame) -> float:
return mse.recall_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
if self.y_true not in data.columns or data[self.y_true].isna().all():
return np.NaN
y_true = data[self.y_true]
y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba)
return recall_score(y_true=y_true, y_pred=y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register('specificity', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationSpecificity(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='specificity',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Specificity', 'specificity')],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def _fit(self, reference_data: pd.DataFrame):
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = mse.specificity_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _estimate(self, data: pd.DataFrame):
y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba)
ovr_estimates = []
for y_pred, y_pred_proba in zip(y_preds, y_pred_probas):
ovr_estimates.append(estimate_specificity(y_pred, y_pred_proba))
multiclass_metric = np.mean(ovr_estimates)
return multiclass_metric
def _sampling_error(self, data: pd.DataFrame) -> float:
return mse.specificity_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
if self.y_true not in data.columns or data[self.y_true].isna().all():
return np.NaN
y_true = data[self.y_true]
y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba)
mcm = multilabel_confusion_matrix(y_true, y_pred, labels=labels)
tn_sum = mcm[:, 0, 0]
fp_sum = mcm[:, 0, 1]
class_wise_specificity = tn_sum / (tn_sum + fp_sum)
return np.mean(class_wise_specificity)
[docs]@MetricFactory.register('accuracy', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationAccuracy(Metric):
def __init__(
self,
y_pred_proba: ModelOutputsType,
y_pred: str,
y_true: str,
chunker: Chunker,
threshold: Threshold,
timestamp_column_name: Optional[str] = None,
**kwargs,
):
super().__init__(
name='accuracy',
y_pred_proba=y_pred_proba,
y_pred=y_pred,
y_true=y_true,
timestamp_column_name=timestamp_column_name,
chunker=chunker,
threshold=threshold,
components=[('Accuracy', 'accuracy')],
)
# sampling error
self._sampling_error_components: Tuple = ()
def _fit(self, reference_data: pd.DataFrame):
label_binarizer = LabelBinarizer()
binarized_y_true = label_binarizer.fit_transform(reference_data[self.y_true])
binarized_y_pred = label_binarizer.transform(reference_data[self.y_pred])
self._sampling_error_components = mse.accuracy_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _estimate(self, data: pd.DataFrame):
y_preds, y_pred_probas, _ = _get_binarized_multiclass_predictions(data, self.y_pred, self.y_pred_proba)
y_preds_array = np.asarray(y_preds).T
y_pred_probas_array = np.asarray(y_pred_probas).T
probability_of_predicted = np.max(y_preds_array * y_pred_probas_array, axis=1)
return np.mean(probability_of_predicted)
def _sampling_error(self, data: pd.DataFrame) -> float:
return mse.accuracy_sampling_error(self._sampling_error_components, data)
def _realized_performance(self, data: pd.DataFrame) -> float:
if self.y_true not in data.columns or data[self.y_true].isna().all():
return np.NaN
y_true = data[self.y_true]
y_pred, _, labels = _get_multiclass_uncalibrated_predictions(data, self.y_pred, self.y_pred_proba)
return accuracy_score(y_true, y_pred)