Source code for nannyml.usage_logging

#  Author:   Niels Nuyttens  <>
#  License: Apache Software License 2.0

import functools
import importlib.util
import inspect
import logging
import os
import platform
import time
import uuid
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, TypeVar

import as segment_analytics
from dotenv import load_dotenv

from nannyml import __version__
from nannyml._typing import ParamSpec

T = TypeVar('T')
P = ParamSpec('P')

# read any .env files to import environment variables

[docs]def disable_usage_logging(): os.environ['NML_DISABLE_USAGE_LOGGING'] = '1'
[docs]def enable_usage_logging(): if 'NML_DISABLE_USAGE_LOGGING' in os.environ: del os.environ['NML_DISABLE_USAGE_LOGGING']
[docs]class UsageEvent(str, Enum): """Logged usage events""" # Calculators STATS_COUNT_FIT = "Simple Stats Count fit" STATS_COUNT_RUN = "Simple Stats Count run" STATS_COUNT_PLOT = "Simple Stats Count plot" STATS_STD_FIT = "Simple Stats Std fit" STATS_STD_RUN = "Simple Stats Std run" STATS_STD_PLOT = "Simple Stats Std plot" STATS_AVG_FIT = "Simple Stats Avg fit" STATS_AVG_RUN = "Simple Stats Avg run" STATS_AVG_PLOT = "Simple Stats Avg plot" STATS_SUM_FIT = "Simple Stats Sum fit" STATS_SUM_RUN = "Simple Stats Sum run" STATS_SUM_PLOT = "Simple Stats Sum plot" STATS_MEDIAN_FIT = "Simple Stats Median fit" STATS_MEDIAN_RUN = "Simple Stats Median run" STATS_MEDIAN_PLOT = "Simple Stats Median plot" DQ_CALC_MISSING_VALUES_FIT = "Data Quality Calculator Missing Values fit" DQ_CALC_MISSING_VALUES_RUN = "Data Quality Calculator Missing Values run" DQ_CALC_MISSING_VALUES_PLOT = "Data Quality Calculator Missing Values plot" DQ_CALC_UNSEEN_VALUES_FIT = "Data Quality Calculator Unseen Values fit" DQ_CALC_UNSEEN_VALUES_RUN = "Data Quality Calculator Unseen Values run" DQ_CALC_UNSEEN_VALUES_PLOT = "Data Quality Calculator Unseen Values plot" UNIVAR_DRIFT_CALC_FIT = "Univariate drift calculator fit" UNIVAR_DRIFT_CALC_RUN = "Univariate drift calculator run" UNIVAR_DRIFT_PLOT = "Univariate drift results plot" MULTIVAR_DRIFT_CALC_FIT = "Multivariate reconstruction error drift calculator fit" MULTIVAR_DRIFT_CALC_RUN = "Multivariate reconstruction error drift calculator run" MULTIVAR_DRIFT_PLOT = "Multivariate drift results plot" DC_CALC_FIT = "Domain Classifier calculator fit" DC_CALC_RUN = "Domain Classifier calculator run" DC_RESULTS_PLOT = "Domain Classifier results plot" PERFORMANCE_CALC_FIT = "Realized performance calculator fit" PERFORMANCE_CALC_RUN = "Realized performance calculator run" PERFORMANCE_PLOT = "Realized performance calculator plot" # Estimators CBPE_ESTIMATOR_FIT = "CBPE estimator fit" CBPE_ESTIMATOR_RUN = "CBPE estimator run" CBPE_PLOT = "CBPE estimator plot" DLE_ESTIMATOR_FIT = "DLE estimator fit" DLE_ESTIMATOR_RUN = "DLE estimator run" DLE_PLOT = "DLE estimator plot" # Ranking RANKER_ALERT_COUNT_RUN = "Run ranker using alert count" RANKER_CORRELATION_FIT = "Fit ranker using correlation with performance" RANKER_CORRELATION_RUN = "Run ranker using correlation with performance" CLI_RUN = "CLI run" WRITE_RAW = "Exported results with RawFilesWriter" WRITE_PICKLE = "Exported results with PickleWriter" WRITE_DB = "Exported results with DatabaseWriter"
[docs]class UsageLogger(ABC): @property def _logger(self): return logging.getLogger(__name__)
[docs] def log(self, usage_event: UsageEvent, metadata: Optional[Dict[str, Any]] = None): if "NML_DISABLE_USAGE_LOGGING" in os.environ: self._logger.debug( "found NML_DISABLE_USAGE_LOGGING key in environment variables. " f"Usage event {usage_event} not logged." ) return if metadata is None: metadata = {} self._log(usage_event, metadata) self._logger.debug(f"logged usage for event {usage_event} with metadata {metadata}")
@abstractmethod def _log(self, usage_event: UsageEvent, metadata: Dict[str, Any]): raise NotImplementedError(f"'{self.__class__.__name__}' does not implement '_log' yet.")
[docs]class SegmentUsageTracker(UsageLogger): SEGMENT_WRITE_KEY = 'lIVZJNAdj2ZaMzAHHnFWP76g7CuwmzGz' write_key: str def __init__(self, write_key: Optional[str] = None, machine_metadata: Optional[Dict[str, Any]] = None): if write_key is not None: self.write_key = write_key else: self.write_key = self.SEGMENT_WRITE_KEY segment_analytics.write_key = self.write_key segment_analytics.max_retries = 1 segment_analytics.timeout = 3 segment_analytics.max_retries = 1 if machine_metadata is not None: self._identify(machine_metadata) def _identify(self, machine_metadata: Dict[str, Any]): segment_analytics.identify(machine_metadata) def _log(self, usage_event: UsageEvent, metadata: Dict[str, Any]): user_id = str(uuid.UUID(int=uuid.getnode())) metadata.update(_get_system_information()) segment_analytics.track(user_id, usage_event.value, metadata)
DEFAULT_USAGE_LOGGER = SegmentUsageTracker()
[docs]def get_logger() -> UsageLogger: return DEFAULT_USAGE_LOGGER
[docs]def log_usage( usage_event: UsageEvent, metadata: Optional[Dict[str, Any]] = None, metadata_from_self: Optional[List[str]] = None, metadata_from_kwargs: Optional[List[str]] = None, logger: UsageLogger = DEFAULT_USAGE_LOGGER, ) -> Callable[[Callable[P, T]], Callable[P, T]]: def logging_decorator(func: Callable[P, T]) -> Callable[P, T]: @functools.wraps(func) def logging_wrapper(*args: P.args, **kwargs: P.kwargs) -> T: # track start times start_time = time.time() process_start_time = time.process_time() runtime_exception = None try: # run original function res = func(*args, **kwargs) except BaseException as exc: runtime_exception = exc # get run times run_time = time.time() - start_time process_run_time = time.process_time() - process_start_time try: # include run times in metadata md = metadata or {} md.update({'run_time': run_time, 'process_run_time': process_run_time}) # report if an exception occurred md.update({'exception_occurred': False}) if runtime_exception is not None: if hasattr(runtime_exception, '__module__'): exception_type = f'{runtime_exception.__module__}.{type(runtime_exception).__name__}' else: exception_type = type(runtime_exception).__name__ md.update( { 'exception_occurred': True, 'exception_type': exception_type, } ) # fetch additional information from instance properties if metadata_from_self is not None: for attr in metadata_from_self: val = getattr(args[0], attr) if isinstance(val, List): md.update({attr: [str(e) for e in val]}) else: md.update({attr: str(val)}) # fetch additional information from function kwargs if metadata_from_kwargs is not None: for attr in metadata_from_kwargs: if attr in kwargs: md.update({attr: kwargs[attr]}) else: # check if the requested parameter has a default value set param = inspect.signature(func).parameters[attr] if param.default is not inspect.Parameter.empty: md.update({attr: param.default}) # log the event logger.log(usage_event, md) finally: if runtime_exception is not None: raise runtime_exception else: return res return logging_wrapper return logging_decorator
def _get_system_information() -> Dict[str, Any]: return { "os_type": platform.system(), "runtime_environment": _get_runtime_environment(), "python_version": platform.python_version(), "nannyml_version": __version__, "nannyml_cloud": _is_nannyml_cloud(), } def _is_nannyml_cloud(): return 'NML_CLOUD' in os.environ def _get_runtime_environment(): if _is_running_in_notebook(): return 'notebook' elif _is_running_in_docker(): return 'docker' elif _is_running_in_kubernetes(): if _is_running_in_aks(): return 'aks' elif _is_running_in_eks(): return 'eks' else: return 'kubernetes' else: return 'native' # Inspired by def _is_running_in_docker(): if Path('/.dockerenv').exists(): return True if any('docker' in line for line in Path('/proc/self/cgroup').open()): return True return False def _is_running_in_kubernetes(): return Path('/var/run/secrets/').exists() def _is_running_in_aks(): import requests try: metadata = requests.get( '', headers={'Metadata': 'true'}, timeout=5 ) return metadata.status_code == 200 except Exception: return False def _is_running_in_eks(): import requests try: token = requests.put( '', headers={'X-aws-ec2-metadata-token-ttl-seconds': 21600}, timeout=5, ).raw() metadata = requests.get('', headers={'X-aws-ec2-metadata-token': token}) return metadata.status_code == 200 except Exception: return False # Inspired by # # and def _is_running_in_notebook(): if importlib.util.find_spec("IPython") is not None: from IPython import get_ipython if get_ipython().__class__.__name__ in [ "ZMQInteractiveShell", ]: return True return False