# Author: Niels Nuyttens <niels@nannyml.com>
# #
# License: Apache Software License 2.0
# Author: Niels Nuyttens <niels@nannyml.com>
#
# License: Apache Software License 2.0
"""Module containing metric utilities and implementations."""
import warnings
from typing import Dict, List, Optional, Tuple, Union # noqa: TYP001
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
multilabel_confusion_matrix,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.preprocessing import LabelBinarizer, label_binarize
from nannyml._typing import ProblemType, class_labels, model_output_column_names
from nannyml.base import _list_missing, _remove_nans
from nannyml.chunk import Chunker
from nannyml.exceptions import InvalidArgumentsException
from nannyml.performance_calculation.metrics.base import Metric, MetricFactory
from nannyml.sampling_error.multiclass_classification import (
accuracy_sampling_error,
accuracy_sampling_error_components,
auroc_sampling_error,
auroc_sampling_error_components,
f1_sampling_error,
f1_sampling_error_components,
multiclass_confusion_matrix_sampling_error,
multiclass_confusion_matrix_sampling_error_components,
precision_sampling_error,
precision_sampling_error_components,
recall_sampling_error,
recall_sampling_error_components,
specificity_sampling_error,
specificity_sampling_error_components,
)
from nannyml.thresholds import Threshold, calculate_threshold_values
[docs]@MetricFactory.register(metric='roc_auc', use_case=ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationAUROC(Metric):
"""Area under Receiver Operating Curve metric."""
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
**kwargs,
):
"""Creates a new AUROC instance.
Parameters
----------
y_true: str
The name of the column containing target values.
y_pred: str
The name of the column containing your model predictions.
threshold: Threshold
The Threshold instance that determines how the lower and upper threshold values will be calculated.
y_pred_proba: Union[str, Dict[str, str]]
Name(s) of the column(s) containing your model output.
- For binary classification, pass a single string refering to the model output column.
- For multiclass classification, pass a dictionary that maps a class string to the column name \
containing model outputs for that class.
"""
super().__init__(
name='roc_auc',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
lower_threshold_limit=0,
upper_threshold_limit=1,
components=[("ROC AUC", "roc_auc")],
)
# FIXME: Should we check the y_pred_proba argument here to ensure it's a dict?
self.y_pred_proba: Dict[str, str]
# sampling error
self._sampling_error_components: List[Tuple] = []
def __str__(self):
return "roc_auc"
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], list(reference_data.columns))
# sampling error
classes = class_labels(self.y_pred_proba)
binarized_y_true = list(label_binarize(reference_data[self.y_true], classes=classes).T)
y_pred_proba = [reference_data[self.y_pred_proba[clazz]].T for clazz in classes]
self._sampling_error_components = auroc_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_proba_reference=y_pred_proba
)
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.y_pred_proba, Dict):
raise InvalidArgumentsException(
f"'y_pred_proba' is of type {type(self.y_pred_proba)}\n"
f"multiclass use cases require 'y_pred_proba' to "
"be a dictionary mapping classes to columns."
)
_list_missing([self.y_true] + model_output_column_names(self.y_pred_proba), data)
data = _remove_nans(data, (self.y_true, self.y_pred_proba.values()))
labels, class_probability_columns = [], []
for label in sorted(list(self.y_pred_proba.keys())):
labels.append(label)
class_probability_columns.append(self.y_pred_proba[label])
y_true = data[self.y_true]
y_pred_proba = data[class_probability_columns]
if y_pred_proba.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
if y_true.nunique() <= 1:
warnings.warn(
f"'{self.y_true}' only contains a single class for chunk, cannot calculate {self.display_name}. "
"Returning NaN."
)
return np.NaN
else:
return roc_auc_score(y_true, y_pred_proba, multi_class='ovr', average='macro', labels=labels)
def _sampling_error(self, data: pd.DataFrame) -> float:
return auroc_sampling_error(self._sampling_error_components, data)
[docs]@MetricFactory.register(metric='f1', use_case=ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationF1(Metric):
"""F1 score metric."""
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
**kwargs,
):
"""Creates a new F1 instance.
Parameters
----------
y_true: str
The name of the column containing target values.
y_pred: str
The name of the column containing your model predictions.
threshold: Threshold
The Threshold instance that determines how the lower and upper threshold values will be calculated.
y_pred_proba: Union[str, Dict[str, str]]
Name(s) of the column(s) containing your model output.
- For binary classification, pass a single string refering to the model output column.
- For multiclass classification, pass a dictionary that maps a class string to the column name \
containing model outputs for that class.
"""
super().__init__(
name='f1',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
lower_threshold_limit=0,
upper_threshold_limit=1,
components=[("F1", "f1")],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def __str__(self):
return "f1"
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], reference_data)
# sampling error
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = f1_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.y_pred_proba, Dict):
raise InvalidArgumentsException(
f"'y_pred_proba' is of type {type(self.y_pred_proba)}\n"
f"multiclass use cases require 'y_pred_proba' to "
"be a dictionary mapping classes to columns."
)
_list_missing([self.y_true, self.y_pred], data)
data = _remove_nans(data, (self.y_true, self.y_pred))
labels = sorted(list(self.y_pred_proba.keys()))
y_true = data[self.y_true]
y_pred = data[self.y_pred]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
if y_true.nunique() <= 1:
warnings.warn(
f"'{self.y_true}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
elif y_pred.nunique() <= 1:
warnings.warn(
f"'{self.y_pred}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
else:
return f1_score(y_true, y_pred, average='macro', labels=labels)
def _sampling_error(self, data: pd.DataFrame) -> float:
return f1_sampling_error(self._sampling_error_components, data)
[docs]@MetricFactory.register(metric='precision', use_case=ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationPrecision(Metric):
"""Precision metric."""
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
**kwargs,
):
"""Creates a new Precision instance.
Parameters
----------
y_true: str
The name of the column containing target values.
y_pred: str
The name of the column containing your model predictions.
threshold: Threshold
The Threshold instance that determines how the lower and upper threshold values will be calculated.
y_pred_proba: Union[str, Dict[str, str]]
Name(s) of the column(s) containing your model output.
- For binary classification, pass a single string refering to the model output column.
- For multiclass classification, pass a dictionary that maps a class string to the column name \
containing model outputs for that class.
"""
super().__init__(
name='precision',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
lower_threshold_limit=0,
upper_threshold_limit=1,
components=[("Precision", "precision")],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def __str__(self):
return "precision"
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], reference_data)
# sampling error
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = precision_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.y_pred_proba, Dict):
raise InvalidArgumentsException(
f"'y_pred_proba' is of type {type(self.y_pred_proba)}\n"
f"multiclass use cases require 'y_pred_proba' to "
"be a dictionary mapping classes to columns."
)
_list_missing([self.y_true, self.y_pred], data)
data = _remove_nans(data, (self.y_true, self.y_pred))
labels = sorted(list(self.y_pred_proba.keys()))
y_true = data[self.y_true]
y_pred = data[self.y_pred]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
if y_true.nunique() <= 1:
warnings.warn(
f"'{self.y_true}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
elif y_pred.nunique() <= 1:
warnings.warn(
f"'{self.y_pred}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
else:
return precision_score(y_true, y_pred, average='macro', labels=labels)
def _sampling_error(self, data: pd.DataFrame) -> float:
return precision_sampling_error(self._sampling_error_components, data)
[docs]@MetricFactory.register(metric='recall', use_case=ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationRecall(Metric):
"""Recall metric, also known as 'sensitivity'."""
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
**kwargs,
):
"""Creates a new Recall instance.
Parameters
----------
y_true: str
The name of the column containing target values.
y_pred: str
The name of the column containing your model predictions.
threshold: Threshold
The Threshold instance that determines how the lower and upper threshold values will be calculated.
y_pred_proba: Union[str, Dict[str, str]]
Name(s) of the column(s) containing your model output.
- For binary classification, pass a single string refering to the model output column.
- For multiclass classification, pass a dictionary that maps a class string to the column name \
containing model outputs for that class.
"""
super().__init__(
name='recall',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
lower_threshold_limit=0,
upper_threshold_limit=1,
components=[("Recall", "recall")],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def __str__(self):
return "recall"
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], reference_data)
# sampling error
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = recall_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.y_pred_proba, Dict):
raise InvalidArgumentsException(
f"'y_pred_proba' is of type {type(self.y_pred_proba)}\n"
f"multiclass use cases require 'y_pred_proba' to "
"be a dictionary mapping classes to columns."
)
_list_missing([self.y_true, self.y_pred], data)
data = _remove_nans(data, (self.y_true, self.y_pred))
labels = sorted(list(self.y_pred_proba.keys()))
y_true = data[self.y_true]
y_pred = data[self.y_pred]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
if y_true.nunique() <= 1:
warnings.warn(
f"'{self.y_true}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
elif y_pred.nunique() <= 1:
warnings.warn(
f"'{self.y_pred}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
else:
return recall_score(y_true, y_pred, average='macro', labels=labels)
def _sampling_error(self, data: pd.DataFrame) -> float:
return recall_sampling_error(self._sampling_error_components, data)
[docs]@MetricFactory.register(metric='specificity', use_case=ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationSpecificity(Metric):
"""Specificity metric."""
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
**kwargs,
):
"""Creates a new Specificity instance.
Parameters
----------
y_true: str
The name of the column containing target values.
y_pred: str
The name of the column containing your model predictions.
threshold: Threshold
The Threshold instance that determines how the lower and upper threshold values will be calculated.
y_pred_proba: Union[str, Dict[str, str]]
Name(s) of the column(s) containing your model output.
- For binary classification, pass a single string refering to the model output column.
- For multiclass classification, pass a dictionary that maps a class string to the column name \
containing model outputs for that class.
"""
super().__init__(
name='specificity',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
lower_threshold_limit=0,
upper_threshold_limit=1,
components=[("Specificity", "specificity")],
)
# sampling error
self._sampling_error_components: List[Tuple] = []
def __str__(self):
return "specificity"
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], reference_data)
# sampling error
label_binarizer = LabelBinarizer()
binarized_y_true = list(label_binarizer.fit_transform(reference_data[self.y_true]).T)
binarized_y_pred = list(label_binarizer.transform(reference_data[self.y_pred]).T)
self._sampling_error_components = specificity_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.y_pred_proba, Dict):
raise InvalidArgumentsException(
f"'y_pred_proba' is of type {type(self.y_pred_proba)}\n"
f"multiclass use cases require 'y_pred_proba' to "
"be a dictionary mapping classes to columns."
)
_list_missing([self.y_true, self.y_pred], data)
data = _remove_nans(data, (self.y_true, self.y_pred))
labels = sorted(list(self.y_pred_proba.keys()))
y_true = data[self.y_true]
y_pred = data[self.y_pred]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: prediction column contains no data"
)
if y_true.nunique() <= 1:
warnings.warn(
f"'{self.y_true}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
elif y_pred.nunique() <= 1:
warnings.warn(
f"'{self.y_pred}' only contains a single class, cannot calculate {self.display_name}. Returning NaN."
)
return np.NaN
else:
MCM = multilabel_confusion_matrix(y_true, y_pred, labels=labels)
tn_sum = MCM[:, 0, 0]
fp_sum = MCM[:, 0, 1]
class_wise_specificity = tn_sum / (tn_sum + fp_sum)
return np.mean(class_wise_specificity)
def _sampling_error(self, data: pd.DataFrame) -> float:
return specificity_sampling_error(self._sampling_error_components, data)
[docs]@MetricFactory.register(metric='accuracy', use_case=ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationAccuracy(Metric):
"""Accuracy metric."""
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
**kwargs,
):
"""Creates a new Accuracy instance.
Parameters
----------
y_true: str
The name of the column containing target values.
y_pred: str
The name of the column containing your model predictions.
threshold: Threshold
The Threshold instance that determines how the lower and upper threshold values will be calculated.
y_pred_proba: Union[str, Dict[str, str]]
Name(s) of the column(s) containing your model output.
- For binary classification, pass a single string refering to the model output column.
- For multiclass classification, pass a dictionary that maps a class string to the column name \
containing model outputs for that class.
"""
super().__init__(
name='accuracy',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
lower_threshold_limit=0,
upper_threshold_limit=1,
components=[("Accuracy", "accuracy")],
)
# sampling error
self._sampling_error_components: Tuple = ()
def __str__(self):
return "accuracy"
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], reference_data)
# sampling error
label_binarizer = LabelBinarizer()
binarized_y_true = label_binarizer.fit_transform(reference_data[self.y_true])
binarized_y_pred = label_binarizer.transform(reference_data[self.y_pred])
self._sampling_error_components = accuracy_sampling_error_components(
y_true_reference=binarized_y_true, y_pred_reference=binarized_y_pred
)
def _calculate(self, data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], data)
data = _remove_nans(data, (self.y_true, self.y_pred))
y_true = data[self.y_true]
y_pred = data[self.y_pred]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric '{self.display_name}': " "prediction column contains no data"
)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
warnings.warn("Calculated Accuracy score contains NaN values.")
return np.nan
else:
return accuracy_score(y_true, y_pred)
def _sampling_error(self, data: pd.DataFrame) -> float:
return accuracy_sampling_error(self._sampling_error_components, data)
[docs]@MetricFactory.register('confusion_matrix', ProblemType.CLASSIFICATION_MULTICLASS)
class MulticlassClassificationConfusionMatrix(Metric):
def __init__(
self,
y_true: str,
y_pred: str,
threshold: Threshold,
y_pred_proba: Optional[Union[str, Dict[str, str]]] = None,
normalize_confusion_matrix: Optional[str] = None,
**kwargs,
):
"""Creates a new confusion matrix instance."""
super().__init__(
name='confusion_matrix',
y_true=y_true,
y_pred=y_pred,
threshold=threshold,
y_pred_proba=y_pred_proba,
components=[("None", "none")],
lower_threshold_limit=0,
)
self.normalize_confusion_matrix: Optional[str] = normalize_confusion_matrix
self.upper_threshold_value_limit: Optional[float] = 1.0 if normalize_confusion_matrix else None
self.classes: Optional[List[str]] = None
def __str__(self):
return "confusion_matrix"
[docs] def fit(self, reference_data: pd.DataFrame, chunker: Chunker):
# _fit
# realized perf on chunks
# set thresholds
self._fit(reference_data)
reference_chunks = chunker.split(reference_data)
reference_chunk_results = np.asarray([self._calculate(chunk.data) for chunk in reference_chunks])
self.alert_thresholds = self._multiclass_confusion_matrix_alert_thresholds(
reference_chunk_results=reference_chunk_results,
)
def _multiclass_confusion_matrix_alert_thresholds(
self,
reference_chunk_results: np.ndarray,
) -> Dict[str, Tuple[Optional[float], Optional[float]]]:
"""Calculate the alert thresholds for the confusion matrix.
Args:
reference_chunk_results: The confusion matrix for each chunk of the reference data.
Returns:
The alert thresholds for the confusion matrix.
"""
alert_thresholds = {}
if self.classes is None:
raise ValueError("classes must be set before calling this method")
num_classes = len(self.classes)
for i in range(num_classes):
for j in range(num_classes):
lower_threshold_value, upper_threshold_value = calculate_threshold_values(
threshold=self.threshold,
data=reference_chunk_results[:, i, j],
lower_threshold_value_limit=self.lower_threshold_value_limit,
upper_threshold_value_limit=self.upper_threshold_value_limit,
)
alert_thresholds[f'true_{self.classes[i]}_pred_{self.classes[j]}'] = (
lower_threshold_value,
upper_threshold_value,
)
return alert_thresholds
def _fit(self, reference_data: pd.DataFrame):
_list_missing([self.y_true, self.y_pred], reference_data)
self.sampling_error_components = multiclass_confusion_matrix_sampling_error_components(
y_true_reference=reference_data[self.y_true],
y_pred_reference=reference_data[self.y_pred],
normalize_confusion_matrix=self.normalize_confusion_matrix,
)
self.classes = sorted(reference_data[self.y_true].unique())
self.components = self._get_components(self.classes)
def _get_components(self, classes: List[str]) -> List[Tuple[str, str]]:
components = []
for true_class in classes:
for pred_class in classes:
components.append(
(
f"true class: '{true_class}', predicted class: '{pred_class}'",
f'true_{true_class}_pred_{pred_class}',
)
)
return components
def _calculate(self, data: pd.DataFrame) -> Union[np.ndarray, float]:
_list_missing([self.y_true, self.y_pred], data)
y_true = data[self.y_true]
y_pred = data[self.y_pred]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: prediction column contains no data"
)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
cm = confusion_matrix(y_true, y_pred, labels=self.classes, normalize=self.normalize_confusion_matrix)
return cm
[docs] def get_chunk_record(self, chunk_data: pd.DataFrame) -> Dict[str, Union[float, bool]]:
if self.classes is None:
raise ValueError("classes must be set before calling this method")
sampling_errors = multiclass_confusion_matrix_sampling_error(self.sampling_error_components, chunk_data)
realized_cm = self._calculate(chunk_data)
if isinstance(realized_cm, float):
realized_cm = np.full((len(self.classes), len(self.classes)), np.nan)
chunk_record = {}
for true_class in self.classes:
for pred_class in self.classes:
column_name = f'true_{true_class}_pred_{pred_class}'
chunk_record[f"{column_name}_sampling_error"] = sampling_errors[
self.classes.index(true_class), self.classes.index(pred_class)
]
chunk_record[f"{column_name}"] = realized_cm[
self.classes.index(true_class), self.classes.index(pred_class)
]
lower_threshold, upper_threshold = self.alert_thresholds[f"true_{true_class}_pred_{pred_class}"]
chunk_record[f"{column_name}_upper_threshold"] = upper_threshold
chunk_record[f"{column_name}_lower_threshold"] = lower_threshold
chunk_record[f"{column_name}_alert"] = (
self.alert_thresholds is not None and (chunk_record[f"{column_name}"] < lower_threshold)
) or (self.alert_thresholds is not None and (chunk_record[f"{column_name}"] > upper_threshold))
return chunk_record