Source code for nannyml.runner

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Used as an access point to start using NannyML in its most simple form."""

import logging
import sys
from pathlib import Path
from typing import Any, Dict, Optional

import pandas as pd
from rich.console import Console
from rich.progress import Progress

from nannyml._typing import ProblemType
from nannyml.chunk import Chunker
from nannyml.drift.multivariate.data_reconstruction import DataReconstructionDriftCalculator
from nannyml.drift.univariate import UnivariateDriftCalculator
from nannyml.io.base import Writer
from nannyml.io.raw_files_writer import RawFilesWriter
from nannyml.io.store import Store
from nannyml.performance_calculation import PerformanceCalculator
from nannyml.performance_estimation.confidence_based import CBPE
from nannyml.performance_estimation.direct_loss_estimation.dle import DEFAULT_METRICS, DLE

_logger = logging.getLogger(__name__)


[docs]def run( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store] = None, ignore_errors: bool = True, run_in_console: bool = False, ): with Progress() as progress: _run_statistical_univariate_feature_drift_calculator( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) _run_data_reconstruction_multivariate_feature_drift_calculator( reference_data, analysis_data, column_mapping, chunker, writer, store, ignore_errors, console=progress.console, ) _run_realized_performance_calculator( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) _run_cbpe_performance_estimation( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) _run_dle_performance_estimation( reference_data, analysis_data, column_mapping, problem_type, chunker, writer, store, ignore_errors, console=progress.console, ) progress.console.line(2) if isinstance(writer, RawFilesWriter): progress.console.rule() progress.console.log(f"view results in {Path(writer.filepath)}")
def _run_statistical_univariate_feature_drift_calculator( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]UnivariateStatisticalDriftCalculator[/]') try: calc: Optional[UnivariateDriftCalculator] = None # calculator to load or create calc_path = 'univariate_drift/calculator.pkl' # the path to load or store the calculator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading calculator from store') calc = store.load(path=calc_path, as_type=UnivariateDriftCalculator) if not calc: # no store or no fitted calculator was in the store if console: console.log('no fitted calculator found in store') console.log('fitting new calculator on reference data') if problem_type == ProblemType.CLASSIFICATION_BINARY: y_pred_proba_column_names = [column_mapping['y_pred_proba']] elif problem_type == ProblemType.CLASSIFICATION_MULTICLASS: y_pred_proba_column_names = list(column_mapping['y_pred_proba'].values()) else: y_pred_proba_column_names = [] calc = UnivariateDriftCalculator( column_names=(column_mapping['features'] + [column_mapping['y_pred']] + y_pred_proba_column_names), timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, categorical_methods=['chi2', 'jensen_shannon', 'l_infinity'], continuous_methods=['kolmogorov_smirnov', 'jensen_shannon', 'wasserstein'], ) calc.fit(reference_data) if store: store.store(calc, path=calc_path) if console: console.log('storing fitted calculator to store') # raise RuntimeError("🔥 something's not right there... 🔥") if console: console.log('calculating on analysis data') assert calc is not None results = calc.calculate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind=kind) for kind in ['drift', 'distribution']} except Exception as exc: msg = f"Failed to run statistical univariate feature drift calculator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='statistical_univariate_feature_drift') def _run_data_reconstruction_multivariate_feature_drift_calculator( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]DataReconstructionDriftCalculator[/]') try: calc: Optional[DataReconstructionDriftCalculator] = None # calculator to load or create calc_path = 'data_reconstruction/calculator.pkl' # the path to load or store the calculator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading calculator from store') calc = store.load(path=calc_path, as_type=DataReconstructionDriftCalculator) if not calc: # no store or no fitted calculator was in the store if console: console.log('no fitted calculator found in store') console.log('fitting new calculator on reference data') calc = DataReconstructionDriftCalculator( column_names=column_mapping['features'], timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, ) calc.fit(reference_data) if store: store.store(calc, path=calc_path) if console: console.log('storing fitted calculator to store') if console: console.log('calculating on analysis data') results = calc.calculate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind='drift') for kind in ['drift']} except Exception as exc: msg = f"Failed to run data reconstruction multivariate feature drift calculator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='data_reconstruction_multivariate_feature_drift') def _run_realized_performance_calculator( # noqa: C901 reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]PerformanceCalculator[/]') if column_mapping['y_true'] not in analysis_data.columns: _logger.info( f"target values column '{column_mapping['y_true']}' not present in analysis data. " "Skipping realized performance calculation." ) if console: console.log( f"target values column '{column_mapping['y_true']}' not present in analysis data. " "Skipping realized performance calculation.", style='yellow', ) return try: calc: Optional[PerformanceCalculator] = None # calculator to load or create calc_path = 'realized_performance/calculator.pkl' # the path to load or store the calculator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading calculator from store') calc = store.load(path=calc_path, as_type=PerformanceCalculator) if not calc: # no store or no fitted calculator was in the store metrics = [] if problem_type in [ProblemType.CLASSIFICATION_BINARY, ProblemType.CLASSIFICATION_MULTICLASS]: metrics = ['roc_auc', 'f1', 'precision', 'recall', 'specificity', 'accuracy'] elif problem_type in [ProblemType.REGRESSION]: metrics = DEFAULT_METRICS if console: console.log('no fitted calculator found in store') console.log('fitting new calculator on reference data') calc = PerformanceCalculator( y_true=column_mapping['y_true'], y_pred=column_mapping['y_pred'], y_pred_proba=column_mapping.get('y_pred_proba', None), timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, metrics=metrics, problem_type=problem_type, ) calc.fit(reference_data) if store: store.store(calc, path=calc_path) if console: console.log('storing fitted calculator to store') if console: console.log('calculating on analysis data') results = calc.calculate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind) for kind in ['performance']} except Exception as exc: msg = f"Failed to run realized performance calculator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='realized_performance') def _run_cbpe_performance_estimation( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]Confidence Base Performance Estimator[/]') if problem_type not in [ProblemType.CLASSIFICATION_BINARY, ProblemType.CLASSIFICATION_MULTICLASS]: _logger.info(f"CBPE does not support '{problem_type.name}' problems. Skipping CBPE estimation.") if console: console.log( f"CBPE does not support '{problem_type.name}' problems. Skipping CBPE estimation.", style='yellow', ) return try: estimator: Optional[CBPE] = None # estimator to load or create estimator_path = 'cbpe/estimator.pkl' # the path to load or store the estimator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading estimator from store') estimator = store.load(path=estimator_path, as_type=CBPE) if not estimator: # no store or no fitted calculator was in the store metrics = ['roc_auc', 'f1', 'precision', 'recall', 'specificity', 'accuracy'] if console: console.log('no fitted estimator found in store') console.log('fitting new estimator on reference data') estimator = CBPE( # type: ignore y_true=column_mapping['y_true'], y_pred=column_mapping['y_pred'], y_pred_proba=column_mapping['y_pred_proba'], timestamp_column_name=column_mapping.get('timestamp', None), problem_type=problem_type, chunker=chunker, metrics=metrics, ) estimator.fit(reference_data) if store: store.store(estimator, path=estimator_path) if console: console.log('storing fitted estimator to store') if console: console.log('estimating on analysis data') results = estimator.estimate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind) for kind in ['performance']} except Exception as exc: msg = f"Failed to run CBPE performance estimator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='confidence_based_performance_estimator') def _run_dle_performance_estimation( reference_data: pd.DataFrame, analysis_data: pd.DataFrame, column_mapping: Dict[str, Any], problem_type: ProblemType, chunker: Chunker, writer: Writer, store: Optional[Store], ignore_errors: bool, console: Optional[Console] = None, ): if console: console.rule('[cyan]Direct Loss Estimator[/]') if problem_type not in [ProblemType.REGRESSION]: _logger.info(f"DLE does not support '{problem_type.name}' problems. Skipping DLE estimation.") if console: console.log( f"DLE does not support '{problem_type.name}' problems. Skipping DLE estimation.", style='yellow', ) return try: estimator: Optional[DLE] = None # estimator to load or create estimator_path = 'cbpe/estimator.pkl' # the path to load or store the estimator in the store if store: # we have a store defined, let's try to load the fitted calculator from there if console: console.log('loading estimator from store') estimator = store.load(path=estimator_path, as_type=DLE) if not estimator: # no store or no fitted calculator was in the store if console: console.log('no fitted estimator found in store') console.log('fitting new estimator on reference data') estimator = DLE( # type: ignore feature_column_names=column_mapping['features'], y_true=column_mapping['y_true'], y_pred=column_mapping['y_pred'], timestamp_column_name=column_mapping.get('timestamp', None), chunker=chunker, metrics=DEFAULT_METRICS, ) estimator.fit(reference_data) if store: store.store(estimator, path=estimator_path) if console: console.log('storing fitted estimator to store') if console: console.log('estimating on analysis data') results = estimator.estimate(analysis_data) plots = {} if isinstance(writer, RawFilesWriter): if console: console.log('generating result plots') plots = {f'{kind}': results.plot(kind) for kind in ['performance']} except Exception as exc: msg = f"Failed to run DLE performance estimator: {exc}" if console: console.log(msg, style='red') else: _logger.error(msg) if ignore_errors: return else: sys.exit(1) if console: console.log('writing results') writer.write(result=results, plots=plots, calculator_name='direct_loss_estimator')