# # Author: Niels Nuyttens <niels@nannyml.com>
# #
# # License: Apache Software License 2.0
#
# """Used as an access point to start using NannyML in its most simple form."""
# import logging
# import sys
# from typing import List
#
import logging
import sys
from pathlib import Path
from typing import Any, Dict
import pandas as pd
from rich.console import Console
from rich.progress import Progress
from nannyml._typing import ProblemType
from nannyml.chunk import Chunker
from nannyml.drift.multivariate.data_reconstruction import DataReconstructionDriftCalculator
from nannyml.drift.univariate import UnivariateDriftCalculator
from nannyml.io.base import Writer
from nannyml.io.raw_files_writer import RawFilesWriter
from nannyml.performance_calculation import PerformanceCalculator
from nannyml.performance_estimation.confidence_based import CBPE
from nannyml.performance_estimation.direct_loss_estimation import DEFAULT_METRICS, DLE
_logger = logging.getLogger(__name__)
[docs]def run(
reference_data: pd.DataFrame,
analysis_data: pd.DataFrame,
column_mapping: Dict[str, Any],
problem_type: ProblemType,
chunker: Chunker,
writer: Writer,
ignore_errors: bool = True,
run_in_console: bool = False,
):
with Progress() as progress:
_run_statistical_univariate_feature_drift_calculator(
reference_data,
analysis_data,
column_mapping,
problem_type,
chunker,
writer,
ignore_errors,
console=progress.console,
)
_run_data_reconstruction_multivariate_feature_drift_calculator(
reference_data, analysis_data, column_mapping, chunker, writer, ignore_errors, console=progress.console
)
_run_realized_performance_calculator(
reference_data,
analysis_data,
column_mapping,
problem_type,
chunker,
writer,
ignore_errors,
console=progress.console,
)
_run_cbpe_performance_estimation(
reference_data,
analysis_data,
column_mapping,
problem_type,
chunker,
writer,
ignore_errors,
console=progress.console,
)
_run_dee_performance_estimation(
reference_data,
analysis_data,
column_mapping,
problem_type,
chunker,
writer,
ignore_errors,
console=progress.console,
)
progress.console.line(2)
if isinstance(writer, RawFilesWriter):
progress.console.rule()
progress.console.log(f"view results in {Path(writer.filepath)}")
def _run_statistical_univariate_feature_drift_calculator(
reference_data: pd.DataFrame,
analysis_data: pd.DataFrame,
column_mapping: Dict[str, Any],
problem_type: ProblemType,
chunker: Chunker,
writer: Writer,
ignore_errors: bool,
console: Console = None,
):
if console:
console.rule('[cyan]UnivariateStatisticalDriftCalculator[/]')
try:
if console:
console.log('fitting on reference data')
if problem_type == ProblemType.CLASSIFICATION_BINARY:
y_pred_proba_column_names = [column_mapping['y_pred_proba']]
elif problem_type == ProblemType.CLASSIFICATION_MULTICLASS:
y_pred_proba_column_names = list(column_mapping['y_pred_proba'].values())
else:
y_pred_proba_column_names = []
calc = UnivariateDriftCalculator(
column_names=(column_mapping['features'] + [column_mapping['y_pred']] + y_pred_proba_column_names),
timestamp_column_name=column_mapping.get('timestamp', None),
chunker=chunker,
categorical_methods=['chi2', 'jensen_shannon'],
continuous_methods=['kolmogorov_smirnov', 'jensen_shannon'],
).fit(reference_data)
# raise RuntimeError("🔥 something's not right there... 🔥")
if console:
console.log('calculating on analysis data')
results = calc.calculate(analysis_data)
plots = {}
if isinstance(writer, RawFilesWriter):
if console:
console.log('generating result plots')
plots = {
f'{kind}_{column_name}': results.plot(kind=kind, method=method, column_name=column_name)
for column_name in results.continuous_column_names
for method in results.continuous_method_names
for kind in ['drift', 'distribution']
}
plots.update(
{
f'{kind}_{column_name}': results.plot(kind=kind, method=method, column_name=column_name)
for column_name in results.categorical_column_names
for method in results.categorical_method_names
for kind in ['drift', 'distribution']
}
)
except Exception as exc:
msg = f"Failed to run statistical univariate feature drift calculator: {exc}"
if console:
console.log(msg, style='red')
else:
_logger.error(msg)
if ignore_errors:
return
else:
sys.exit(1)
if console:
console.log('writing results')
writer.write(result=results, plots=plots, calculator_name='statistical_univariate_feature_drift')
def _run_data_reconstruction_multivariate_feature_drift_calculator(
reference_data: pd.DataFrame,
analysis_data: pd.DataFrame,
column_mapping: Dict[str, Any],
chunker: Chunker,
writer: Writer,
ignore_errors: bool,
console: Console = None,
):
if console:
console.rule('[cyan]DataReconstructionDriftCalculator[/]')
try:
if console:
console.log('fitting on reference data')
calc = DataReconstructionDriftCalculator(
column_names=column_mapping['features'],
timestamp_column_name=column_mapping.get('timestamp', None),
chunker=chunker,
).fit(reference_data)
if console:
console.log('calculating on analysis data')
results = calc.calculate(analysis_data)
plots = {}
if isinstance(writer, RawFilesWriter):
if console:
console.log('generating result plots')
plots = {f'{kind}': results.plot(kind='drift') for kind in ['drift']}
except Exception as exc:
msg = f"Failed to run data reconstruction multivariate feature drift calculator: {exc}"
if console:
console.log(msg, style='red')
else:
_logger.error(msg)
if ignore_errors:
return
else:
sys.exit(1)
if console:
console.log('writing results')
writer.write(result=results, plots=plots, calculator_name='data_reconstruction_multivariate_feature_drift')
def _run_realized_performance_calculator(
reference_data: pd.DataFrame,
analysis_data: pd.DataFrame,
column_mapping: Dict[str, Any],
problem_type: ProblemType,
chunker: Chunker,
writer: Writer,
ignore_errors: bool,
console: Console = None,
):
if console:
console.rule('[cyan]PerformanceCalculator[/]')
if column_mapping['y_true'] not in analysis_data.columns:
_logger.info(
f"target values column '{column_mapping['y_true']}' not present in analysis data. "
"Skipping realized performance calculation."
)
if console:
console.log(
f"target values column '{column_mapping['y_true']}' not present in analysis data. "
"Skipping realized performance calculation.",
style='yellow',
)
return
metrics = []
if problem_type in [ProblemType.CLASSIFICATION_BINARY, ProblemType.CLASSIFICATION_MULTICLASS]:
metrics = ['roc_auc', 'f1', 'precision', 'recall', 'specificity', 'accuracy']
elif problem_type in [ProblemType.REGRESSION]:
metrics = DEFAULT_METRICS
try:
if console:
console.log('fitting on reference data')
calc = PerformanceCalculator(
y_true=column_mapping['y_true'],
y_pred=column_mapping['y_pred'],
y_pred_proba=column_mapping.get('y_pred_proba', None),
timestamp_column_name=column_mapping.get('timestamp', None),
chunker=chunker,
metrics=metrics,
problem_type=problem_type,
).fit(reference_data)
if console:
console.log('calculating on analysis data')
results = calc.calculate(analysis_data)
plots = {}
if isinstance(writer, RawFilesWriter):
if console:
console.log('generating result plots')
plots = {
f'realized_{metric}': results.plot(kind, metric=metric)
for kind in ['performance']
for metric in metrics
}
except Exception as exc:
msg = f"Failed to run realized performance calculator: {exc}"
if console:
console.log(msg, style='red')
else:
_logger.error(msg)
if ignore_errors:
return
else:
sys.exit(1)
if console:
console.log('writing results')
writer.write(result=results, plots=plots, calculator_name='realized_performance')
def _run_cbpe_performance_estimation(
reference_data: pd.DataFrame,
analysis_data: pd.DataFrame,
column_mapping: Dict[str, Any],
problem_type: ProblemType,
chunker: Chunker,
writer: Writer,
ignore_errors: bool,
console: Console = None,
):
if console:
console.rule('[cyan]Confidence Base Performance Estimator[/]')
if problem_type not in [ProblemType.CLASSIFICATION_BINARY, ProblemType.CLASSIFICATION_MULTICLASS]:
_logger.info(f"CBPE does not support '{problem_type.name}' problems. Skipping CBPE estimation.")
if console:
console.log(
f"CBPE does not support '{problem_type.name}' problems. Skipping CBPE estimation.",
style='yellow',
)
return
metrics = ['roc_auc', 'f1', 'precision', 'recall', 'specificity', 'accuracy']
try:
if console:
console.log('fitting on reference data')
estimator = CBPE( # type: ignore
y_true=column_mapping['y_true'],
y_pred=column_mapping['y_pred'],
y_pred_proba=column_mapping['y_pred_proba'],
timestamp_column_name=column_mapping.get('timestamp', None),
problem_type=problem_type,
chunker=chunker,
metrics=metrics,
).fit(reference_data)
if console:
console.log('estimating on analysis data')
results = estimator.estimate(analysis_data)
plots = {}
if isinstance(writer, RawFilesWriter):
if console:
console.log('generating result plots')
plots = {
f'estimated_{metric}': results.plot(kind, metric=metric)
for kind in ['performance']
for metric in metrics
}
except Exception as exc:
msg = f"Failed to run CBPE performance estimator: {exc}"
if console:
console.log(msg, style='red')
else:
_logger.error(msg)
if ignore_errors:
return
else:
sys.exit(1)
if console:
console.log('writing results')
writer.write(result=results, plots=plots, calculator_name='confidence_based_performance_estimator')
def _run_dee_performance_estimation(
reference_data: pd.DataFrame,
analysis_data: pd.DataFrame,
column_mapping: Dict[str, Any],
problem_type: ProblemType,
chunker: Chunker,
writer: Writer,
ignore_errors: bool,
console: Console = None,
):
if console:
console.rule('[cyan]Direct Loss Estimator[/]')
if problem_type not in [ProblemType.REGRESSION]:
_logger.info(f"DLE does not support '{problem_type.name}' problems. Skipping DLE estimation.")
if console:
console.log(
f"DLE does not support '{problem_type.name}' problems. Skipping DLE estimation.",
style='yellow',
)
return
try:
if console:
console.log('fitting on reference data')
estimator = DLE( # type: ignore
feature_column_names=column_mapping['features'],
y_true=column_mapping['y_true'],
y_pred=column_mapping['y_pred'],
timestamp_column_name=column_mapping.get('timestamp', None),
chunker=chunker,
metrics=DEFAULT_METRICS,
).fit(reference_data)
if console:
console.log('estimating on analysis data')
results = estimator.estimate(analysis_data)
plots = {}
if isinstance(writer, RawFilesWriter):
if console:
console.log('generating result plots')
plots = {
f'estimated_{metric}': results.plot(kind, metric=metric)
for kind in ['performance']
for metric in DEFAULT_METRICS
}
except Exception as exc:
msg = f"Failed to run DLE performance estimator: {exc}"
if console:
console.log(msg, style='red')
else:
_logger.error(msg)
if ignore_errors:
return
else:
sys.exit(1)
if console:
console.log('writing results')
writer.write(result=results, plots=plots, calculator_name='direct_error_estimator')