Source code for nannyml.base

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

"""Module containing base classes for drift calculation."""
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import Any, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import plotly.graph_objects

from nannyml.chunk import Chunker, ChunkerFactory
from nannyml.exceptions import (
    CalculatorException,
    CalculatorNotFittedException,
    EstimatorException,
    InvalidArgumentsException,
    InvalidReferenceDataException,
)


[docs]class AbstractCalculatorResult(ABC): """Contains the results of a calculation and provides plotting functionality. The result of the :meth:`~nannyml.base.AbstractCalculator.calculate` method of a :class:`~nannyml.base.AbstractCalculator`. It is an abstract class containing shared properties and methods across implementations. For each :class:`~nannyml.base.AbstractCalculator` class there will be a corresponding :class:`~nannyml.base.AbstractCalculatorResult` implementation. """ DEFAULT_COLUMNS = ('key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period') def __init__(self, results_data: pd.DataFrame, *args, **kwargs): """Creates a new :class:`~nannyml.base.AbstractCalculatorResult` instance. Parameters ---------- results_data: pd.DataFrame The data returned by the Calculator. """ self.data = results_data.copy(deep=True) @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__) # TODO: define more specific interface (add common arguments) def __len__(self): # noqa: D105 return len(self.data)
[docs] @abstractmethod def plot(self, *args, **kwargs) -> plotly.graph_objects.Figure: """Plots calculation results.""" raise NotImplementedError
[docs] def to_df(self, multilevel: bool = True) -> pd.DataFrame: """Export results do pandas dataframe.""" if multilevel: return self.data else: column_names = [ '_'.join(col).replace('chunk_chunk_chunk', 'chunk').replace('chunk_chunk', 'chunk') for col in self.data.columns.values ] single_level_data = self.data.copy(deep=True) single_level_data.columns = column_names return single_level_data
[docs] def filter( self, period: str = 'analysis', metrics: Optional[Union[str, List[str]]] = None, *args, **kwargs ) -> AbstractCalculatorResult: """Returns filtered result metric data.""" if metrics and not isinstance(metrics, (str, list)): raise InvalidArgumentsException("metrics value provided is not a valid metric or list of metrics") if isinstance(metrics, str): metrics = [metrics] try: return self._filter(period, metrics, *args, **kwargs) except Exception as exc: raise CalculatorException(f"could not read result data: {exc}")
@abstractmethod def _filter(self, period: str, metrics: Optional[List[str]] = None, *args, **kwargs) -> AbstractCalculatorResult: raise NotImplementedError
[docs]class AbstractCalculator(ABC): """Base class for drift calculation.""" def __init__( self, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, timestamp_column_name: Optional[str] = None, ): """Creates a new instance of an abstract DriftCalculator. Parameters ---------- chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker: Chunker The `Chunker` used to split the data sets into a lists of chunks. timestamp_column_name: str The column name of the column containing timestamp information. """ self.chunker = ChunkerFactory.get_chunker( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) self.timestamp_column_name = timestamp_column_name self.result: Optional[AbstractCalculatorResult] = None @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__)
[docs] def fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> AbstractCalculator: """Trains the calculator using reference data.""" try: self._logger.debug(f"fitting {str(self)}") return self._fit(reference_data, *args, **kwargs) except InvalidArgumentsException: raise except InvalidReferenceDataException: raise except Exception as exc: raise CalculatorException(f"failed while fitting {str(self)}.\n{exc}")
[docs] def calculate(self, data: pd.DataFrame, *args, **kwargs) -> Any: """Performs a calculation on the provided data.""" try: self._logger.debug(f"calculating {str(self)}") data = data.copy() return self._calculate(data, *args, **kwargs) except InvalidArgumentsException: raise except CalculatorNotFittedException: raise except Exception as exc: raise CalculatorException(f"failed while calculating {str(self)}.\n{exc}")
@abstractmethod def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> AbstractCalculator: raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_fit' method") @abstractmethod def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Any: raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_calculate' method")
[docs]class AbstractEstimatorResult(ABC): """Contains the results of a drift calculation and provides additional functionality such as plotting. The result of the :meth:`~nannyml.drift.base.DriftCalculator.calculate` method of a :class:`~nannyml.drift.base.DriftCalculator`. It is an abstract class containing shared properties and methods across implementations. For each :class:`~nannyml.drift.base.DriftCalculator` class there will be an associated :class:`~nannyml.drift.base.DriftResult` implementation. """ DEFAULT_COLUMNS = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] def __init__(self, results_data: pd.DataFrame): """Creates a new DriftResult instance. Parameters ---------- results_data: pd.DataFrame The result data of the performed calculation. """ self.data = results_data.copy(deep=True) @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__)
[docs] def to_df(self, multilevel: bool = True): """Export results do pandas dataframe.""" if multilevel: return self.data else: column_names = [ '_'.join(col).replace('chunk_chunk_chunk', 'chunk').replace('chunk_chunk', 'chunk') for col in self.data.columns.values ] single_level_data = self.data.copy(deep=True) single_level_data.columns = column_names return single_level_data
[docs] def filter( self, period: str = 'analysis', metrics: Optional[Union[str, List[str]]] = None, *args, **kwargs ) -> AbstractEstimatorResult: """Returns result metric data.""" if metrics and not isinstance(metrics, (str, list)): raise InvalidArgumentsException("metrics value provided is not a valid metric or list of metrics") if isinstance(metrics, str): metrics = [metrics] try: return self._filter(period, metrics, *args, **kwargs) except Exception as exc: raise EstimatorException(f"could not read result data: {exc}")
@abstractmethod def _filter(self, period: str, metrics: Optional[List[str]] = None, *args, **kwargs) -> AbstractEstimatorResult: raise NotImplementedError
[docs] def plot(self, *args, **kwargs) -> plotly.graph_objects.Figure: """Plot drift results.""" raise NotImplementedError
[docs]class AbstractEstimator(ABC): """Base class for drift calculation.""" def __init__( self, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, timestamp_column_name: Optional[str] = None, ): """Creates a new instance of an abstract DriftCalculator. Parameters ---------- chunk_size: int Splits the data into chunks containing `chunks_size` observations. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_number: int Splits the data into `chunk_number` pieces. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunk_period: str Splits the data according to the given period. Only one of `chunk_size`, `chunk_number` or `chunk_period` should be given. chunker : Chunker The `Chunker` used to split the data sets into a lists of chunks. timestamp_column_name: str The column name of the column containing timestamp information. """ self.chunker = ChunkerFactory.get_chunker( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name ) self.timestamp_column_name = timestamp_column_name self.result: Optional[AbstractEstimatorResult] = None @property def _logger(self) -> logging.Logger: return logging.getLogger(__name__) def __str__(self): return f'{self.__module__}.{self.__class__.__name__}'
[docs] def fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> AbstractEstimator: """Trains the calculator using reference data.""" try: self._logger.info(f"fitting {str(self)}") reference_data = reference_data.copy() return self._fit(reference_data, *args, **kwargs) except InvalidArgumentsException: raise except InvalidReferenceDataException: raise except Exception as exc: raise CalculatorException(f"failed while fitting {str(self)}.\n{exc}")
[docs] def estimate(self, data: pd.DataFrame, *args, **kwargs) -> Any: """Performs a calculation on the provided data.""" try: self._logger.info(f"estimating {str(self)}") data = data.copy() return self._estimate(data, *args, **kwargs) except InvalidArgumentsException: raise except CalculatorNotFittedException: raise except Exception as exc: raise CalculatorException(f"failed while calculating {str(self)}.\n{exc}")
@abstractmethod def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> AbstractEstimator: raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_fit' method") @abstractmethod def _estimate(self, data: pd.DataFrame, *args, **kwargs) -> Any: raise NotImplementedError(f"'{self.__class__.__name__}' must implement the '_calculate' method")
def _split_features_by_type(data: pd.DataFrame, feature_column_names: List[str]) -> Tuple[List[str], List[str]]: continuous_column_names = [col for col in feature_column_names if _column_is_continuous(data[col])] categorical_column_names = [col for col in feature_column_names if _column_is_categorical(data[col])] return continuous_column_names, categorical_column_names def _column_is_categorical(column: pd.Series) -> bool: return column.dtype in ['object', 'string', 'category', 'bool'] def _column_is_continuous(column: pd.Series) -> bool: return column.dtype in [ 'int_', 'int8', 'int16', 'int32', 'int64', 'uint8', 'uint16', 'uint32', 'uint64', 'float_', 'float16', 'float32', 'float64', ] def _list_missing(columns_to_find: List, dataset_columns: Union[List, pd.DataFrame]): if isinstance(dataset_columns, pd.DataFrame): dataset_columns = dataset_columns.columns missing = [col for col in columns_to_find if col not in dataset_columns] if missing: raise InvalidArgumentsException(f"missing required columns '{missing}' in data set:\n\t{dataset_columns}") def _raise_exception_for_negative_values(column: pd.Series): """Raises an InvalidArgumentsException if a given column contains negative values. Parameters ---------- column: pd.Series Column to check for negative values. Raises ------ nannyml.exceptions.InvalidArgumentsException """ if any(column.values < 0): negative_item_indices = np.where(column.values < 0) raise InvalidArgumentsException( f"target values '{column.name}' contain negative values.\n" f"\tLog-based metrics are not supported for negative target values.\n" f"\tCheck '{column.name}' at rows {str(negative_item_indices)}." )