Source code for nannyml.performance_calculation.metrics

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Module containing metric utilities and implementations."""
import abc
import logging
from logging import Logger
from typing import Any, Callable, Dict, List, Tuple  # noqa: TYP001

import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    multilabel_confusion_matrix,
    precision_score,
    recall_score,
    roc_auc_score,
)

from nannyml._typing import UseCase, model_output_column_names
from nannyml.base import AbstractCalculator, _list_missing
from nannyml.chunk import Chunk, Chunker
from nannyml.exceptions import InvalidArgumentsException


[docs]class Metric(abc.ABC): """A performance metric used to calculate realized model performance.""" def __init__( self, display_name: str, column_name: str, calculator: AbstractCalculator, upper_threshold: float = None, lower_threshold: float = None, ): """Creates a new Metric instance. Parameters ---------- display_name : str The name of the metric. Used to display in plots. If not given this name will be derived from the ``calculation_function``. column_name: str The name used to indicate the metric in columns of a DataFrame. calculator: AbstractCalculator The calculator using the Metric instance. upper_threshold : float, default=None An optional upper threshold for the performance metric. lower_threshold : float, default=None An optional lower threshold for the performance metric. """ self.display_name = display_name self.column_name = column_name from .calculator import PerformanceCalculator if not isinstance(calculator, PerformanceCalculator): raise RuntimeError(f"{calculator.__class__.__name__} is not an instance of type " f"PerformanceCalculator") self.calculator = calculator self.lower_threshold = lower_threshold self.upper_threshold = upper_threshold self._minimum_chunk_size: int = 300
[docs] def fit(self, reference_data: pd.DataFrame, chunker: Chunker): """Fits a Metric on reference data. Parameters ---------- reference_data: pd.DataFrame The reference data used for fitting. Must have target data available. chunker: Chunker The :class:`~nannyml.chunk.Chunker` used to split the reference data into chunks. This value is provided by the calling :class:`~nannyml.performance_calculation.calculator.PerformanceCalculator`. """ self._fit(reference_data) # Calculate alert thresholds if self.upper_threshold is None and self.lower_threshold is None: reference_chunks = chunker.split( reference_data, minimum_chunk_size=self.minimum_chunk_size(), timestamp_column_name=self.calculator.timestamp_column_name, ) self.lower_threshold, self.upper_threshold = self._calculate_alert_thresholds(reference_chunks) return
def _fit(self, reference_data: pd.DataFrame): raise NotImplementedError
[docs] def calculate(self, data: pd.DataFrame): """Calculates performance metrics on data. Parameters ---------- data: pd.DataFrame The data to calculate performance metrics on. Requires presence of either the predicted labels or prediction scores/probabilities (depending on the metric to be calculated), as well as the target data. """ return self._calculate(data)
def _calculate(self, data: pd.DataFrame): raise NotImplementedError
[docs] def minimum_chunk_size(self) -> int: """Determines the minimum number of observations a chunk should ideally for this metric to be trustworthy.""" try: return self._minimum_chunk_size except Exception: # TODO: log failure return 300
def _calculate_alert_thresholds( self, reference_chunks: List[Chunk], std_num: int = 3, lower_limit: int = 0, upper_limit: int = 1 ) -> Tuple[float, float]: chunked_reference_metric = [self.calculate(chunk.data) for chunk in reference_chunks] deviation = np.std(chunked_reference_metric) * std_num mean_reference_metric = np.mean(chunked_reference_metric) lower_threshold = np.maximum(mean_reference_metric - deviation, lower_limit) upper_threshold = np.minimum(mean_reference_metric + deviation, upper_limit) return lower_threshold, upper_threshold
[docs] def __eq__(self, other): """Establishes equality by comparing all properties.""" return ( self.display_name == other.display_name and self.column_name == other.column_name and self.upper_threshold == other.upper_threshold and self.lower_threshold == other.lower_threshold )
[docs]class MetricFactory: """A factory class that produces Metric instances based on a given magic string or a metric specification.""" registry: Dict[str, Dict[UseCase, Metric]] = {} @classmethod def _logger(cls) -> Logger: return logging.getLogger(__name__)
[docs] @classmethod def create(cls, key: str, use_case: UseCase, kwargs: Dict[str, Any] = {}) -> Metric: """Returns a Metric instance for a given key.""" if not isinstance(key, str): raise InvalidArgumentsException( f"cannot create metric given a '{type(key)}'" "Please provide a string, function or Metric" ) if key not in cls.registry: raise InvalidArgumentsException( f"unknown metric key '{key}' given. " "Should be one of ['roc_auc', 'f1', 'precision', 'recall', 'specificity', " "'accuracy']." ) if use_case not in cls.registry[key]: raise RuntimeError( f"metric '{key}' is currently not supported for use case {use_case}. " "Please specify another metric or use one of these supported model types for this metric: " f"{[md for md in cls.registry[key]]}" ) metric_class = cls.registry[key][use_case] return metric_class(**kwargs) # type: ignore
[docs] @classmethod def register(cls, metric: str, use_case: UseCase) -> Callable: def inner_wrapper(wrapped_class: Metric) -> Metric: if metric in cls.registry: if use_case in cls.registry[metric]: cls._logger().warning(f"re-registering Metric for metric='{metric}' and use_case='{use_case}'") cls.registry[metric][use_case] = wrapped_class else: cls.registry[metric] = {use_case: wrapped_class} return wrapped_class return inner_wrapper
# def _floor_chunk_size(calculated_min_chunk_size: float, lower_limit_on_chunk_size: int = 300) -> int: # return int(np.maximum(calculated_min_chunk_size, lower_limit_on_chunk_size)) # # # def _minimum_chunk_size_roc_auc( # data: pd.DataFrame, # period_column_name: str = NML_METADATA_PERIOD_COLUMN_NAME, # predicted_probability_column_name: str = NML_METADATA_PREDICTED_PROBABILITY_COLUMN_NAME, # target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME, # required_std: float = 0.02, # ) -> int: # """Estimation of minimum sample size to get required standard deviation of AUROC. # # Estimation takes advantage of Standard Error of the Mean formula and expressing AUROC as Mann-Whitney U statistic. # """ # y_true = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, target_column_name] # y_pred_proba = data.loc[ # data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, predicted_probability_column_name # ] # # y_true, y_pred_proba = np.asarray(y_true), np.asarray(y_pred_proba) # if np.mean(y_true) > 0.5: # y_true = abs(np.asarray(y_true) - 1) # y_pred_proba = 1 - y_pred_proba # # sorted_idx = np.argsort(y_pred_proba) # y_pred_proba = y_pred_proba[sorted_idx] # y_true = y_true[sorted_idx] # rank_order = np.asarray(range(len(y_pred_proba))) # positive_ranks = y_true * rank_order # indexes = np.unique(positive_ranks)[1:] # ser = [] # # for i, index in enumerate(indexes): # ser.append(index - i) # # n_pos = np.sum(y_true) # n_neg = len(y_true) - n_pos # ser_divided = ser / (n_pos * n_neg) # ser_multi = ser_divided * n_pos # # pos_targets = y_true # # neg_targets = abs(y_true - 1) # # n_pos_targets = np.sum(pos_targets) # # fraction = n_pos_targets / len(y_true) # sample_size = (np.std(ser_multi)) ** 2 / ((required_std**2) * fraction) # sample_size = np.minimum(sample_size, len(y_true)) # sample_size = np.round(sample_size, -2) # # return _floor_chunk_size(sample_size) # # # def _minimum_chunk_size_f1( # data: pd.DataFrame, # period_column_name: str = NML_METADATA_PERIOD_COLUMN_NAME, # prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME, # target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME, # required_std: float = 0.02, # ): # """Estimation of minimum sample size to get required standard deviation of F1. # # Estimation takes advantage of Standard Error of the Mean formula. # """ # y_true = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, target_column_name] # y_pred = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, prediction_column_name] # # y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) # # TP = np.where((y_true == y_pred) & (y_pred == 1), 1, np.nan) # FP = np.where((y_true != y_pred) & (y_pred == 1), 0, np.nan) # FN = np.where((y_true != y_pred) & (y_pred == 0), 0, np.nan) # # TP = TP[~np.isnan(TP)] # FN = FN[~np.isnan(FN)] # FP = FP[~np.isnan(FP)] # # tp_fp_fn = np.concatenate([TP, FN, FP]) # # correcting_factor = len(tp_fp_fn) / ((len(FN) + len(FP)) * 0.5 + len(TP)) # obs_level_f1 = tp_fp_fn * correcting_factor # fraction_of_relevant = len(tp_fp_fn) / len(y_pred) # sample_size = ((np.std(obs_level_f1)) ** 2) / ((required_std**2) * fraction_of_relevant) # sample_size = np.minimum(sample_size, len(y_true)) # sample_size = np.round(sample_size, -2) # # return _floor_chunk_size(sample_size) # # # def _minimum_chunk_size_precision( # data: pd.DataFrame, # period_column_name: str = NML_METADATA_PERIOD_COLUMN_NAME, # prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME, # target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME, # required_std: float = 0.02, # ): # """Estimation of minimum sample size to get required standard deviation of Precision. # # Estimation takes advantage of Standard Error of the Mean formula. # """ # y_true = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, target_column_name] # y_pred = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, prediction_column_name] # # y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) # # TP = np.where((y_true == y_pred) & (y_pred == 1), 1, np.nan) # FP = np.where((y_true != y_pred) & (y_pred == 1), 0, np.nan) # # TP = TP[~np.isnan(TP)] # FP = FP[~np.isnan(FP)] # obs_level_precision = np.concatenate([TP, FP]) # amount_positive_pred = np.sum(y_pred) # fraction_of_pos_pred = amount_positive_pred / len(y_pred) # sample_size = ((np.std(obs_level_precision)) ** 2) / ((required_std**2) * fraction_of_pos_pred) # sample_size = np.minimum(sample_size, len(y_true)) # sample_size = np.round(sample_size, -2) # # return _floor_chunk_size(sample_size) # # # def _minimum_chunk_size_recall( # data: pd.DataFrame, # period_column_name: str = NML_METADATA_PERIOD_COLUMN_NAME, # prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME, # target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME, # required_std: float = 0.02, # ): # """Estimation of minimum sample size to get required standard deviation of Recall. # # Estimation takes advantage of Standard Error of the Mean formula. # """ # y_true = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, target_column_name] # y_pred = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, prediction_column_name] # # y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) # # TP = np.where((y_true == y_pred) & (y_pred == 1), 1, np.nan) # FN = np.where((y_true != y_pred) & (y_pred == 0), 0, np.nan) # TP = TP[~np.isnan(TP)] # FN = FN[~np.isnan(FN)] # # obs_level_recall = np.concatenate([TP, FN]) # fraction_of_relevant = sum(obs_level_recall) / len(y_pred) # # sample_size = ((np.std(obs_level_recall)) ** 2) / ((required_std**2) * fraction_of_relevant) # sample_size = np.minimum(sample_size, len(y_true)) # sample_size = np.round(sample_size, -2) # # return _floor_chunk_size(sample_size) # # # def _minimum_chunk_size_specificity( # data: pd.DataFrame, # period_column_name: str = NML_METADATA_PERIOD_COLUMN_NAME, # prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME, # target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME, # required_std: float = 0.02, # ): # """Estimation of minimum sample size to get required standard deviation of Specificity. # # Estimation takes advantage of Standard Error of the Mean formula. # """ # y_true = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, target_column_name] # y_pred = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, prediction_column_name] # # y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) # TN = np.where((y_true == y_pred) & (y_pred == 0), 1, np.nan) # FP = np.where((y_true != y_pred) & (y_pred == 1), 0, np.nan) # TN = TN[~np.isnan(TN)] # FP = FP[~np.isnan(FP)] # # obs_level_specificity = np.concatenate([TN, FP]) # fraction_of_relevant = len(obs_level_specificity) / len(y_pred) # sample_size = ((np.std(obs_level_specificity)) ** 2) / ((required_std**2) * fraction_of_relevant) # sample_size = np.minimum(sample_size, len(y_true)) # sample_size = np.round(sample_size, -2) # # return _floor_chunk_size(sample_size) # # # def _minimum_chunk_size_accuracy( # data: pd.DataFrame, # period_column_name: str = NML_METADATA_PERIOD_COLUMN_NAME, # prediction_column_name: str = NML_METADATA_PREDICTION_COLUMN_NAME, # target_column_name: str = NML_METADATA_TARGET_COLUMN_NAME, # required_std: float = 0.02, # ): # """Estimation of minimum sample size to get required standard deviation of Accuracy. # # Estimation takes advantage of Standard Error of the Mean formula. # """ # y_true = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, target_column_name] # y_pred = data.loc[data[period_column_name] == NML_METADATA_REFERENCE_period_NAME, prediction_column_name] # # y_true, y_pred = np.asarray(y_true), np.asarray(y_pred) # y_true = np.asarray(y_true).astype(int) # # y_pred = np.asarray(y_pred).astype(int) # correct_table = (y_true == y_pred).astype(int) # sample_size = (np.std(correct_table) ** 2) / (required_std**2) # sample_size = np.minimum(sample_size, len(y_true)) # sample_size = np.round(sample_size, -2) # # return _floor_chunk_size(sample_size)
[docs]@MetricFactory.register(metric='roc_auc', use_case=UseCase.CLASSIFICATION_BINARY) class BinaryClassificationAUROC(Metric): """Area under Receiver Operating Curve metric.""" def __init__(self, calculator): """Creates a new AUROC instance.""" super().__init__(display_name='ROC AUC', column_name='roc_auc', calculator=calculator) def __str__(self): return "roc_auc" def _fit(self, reference_data: pd.DataFrame): # self._min_chunk_size = _minimum_chunk_size_roc_auc(reference_data) _list_missing([self.calculator.y_true, self.calculator.y_pred_proba], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): """Redefine to handle NaNs and edge cases.""" _list_missing([self.calculator.y_true, self.calculator.y_pred_proba], list(data.columns)) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred_proba] y_true, y_pred = _common_data_cleaning(y_true, y_pred) if y_true.nunique() <= 1: return np.nan else: return roc_auc_score(y_true, y_pred)
[docs]@MetricFactory.register(metric='f1', use_case=UseCase.CLASSIFICATION_BINARY) class BinaryClassificationF1(Metric): """F1 score metric.""" def __init__(self, calculator): """Creates a new F1 instance.""" super().__init__(display_name='F1', column_name='f1', calculator=calculator) def __str__(self): return "f1" def _fit(self, reference_data: pd.DataFrame): # self._min_chunk_size = _minimum_chunk_size_f1(reference_data) _list_missing([self.calculator.y_true, self.calculator.y_pred], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): """Redefine to handle NaNs and edge cases.""" _list_missing([self.calculator.y_true, self.calculator.y_pred], list(data.columns)) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] y_true, y_pred = _common_data_cleaning(y_true, y_pred) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return f1_score(y_true, y_pred)
[docs]@MetricFactory.register(metric='precision', use_case=UseCase.CLASSIFICATION_BINARY) class BinaryClassificationPrecision(Metric): """Precision metric.""" def __init__(self, calculator): """Creates a new Precision instance.""" super().__init__(display_name='Precision', column_name='precision', calculator=calculator) def __str__(self): return "precision" def _fit(self, reference_data: pd.DataFrame): # self._min_chunk_size = _minimum_chunk_size_precision(reference_data) _list_missing([self.calculator.y_true, self.calculator.y_pred], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], list(data.columns)) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] y_true, y_pred = _common_data_cleaning(y_true, y_pred) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return precision_score(y_true, y_pred)
[docs]@MetricFactory.register(metric='recall', use_case=UseCase.CLASSIFICATION_BINARY) class BinaryClassificationRecall(Metric): """Recall metric, also known as 'sensitivity'.""" def __init__(self, calculator): """Creates a new Recall instance.""" super().__init__(display_name='Recall', column_name='recall', calculator=calculator) def __str__(self): return "recall" def _fit(self, reference_data: pd.DataFrame): # self._min_chunk_size = _minimum_chunk_size_recall(reference_data) _list_missing([self.calculator.y_true, self.calculator.y_pred], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], list(data.columns)) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] y_true, y_pred = _common_data_cleaning(y_true, y_pred) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return recall_score(y_true, y_pred)
[docs]@MetricFactory.register(metric='specificity', use_case=UseCase.CLASSIFICATION_BINARY) class BinaryClassificationSpecificity(Metric): """Specificity metric.""" def __init__(self, calculator): """Creates a new F1 instance.""" super().__init__(display_name='Specificity', column_name='specificity', calculator=calculator) def __str__(self): return "specificity" def _fit(self, reference_data: pd.DataFrame): # self._min_chunk_size = _minimum_chunk_size_specificity(reference_data) _list_missing([self.calculator.y_true, self.calculator.y_pred], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], list(data.columns)) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all(): raise InvalidArgumentsException( f"could not calculate metric {self.display_name}: " "prediction column contains no data" ) y_true, y_pred = _common_data_cleaning(y_true, y_pred) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel() return tn / (tn + fp)
[docs]@MetricFactory.register(metric='accuracy', use_case=UseCase.CLASSIFICATION_BINARY) class BinaryClassificationAccuracy(Metric): """Accuracy metric.""" def __init__(self, calculator): """Creates a new Accuracy instance.""" super().__init__(display_name='Accuracy', column_name='accuracy', calculator=calculator) def __str__(self): return "accuracy" def _fit(self, reference_data: pd.DataFrame): # self._min_chunk_size = _minimum_chunk_size_accuracy(reference_data) _list_missing([self.calculator.y_true, self.calculator.y_pred], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], list(data.columns)) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all(): raise InvalidArgumentsException( f"could not calculate metric '{self.display_name}': " "prediction column contains no data" ) y_true, y_pred = _common_data_cleaning(y_true, y_pred) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel() return (tp + tn) / (tp + tn + fp + fn)
[docs]@MetricFactory.register(metric='roc_auc', use_case=UseCase.CLASSIFICATION_MULTICLASS) class MulticlassClassificationAUROC(Metric): """Area under Receiver Operating Curve metric.""" def __init__(self, calculator): """Creates a new AUROC instance.""" super().__init__(display_name='ROC AUC', column_name='roc_auc', calculator=calculator) self._min_chunk_size = 300 def __str__(self): return "roc_auc" def _fit(self, reference_data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], list(reference_data.columns)) def _calculate(self, data: pd.DataFrame): if not isinstance(self.calculator.y_pred_proba, Dict): raise InvalidArgumentsException( f"'y_pred_proba' is of type {type(self.calculator.y_pred_proba)}\n" f"multiclass use cases require 'y_pred_proba' to " "be a dictionary mapping classes to columns." ) _list_missing([self.calculator.y_true] + model_output_column_names(self.calculator.y_pred_proba), data) labels, class_probability_columns = [], [] for label in sorted(list(self.calculator.y_pred_proba.keys())): labels.append(label) class_probability_columns.append(self.calculator.y_pred_proba[label]) y_true = data[self.calculator.y_true] y_pred = data[class_probability_columns] if y_pred.isna().all().any(): raise InvalidArgumentsException( f"could not calculate metric {self.display_name}: " "prediction column contains no data" ) if y_true.nunique() <= 1: return np.nan else: return roc_auc_score(y_true, y_pred, multi_class='ovr', average='macro', labels=labels)
[docs]@MetricFactory.register(metric='f1', use_case=UseCase.CLASSIFICATION_MULTICLASS) class MulticlassClassificationF1(Metric): """F1 score metric.""" def __init__(self, calculator): """Creates a new F1 instance.""" super().__init__(display_name='F1', column_name='f1', calculator=calculator) self._min_chunk_size = 300 def __str__(self): return "f1" def _fit(self, reference_data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], reference_data) def _calculate(self, data: pd.DataFrame): if not isinstance(self.calculator.y_pred_proba, Dict): raise InvalidArgumentsException( f"'y_pred_proba' is of type {type(self.calculator.y_pred_proba)}\n" f"multiclass use cases require 'y_pred_proba' to " "be a dictionary mapping classes to columns." ) _list_missing([self.calculator.y_true, self.calculator.y_pred], data) labels = sorted(list(self.calculator.y_pred_proba.keys())) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all().any(): raise InvalidArgumentsException( f"could not calculate metric {self.display_name}: " "prediction column contains no data" ) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return f1_score(y_true, y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register(metric='precision', use_case=UseCase.CLASSIFICATION_MULTICLASS) class MulticlassClassificationPrecision(Metric): """Precision metric.""" def __init__(self, calculator): """Creates a new Precision instance.""" super().__init__(display_name='Precision', column_name='precision', calculator=calculator) self._min_chunk_size = 300 def __str__(self): return "precision" def _fit(self, reference_data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], reference_data) def _calculate(self, data: pd.DataFrame): if not isinstance(self.calculator.y_pred_proba, Dict): raise InvalidArgumentsException( f"'y_pred_proba' is of type {type(self.calculator.y_pred_proba)}\n" f"multiclass use cases require 'y_pred_proba' to " "be a dictionary mapping classes to columns." ) _list_missing([self.calculator.y_true, self.calculator.y_pred], data) labels = sorted(list(self.calculator.y_pred_proba.keys())) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all().any(): raise InvalidArgumentsException( f"could not calculate metric {self.display_name}: " "prediction column contains no data" ) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return precision_score(y_true, y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register(metric='recall', use_case=UseCase.CLASSIFICATION_MULTICLASS) class MulticlassClassificationRecall(Metric): """Recall metric, also known as 'sensitivity'.""" def __init__(self, calculator): """Creates a new Recall instance.""" super().__init__(display_name='Recall', column_name='recall', calculator=calculator) self._min_chunk_size = 300 def __str__(self): return "recall" def _fit(self, reference_data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], reference_data) def _calculate(self, data: pd.DataFrame): if not isinstance(self.calculator.y_pred_proba, Dict): raise InvalidArgumentsException( f"'y_pred_proba' is of type {type(self.calculator.y_pred_proba)}\n" f"multiclass use cases require 'y_pred_proba' to " "be a dictionary mapping classes to columns." ) _list_missing([self.calculator.y_true, self.calculator.y_pred], data) labels = sorted(list(self.calculator.y_pred_proba.keys())) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all().any(): raise InvalidArgumentsException( f"could not calculate metric {self.display_name}: " "prediction column contains no data" ) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return recall_score(y_true, y_pred, average='macro', labels=labels)
[docs]@MetricFactory.register(metric='specificity', use_case=UseCase.CLASSIFICATION_MULTICLASS) class MulticlassClassificationSpecificity(Metric): """Specificity metric.""" def __init__(self, calculator): """Creates a new Specificity instance.""" super().__init__(display_name='Specificity', column_name='specificity', calculator=calculator) self._min_chunk_size = 300 def __str__(self): return "specificity" def _fit(self, reference_data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], reference_data) def _calculate(self, data: pd.DataFrame): if not isinstance(self.calculator.y_pred_proba, Dict): raise InvalidArgumentsException( f"'y_pred_proba' is of type {type(self.calculator.y_pred_proba)}\n" f"multiclass use cases require 'y_pred_proba' to " "be a dictionary mapping classes to columns." ) _list_missing([self.calculator.y_true, self.calculator.y_pred], data) labels = sorted(list(self.calculator.y_pred_proba.keys())) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all().any(): raise InvalidArgumentsException( f"could not calculate metric {self.display_name}: prediction column contains no data" ) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: MCM = multilabel_confusion_matrix(y_true, y_pred, labels=labels) tn_sum = MCM[:, 0, 0] fp_sum = MCM[:, 0, 1] class_wise_specificity = tn_sum / (tn_sum + fp_sum) return np.mean(class_wise_specificity)
[docs]@MetricFactory.register(metric='accuracy', use_case=UseCase.CLASSIFICATION_MULTICLASS) class MulticlassClassificationAccuracy(Metric): """Accuracy metric.""" def __init__(self, calculator): """Creates a new Accuracy instance.""" super().__init__(display_name='Accuracy', column_name='accuracy', calculator=calculator) self._min_chunk_size = 300 def __str__(self): return "accuracy" def _fit(self, reference_data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], reference_data) def _calculate(self, data: pd.DataFrame): _list_missing([self.calculator.y_true, self.calculator.y_pred], data) y_true = data[self.calculator.y_true] y_pred = data[self.calculator.y_pred] if y_pred.isna().all().any(): raise InvalidArgumentsException( f"could not calculate metric '{self.display_name}': " "prediction column contains no data" ) if (y_true.nunique() <= 1) or (y_pred.nunique() <= 1): return np.nan else: return accuracy_score(y_true, y_pred)
def _common_data_cleaning(y_true, y_pred): y_true, y_pred = ( pd.Series(y_true).reset_index(drop=True), pd.Series(y_pred).reset_index(drop=True), ) y_true = y_true[~y_pred.isna()] y_pred.dropna(inplace=True) y_pred = y_pred[~y_true.isna()] y_true.dropna(inplace=True) return y_true, y_pred