# Author: Niels Nuyttens <niels@nannyml.com>
#
# License: Apache Software License 2.0
"""Module containing metric utilities and implementations."""
import abc
from typing import Type # noqa: TYP001
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
f1_score,
multilabel_confusion_matrix,
precision_score,
recall_score,
roc_auc_score,
)
from nannyml import Chunk, Chunker
from nannyml.exceptions import InvalidArgumentsException
from nannyml.metadata.base import (
NML_METADATA_PARTITION_COLUMN_NAME,
NML_METADATA_REFERENCE_PARTITION_NAME,
NML_METADATA_TARGET_COLUMN_NAME,
ModelMetadata,
)
from nannyml.metadata.binary_classification import (
NML_METADATA_PREDICTED_PROBABILITY_COLUMN_NAME,
NML_METADATA_PREDICTION_COLUMN_NAME,
BinaryClassificationMetadata,
)
from nannyml.metadata.multiclass_classification import MulticlassClassificationMetadata
[docs]class Metric(abc.ABC):
"""Represents a performance metric."""
def __init__(
self,
display_name: str,
column_name: str,
metadata: ModelMetadata,
upper_threshold: float = None,
lower_threshold: float = None,
):
"""Creates a new Metric instance.
Parameters
----------
display_name : str
The name of the metric. Used to display in plots. If not given this name will be derived from the
``calculation_function``.
column_name: str
The name used to indicate the metric in columns of a DataFrame.
metadata: ModelMetadata
Metadata describing the model being monitored.
upper_threshold : float, default=None
An optional upper threshold for the performance metric.
lower_threshold : float, default=None
An optional lower threshold for the performance metric.
"""
self.display_name = display_name
self.column_name = column_name
self.metadata = metadata
self.lower_threshold = lower_threshold
self.upper_threshold = upper_threshold
self._minimum_chunk_size: int = 300
[docs] def fit(self, reference_data: pd.DataFrame, chunker: Chunker):
"""Fits a Metric on reference data.
Parameters
----------
reference_data: pd.DataFrame
The reference data used for fitting. Must have target data available.
chunker: Chunker
The :class:`~nannyml.chunk.Chunker` used to split the reference data into chunks.
This value is provided by the calling
:class:`~nannyml.performance_calculation.calculator.PerformanceCalculator`.
"""
self._fit(reference_data)
# Calculate alert thresholds
if self.upper_threshold is None and self.lower_threshold is None:
reference_chunks = chunker.split(reference_data, minimum_chunk_size=self.minimum_chunk_size())
self.lower_threshold, self.upper_threshold = self._calculate_alert_thresholds(reference_chunks)
return
def _fit(self, reference_data: pd.DataFrame):
raise NotImplementedError
[docs] def calculate(self, data: pd.DataFrame):
"""Calculates performance metrics on data.
Parameters
----------
data: pd.DataFrame
The data to calculate performance metrics on. Requires presence of either the predicted labels or
prediction scores/probabilities (depending on the metric to be calculated), as well as the target data.
"""
if NML_METADATA_TARGET_COLUMN_NAME not in data.columns:
raise RuntimeError('data does not contain target column')
if (
NML_METADATA_PREDICTION_COLUMN_NAME not in data.columns
and NML_METADATA_PREDICTED_PROBABILITY_COLUMN_NAME not in data.columns
):
raise RuntimeError('data does contains neither prediction column or predicted probabilities column')
return self._calculate(data)
def _calculate(self, data: pd.DataFrame):
raise NotImplementedError
[docs] def minimum_chunk_size(self) -> int:
"""Determines the minimum number of observations a chunk should ideally for this metric to be trustworthy."""
try:
return self._minimum_chunk_size
except Exception:
# TODO: log failure
return 300
def _calculate_alert_thresholds(
self, reference_chunks: List[Chunk], std_num: int = 3, lower_limit: int = 0, upper_limit: int = 1
) -> Tuple[float, float]:
chunked_reference_metric = [self.calculate(chunk.data) for chunk in reference_chunks]
deviation = np.std(chunked_reference_metric) * std_num
mean_reference_metric = np.mean(chunked_reference_metric)
lower_threshold = np.maximum(mean_reference_metric - deviation, lower_limit)
upper_threshold = np.minimum(mean_reference_metric + deviation, upper_limit)
return lower_threshold, upper_threshold
[docs] def __eq__(self, other):
"""Establishes equality by comparing all properties."""
return (
self.display_name == other.display_name
and self.column_name == other.column_name
and self.upper_threshold == other.upper_threshold
and self.lower_threshold == other.lower_threshold
)
def _floor_chunk_size(calculated_min_chunk_size: float, lower_limit_on_chunk_size: int = 300) -> int:
return int(np.maximum(calculated_min_chunk_size, lower_limit_on_chunk_size))
def _minimum_chunk_size_roc_auc(
data: pd.DataFrame,
partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME,
predicted_probability_column_name: str = NML_METADATA_PREDICTED_PROBABILITY_COLUMN_NAME,
target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME,
required_std: float = 0.02,
) -> int:
"""Estimation of minimum sample size to get required standard deviation of AUROC.
Estimation takes advantage of Standard Error of the Mean formula and expressing AUROC as Mann-Whitney U statistic.
"""
y_true = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, target_column_name]
y_pred_proba = data.loc[
data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, predicted_probability_column_name
]
y_true, y_pred_proba = np.asarray(y_true), np.asarray(y_pred_proba)
if np.mean(y_true) > 0.5:
y_true = abs(np.asarray(y_true) - 1)
y_pred_proba = 1 - y_pred_proba
sorted_idx = np.argsort(y_pred_proba)
y_pred_proba = y_pred_proba[sorted_idx]
y_true = y_true[sorted_idx]
rank_order = np.asarray(range(len(y_pred_proba)))
positive_ranks = y_true * rank_order
indexes = np.unique(positive_ranks)[1:]
ser = []
for i, index in enumerate(indexes):
ser.append(index - i)
n_pos = np.sum(y_true)
n_neg = len(y_true) - n_pos
ser_divided = ser / (n_pos * n_neg)
ser_multi = ser_divided * n_pos
pos_targets = y_true
# neg_targets = abs(y_true - 1)
n_pos_targets = np.sum(pos_targets)
fraction = n_pos_targets / len(y_true)
sample_size = (np.std(ser_multi)) ** 2 / ((required_std**2) * fraction)
sample_size = np.minimum(sample_size, len(y_true))
sample_size = np.round(sample_size, -2)
return _floor_chunk_size(sample_size)
def _minimum_chunk_size_f1(
data: pd.DataFrame,
partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME,
prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME,
target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME,
required_std: float = 0.02,
):
"""Estimation of minimum sample size to get required standard deviation of F1.
Estimation takes advantage of Standard Error of the Mean formula.
"""
y_true = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, target_column_name]
y_pred = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, prediction_column_name]
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
TP = np.where((y_true == y_pred) & (y_pred == 1), 1, np.nan)
FP = np.where((y_true != y_pred) & (y_pred == 1), 0, np.nan)
FN = np.where((y_true != y_pred) & (y_pred == 0), 0, np.nan)
TP = TP[~np.isnan(TP)]
FN = FN[~np.isnan(FN)]
FP = FP[~np.isnan(FP)]
tp_fp_fn = np.concatenate([TP, FN, FP])
correcting_factor = len(tp_fp_fn) / ((len(FN) + len(FP)) * 0.5 + len(TP))
obs_level_f1 = tp_fp_fn * correcting_factor
fraction_of_relevant = len(tp_fp_fn) / len(y_pred)
sample_size = ((np.std(obs_level_f1)) ** 2) / ((required_std**2) * fraction_of_relevant)
sample_size = np.minimum(sample_size, len(y_true))
sample_size = np.round(sample_size, -2)
return _floor_chunk_size(sample_size)
def _minimum_chunk_size_precision(
data: pd.DataFrame,
partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME,
prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME,
target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME,
required_std: float = 0.02,
):
"""Estimation of minimum sample size to get required standard deviation of Precision.
Estimation takes advantage of Standard Error of the Mean formula.
"""
y_true = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, target_column_name]
y_pred = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, prediction_column_name]
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
TP = np.where((y_true == y_pred) & (y_pred == 1), 1, np.nan)
FP = np.where((y_true != y_pred) & (y_pred == 1), 0, np.nan)
TP = TP[~np.isnan(TP)]
FP = FP[~np.isnan(FP)]
obs_level_precision = np.concatenate([TP, FP])
amount_positive_pred = np.sum(y_pred)
fraction_of_pos_pred = amount_positive_pred / len(y_pred)
sample_size = ((np.std(obs_level_precision)) ** 2) / ((required_std**2) * fraction_of_pos_pred)
sample_size = np.minimum(sample_size, len(y_true))
sample_size = np.round(sample_size, -2)
return _floor_chunk_size(sample_size)
def _minimum_chunk_size_recall(
data: pd.DataFrame,
partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME,
prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME,
target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME,
required_std: float = 0.02,
):
"""Estimation of minimum sample size to get required standard deviation of Recall.
Estimation takes advantage of Standard Error of the Mean formula.
"""
y_true = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, target_column_name]
y_pred = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, prediction_column_name]
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
TP = np.where((y_true == y_pred) & (y_pred == 1), 1, np.nan)
FN = np.where((y_true != y_pred) & (y_pred == 0), 0, np.nan)
TP = TP[~np.isnan(TP)]
FN = FN[~np.isnan(FN)]
obs_level_recall = np.concatenate([TP, FN])
fraction_of_relevant = sum(obs_level_recall) / len(y_pred)
sample_size = ((np.std(obs_level_recall)) ** 2) / ((required_std**2) * fraction_of_relevant)
sample_size = np.minimum(sample_size, len(y_true))
sample_size = np.round(sample_size, -2)
return _floor_chunk_size(sample_size)
def _minimum_chunk_size_specificity(
data: pd.DataFrame,
partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME,
prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME,
target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME,
required_std: float = 0.02,
):
"""Estimation of minimum sample size to get required standard deviation of Specificity.
Estimation takes advantage of Standard Error of the Mean formula.
"""
y_true = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, target_column_name]
y_pred = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, prediction_column_name]
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
TN = np.where((y_true == y_pred) & (y_pred == 0), 1, np.nan)
FP = np.where((y_true != y_pred) & (y_pred == 1), 0, np.nan)
TN = TN[~np.isnan(TN)]
FP = FP[~np.isnan(FP)]
obs_level_specificity = np.concatenate([TN, FP])
fraction_of_relevant = len(obs_level_specificity) / len(y_pred)
sample_size = ((np.std(obs_level_specificity)) ** 2) / ((required_std**2) * fraction_of_relevant)
sample_size = np.minimum(sample_size, len(y_true))
sample_size = np.round(sample_size, -2)
return _floor_chunk_size(sample_size)
def _minimum_chunk_size_accuracy(
data: pd.DataFrame,
partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME,
prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME,
target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME,
required_std: float = 0.02,
):
"""Estimation of minimum sample size to get required standard deviation of Accuracy.
Estimation takes advantage of Standard Error of the Mean formula.
"""
y_true = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, target_column_name]
y_pred = data.loc[data[partition_column_name] == NML_METADATA_REFERENCE_PARTITION_NAME, prediction_column_name]
y_true, y_pred = np.asarray(y_true), np.asarray(y_pred)
y_true = np.asarray(y_true).astype(int)
y_pred = np.asarray(y_pred).astype(int)
correct_table = (y_true == y_pred).astype(int)
sample_size = (np.std(correct_table) ** 2) / (required_std**2)
sample_size = np.minimum(sample_size, len(y_true))
sample_size = np.round(sample_size, -2)
return _floor_chunk_size(sample_size)
[docs]class BinaryClassificationAUROC(Metric):
"""Area under Receiver Operating Curve metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new AUROC instance."""
super().__init__(display_name='ROC AUC', column_name='roc_auc', metadata=metadata)
def _fit(self, reference_data: pd.DataFrame):
self._min_chunk_size = _minimum_chunk_size_roc_auc(reference_data)
def _calculate(self, data: pd.DataFrame):
"""Redefine to handle NaNs and edge cases."""
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTED_PROBABILITY_COLUMN_NAME] # TODO: this should be predicted_probabilities
y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if y_true.nunique() <= 1:
return np.nan
else:
return roc_auc_score(y_true, y_pred)
[docs]class BinaryClassificationF1(Metric):
"""F1 score metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new F1 instance."""
super().__init__(display_name='F1', column_name='f1', metadata=metadata)
def _fit(self, reference_data: pd.DataFrame):
self._min_chunk_size = _minimum_chunk_size_f1(reference_data)
def _calculate(self, data: pd.DataFrame):
"""Redefine to handle NaNs and edge cases."""
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return f1_score(y_true, y_pred)
[docs]class BinaryClassificationPrecision(Metric):
"""Precision metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Precision instance."""
super().__init__(display_name='Precision', column_name='precision', metadata=metadata)
def _fit(self, reference_data: pd.DataFrame):
self._min_chunk_size = _minimum_chunk_size_precision(reference_data)
def _calculate(self, data: pd.DataFrame):
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return precision_score(y_true, y_pred)
[docs]class BinaryClassificationRecall(Metric):
"""Recall metric, also known as 'sensitivity'."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Recall instance."""
super().__init__(display_name='Recall', column_name='recall', metadata=metadata)
def _fit(self, reference_data: pd.DataFrame):
self._min_chunk_size = _minimum_chunk_size_recall(reference_data)
def _calculate(self, data: pd.DataFrame):
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return recall_score(y_true, y_pred)
[docs]class BinaryClassificationSpecificity(Metric):
"""Specificity metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new F1 instance."""
super().__init__(display_name='Specificity', column_name='specificity', metadata=metadata)
def _fit(self, reference_data: pd.DataFrame):
self._min_chunk_size = _minimum_chunk_size_specificity(reference_data)
def _calculate(self, data: pd.DataFrame):
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
return tn / (tn + fp)
[docs]class BinaryClassificationAccuracy(Metric):
"""Accuracy metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Accuracy instance."""
super().__init__(display_name='Accuracy', column_name='accuracy', metadata=metadata)
def _fit(self, reference_data: pd.DataFrame):
self._min_chunk_size = _minimum_chunk_size_accuracy(reference_data)
def _calculate(self, data: pd.DataFrame):
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all():
raise InvalidArgumentsException(
f"could not calculate metric '{self.display_name}': " "prediction column contains no data"
)
y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
return (tp + tn) / (tp + tn + fp + fn)
[docs]class MulticlassClassificationAUROC(Metric):
"""Area under Receiver Operating Curve metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new AUROC instance."""
super().__init__(display_name='ROC AUC', column_name='roc_auc', metadata=metadata)
self._min_chunk_size = 300
def _fit(self, reference_data: pd.DataFrame):
pass
def _calculate(self, data: pd.DataFrame):
"""Redefine to handle NaNs and edge cases."""
if not isinstance(self.metadata, MulticlassClassificationMetadata):
raise InvalidArgumentsException('metadata was not an instance of MulticlassClassificationMetadata')
labels, class_probability_columns = [], []
for label in sorted(list(self.metadata.predicted_class_probability_metadata_columns())):
labels.append(label)
class_probability_columns.append(self.metadata.predicted_class_probability_metadata_columns()[label])
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[class_probability_columns]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
# y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if y_true.nunique() <= 1:
return np.nan
else:
return roc_auc_score(y_true, y_pred, multi_class='ovr', average='macro', labels=labels)
[docs]class MulticlassClassificationF1(Metric):
"""F1 score metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new F1 instance."""
super().__init__(display_name='F1', column_name='f1', metadata=metadata)
self._min_chunk_size = 300
def _fit(self, reference_data: pd.DataFrame):
pass
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.metadata, MulticlassClassificationMetadata):
raise InvalidArgumentsException('metadata was not an instance of MulticlassClassificationMetadata')
labels = sorted(list(self.metadata.predicted_class_probability_metadata_columns().keys()))
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
# y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return f1_score(y_true, y_pred, average='macro', labels=labels)
[docs]class MulticlassClassificationPrecision(Metric):
"""Precision metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Precision instance."""
super().__init__(display_name='Precision', column_name='precision', metadata=metadata)
self._min_chunk_size = 300
def _fit(self, reference_data: pd.DataFrame):
pass
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.metadata, MulticlassClassificationMetadata):
raise InvalidArgumentsException('metadata was not an instance of MulticlassClassificationMetadata')
labels = sorted(list(self.metadata.predicted_class_probability_metadata_columns().keys()))
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
# y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return precision_score(y_true, y_pred, average='macro', labels=labels)
[docs]class MulticlassClassificationRecall(Metric):
"""Recall metric, also known as 'sensitivity'."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Recall instance."""
super().__init__(display_name='Recall', column_name='recall', metadata=metadata)
self._min_chunk_size = 300
def _fit(self, reference_data: pd.DataFrame):
pass
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.metadata, MulticlassClassificationMetadata):
raise InvalidArgumentsException('metadata was not an instance of MulticlassClassificationMetadata')
labels = sorted(list(self.metadata.predicted_class_probability_metadata_columns().keys()))
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
# y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return recall_score(y_true, y_pred, average='macro', labels=labels)
[docs]class MulticlassClassificationSpecificity(Metric):
"""Specificity metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Specificity instance."""
super().__init__(display_name='Specificity', column_name='specificity', metadata=metadata)
self._min_chunk_size = 300
def _fit(self, reference_data: pd.DataFrame):
pass
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.metadata, MulticlassClassificationMetadata):
raise InvalidArgumentsException('metadata was not an instance of MulticlassClassificationMetadata')
labels = sorted(list(self.metadata.predicted_class_probability_metadata_columns().keys()))
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric {self.display_name}: " "prediction column contains no data"
)
# y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
MCM = multilabel_confusion_matrix(y_true, y_pred, labels=labels)
tn_sum = MCM[:, 0, 0]
fp_sum = MCM[:, 0, 1]
class_wise_specificity = tn_sum / (tn_sum + fp_sum)
return np.mean(class_wise_specificity)
[docs]class MulticlassClassificationAccuracy(Metric):
"""Accuracy metric."""
def __init__(self, metadata: ModelMetadata):
"""Creates a new Accuracy instance."""
super().__init__(display_name='Accuracy', column_name='accuracy', metadata=metadata)
self._min_chunk_size = 300
def _fit(self, reference_data: pd.DataFrame):
pass
def _calculate(self, data: pd.DataFrame):
if not isinstance(self.metadata, MulticlassClassificationMetadata):
raise InvalidArgumentsException('metadata was not an instance of MulticlassClassificationMetadata')
y_true = data[NML_METADATA_TARGET_COLUMN_NAME]
y_pred = data[NML_METADATA_PREDICTION_COLUMN_NAME]
if y_pred.isna().all().any():
raise InvalidArgumentsException(
f"could not calculate metric '{self.display_name}': " "prediction column contains no data"
)
# y_true, y_pred = _common_data_cleaning(y_true, y_pred)
if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1):
return np.nan
else:
return accuracy_score(y_true, y_pred)
def _common_data_cleaning(y_true, y_pred):
y_true, y_pred = (
pd.Series(y_true).reset_index(drop=True),
pd.Series(y_pred).reset_index(drop=True),
)
y_true = y_true[~y_pred.isna()]
y_pred.dropna(inplace=True)
y_pred = y_pred[~y_true.isna()]
y_true.dropna(inplace=True)
return y_true, y_pred
[docs]class MetricFactory:
"""A factory class that produces Metric instances based on a given magic string or a metric specification."""
_metrics: Dict[str, Dict[str, Type[Metric]]] = {
'roc_auc': {
BinaryClassificationMetadata.__name__: BinaryClassificationAUROC,
MulticlassClassificationMetadata.__name__: MulticlassClassificationAUROC,
},
'f1': {
BinaryClassificationMetadata.__name__: BinaryClassificationF1,
MulticlassClassificationMetadata.__name__: MulticlassClassificationF1,
},
'precision': {
BinaryClassificationMetadata.__name__: BinaryClassificationPrecision,
MulticlassClassificationMetadata.__name__: MulticlassClassificationPrecision,
},
'recall': {
BinaryClassificationMetadata.__name__: BinaryClassificationRecall,
MulticlassClassificationMetadata.__name__: MulticlassClassificationRecall,
},
'specificity': {
BinaryClassificationMetadata.__name__: BinaryClassificationSpecificity,
MulticlassClassificationMetadata.__name__: MulticlassClassificationSpecificity,
},
'accuracy': {
BinaryClassificationMetadata.__name__: BinaryClassificationAccuracy,
MulticlassClassificationMetadata.__name__: MulticlassClassificationAccuracy,
},
}
[docs] @classmethod
def create(cls, key: str, metadata: ModelMetadata) -> Metric:
"""Returns a Metric instance for a given key."""
if not isinstance(key, str):
raise InvalidArgumentsException(
f"cannot create metric given a '{type(key)}'" "Please provide a string, function or Metric"
)
if key not in cls._metrics:
raise InvalidArgumentsException(
f"unknown metric key '{key}' given. "
"Should be one of ['roc_auc', 'f1', 'precision', 'recall', 'specificity', "
"'accuracy']."
)
metadata_class_name = type(metadata).__name__
if metadata_class_name not in cls._metrics[key]:
raise RuntimeError(
f"metric '{key}' is currently not supported for model type {metadata_class_name}. "
"Please specify another metric or use one of these supported model types for this metric: "
f"{[metadata_class_name for md in cls._metrics[key]]}"
)
metric_class = cls._metrics[key][metadata_class_name]
return metric_class(metadata=metadata) # type: ignore