Source code for nannyml.io.store.file_store

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict, Optional

import fsspec

from nannyml.io.base import _get_protocol_and_path, get_filepath_str
from nannyml.io.store.base import Store
from nannyml.io.store.serializers import JoblibPickleSerializer, Serializer


[docs]class FilesystemStore(Store): """A Store implementation that uses a local or remote file system for persistence. Any object is first serialized using an instance of the `Serializer` class. The resulting bytes are then written onto a file system. The `FilesystemStore` uses `fsspec` under the covers, allowing it to support a wide range of local and remote filesystems. These include (but are not limited to) S3, Google Cloud Storage and Azure Blob Storage. In order to these remote filesystems, additional credentials can be passed along. Examples --------- Using S3 as a backing filesystem. See https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html to learn more about the required access key id and secret access key credentials. >>> store = FilesystemStore( ... root_path='s3://my-bucket-name/some/path', ... credentials={ ... 'client_kwargs': { ... 'aws_access_key_id': '<ACCESS_KEY_ID>' ... 'aws_secret_access_key': '<SECRET_ACCESS_KEY>' ... } ... } ... ) Using Google Cloud Storage (GCS) as a backing filesystem. See https://cloud.google.com/iam/docs/creating-managing-service-account-keys to learn more about the required service account key credentials. >>> store = FilesystemStore( ... root_path='gs://my-bucket-name/some/path', ... credentials={'token': 'service-account-access-key.json'} ... ) Using Azure Blob Storage as a backing filesystem. See https://github.com/fsspec/adlfs#setting-credentials to learn more about the required credentials. >>> store = FilesystemStore( ... root_path='abfs://my-container-name/some/path', ... credentials={'account_name': '<ACCOUNT_NAME>', 'account_key': '<ACCOUNT_KEY>'} ... ) Performing basic operations. An optional path parameter can be set to control what subdirectories and file name should be used when storing. When none is given the object will be stored in the configured store root path using an automatically generated name. >>> store = FilesystemStore(root_path='/tmp/nml-cache') # creating the store >>> store.store(calc, path='example/calc.pkl') # storing the object >>> store.load(path='example/calc.pkl') # returns the object without any checks >>> # returns the object if it is a UnivariateDriftCalculator, raises a StoreException otherwise >>> store.load(path='example/calc.pkl', as_type='UnivariateDriftCalculator') >>> store.load(path='i_dont_exist.pkl') # raises a StoreException """ def __init__( self, root_path: str, credentials: Optional[Dict[str, Any]] = None, fs_args: Optional[Dict[str, Any]] = None, serializer: Serializer = JoblibPickleSerializer(), ): """Creates a new FilesystemStore instance. Parameters ---------- root_path : str The root directory where all storage operations will originate. credentials : Optional[Dict[str, Any]], default=None Optional dictionary of credential information passed along to `fsspec`. Exact contents depend on the type of backing filesystem used. fs_args : Optional[Dict[str, Any]], default=None Optional dictionary of initialization parameters passed along when creating an internal `fsspec.filesystem` instance. serializer : Serializer, default=JoblibPickleSerializer() An optional `Serializer` instance that will be used to convert an object into a byte representation and the other way around. The default uses the `JoblibPickleSerializer`, which internally relies on `joblib` and it's pickling functionality. """ super().__init__() _fs_args = deepcopy(fs_args) or {} _credentials = deepcopy(credentials) or {} protocol, path = _get_protocol_and_path(root_path) if protocol == "file": _fs_args.setdefault("auto_mkdir", True) self._protocol = protocol.lower() self.root_path = path self._storage_options = {**_credentials, **_fs_args} self._fs = fsspec.filesystem(self._protocol, **self._storage_options) self._serializer = serializer def _store(self, obj, path: Optional[str] = None, **store_args): if not path: path = f'{obj.__module__}.{obj.__class__.__name__}.pkl' write_path = Path(get_filepath_str(self.root_path, self._protocol)) / path with self._fs.open(str(write_path), mode="wb") as fs_file: bytez = self._serializer.serialize(obj) fs_file.write(bytez) def _load(self, path: str, **load_args): try: load_path = Path(get_filepath_str(self.root_path, self._protocol)) / path with self._fs.open(str(load_path), mode="rb") as fs_file: bytez = fs_file.read() calc = self._serializer.deserialize(bytez) return calc except FileNotFoundError: p = f'{self._protocol}://{self.root_path}/{path}' self._logger.info(f'could not find file in store location "{p}", returning "None"') return None