Source code for nannyml.drift.ranker

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Module containing ways to rank features according to drift.

This model allows you to rank the columns within a
:class:`~nannyml.drift.univariate.calculator.UnivariateDriftCalculator` result according to their degree of drift.

The following rankers are currently available:

- :class:`~nannyml.drift.ranker.AlertCountRanker`: ranks the features according
  to the number of drift detection alerts they cause.
- :class:`~nannyml.drift.ranker.CorrelationRanker`: ranks the features according to their correlation with changes
  in realized or estimated performance.

"""
from __future__ import annotations

from typing import Optional, Union

import numpy as np
import pandas as pd
from scipy.stats import pearsonr

from nannyml._typing import Metric
from nannyml.drift.univariate.result import Result as UnivariateResults
from nannyml.exceptions import InvalidArgumentsException, NotFittedException
from nannyml.performance_calculation.result import Result as PerformanceCalculationResults
from nannyml.performance_estimation.confidence_based.metrics import Metric as CBPEMetric
from nannyml.performance_estimation.confidence_based.results import Result as CBPEResults
from nannyml.performance_estimation.direct_loss_estimation.result import Result as DLEResults
from nannyml.usage_logging import UsageEvent, log_usage


def _validate_drift_result(drift_calculation_result: UnivariateResults):
    if not isinstance(drift_calculation_result, UnivariateResults):
        raise InvalidArgumentsException("Univariate Results object required for drift_calculation_result argument.")

    if drift_calculation_result.data.empty:
        raise InvalidArgumentsException('drift results contain no data to use for ranking')

    if len(drift_calculation_result.categorical_method_names) > 1:
        raise InvalidArgumentsException(
            f"Only one categorical drift method should be present in the univariate results."
            f"\nFound: {drift_calculation_result.categorical_method_names}"
        )

    if len(drift_calculation_result.continuous_method_names) > 1:
        raise InvalidArgumentsException(
            f"Only one continuous drift method should be present in the univariate results."
            f"\nFound: {drift_calculation_result.continuous_method_names}"
        )


def _validate_performance_result(performance_results: Union[CBPEResults, DLEResults, PerformanceCalculationResults]):
    """Validate Inputs before performing ranking.

    Parameters
    ----------
    performance_results: Performance Estimation or Calculation results. Can be an instance of:
            nml.performance_estimation.confidence_based.results.Result,
            nml.performance_estimation.direct_loss_estimation.result.Result,
            nml.performance_calculation.result.Result
    """

    if not isinstance(performance_results, (CBPEResults, DLEResults, PerformanceCalculationResults)):
        raise InvalidArgumentsException(
            "Estimated or Realized Performance results object required for performance_results argument."
        )

    if len(performance_results.metrics) != 1:
        raise InvalidArgumentsException(
            "Just one metric should be present in performance_results used to rank CorrelationRanker."
        )


[docs]class AlertCountRanker: """Ranks the features according to the number of drift detection alerts they cause."""
[docs] @log_usage(UsageEvent.RANKER_ALERT_COUNT_RUN) def rank( self, drift_calculation_result: UnivariateResults, only_drifting: bool = False, ) -> pd.DataFrame: """Ranks the features according to the number of drift detection alerts they cause. Parameters ---------- drift_calculation_result : nannyml.driQft.univariate.Result The result of a univariate drift calculation. only_drifting : bool, default=False Omits features without alerts from the ranking results. Returns ------- ranking: pd.DataFrame A DataFrame containing the feature names and their ranks (the highest rank starts at 1, second-highest rank is 2, etc.). Features with the same number of alerts are ranked alphanumerically on the feature name. Examples -------- >>> import nannyml as nml >>> from IPython.display import display >>> >>> reference_df, analysis_df, target_df = nml.load_synthetic_binary_classification_dataset() >>> >>> display(reference_df.head()) >>> >>> column_names = [ >>> col for col in reference_df.columns if col not in ['timestamp', 'y_pred_proba', 'period', >>> 'y_pred', 'work_home_actual', 'identifier']] >>> >>> calc = nml.UnivariateDriftCalculator(column_names=column_names, >>> timestamp_column_name='timestamp') >>> >>> calc.fit(reference_df) >>> >>> results = calc.calculate(analysis_df.merge(target_df, on='identifier')) >>> >>> ranker = nml.AlertCountRanker() >>> ranked_features = ranker.rank(drift_calculation_result=results, only_drifting=False) >>> display(ranked_features) number_of_alerts column_name rank 0 5 wfh_prev_workday 1 1 5 salary_range 2 2 5 public_transportation_cost 3 3 5 distance_from_office 4 4 0 workday 5 5 0 work_home_actual 6 6 0 tenure 7 7 0 gas_price_per_litre 8 """ _validate_drift_result(drift_calculation_result) non_chunk = list(set(drift_calculation_result.data.columns.get_level_values(0)) - {'chunk'}) ranking = ( drift_calculation_result.filter(period='analysis') .to_df() .loc[:, (non_chunk, slice(None), 'alert')] .sum() .reset_index()[['level_0', 0]] ) ranking = ranking.groupby('level_0').sum() ranking.columns = ['number_of_alerts'] ranking['column_name'] = ranking.index ranking = ranking.sort_values(['number_of_alerts', 'column_name'], ascending=False) ranking = ranking.reset_index(drop=True) ranking['rank'] = ranking.index + 1 if only_drifting: ranking = ranking.loc[ranking['number_of_alerts'] != 0, :] return ranking
[docs]class CorrelationRanker: """Ranks the features according to their correlation with changes in realized or estimated performance. Examples -------- >>> import nannyml as nml >>> from IPython.display import display >>> >>> reference_df, analysis_df, target_df = nml.load_synthetic_binary_classification_dataset() >>> >>> column_names = [col for col in reference_df.columns >>> if col not in ['timestamp', 'y_pred_proba', 'period', >>> 'y_pred', 'work_home_actual', 'identifier']] >>> >>> univ_calc = nml.UnivariateDriftCalculator(column_names=column_names, >>> timestamp_column_name='timestamp') >>> >>> calc = nml.UnivariateDriftCalculator(column_names=column_names, >>> timestamp_column_name='timestamp') >>> >>> univ_calc.fit(reference_df) >>> univariate_results = calc.calculate(analysis_df.merge(target_df, on='identifier')) >>> >>> realized_calc = nml.PerformanceCalculator( >>> y_pred_proba='y_pred_proba', >>> y_pred='y_pred', >>> y_true='work_home_actual', >>> timestamp_column_name='timestamp', >>> problem_type='classification_binary', >>> metrics=['roc_auc']) >>> realized_calc.fit(reference_df) >>> realized_perf_results = realized_calc.calculate(analysis_df.merge(target_df, on='identifier')) >>> >>> ranker = nml.CorrelationRanker() >>> # ranker fits on one metric and reference period data only >>> ranker.fit(realized_perf_results.filter(period='reference')) >>> # ranker ranks on one drift method and one performance metric >>> correlation_ranked_features = ranker.rank( >>> univariate_results, >>> realized_perf_results, >>> only_drifting = False) >>> display(correlation_ranked_features) column_name pearsonr_correlation pearsonr_pvalue has_drifted rank 0 wfh_prev_workday 0.929710 3.076474e-09 True 1 1 public_transportation_cost 0.925910 4.872173e-09 True 2 2 salary_range 0.921556 8.014868e-09 True 3 3 distance_from_office 0.920749 8.762147e-09 True 4 4 gas_price_per_litre 0.340076 1.423541e-01 False 5 5 workday 0.154622 5.151128e-01 False 6 6 work_home_actual -0.030899 8.971071e-01 False 7 7 tenure -0.177018 4.553046e-01 False 8 """ def __init__(self) -> None: """Creates a new CorrelationRanker instance.""" super().__init__() self.metric: Metric self.mean_reference_performance: Optional[float] = None self.absolute_performance_change: Optional[float] = None self._is_fitted: bool = False
[docs] @log_usage(UsageEvent.RANKER_CORRELATION_FIT) def fit( self, reference_performance_calculation_result: Optional[ Union[CBPEResults, DLEResults, PerformanceCalculationResults] ] = None, ) -> CorrelationRanker: """Calculates the average performance during the reference period. This value is saved at the `mean_reference_performance` property of the ranker. Parameters ---------- reference_performance_calculation_result : Union[CBPEResults, DLEResults, PerformanceCalculationResults] Results from any performance calculator or estimator, e.g. :class:`~nannyml.performance_calculation.calculator.PerformanceCalculator` :class:`~nannyml.performance_estimation.confidence_based.cbpe.CBPE` :class:`~nannyml.performance_estimation.direct_loss_estimation.dle.DLE` Returns ------- ranking: CorrelationRanker """ if reference_performance_calculation_result is None: raise InvalidArgumentsException("reference performance calculation results can not be None.") _validate_performance_result(reference_performance_calculation_result) # we're expecting to have filtered inputs, so we should only have a single input. self.metric = reference_performance_calculation_result.metrics[0] # TODO: this will fail for estimated confusion matrix metric_column_name = self.metric.name if isinstance(self.metric, CBPEMetric) else self.metric.column_name self.mean_reference_performance = ( reference_performance_calculation_result.to_df().loc[:, (metric_column_name, 'value')].mean() ) self._is_fitted = True return self
[docs] @log_usage(UsageEvent.RANKER_CORRELATION_RUN) def rank( self, drift_calculation_result: UnivariateResults, performance_calculation_result: Optional[Union[CBPEResults, DLEResults, PerformanceCalculationResults]] = None, only_drifting: bool = False, ): """Compares the number of alerts for each feature and ranks them accordingly. Parameters ---------- drift_calculation_result: UnivariateResults The univariate drift results containing the features we want to rank. performance_calculation_result: Union[CBPEResults, DLEResults, PerformanceCalculationResults] Results from any performance calculator or estimator, e.g. :class:`~nannyml.performance_calculation.calculator.PerformanceCalculator` :class:`~nannyml.performance_estimation.confidence_based.cbpe.CBPE` :class:`~nannyml.performance_estimation.direct_loss_estimation.dle.DLE` only_drifting: bool, default=False Omits features without alerts from the ranking results. Returns ------- ranking: pd.DataFrame A DataFrame containing the feature names and their ranks (the highest rank starts at 1, second-highest rank is 2, etc.). Features with the same number of alerts are ranked alphanumerically on the feature name. """ if not self._is_fitted or self.metric is None: raise NotFittedException("trying to call 'rank()' on an unfitted Ranker. Please call 'fit()' first") # Perform input validations if performance_calculation_result is None: raise InvalidArgumentsException("reference performance calculation results can not be None.") _validate_drift_result(drift_calculation_result) _validate_performance_result(performance_calculation_result) _drift_index = drift_calculation_result.to_df().loc[:, ('chunk', 'chunk', 'start_index')] _perf_index = performance_calculation_result.to_df().loc[:, ('chunk', 'start_index')] if not _drift_index.equals(_perf_index): raise InvalidArgumentsException( "Drift and Performance results need to be filtered to the same data period." ) # TODO: this will fail for estimated confusion matrix metric_column_name = self.metric.name if isinstance(self.metric, CBPEMetric) else self.metric.column_name # Start ranking calculations abs_perf_change = np.abs( performance_calculation_result.to_df().loc[:, (metric_column_name, 'value')].to_numpy() - self.mean_reference_performance ) self.absolute_performance_change = abs_perf_change features1 = [] spearmanr1 = [] spearmanr2 = [] has_drifted = [] for ftr in drift_calculation_result.column_names: features1.append(ftr) tmp1 = pearsonr( drift_calculation_result.to_df().loc[:, (ftr, slice(None), 'value')].to_numpy().ravel(), abs_perf_change ) spearmanr1.append(tmp1[0]) spearmanr2.append(tmp1[1]) has_drifted.append( (drift_calculation_result.to_df().loc[:, (ftr, slice(None), 'alert')] == True).any()[0] # noqa: E712 ) ranked = pd.DataFrame( { 'column_name': features1, 'pearsonr_correlation': spearmanr1, 'pearsonr_pvalue': spearmanr2, 'has_drifted': has_drifted, } ) # we want 1st row to be most impactful feature ranked.sort_values('pearsonr_correlation', ascending=False, inplace=True) ranked.reset_index(drop=True, inplace=True) ranked['rank'] = ranked.index + 1 if only_drifting: ranked = ranked.loc[ranked.has_drifted == True].reset_index(drop=True) # noqa: E712 return ranked