#  Author:   Niels Nuyttens  <>
#  License: Apache Software License 2.0
import abc
import logging
from copy import deepcopy
from pathlib import Path
from typing import Any, Dict

import fsspec

from nannyml._typing import Result
from import Writer, _get_protocol_and_path

[docs]class FileWriter(Writer, abc.ABC): """An abstract Writer implementation that writes results out to a filesystem (either local or remote / cloud). This abstract base class provides a helper ``_write_bytes_to_filesystem`` function that helps to implement the abstract ``_write`` function. """ _logger = logging.getLogger(__name__) def __init__( self, path: str, write_args: Dict[str, Any] = None, credentials: Dict[str, Any] = None, fs_args: Dict[str, Any] = None, ): """ Creates a new FileWriter Parameters ---------- path : str The path to read data from. Can be a regular file path or contain a protocol. write_args : Dict[str, Any] Specific arguments passed along the method performing the actual writing. credentials : Dict[str, Any] Used to provide credential information following specific ``fsspec`` implementations. fs_args : Specific arguments passed along to the ``fsspec`` filesystem initializer. """ super().__init__() _fs_args = deepcopy(fs_args) or {} _credentials = deepcopy(credentials) or {} protocol, path = _get_protocol_and_path(path) if protocol == "file": _fs_args.setdefault("auto_mkdir", True) self._protocol = protocol self.filepath = path self._storage_options = {**_credentials, **_fs_args} self._fs = fsspec.filesystem(self._protocol, **self._storage_options) self._write_args = write_args or {} # type: Dict[str, Any] def _write(self, result: Result, **kwargs): raise NotImplementedError( f"'{self.__class__.__name__}' is a subclass of Writer and it must implement the _write method" )
def _write_bytes_to_filesystem(bytez, save_path: Path, fs: fsspec.spec.AbstractFileSystem): with, mode="wb") as fs_file: fs_file.write(bytez)