Source code for

#  Author:   Niels Nuyttens  <>
#  License: Apache Software License 2.0

from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Dict

import fsspec
import pandas as pd

from nannyml.exceptions import InvalidArgumentsException
from import Reader, _get_filepath_str, _get_protocol_and_path

[docs]class FileReader(Reader): """A Reader implementation that retrieves data from a file (either local or cloud based).""" def __init__( self, filepath: str, read_args: Dict[str, Any] = None, credentials: Dict[str, Any] = None, fs_args: Dict[str, Any] = None, ): """ Creates a new FileReader instance. Parameters ---------- filepath : str The path to read data from. Can be a regular file path or contain a protocol. read_args : Dict[str, Any] Specific arguments passed along to the methods doing the actual reading (mostly Pandas-based). credentials : Dict[str, Any] Used to provide credential information following specific ``fsspec`` implementations. fs_args : Specific arguments passed along to the ``fsspec`` filesystem initializer. Examples -------- >>> local_reader = FileReader( ... filepath='/my-data-directory/data.pq' ... ) >>> aws_reader = FileReader( ... filepath='s3://my-data-directory/data.pq', ... credentials={'key': 'my_key', 'secret': 'my_secret'} ... ) >>> aws_reader2 = FileReader( ... filepath='s3://my-data-directory/data.pq', ... credentials={'aws_access_key_id': 'access_key_id', 'aws_secret_access_key': 'secret_access_key'} ... ) >>> gcp_reader = FileReader( ... filepath='s3://my-data-directory/data.pq', ... credentials={'token': 'my_service_account_credential_file.json'} ... ) """ _fs_args = deepcopy(fs_args) or {} _credentials = deepcopy(credentials) or {} protocol, path = _get_protocol_and_path(filepath) if protocol == "file": _fs_args.setdefault("auto_mkdir", True) self._protocol = protocol self._storage_options = {**_credentials, **_fs_args} self._fs = fsspec.filesystem(self._protocol, **self._storage_options) self._filepath = PurePosixPath(path) self._read_args = read_args or {} # type: Dict[str, Any] def _read(self) -> pd.DataFrame: read_path = _get_filepath_str(str(self._filepath), self._protocol) with, mode='rb') as f: if self._filepath.suffix in ['.pq', '.parquet']: return pd.read_parquet(f, **self._read_args) elif self._filepath.suffix == '.csv': return pd.read_csv(f, **self._read_args) else: raise InvalidArgumentsException(f"'{self._filepath.suffix}' files are currently not supported.")