Source code for nannyml.plots.components.joy_plot

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0
from functools import partial
from typing import Any, Dict, Optional, Union

import matplotlib
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from scipy.integrate import cumulative_trapezoid
from statsmodels import api as sm

from nannyml.chunk import Chunker
from nannyml.plots.colors import Colors
from nannyml.plots.components.hover import Hover, render_x_coordinate
from nannyml.plots.util import ensure_numpy, is_time_based_x_axis


def _get_kde(array, cut=3, clip=(-np.inf, np.inf)):
    try:  # pragma: no cover
        kde = sm.nonparametric.KDEUnivariate(array)
        kde.fit(cut=cut, clip=clip)

        # Calculation may return duplicate support values in edge cases. These results are not sensible. Treating it as
        # an error case and returning None
        if len(np.unique(kde.support)) < len(kde.support):
            return None

        return kde
    except Exception:
        return None


def _get_kde_support(kde):
    if kde is not None:  # pragma: no cover
        return kde.support[::5]
    else:
        return np.array([])


def _get_kde_density(kde):
    if kde is not None:  # pragma: no cover
        return kde.density[::5]
    else:
        return np.array([])


def _get_kde_cdf(kde_support, kde_density):
    if len(kde_support) > 0 and len(kde_density) > 0:
        cdf = cumulative_trapezoid(y=kde_density, x=kde_support, initial=0)
        return cdf
    else:
        return np.array([])


def _get_kde_quartiles(cdf, kde_support, kde_density):
    if len(cdf) > 0:
        quartiles = []
        for quartile in [0.25, 0.50, 0.75]:
            quartile_index = np.abs(cdf - quartile).argmin()
            quartiles.append((kde_support[quartile_index], kde_density[quartile_index]))
        return quartiles
    else:
        return []


