Source code for nannyml.performance_calculation.calculator

#  Author:   Niels Nuyttens  <>
#  License: Apache Software License 2.0

"""Calculates realized performance metrics when target data is available."""

from __future__ import annotations

from typing import Dict, List, Optional, Union

import numpy as np
import pandas as pd

from nannyml._typing import ModelOutputsType, ProblemType
from nannyml.base import AbstractCalculator
from nannyml.chunk import Chunk, Chunker
from nannyml.exceptions import CalculatorNotFittedException, InvalidArgumentsException
from nannyml.performance_calculation.metrics.base import Metric, MetricFactory
from nannyml.performance_calculation.result import Result


SUPPORTED_METRICS = list(MetricFactory.registry.keys())

[docs]class PerformanceCalculator(AbstractCalculator): """Calculates realized performance metrics when target data is available.""" def __init__( self, metrics: List[str], y_true: str, y_pred: str, problem_type: Union[str, ProblemType], y_pred_proba: ModelOutputsType = None, timestamp_column_name: str = None, chunk_size: int = None, chunk_number: int = None, chunk_period: str = None, chunker: Chunker = None, ): """Creates a new performance calculator. Parameters ---------- y_true: str The name of the column containing target values. y_pred_proba: ModelOutputsType Name(s) of the column(s) containing your model output. Pass a single string when there is only a single model output column, e.g. in binary classification cases. Pass a dictionary when working with multiple output columns, e.g. in multiclass classification cases. The dictionary maps a class/label string to the column name containing model outputs for that class/label. y_pred: str The name of the column containing your model predictions. timestamp_column_name: str, default=None The name of the column containing the timestamp of the model prediction. metrics: List[str] A list of metrics to calculate. chunk_size: int, default=None Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int, default=None Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str, default=None Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker, default=None The `Chunker` used to split the data sets into a lists of chunks. Examples -------- >>> import nannyml as nml >>> from IPython.display import display >>> reference_df = nml.load_synthetic_binary_classification_dataset()[0] >>> analysis_df = nml.load_synthetic_binary_classification_dataset()[1] >>> analysis_target_df = nml.load_synthetic_binary_classification_dataset()[2] >>> analysis_df = analysis_df.merge(analysis_target_df, on='identifier') >>> display(reference_df.head(3)) >>> calc = nml.PerformanceCalculator( ... y_pred_proba='y_pred_proba', ... y_pred='y_pred', ... y_true='work_home_actual', ... timestamp_column_name='timestamp', ... problem_type='classification_binary', ... metrics=['roc_auc', 'f1', 'precision', 'recall', 'specificity', 'accuracy'], ... chunk_size=5000) >>> >>> results = calc.calculate(analysis_df) >>> display( >>> display(results.calculator.previous_reference_results) >>> for metric in calc.metrics: ... figure = results.plot(kind='performance', plot_reference=True, metric=metric) ... """ super().__init__(chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name) self.y_true = y_true self.y_pred = y_pred self.y_pred_proba = y_pred_proba if isinstance(problem_type, str): problem_type = ProblemType.parse(problem_type) self.problem_type = problem_type if self.problem_type is not ProblemType.REGRESSION and y_pred_proba is None: raise InvalidArgumentsException(f"'y_pred_proba' can not be 'None' for problem type {ProblemType.value}") self.metrics: List[Metric] = [ MetricFactory.create(m, problem_type, {'calculator': self}) for m in metrics # type: ignore ] self.previous_reference_data: Optional[pd.DataFrame] = None self.previous_reference_results: Optional[pd.DataFrame] = None def __str__(self): return f"PerformanceCalculator[metrics={str(self.metrics)}]" def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> PerformanceCalculator: """Fits the calculator on the reference data, calibrating it for further use on the full dataset.""" if reference_data.empty: raise InvalidArgumentsException('reference data contains no rows. Provide a valid reference data set.') if self.y_true not in reference_data.columns: raise InvalidArgumentsException( f"target data column '{self.y_true}' not found in data columns: {reference_data.columns}." ) reference_data = reference_data.copy() # data validation is performed during the _fit for each metric for metric in self.metrics:, chunker=self.chunker) self.previous_reference_data = reference_data self.previous_reference_results = self._calculate(reference_data).data return self def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: """Calculates performance on the analysis data, using the metrics specified on calculator creation.""" if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') if self.y_true not in data.columns: raise InvalidArgumentsException(f"data does not contain target data column '{self.y_true}'.") data = data.copy() # Setup for target completeness rate data['NML_TARGET_INCOMPLETE'] = data[self.y_true].isna().astype(np.int16) # Generate chunks if self.chunker is None: raise CalculatorNotFittedException( 'chunker has not been set. ' 'Please ensure you run ```` ' 'before running ``calculator.calculate()``' ) chunks = self.chunker.split(data) # Construct result frame res = pd.DataFrame.from_records( [ { 'key': chunk.key, 'chunk_index': chunk.chunk_index, 'start_index': chunk.start_index, 'end_index': chunk.end_index, 'start_date': chunk.start_datetime, 'end_date': chunk.end_datetime, 'period': 'analysis' if chunk.is_transition else chunk.period, 'targets_missing_rate':[TARGET_COMPLETENESS_RATE_COLUMN_NAME].sum() /[TARGET_COMPLETENESS_RATE_COLUMN_NAME].count(), **self._calculate_metrics_for_chunk(chunk), } for chunk in chunks ] ) return Result(results_data=res, calculator=self) def _calculate_metrics_for_chunk(self, chunk: Chunk) -> Dict: metrics_results = {} for metric in self.metrics: chunk_metric = metric.calculate( metrics_results[metric.column_name] = chunk_metric metrics_results[f'{metric.column_name}_lower_threshold'] = metric.lower_threshold metrics_results[f'{metric.column_name}_upper_threshold'] = metric.upper_threshold metrics_results[f'{metric.column_name}_sampling_error'] = metric.sampling_error( metrics_results[f'{metric.column_name}_alert'] = ( metric.lower_threshold > chunk_metric if metric.lower_threshold else False ) or (chunk_metric > metric.upper_threshold if metric.upper_threshold else False) return metrics_results