# Author: Niels Nuyttens <niels@nannyml.com>
# Jakub Bialek <jakub@nannyml.com>
#
# License: Apache Software License 2.0
"""NannyML module providing intelligent splitting of data into chunks."""
import abc
import logging
import warnings
from datetime import datetime
from typing import List
import numpy as np
import pandas as pd
from dateutil.parser import ParserError # type: ignore
from pandas import Period
from nannyml.exceptions import ChunkerException, InvalidArgumentsException, MissingMetadataException
from nannyml.metadata.base import NML_METADATA_PARTITION_COLUMN_NAME, NML_METADATA_TIMESTAMP_COLUMN_NAME
logger = logging.getLogger(__name__)
[docs]class Chunk:
"""A subset of data that acts as a logical unit during calculations."""
def __init__(
self,
key: str,
data: pd.DataFrame,
start_datetime: datetime = datetime.max,
end_datetime: datetime = datetime.max,
partition: str = None,
):
"""Creates a new chunk.
Parameters
----------
key : str, required.
A value describing what data is wrapped in this chunk.
data : DataFrame, required
The data to be contained within the chunk.
start_datetime: datetime
The starting point in time for this chunk.
end_datetime: datetime
The end point in time for this chunk.
partition : string, optional
The 'partition' this chunk belongs to, for example 'reference' or 'analysis'.
"""
self.key = key
self.data = data
self.partition = partition
self.is_transition: bool = False
self.start_datetime = start_datetime
self.end_datetime = end_datetime
self.start_index: int = 0
self.end_index: int = 0
[docs] def __repr__(self):
"""Returns textual summary of a chunk.
Returns
-------
chunk_str: str
"""
return (
f'Chunk[key={self.key}, data=pd.DataFrame[[{self.data.shape[0]}x{self.data.shape[1]}]], '
f'partition={self.partition}, is_transition={self.is_transition},'
f'start_datetime={self.start_datetime}, end_datetime={self.end_datetime},'
f'start_index={self.start_index}, end_index={self.end_index}]'
)
[docs] def __len__(self):
"""Returns the number of rows held within this chunk.
Returns
-------
length: int
Number of rows in the `data` property of the chunk.
"""
return self.data.shape[0]
def _get_partition(c: Chunk, partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME):
if partition_column_name not in c.data.columns:
raise MissingMetadataException(
f"missing partition column '{NML_METADATA_PARTITION_COLUMN_NAME}'." "Please provide valid metadata."
)
if _is_transition(c, partition_column_name):
return None
return c.data[partition_column_name].iloc[0]
def _is_transition(c: Chunk, partition_column_name: str = NML_METADATA_PARTITION_COLUMN_NAME) -> bool:
if c.data.shape[0] > 1:
return c.data[partition_column_name].nunique() > 1
else:
return False
def _get_boundary_indices(c: Chunk):
return c.data.index.min(), c.data.index.max()
[docs]class Chunker(abc.ABC):
"""Base class for Chunker implementations.
Inheriting classes will split a DataFrame into a list of Chunks.
They will do this based on several constraints, e.g. observation timestamps, number of observations per Chunk
or a preferred number of Chunks.
"""
def __init__(self):
"""Creates a new Chunker. Not used directly."""
pass
[docs] def split(self, data: pd.DataFrame, columns=None, minimum_chunk_size: int = None) -> List[Chunk]:
"""Splits a given data frame into a list of chunks.
This method provides a uniform interface across Chunker implementations to keep them interchangeable.
After performing the implementation-specific `_split` method, there are some checks on the resulting chunk list.
If the total number of chunks is low a warning will be written out to the logs.
We dynamically determine the optimal minimum number of observations per chunk and then check if the resulting
chunks contain at least as many. If there are any underpopulated chunks a warning will be written out in
the logs.
Parameters
----------
data: DataFrame
The data to be split into chunks
columns: List[str], default=None
A list of columns to be included in the resulting chunk data. Unlisted columns will be dropped.
minimum_chunk_size: int, default=None
The recommended minimum number of observations a :class:`~nannyml.chunk.Chunk` should hold.
When specified a warning will appear if the split results in underpopulated chunks.
When not specified there will be no checks for underpopulated chunks.
Returns
-------
chunks: List[Chunk]
The list of chunks
"""
if NML_METADATA_TIMESTAMP_COLUMN_NAME not in data.columns:
raise MissingMetadataException(
f"missing timestamp column '{NML_METADATA_TIMESTAMP_COLUMN_NAME}'." "Please provide valid metadata."
)
data = data.sort_values(by=[NML_METADATA_TIMESTAMP_COLUMN_NAME]).reset_index(drop=True)
try:
chunks = self._split(data, minimum_chunk_size)
except Exception as exc:
raise ChunkerException(f"could not split data into chunks: {exc}")
for c in chunks:
if _is_transition(c):
c.is_transition = True
c.partition = _get_partition(c)
c.start_index, c.end_index = _get_boundary_indices(c)
if columns is not None:
c.data = c.data[columns]
if len(chunks) < 6:
# TODO wording
warnings.warn(
'The resulting number of chunks is too low. '
'Please consider splitting your data in a different way or continue at your own risk.'
)
# check if all chunk sizes > minimal chunk size. If not, render a warning message.
if minimum_chunk_size:
underpopulated_chunks = [c for c in chunks if len(c) < minimum_chunk_size]
if len(underpopulated_chunks) > 0:
# TODO wording
warnings.warn(
f'The resulting list of chunks contains {len(underpopulated_chunks)} underpopulated chunks. '
'They contain too few records to be statistically robust and might negatively influence '
'the quality of calculations. '
'Please consider splitting your data in a different way or continue at your own risk.'
)
return chunks
# TODO wording
@abc.abstractmethod
def _split(self, data: pd.DataFrame, minimum_chunk_size: int = None) -> List[Chunk]:
"""Splits the DataFrame into chunks.
Abstract method, to be implemented within inheriting classes.
Parameters
----------
data: pandas.DataFrame
The full dataset that should be split into Chunks
minimum_chunk_size: int, default=None
The recommended minimum number of observations a :class:`~nannyml.chunk.Chunk` should hold.
Returns
-------
chunks: array of Chunks
The array of Chunks after splitting the original DataFrame `data`
See Also
--------
PeriodBasedChunker: Splits data based on the timestamp of observations
SizeBasedChunker: Splits data based on the amount of observations in a Chunk
CountBasedChunker: Splits data based on the resulting number of Chunks
Notes
-----
There is a minimal number of observations that a Chunk should contain in order to retain statistical relevance.
A chunker will log a warning message when your splitting criteria would result in underpopulated chunks.
Note that in this situation calculation results may not be relevant.
"""
pass # pragma: no cover
[docs]class PeriodBasedChunker(Chunker):
"""A Chunker that will split data into Chunks based on a date column in the data.
Examples
--------
Chunk using monthly periods and providing a column name
>>> from nannyml.chunk import PeriodBasedChunker
>>> df = pd.read_parquet('/path/to/my/data.pq')
>>> chunker = PeriodBasedChunker(date_column_name='observation_date', offset='M')
>>> chunks = chunker.split(data=df)
Or chunk using weekly periods
>>> from nannyml.chunk import PeriodBasedChunker
>>> df = pd.read_parquet('/path/to/my/data.pq')
>>> chunker = PeriodBasedChunker(date_column=df['observation_date'], offset='W', minimum_chunk_size=50)
>>> chunks = chunker.split(data=df)
"""
def __init__(
self,
date_column_name: str = NML_METADATA_TIMESTAMP_COLUMN_NAME,
offset: str = 'W',
):
"""Creates a new PeriodBasedChunker.
Parameters
----------
date_column_name: string
The name of the column in the DataFrame that contains the date used for chunking.
Defaults to the metadata timestamp column added by the `ModelMetadata.extract_metadata` function.
offset: a frequency string representing a pandas.tseries.offsets.DateOffset
The offset determines how the time-based grouping will occur. A list of possible values
is to be found at https://pandas.pydata.org/docs/user_guide/timeseries.html#offset-aliases.
Returns
-------
chunker: a PeriodBasedChunker instance used to split data into time-based Chunks.
"""
super().__init__()
self.date_column_name = date_column_name
self.offset = offset
def _split(self, data: pd.DataFrame, minimum_chunk_size: int = None) -> List[Chunk]:
chunks = []
date_column_name = self.date_column_name or self.date_column.name # type: ignore
try:
grouped_data = data.groupby(pd.to_datetime(data[date_column_name]).dt.to_period(self.offset))
k: Period
for k in grouped_data.groups.keys():
chunk = Chunk(
key=str(k), data=grouped_data.get_group(k), start_datetime=k.start_time, end_datetime=k.end_time
)
chunks.append(chunk)
except KeyError:
raise ChunkerException(f"could not find date_column '{date_column_name}' in given data")
except ParserError:
raise ChunkerException(
f"could not parse date_column '{date_column_name}' values as dates."
f"Please verify if you've specified the correct date column."
)
return chunks
[docs]class SizeBasedChunker(Chunker):
"""A Chunker that will split data into Chunks based on the preferred number of observations per Chunk.
Notes
-----
- Chunks are adjacent, not overlapping
- There will be no "incomplete chunks", so the leftover observations that cannot fill an entire chunk will
be dropped by default.
Examples
--------
Chunk using monthly periods and providing a column name
>>> from nannyml.chunk import SizeBasedChunker
>>> df = pd.read_parquet('/path/to/my/data.pq')
>>> chunker = SizeBasedChunker(chunk_size=2000, minimum_chunk_size=50)
>>> chunks = chunker.split(data=df)
"""
def __init__(self, chunk_size: int):
"""Create a new SizeBasedChunker.
Parameters
----------
chunk_size: int
The preferred size of the resulting Chunks, i.e. the number of observations in each Chunk.
Returns
-------
chunker: a size-based instance used to split data into Chunks of a constant size.
"""
super().__init__()
# TODO wording
if not isinstance(chunk_size, (int, np.int64)):
raise InvalidArgumentsException(
f"given chunk_size is of type {type(chunk_size)} but should be an int."
f"Please provide an integer as a chunk size"
)
# TODO wording
if chunk_size <= 0:
raise InvalidArgumentsException(
f"given chunk_size {chunk_size} is less then or equal to zero."
f"The chunk size should always be larger then zero"
)
self.chunk_size = chunk_size
def _split(self, data: pd.DataFrame, minimum_chunk_size: int = None) -> List[Chunk]:
def _create_chunk(index: int, data: pd.DataFrame, chunk_size: int) -> Chunk:
chunk_data = data.loc[index : index + chunk_size - 1, :]
min_date = pd.to_datetime(chunk_data[NML_METADATA_TIMESTAMP_COLUMN_NAME].min())
max_date = pd.to_datetime(chunk_data[NML_METADATA_TIMESTAMP_COLUMN_NAME].max())
return Chunk(
key=f'[{index}:{index + self.chunk_size - 1}]',
data=chunk_data,
start_datetime=min_date,
end_datetime=max_date,
)
data = data.copy().reset_index()
chunks = [
_create_chunk(index=i, data=data, chunk_size=self.chunk_size)
for i in range(0, len(data), self.chunk_size)
if i + self.chunk_size - 1 < len(data)
]
return chunks
[docs]class CountBasedChunker(Chunker):
"""A Chunker that will split data into chunks based on the preferred number of total chunks.
Examples
--------
>>> from nannyml.chunk import CountBasedChunker
>>> df = pd.read_parquet('/path/to/my/data.pq')
>>> chunker = CountBasedChunker(chunk_count=100, minimum_chunk_size=50)
>>> chunks = chunker.split(data=df)
"""
def __init__(self, chunk_count: int):
"""Creates a new CountBasedChunker.
It will calculate the amount of observations per chunk based on the given chunk count.
It then continues to split the data into chunks just like a SizeBasedChunker does.
Parameters
----------
chunk_count: int
The amount of chunks to split the data in.
Returns
-------
chunker: CountBasedChunker
"""
super().__init__()
# TODO wording
if not isinstance(chunk_count, int):
raise InvalidArgumentsException(
f"given chunk_count is of type {type(chunk_count)} but should be an int."
f"Please provide an integer as a chunk count"
)
# TODO wording
if chunk_count <= 0:
raise InvalidArgumentsException(
f"given chunk_count {chunk_count} is less then or equal to zero."
f"The chunk count should always be larger then zero"
)
self.chunk_count = chunk_count
def _split(self, data: pd.DataFrame, minimum_chunk_size: int = None) -> List[Chunk]:
if data.shape[0] == 0:
return []
data = data.copy().reset_index()
chunk_size = data.shape[0] // self.chunk_count
chunks = SizeBasedChunker(chunk_size=chunk_size).split(data=data, minimum_chunk_size=minimum_chunk_size)
return chunks
[docs]class DefaultChunker(Chunker):
"""Splits data into chunks sized 3 times the minimum chunk size.
Examples
--------
>>> from nannyml.chunk import DefaultChunker
>>> df = pd.read_parquet('/path/to/my/data.pq')
>>> chunker = DefaultChunker(minimum_chunk_size=50)
>>> chunks = chunker.split(data=df)
"""
def __init__(self):
"""Creates a new DefaultChunker."""
super(DefaultChunker, self).__init__()
def _split(self, data: pd.DataFrame, minimum_chunk_size: int = None) -> List[Chunk]:
if not minimum_chunk_size:
raise InvalidArgumentsException("could not use DefaultChunker: 'minimum_chunk_size' should be specified")
chunk_size = minimum_chunk_size * 3
chunks = SizeBasedChunker(chunk_size).split(data, minimum_chunk_size=minimum_chunk_size)
return chunks