[docs]def calculate_chunk_distributions( data: Union[np.ndarray, pd.Series], chunker: Chunker, timestamps: Optional[Union[np.ndarray, pd.Series]] = None, data_periods: Optional[Union[np.ndarray, pd.Series]] = None, kde_cut=3, kde_clip=(-np.inf, np.inf), post_kde_clip=None, ): if isinstance(data, np.ndarray): data = pd.Series(data, name='data') if isinstance(data_periods, np.ndarray): data_periods = pd.Series(data_periods, name='period') get_kde_partial_application = partial(_get_kde, cut=kde_cut, clip=kde_clip) data_with_chunk_keys = pd.concat( [ chunk.data.assign(chunk_key=chunk.key, chunk_index=chunk.chunk_index) for chunk in chunker.split(pd.concat([data, timestamps], axis=1)) ] ) group_by_cols = ['chunk_index', 'chunk_key'] if data_periods is not None: data_with_chunk_keys['period'] = data_periods group_by_cols += ['period'] data = ( # group by period too, 'key' column can be there for both reference and analysis data_with_chunk_keys.groupby(group_by_cols)[data.name] .apply(get_kde_partial_application) .to_frame('kde') .reset_index() ) data['kde_support'] = data['kde'].apply(lambda kde: _get_kde_support(kde)) data['kde_density'] = data['kde'].apply(lambda kde: _get_kde_density(kde)) data['kde_cdf'] = data[['kde_support', 'kde_density']].apply( lambda row: _get_kde_cdf(row['kde_support'], row['kde_density'] if len(row['kde_support']) > 0 else []), axis=1 ) if post_kde_clip: # Clip the kde support to the clip values, adjust the density and cdf to the same length data['kde_support'] = data['kde_support'].apply(lambda x: x[x > post_kde_clip[0]]) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_density'] = data.apply(lambda row: row['kde_density'][-row['kde_support_len'] :], axis=1) data['kde_cdf'] = data.apply(lambda row: row['kde_cdf'][-row['kde_support_len'] :], axis=1) data['kde_support'] = data['kde_support'].apply(lambda x: x[x < post_kde_clip[1]]) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_density'] = data.apply(lambda row: row['kde_density'][: row['kde_support_len']], axis=1) data['kde_cdf'] = data.apply(lambda row: row['kde_cdf'][: row['kde_support_len']], axis=1) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_support_len'] = data['kde_support'].apply(lambda x: len(x)) data['kde_quartiles'] = data[['kde_cdf', 'kde_support', 'kde_density']].apply( lambda row: _get_kde_quartiles( row['kde_cdf'], row['kde_support'], row['kde_density'] if len(row['kde_support']) > 0 else [] ), axis=1, ) data['kde_density_local_max'] = data['kde_density'].apply(lambda x: max(x) if len(x) > 0 else 0) data['kde_density_global_max'] = data.groupby('chunk_index')['kde_density_local_max'].max().max() data['kde_density_scaled'] = data[['kde_density', 'kde_density_local_max']].apply( lambda row: np.divide(np.array(row['kde_density']), row['kde_density_local_max']), axis=1 ) data['kde_quartiles_scaled'] = data[['kde_quartiles', 'kde_density_local_max']].apply( lambda row: [(q[0], q[1] / row['kde_density_local_max']) for q in row['kde_quartiles']], axis=1 ) return data
def _create_joy_table(data_distributions: pd.DataFrame, result_data: pd.DataFrame): joy_table = pd.merge(result_data, data_distributions) is_time_based_x_axis = not result_data['chunk_end_date'].isnull().values.any() # Sort to make sure most current chunks are plotted in front of the others if is_time_based_x_axis: joy_table = joy_table.sort_values('chunk_end_date', ascending=True).reset_index(drop=True) else: joy_table = joy_table.sort_values('chunk_index', ascending=True).reset_index(drop=True) return joy_table
[docs]def joy( fig: go.Figure, data_distributions: pd.DataFrame, color: str, name: str, chunk_keys: Optional[Union[np.ndarray, pd.Series]] = None, chunk_start_dates: Optional[Union[np.ndarray, pd.Series]] = None, chunk_end_dates: Optional[Union[np.ndarray, pd.Series]] = None, chunk_indices: Optional[Union[np.ndarray, pd.Series]] = None, subplot_args: Optional[Dict[str, Any]] = None, alpha=0.2, plot_quartiles: bool = True, **kwargs, ) -> go.Figure: chunk_keys, chunk_indices, chunk_start_dates, chunk_end_dates = ensure_numpy( chunk_keys, chunk_indices, chunk_start_dates, chunk_end_dates ) joy_overlap = 1 if subplot_args is None: subplot_args = {} else: fig.update_xaxes( linecolor=Colors.INDIGO_PERSIAN, showgrid=False, mirror=True, zeroline=False, matches='x', title=fig.layout.xaxis.title, row=subplot_args['row'], col=subplot_args['col'], ) fig.update_yaxes( linecolor=Colors.INDIGO_PERSIAN, showgrid=False, range=fig.layout.yaxis.range, title=fig.layout.yaxis.title, mirror=True, zeroline=False, row=subplot_args['row'], col=subplot_args['col'], ) for i, row in data_distributions.iterrows(): if is_time_based_x_axis(chunk_start_dates, chunk_end_dates): y_date_position = chunk_start_dates[i] y_date_height_scaler = chunk_start_dates[i] - chunk_end_dates[i] else: y_date_position = chunk_indices[i] y_date_height_scaler = -1 kde_support = row['kde_support'] kde_density_scaled = row['kde_density_scaled'] * joy_overlap kde_quartiles = [(q[0], q[1] * joy_overlap) for q in row['kde_quartiles_scaled']] fig.add_trace( go.Scatter( name=name, x=y_date_position + kde_density_scaled * y_date_height_scaler, y=kde_support, mode='lines', line=dict(color=color, width=1), hoverinfo='skip', showlegend=False, **kwargs, ), **subplot_args, ) fig.add_trace( go.Scatter( x=[y_date_position] * len(kde_density_scaled), y=kde_support, line=dict(color='rgba(0,0,0,0)', width=1), fill='tonexty', fillcolor='rgba{}'.format(matplotlib.colors.to_rgba(matplotlib.colors.to_rgb(color), alpha)), hoverinfo='skip', showlegend=False, **kwargs, ), **subplot_args, ) if plot_quartiles: for kde_quartile in kde_quartiles: hover = Hover(template='Chunk %{chunk_key}: %{x_coordinate}, <b>%{quartile}</b>') hover.add(chunk_keys[i] if chunk_keys is not None else row['chunk_key'], name='chunk_key') hover.add( render_x_coordinate(chunk_indices, chunk_start_dates, chunk_end_dates)[i], name='x_coordinate' ) hover.add(np.round(kde_quartile[0], 3), name='quartile') fig.add_trace( go.Scatter( name=name, x=[y_date_position, y_date_position + kde_quartile[1] * y_date_height_scaler], y=[kde_quartile[0], kde_quartile[0]], mode='lines', line=dict(color=color, width=1, dash='dot'), hovertemplate=hover.get_template(), customdata=hover.get_custom_data(), hoverlabel=dict(bgcolor=color, font=dict(color='white')), showlegend=False, **kwargs, ), **subplot_args, ) return fig
[docs]def alert( fig: go.Figure, data_distributions: pd.DataFrame, color: str, name: str, alerts: Union[np.ndarray, pd.Series], chunk_keys: Optional[Union[np.ndarray, pd.Series]] = None, chunk_start_dates: Optional[Union[np.ndarray, pd.Series]] = None, chunk_end_dates: Optional[Union[np.ndarray, pd.Series]] = None, chunk_indices: Optional[Union[np.ndarray, pd.Series]] = None, subplot_args: Optional[Dict[str, Any]] = None, alpha=0.3, plot_quartiles: bool = True, **kwargs, ) -> go.Figure: data = pd.DataFrame( { 'chunk_keys': chunk_keys, 'chunk_indices': chunk_indices, 'chunk_start_dates': chunk_start_dates, 'chunk_end_dates': chunk_end_dates, 'alerts': alerts, } ) data = pd.concat([data, data_distributions], axis=1) alerts_data = data.loc[data['alerts']].reset_index(drop=True) return joy( fig, alerts_data[data_distributions.columns], color, name, alerts_data['chunk_keys'], alerts_data['chunk_start_dates'], alerts_data['chunk_end_dates'], alerts_data['chunk_indices'], subplot_args, alpha, plot_quartiles, **kwargs, )