Source code for nannyml.runner

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Used as an access point to start using NannyML in its most simple form."""

import logging
import sys
from pathlib import Path
from typing import Any, Dict, Optional

import pandas as pd
from rich.console import Console
from rich.progress import Progress

from nannyml._typing import ProblemType
from nannyml.chunk import Chunker
from nannyml.drift.multivariate.data_reconstruction import DataReconstructionDriftCalculator
from nannyml.drift.univariate import UnivariateDriftCalculator
from nannyml.exceptions import InvalidArgumentsException
from nannyml.io.base import Writer
from nannyml.io.raw_files_writer import RawFilesWriter
from nannyml.io.store import Store
from nannyml.performance_calculation import (
    SUPPORTED_CLASSIFICATION_METRIC_VALUES,
    SUPPORTED_REGRESSION_METRIC_VALUES,
    PerformanceCalculator,
)
from nannyml.performance_estimation.confidence_based import CBPE
from nannyml.performance_estimation.confidence_based import SUPPORTED_METRIC_VALUES as CBPE_SUPPORTED_METRICS
from nannyml.performance_estimation.direct_loss_estimation import DLE
from nannyml.performance_estimation.direct_loss_estimation import SUPPORTED_METRIC_VALUES as DLE_SUPPORTED_METRICS

_logger = logging.getLogger(__name__)


[docs]def run( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store] = None, ignore_errors: bool = True, run_in_console: bool = False, ): with Progress() as progress: _run_statistical_univariate_feature_drift_calculator( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) _run_data_reconstruction_multivariate_feature_drift_calculator( reference_data, analysis_data, column_mapping, chunker, writer, store, ignore_errors, console=progress.console, ) _run_realized_performance_calculator( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) _run_cbpe_performance_estimation( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) _run_dle_performance_estimation( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) progress.console.line(2) if isinstance(writer, RawFilesWriter): progress.console.rule() progress.console.log(f"view results in {Path(writer.filepath)}")
def _run_statistical_univariate_feature_drift_calculator( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]UnivariateStatisticalDriftCalculator[/]') try: calc: Optional[UnivariateDriftCalculator] = None # calculator to load or create calc_path = 'univariate_drift/calculator.pkl' # the path to load or store the calculator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading calculator from store') calc = store.load(path=calc_path, as_type=UnivariateDriftCalculator) if not calc: # no store or no fitted calculator was in the store if console: console.log('no fitted calculator found in store') console.log('fitting new calculator on reference data') if problem_type == ProblemType.CLASSIFICATION_BINARY: y_pred_proba_column_names = [column_mapping['y_pred_proba']] elif problem_type == ProblemType.CLASSIFICATION_MULTICLASS: y_pred_proba_column_names = list(column_mapping['y_pred_proba'].values()) else: y_pred_proba_column_names = [] calc = UnivariateDriftCalculator( column_names=(column_mapping['features'] + [column_mapping['y_pred']] + y_pred_proba_column_names), timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, categorical_methods=['chi2', 'jensen_shannon', 'l_infinity'], continuous_methods=['kolmogorov_smirnov', 'jensen_shannon', 'wasserstein'], ) calc.fit(reference_data) if store: store.store(calc, path=calc_path) if console: console.log('storing fitted calculator to store') # raise RuntimeError("🔥 something's not right there... 🔥") if console: console.log('calculating on analysis data') results = calc.calculate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind=kind) for kind in ['drift', 'distribution']} except Exception as exc: msg = f"Failed to run statistical univariate feature drift calculator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='statistical_univariate_feature_drift') def _run_data_reconstruction_multivariate_feature_drift_calculator( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]DataReconstructionDriftCalculator[/]') try: calc: Optional[DataReconstructionDriftCalculator] = None # calculator to load or create calc_path = 'data_reconstruction/calculator.pkl' # the path to load or store the calculator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading calculator from store') calc = store.load(path=calc_path, as_type=DataReconstructionDriftCalculator) if not calc: # no store or no fitted calculator was in the store if console: console.log('no fitted calculator found in store') console.log('fitting new calculator on reference data') calc = DataReconstructionDriftCalculator( column_names=column_mapping['features'], timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, ) calc.fit(reference_data) if store: store.store(calc, path=calc_path) if console: console.log('storing fitted calculator to store') if console: console.log('calculating on analysis data') results = calc.calculate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind='drift') for kind in ['drift']} except Exception as exc: msg = f"Failed to run data reconstruction multivariate feature drift calculator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='data_reconstruction_multivariate_feature_drift') def _run_realized_performance_calculator( # noqa: C901 reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]PerformanceCalculator[/]') if column_mapping['y_true'] not in analysis_data.columns: _logger.info( f"target values column '{column_mapping['y_true']}' not present in analysis data. " "Skipping realized performance calculation." ) if console: console.log( f"target values column '{column_mapping['y_true']}' not present in analysis data. " "Skipping realized performance calculation.", style='yellow', ) return try: calc: Optional[PerformanceCalculator] = None # calculator to load or create calc_path = 'realized_performance/calculator.pkl' # the path to load or store the calculator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading calculator from store') calc = store.load(path=calc_path, as_type=PerformanceCalculator) if not calc: # no store or no fitted calculator was in the store if problem_type in [ProblemType.CLASSIFICATION_BINARY]: # requires a non-default parameter 'business_value_matrix' metrics = [ metric for metric in SUPPORTED_CLASSIFICATION_METRIC_VALUES if metric not in ['business_value'] ] elif problem_type in [ProblemType.CLASSIFICATION_MULTICLASS]: metrics = [ metric for metric in SUPPORTED_CLASSIFICATION_METRIC_VALUES if metric not in ['business_value', 'confusion_matrix'] ] elif problem_type in [ProblemType.REGRESSION]: metrics = SUPPORTED_REGRESSION_METRIC_VALUES else: raise InvalidArgumentsException(f"unsupported problem type '{problem_type}'") if console: console.log('no fitted calculator found in store') console.log('fitting new calculator on reference data') calc = PerformanceCalculator( y_true=column_mapping['y_true'], y_pred=column_mapping['y_pred'], y_pred_proba=column_mapping.get('y_pred_proba', None), timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, metrics=metrics, problem_type=problem_type, ) calc.fit(reference_data) if store: store.store(calc, path=calc_path) if console: console.log('storing fitted calculator to store') if console: console.log('calculating on analysis data') results = calc.calculate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind) for kind in ['performance']} except Exception as exc: msg = f"Failed to run realized performance calculator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='realized_performance') def _run_cbpe_performance_estimation( # noqa: C901 reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]Confidence Base Performance Estimator[/]') if problem_type not in [ProblemType.CLASSIFICATION_BINARY, ProblemType.CLASSIFICATION_MULTICLASS]: _logger.info(f"CBPE does not support '{problem_type.name}' problems. Skipping CBPE estimation.") if console: console.log( f"CBPE does not support '{problem_type.name}' problems. Skipping CBPE estimation.", style='yellow', ) return try: estimator: Optional[CBPE] = None # estimator to load or create estimator_path = 'cbpe/estimator.pkl' # the path to load or store the estimator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading estimator from store') estimator = store.load(path=estimator_path, as_type=CBPE) if not estimator: # no store or no fitted calculator was in the store if problem_type in [ProblemType.CLASSIFICATION_BINARY]: # requires a non-default parameter 'business_value_matrix' metrics = [metric for metric in CBPE_SUPPORTED_METRICS if metric not in ['business_value']] elif problem_type in [ProblemType.CLASSIFICATION_MULTICLASS]: metrics = [ metric for metric in CBPE_SUPPORTED_METRICS if metric not in ['business_value', 'confusion_matrix'] ] else: raise InvalidArgumentsException(f"unsupported problem type '{problem_type}'") if console: console.log('no fitted estimator found in store') console.log('fitting new estimator on reference data') estimator = CBPE( y_true=column_mapping['y_true'], y_pred=column_mapping['y_pred'], y_pred_proba=column_mapping['y_pred_proba'], timestamp_column_name=column_mapping.get('timestamp', None), problem_type=problem_type, chunker=chunker, metrics=metrics, ) estimator.fit(reference_data) if store: store.store(estimator, path=estimator_path) if console: console.log('storing fitted estimator to store') if console: console.log('estimating on analysis data') results = estimator.estimate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind) for kind in ['performance']} except Exception as exc: msg = f"Failed to run CBPE performance estimator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='confidence_based_performance_estimator') def _run_dle_performance_estimation( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]Direct Loss Estimator[/]') if problem_type not in [ProblemType.REGRESSION]: _logger.info(f"DLE does not support '{problem_type.name}' problems. Skipping DLE estimation.") if console: console.log( f"DLE does not support '{problem_type.name}' problems. Skipping DLE estimation.", style='yellow', ) return try: estimator: Optional[DLE] = None # estimator to load or create estimator_path = 'cbpe/estimator.pkl' # the path to load or store the estimator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading estimator from store') estimator = store.load(path=estimator_path, as_type=DLE) if not estimator: # no store or no fitted calculator was in the store if console: console.log('no fitted estimator found in store') console.log('fitting new estimator on reference data') estimator = DLE( feature_column_names=column_mapping['features'], y_true=column_mapping['y_true'], y_pred=column_mapping['y_pred'], timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, metrics=DLE_SUPPORTED_METRICS, ) estimator.fit(reference_data) if store: store.store(estimator, path=estimator_path) if console: console.log('storing fitted estimator to store') if console: console.log('estimating on analysis data') results = estimator.estimate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind) for kind in ['performance']} except Exception as exc: msg = f"Failed to run DLE performance estimator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='direct_loss_estimator')