Source code for nannyml.usage_logging

#  Author:   Niels Nuyttens  <niels@nannyml.com>
#
#  License: Apache Software License 2.0

import functools
import importlib.util
import inspect
import logging
import os
import platform
import time
import uuid
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional

import analytics as segment_analytics
from dotenv import load_dotenv

from nannyml import __version__

# read any .env files to import environment variables
load_dotenv()


[docs]def disable_usage_logging(): os.environ['NML_DISABLE_USAGE_LOGGING'] = '1'
[docs]def enable_usage_logging(): if 'NML_DISABLE_USAGE_LOGGING' in os.environ: del os.environ['NML_DISABLE_USAGE_LOGGING']
[docs]class UsageEvent(str, Enum): """Logged usage events""" # Calculators UNIVAR_DRIFT_CALC_FIT = "Univariate drift calculator fit" UNIVAR_DRIFT_CALC_RUN = "Univariate drift calculator run" UNIVAR_DRIFT_PLOT = "Univariate drift results plot" MULTIVAR_DRIFT_CALC_FIT = "Multivariate reconstruction error drift calculator fit" MULTIVAR_DRIFT_CALC_RUN = "Multivariate reconstruction error drift calculator run" MULTIVAR_DRIFT_PLOT = "Multivariate drift results plot" PERFORMANCE_CALC_FIT = "Realized performance calculator fit" PERFORMANCE_CALC_RUN = "Realized performance calculator run" PERFORMANCE_PLOT = "Realized performance calculator plot" # Estimators CBPE_ESTIMATOR_FIT = "CBPE estimator fit" CBPE_ESTIMATOR_RUN = "CBPE estimator run" CBPE_PLOT = "CBPE estimator plot" DLE_ESTIMATOR_FIT = "DLE estimator fit" DLE_ESTIMATOR_RUN = "DLE estimator run" DLE_PLOT = "DLE estimator plot" # Ranking RANKER_ALERT_COUNT_RUN = "Run ranker using alert count" RANKER_CORRELATION_FIT = "Fit ranker using correlation with performance" RANKER_CORRELATION_RUN = "Run ranker using correlation with performance" CLI_RUN = "CLI run" WRITE_RAW = "Exported results with RawFilesWriter" WRITE_PICKLE = "Exported results with PickleWriter" WRITE_DB = "Exported results with DatabaseWriter"
[docs]class UsageLogger(ABC): @property def _logger(self): return logging.getLogger(__name__)
[docs] def log(self, usage_event: UsageEvent, metadata: Optional[Dict[str, Any]] = None): if "NML_DISABLE_USAGE_LOGGING" in os.environ: self._logger.debug( "found NML_DISABLE_USAGE_LOGGING key in environment variables. " f"Usage event {usage_event} not logged." ) return if metadata is None: metadata = {} self._log(usage_event, metadata) self._logger.debug(f"logged usage for event {usage_event} with metadata {metadata}")
@abstractmethod def _log(self, usage_event: UsageEvent, metadata: Dict[str, Any]): raise NotImplementedError(f"'{self.__class__.__name__}' does not implement '_log' yet.")
[docs]class SegmentUsageTracker(UsageLogger): SEGMENT_WRITE_KEY = 'lIVZJNAdj2ZaMzAHHnFWP76g7CuwmzGz' write_key: str def __init__(self, write_key: Optional[str] = None, machine_metadata: Optional[Dict[str, Any]] = None): if write_key is not None: self.write_key = write_key else: self.write_key = self.SEGMENT_WRITE_KEY segment_analytics.write_key = self.write_key segment_analytics.max_retries = 1 if machine_metadata is not None: self._identify(machine_metadata) def _identify(self, machine_metadata: Dict[str, Any]): segment_analytics.identify(machine_metadata) def _log(self, usage_event: UsageEvent, metadata: Dict[str, Any]): user_id = str(uuid.UUID(int=uuid.getnode())) metadata.update(_get_system_information()) segment_analytics.track(user_id, usage_event.value, metadata)
DEFAULT_USAGE_LOGGER = SegmentUsageTracker()
[docs]def get_logger() -> UsageLogger: return DEFAULT_USAGE_LOGGER
[docs]def log_usage( usage_event: UsageEvent, metadata: Optional[Dict[str, Any]] = None, metadata_from_self: Optional[List[str]] = None, metadata_from_kwargs: Optional[List[str]] = None, logger: UsageLogger = DEFAULT_USAGE_LOGGER, ): def logging_decorator(func): @functools.wraps(func) def logging_wrapper(*args, **kwargs): # track start times start_time = time.time() process_start_time = time.process_time() runtime_exception, res = None, None try: # run original function res = func(*args, **kwargs) except BaseException as exc: runtime_exception = exc # get run times run_time = time.time() - start_time process_run_time = time.process_time() - process_start_time try: # include run times in metadata md = metadata or {} md.update({'run_time': run_time, 'process_run_time': process_run_time}) # report if an exception occurred md.update({'exception_occurred': False}) if runtime_exception is not None: if hasattr(runtime_exception, '__module__'): exception_type = f'{runtime_exception.__module__}.{type(runtime_exception).__name__}' else: exception_type = type(runtime_exception).__name__ md.update( { 'exception_occurred': True, 'exception_type': exception_type, } ) # fetch additional information from instance properties if metadata_from_self is not None: for attr in metadata_from_self: val = getattr(args[0], attr) if isinstance(val, List): md.update({attr: [str(e) for e in val]}) else: md.update({attr: str(val)}) # fetch additional information from function kwargs if metadata_from_kwargs is not None: for attr in metadata_from_kwargs: if attr in kwargs: md.update({attr: kwargs[attr]}) else: # check if the requested parameter has a default value set param = inspect.signature(func).parameters[attr] if param.default is not inspect.Parameter.empty: md.update({attr: param.default}) # log the event logger.log(usage_event, md) finally: if runtime_exception is not None: raise runtime_exception else: return res return logging_wrapper return logging_decorator
def _get_system_information() -> Dict[str, Any]: return { "os_type": platform.system(), "runtime_environment": _get_runtime_environment(), "python_version": platform.python_version(), "nannyml_version": __version__, } def _get_runtime_environment(): if _is_running_in_docker(): return 'docker' elif _is_running_in_notebook(): return 'notebook' else: return 'native' # Inspired by https://github.com/jaraco/jaraco.docker/blob/main/jaraco/docker.py def _is_running_in_docker(): if Path('/.dockerenv').exists(): return True if any('docker' in line for line in Path('/proc/self/cgroup').open()): return True return False # Inspired by # https://github.com/zenml-io/zenml/blob/275109da08b783d5d2cd508b5f703aed0c66e485/src/zenml/environment.py#L182 # and https://stackoverflow.com/a/39662359 def _is_running_in_notebook(): if importlib.util.find_spec("IPython") is not None: from IPython import get_ipython if get_ipython().__class__.__name__ in [ "ZMQInteractiveShell", ]: return True return False