Source code for nannyml.distribution.continuous.calculator

from functools import partial
from typing import List, Optional, Union

import numpy as np
import pandas as pd
from scipy.integrate import cumulative_trapezoid
from statsmodels import api as sm

from nannyml import Chunker
from nannyml._typing import Self
from nannyml.base import AbstractCalculator, _list_missing
from nannyml.distribution.continuous.result import Result
from nannyml.exceptions import InvalidArgumentsException


[docs]class ContinuousDistributionCalculator(AbstractCalculator): def __init__( self, column_names: Union[str, List[str]], timestamp_column_name: Optional[str] = None, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, points_per_joy_plot: Optional[int] = None, ): super().__init__( chunk_size, chunk_number, chunk_period, chunker, timestamp_column_name, ) self.column_names = column_names if isinstance(column_names, List) else [column_names] self.result: Optional[Result] = None self.points_per_joy_plot = points_per_joy_plot def _fit(self, reference_data: pd.DataFrame, *args, **kwargs) -> Self: self.result = self._calculate(reference_data) self.result.data[('chunk', 'period')] = 'reference' return self def _calculate(self, data: pd.DataFrame, *args, **kwargs) -> Result: if data.empty: raise InvalidArgumentsException('data contains no rows. Please provide a valid data set.') _list_missing(self.column_names, data) result_data = pd.DataFrame(columns=_create_multilevel_index(self.column_names)) for column in self.column_names: column_distributions_per_chunk = calculate_chunk_distributions( data[column], self.chunker, data.get(self.timestamp_column_name, default=None), points_per_joy_plot=self.points_per_joy_plot, ) column_distributions_per_chunk.drop(columns=['key', 'chunk_index'], inplace=True) for c in column_distributions_per_chunk.columns: result_data.loc[:, (column, c)] = column_distributions_per_chunk[c] chunks = self.chunker.split(data) result_data[('chunk', 'key')] = [c.key for c in chunks] result_data[('chunk', 'chunk_index')] = [c.chunk_index for c in chunks] result_data[('chunk', 'start_index')] = [c.start_index for c in chunks] result_data[('chunk', 'end_index')] = [c.end_index for c in chunks] result_data[('chunk', 'start_date')] = [c.start_datetime for c in chunks] result_data[('chunk', 'end_date')] = [c.end_datetime for c in chunks] result_data[('chunk', 'period')] = ['analysis' for _ in chunks] if self.result is None: self.result = Result(result_data, self.column_names, self.timestamp_column_name, self.chunker) else: self.result = self.result.filter(period='reference') self.result.data = pd.concat([self.result.data, result_data]).reset_index(drop=True) return self.result
def _get_kde(array, cut=3, clip=(-np.inf, np.inf)): try: # pragma: no cover kde = sm.nonparametric.KDEUnivariate(array) kde.fit(cut=cut, clip=clip) # Calculation may return duplicate support values in edge cases. These results are not sensible. Treating it as # an error case and returning None if len(np.unique(kde.support)) < len(kde.support): return None return kde except Exception: return None def _get_kde_support(kde, points_per_joy_plot: Optional[int] = None): if kde is not None: # pragma: no cover return kde.support[:: (len(kde.support) // (points_per_joy_plot or 50))] else: return np.array([]) def _get_kde_density(kde, points_per_joy_plot: Optional[int] = None): if kde is not None: # pragma: no cover return kde.density[:: (len(kde.support) // (points_per_joy_plot or 50))] else: return np.array([]) def _get_kde_cdf(kde_support, kde_density): if len(kde_support) > 0 and len(kde_density) > 0: cdf = cumulative_trapezoid(y=kde_density, x=kde_support, initial=0) return cdf else: return np.array([]) def _get_kde_quartiles(cdf, kde_support, kde_density): if len(cdf) > 0: quartiles = [] for quartile in [0.25, 0.50, 0.75]: quartile_index = np.argmax(cdf >= quartile) quartiles.append((kde_support[quartile_index], kde_density[quartile_index], cdf[quartile_index])) return quartiles else: return []
[docs]def calculate_chunk_distributions( data: Union[np.ndarray, pd.Series], chunker: Chunker, timestamps: Optional[Union[np.ndarray, pd.Series]] = None, data_periods: Optional[Union[np.ndarray, pd.Series]] = None, kde_cut=3, kde_clip=(-np.inf, np.inf), post_kde_clip=None, points_per_joy_plot: Optional[int] = None, ): if isinstance(data, np.ndarray): data = pd.Series(data, name='data') if isinstance(data_periods, np.ndarray): data_periods = pd.Series(data_periods, name='period') get_kde_partial_application = partial(_get_kde, cut=kde_cut, clip=kde_clip) data_with_chunk_keys = pd.concat( [ chunk.data.assign(key=chunk.key, chunk_index=chunk.chunk_index) for chunk in chunker.split(pd.concat([data, timestamps], axis=1)) ] ) group_by_cols = ['chunk_index', 'key'] if data_periods is not None: data_with_chunk_keys['period'] = data_periods group_by_cols += ['period'] data = ( # group by period too, 'key' column can be there for both reference and analysis data_with_chunk_keys.groupby(group_by_cols)[data.name] .apply(get_kde_partial_application) .to_frame('kde') .reset_index() ) data['kde_support'] = data['kde'].apply(lambda kde: _get_kde_support(kde, points_per_joy_plot)) data['kde_density'] = data['kde'].apply(lambda kde: _get_kde_density(kde, points_per_joy_plot)) data['kde_cdf'] = data[['kde_support', 'kde_density']].apply( lambda row: _get_kde_cdf(row['kde_support'], row['kde_density'] if len(row['kde_support']) > 0 else []), axis=1, ) if post_kde_clip: # Clip the kde support to the clip values, adjust the density and cdf to the same length data['kde_support'] = data['kde_support'].apply(lambda x: x[x > post_kde_clip[0]]) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_density'] = data.apply(lambda row: row['kde_density'][-row['kde_support_len'] :], axis=1) data['kde_cdf'] = data.apply(lambda row: row['kde_cdf'][-row['kde_support_len'] :], axis=1) data['kde_support'] = data['kde_support'].apply(lambda x: x[x < post_kde_clip[1]]) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_density'] = data.apply(lambda row: row['kde_density'][: row['kde_support_len']], axis=1) data['kde_cdf'] = data.apply(lambda row: row['kde_cdf'][: row['kde_support_len']], axis=1) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_quartiles'] = data[['kde_cdf', 'kde_support', 'kde_density']].apply( lambda row: _get_kde_quartiles( row['kde_cdf'], row['kde_support'], row['kde_density'] if len(row['kde_support']) > 0 else [] ), axis=1, ) data['kde_density_local_max'] = data['kde_density'].apply(lambda x: max(x) if len(x) > 0 else 0) data['kde_density_global_max'] = data.groupby('chunk_index')['kde_density_local_max'].max().max() data['kde_density_scaled'] = data[['kde_density', 'kde_density_local_max']].apply( lambda row: np.divide(np.array(row['kde_density']), row['kde_density_local_max']), axis=1 ) data['kde_quartiles_scaled'] = data[['kde_quartiles', 'kde_density_local_max']].apply( lambda row: [(q[0], q[1] / row['kde_density_local_max'], q[2]) for q in row['kde_quartiles']], axis=1 ) # TODO: Consider removing redundant columns to reduce fitted calculator memory usage # The kde calculator creates issues for pickling the calculator. We don't need it anymore, so removing it here del data['kde'] return data
def _create_multilevel_index(column_names: List[str]): chunk_column_names = ['key', 'chunk_index', 'start_index', 'end_index', 'start_date', 'end_date', 'period'] distribution_column_names = [ 'kde', 'kde_support', 'kde_density', 'kde_cdf', 'kde_support_len', 'kde_quartiles', 'kde_density_local_max', 'kde_density_global_max', 'kde_density_scaled', 'kde_quartiles_scaled', ] chunk_tuples = [('chunk', chunk_column_name) for chunk_column_name in chunk_column_names] continuous_column_tuples = [ (column_name, distribution_column_name) for column_name in column_names for distribution_column_name in distribution_column_names ] tuples = chunk_tuples + continuous_column_tuples return pd.MultiIndex.from_tuples(tuples)