Source code for nannyml.chunk

# Author:   Niels Nuyttens  <niels@nannyml.com>
#           Jakub Bialek    <jakub@nannyml.com>
#
# License: Apache Software License 2.0

"""NannyML module providing intelligent splitting of data into chunks."""

from __future__ import annotations

import abc
import copy
import logging
import warnings
from datetime import datetime
from typing import List, Optional

import numpy as np
import pandas as pd
from dateutil.parser import ParserError  # type: ignore
from pandas import Period

from nannyml.exceptions import ChunkerException, InvalidArgumentsException

logger = logging.getLogger(__name__)


[docs]class Chunk: """A subset of data that acts as a logical unit during calculations.""" def __init__( self, key: str, data: pd.DataFrame, start_datetime: Optional[datetime] = None, end_datetime: Optional[datetime] = None, start_index: int = -1, end_index: int = -1, period: Optional[str] = None, ): """Creates a new chunk. Parameters ---------- key : str, required. A value describing what data is wrapped in this chunk. data : DataFrame, required The data to be contained within the chunk. start_datetime: datetime The starting point in time for this chunk. end_datetime: datetime The end point in time for this chunk. period : string, optional The 'period' this chunk belongs to, for example 'reference' or 'analysis'. """ self.key = key self.data = data self.period = period self.is_transition: bool = False self.start_datetime = start_datetime self.end_datetime = end_datetime self.start_index: int = start_index self.end_index: int = end_index self.chunk_index: int = -1
[docs] def __repr__(self): """Returns textual summary of a chunk. Returns ------- chunk_str: str """ return ( f'Chunk[key={self.key}, data=pd.DataFrame[[{self.data.shape[0]}x{self.data.shape[1]}]], ' f'period={self.period}, is_transition={self.is_transition},' f'start_datetime={self.start_datetime}, end_datetime={self.end_datetime},' f'start_index={self.start_index}, end_index={self.end_index}]' )
[docs] def __len__(self): """Returns the number of rows held within this chunk. Returns ------- length: int Number of rows in the `data` property of the chunk. """ return self.data.shape[0]
def __lt__(self, other: Chunk): if self.start_datetime and self.end_datetime and other.start_datetime and other.end_datetime: return self.end_datetime < other.start_datetime else: return self.end_index < other.start_index
[docs] def merge(self, other: Chunk): """Merge two chunks together into a single one""" if self < other: first, second = self, other else: first, second = other, self result = copy.deepcopy(first) result.data = pd.concat([first.data, second.data]) result.end_datetime = second.end_datetime result.key = f'[{first.start_index}:{second.end_index}]' return result
def _get_boundary_indices(c: Chunk): return c.data.index.min(), c.data.index.max()
[docs]class Chunker(abc.ABC): """Base class for Chunker implementations. Inheriting classes will split a DataFrame into a list of Chunks. They will do this based on several constraints, e.g. observation timestamps, number of observations per Chunk or a preferred number of Chunks. """ def __init__(self, timestamp_column_name: Optional[str] = None): """Creates a new Chunker.""" self.timestamp_column_name = timestamp_column_name
[docs] def split( self, data: pd.DataFrame, columns=None, ) -> List[Chunk]: """Splits a given data frame into a list of chunks. This method provides a uniform interface across Chunker implementations to keep them interchangeable. After performing the implementation-specific `_split` method, there are some checks on the resulting chunk list. If the total number of chunks is low a warning will be written out to the logs. We dynamically determine the optimal minimum number of observations per chunk and then check if the resulting chunks contain at least as many. If there are any underpopulated chunks a warning will be written out in the logs. Parameters ---------- data: DataFrame The data to be split into chunks columns: List[str], default=None A list of columns to be included in the resulting chunk data. Unlisted columns will be dropped. Returns ------- chunks: List[Chunk] The list of chunks """ if self.timestamp_column_name: if self.timestamp_column_name not in data.columns: raise InvalidArgumentsException( f"timestamp column '{self.timestamp_column_name}' not in columns: {list(data.columns)}." ) data = data.sort_values(by=[self.timestamp_column_name]).reset_index(drop=True) try: chunks = self._split(data) except Exception as exc: raise ChunkerException(f"could not split data into chunks: {exc}") for chunk_index, chunk in enumerate(chunks): chunk.start_index, chunk.end_index = _get_boundary_indices(chunk) chunk.chunk_index = chunk_index if columns is not None: chunk.data = chunk.data[columns] if len(chunks) < 6: # TODO wording warnings.warn( 'The resulting number of chunks is too low. ' 'Please consider splitting your data in a different way or continue at your own risk.' ) return chunks
# TODO wording @abc.abstractmethod def _split(self, data: pd.DataFrame) -> List[Chunk]: """Splits the DataFrame into chunks. Abstract method, to be implemented within inheriting classes. Parameters ---------- data: pandas.DataFrame The full dataset that should be split into Chunks Returns ------- chunks: array of Chunks The array of Chunks after splitting the original DataFrame `data` See Also -------- PeriodBasedChunker: Splits data based on the timestamp of observations SizeBasedChunker: Splits data based on the amount of observations in a Chunk CountBasedChunker: Splits data based on the resulting number of Chunks Notes ----- There is a minimal number of observations that a Chunk should contain in order to retain statistical relevance. A chunker will log a warning message when your splitting criteria would result in underpopulated chunks. Note that in this situation calculation results may not be relevant. """ pass # pragma: no cover
[docs]class ChunkerFactory:
[docs] @classmethod def get_chunker( cls, chunk_size: Optional[int] = None, chunk_number: Optional[int] = None, chunk_period: Optional[str] = None, chunker: Optional[Chunker] = None, timestamp_column_name: Optional[str] = None, ) -> Chunker: if chunker is not None: return chunker if chunk_size: return SizeBasedChunker(chunk_size=chunk_size, timestamp_column_name=timestamp_column_name) # type: ignore elif chunk_number: return CountBasedChunker( chunk_number=chunk_number, timestamp_column_name=timestamp_column_name # type: ignore ) elif chunk_period: if timestamp_column_name is None: raise InvalidArgumentsException( "you must provide the 'timestamp_column_name' " "when using period based chunking" ) return PeriodBasedChunker(offset=chunk_period, timestamp_column_name=timestamp_column_name) # type: ignore else: return DefaultChunker(timestamp_column_name=timestamp_column_name) # type: ignore
[docs]class PeriodBasedChunker(Chunker): """A Chunker that will split data into Chunks based on a date column in the data. Examples -------- Chunk using monthly periods and providing a column name >>> from nannyml.chunk import PeriodBasedChunker >>> df = pd.read_parquet('/path/to/my/data.pq') >>> chunker = PeriodBasedChunker(timestamp_column_name='observation_date', offset='M') >>> chunks = chunker.split(data=df) Or chunk using weekly periods >>> from nannyml.chunk import PeriodBasedChunker >>> df = pd.read_parquet('/path/to/my/data.pq') >>> chunker = PeriodBasedChunker(timestamp_column_name=df['observation_date'], offset='W', minimum_chunk_size=50) >>> chunks = chunker.split(data=df) """ def __init__( self, timestamp_column_name: str, offset: str = 'W', ): """Creates a new PeriodBasedChunker. Parameters ---------- offset: a frequency string representing a pandas.tseries.offsets.DateOffset The offset determines how the time-based grouping will occur. A list of possible values is to be found at https://pandas.pydata.org/docs/user_guide/timeseries.html#offset-aliases. drop: bool, default=False Drops the timestamp column from the chunk data if True. Returns ------- chunker: a PeriodBasedChunker instance used to split data into time-based Chunks. """ super().__init__(timestamp_column_name) self.offset = offset def _split(self, data: pd.DataFrame) -> List[Chunk]: chunks = [] try: grouped_data = data.groupby(pd.to_datetime(data[self.timestamp_column_name]).dt.to_period(self.offset)) k: Period for k in grouped_data.groups.keys(): chunk = Chunk( key=str(k), data=grouped_data.get_group(k), start_datetime=k.start_time, end_datetime=k.end_time ) chunks.append(chunk) except KeyError: raise ChunkerException(f"could not find date_column '{self.timestamp_column_name}' in given data") except ParserError: raise ChunkerException( f"could not parse date_column '{self.timestamp_column_name}' values as dates." f"Please verify if you've specified the correct date column." ) return chunks
[docs]class SizeBasedChunker(Chunker): """A Chunker that will split data into Chunks based on the preferred number of observations per Chunk. Notes ----- - Chunks are adjacent, not overlapping - There will be no "incomplete chunks", so the leftover observations that cannot fill an entire chunk will be dropped by default. Examples -------- Chunk using monthly periods and providing a column name >>> from nannyml.chunk import SizeBasedChunker >>> df = pd.read_parquet('/path/to/my/data.pq') >>> chunker = SizeBasedChunker(chunk_size=2000, incomplete='drop') >>> chunks = chunker.split(data=df) """ def __init__(self, chunk_size: int, incomplete: str = 'append', timestamp_column_name: Optional[str] = None): """Create a new SizeBasedChunker. Parameters ---------- chunk_size: int The preferred size of the resulting Chunks, i.e. the number of observations in each Chunk. incomplete: str, default='append' Choose how to handle any leftover observations that don't make up a full Chunk. The following options are available: - ``'drop'``: drop the leftover observations - ``'keep'``: keep the incomplete Chunk (containing less than ``chunk_size`` observations) - ``'append'``: append leftover observations to the last complete Chunk (overfilling it) Defaults to ``'append'``. Returns ------- chunker: a size-based instance used to split data into Chunks of a constant size. """ super().__init__(timestamp_column_name) # TODO wording if not isinstance(chunk_size, (int, np.int64)): raise InvalidArgumentsException( f"given chunk_size is of type {type(chunk_size)} but should be an int." f"Please provide an integer as a chunk size" ) # TODO wording if chunk_size <= 0: raise InvalidArgumentsException( f"given chunk_size {chunk_size} is less then or equal to zero." f"The chunk size should always be larger then zero" ) self.chunk_size = chunk_size self.incomplete = incomplete def _split(self, data: pd.DataFrame) -> List[Chunk]: def _create_chunk(index: int, data: pd.DataFrame, chunk_size: int) -> Chunk: chunk_data = data.loc[index : index + chunk_size - 1, :] chunk = Chunk( key=f'[{index}:{index + chunk_size - 1}]', data=chunk_data, start_index=index, end_index=index + chunk_size - 1, ) if self.timestamp_column_name: chunk.start_datetime = pd.to_datetime(chunk.data[self.timestamp_column_name].min()) chunk.end_datetime = pd.to_datetime(chunk.data[self.timestamp_column_name].max()) return chunk data = data.copy().reset_index(drop=True) chunks = [ _create_chunk(index=i, data=data, chunk_size=self.chunk_size) for i in range(0, len(data), self.chunk_size) if i + self.chunk_size - 1 < len(data) ] # deal with unassigned observations if data.shape[0] % self.chunk_size != 0: incomplete_chunk = _create_chunk( index=self.chunk_size * (data.shape[0] // self.chunk_size), data=data, chunk_size=(data.shape[0] % self.chunk_size), ) if self.incomplete == 'keep': chunks += [incomplete_chunk] elif self.incomplete == 'append': chunks[-1] = chunks[-1].merge(incomplete_chunk) elif self.incomplete == 'drop': pass else: raise InvalidArgumentsException( f"unknown value '{self.incomplete}' for 'incomplete'. " f"Value should be one of ['drop', 'keep', 'append']" ) return chunks
[docs]class CountBasedChunker(Chunker): """A Chunker that will split data into chunks based on the preferred number of total chunks. Examples -------- >>> from nannyml.chunk import CountBasedChunker >>> df = pd.read_parquet('/path/to/my/data.pq') >>> chunker = CountBasedChunker(chunk_number=100) >>> chunks = chunker.split(data=df) """ def __init__(self, chunk_number: int, incomplete: str = 'append', timestamp_column_name: Optional[str] = None): """Creates a new CountBasedChunker. It will calculate the amount of observations per chunk based on the given chunk count. It then continues to split the data into chunks just like a SizeBasedChunker does. Parameters ---------- chunk_number: int The amount of chunks to split the data in. incomplete: str, default='append' incomplete: str, default='append' Choose how to handle any leftover observations that don't make up a full Chunk. The following options are available: - ``'drop'``: drop the leftover observations - ``'keep'``: keep the incomplete Chunk (containing less than ``chunk_size`` observations) - ``'append'``: append leftover observations to the last complete Chunk (overfilling it) Defaults to ``'append'``. Returns ------- chunker: CountBasedChunker """ super().__init__(timestamp_column_name) self.incomplete = incomplete # TODO wording if not isinstance(chunk_number, int): raise InvalidArgumentsException( f"given chunk_number is of type {type(chunk_number)} but should be an int." f"Please provide an integer as a chunk count" ) # TODO wording if chunk_number <= 0: raise InvalidArgumentsException( f"given chunk_number {chunk_number} is less then or equal to zero." f"The chunk number should always be larger then zero" ) self.chunk_number = chunk_number def _split(self, data: pd.DataFrame) -> List[Chunk]: if data.shape[0] == 0: return [] data = data.copy().reset_index() chunk_size = data.shape[0] // self.chunk_number chunks = SizeBasedChunker( chunk_size=chunk_size, incomplete=self.incomplete, timestamp_column_name=self.timestamp_column_name ).split(data=data) return chunks
[docs]class DefaultChunker(Chunker): """Splits data into about 10 chunks. Examples -------- >>> from nannyml.chunk import DefaultChunker >>> df = pd.read_parquet('/path/to/my/data.pq') >>> chunker = DefaultChunker() >>> chunks = chunker.split(data=df) """ DEFAULT_CHUNK_COUNT = 10 def __init__(self, timestamp_column_name: Optional[str] = None): """Creates a new DefaultChunker.""" super(DefaultChunker, self).__init__(timestamp_column_name) def _split(self, data: pd.DataFrame) -> List[Chunk]: if data.shape[0] == 0: return [] data = data.copy().reset_index(drop=True) chunk_size = data.shape[0] // self.DEFAULT_CHUNK_COUNT chunks = SizeBasedChunker(chunk_size=chunk_size, timestamp_column_name=self.timestamp_column_name).split( data=data ) return chunks