"""Utilities related to configuration and handling of framework logging.
"""
import abc
import os
import sys
import argparse
import datetime
import io
import subprocess
import signal
import traceback
import typing
import dataclasses as dc
from . import basic, dataclass, exceptions
import logging
import logging.config
import logging.handlers
_log = logging.getLogger(__name__)
[docs]
class MDTFConsoleHandler(logging.StreamHandler):
"""Dummy class to designate logging to stdout or stderr from the root logger.
"""
pass
[docs]
class StringIOHandler(logging.StreamHandler):
""":py:class:`~logging.StreamHandler` instance that writes log entries to
an internal :py:class:`~io.StringIO` buffer.
"""
def __init__(self):
self._log_buffer = io.StringIO()
super(StringIOHandler, self).__init__(stream=self._log_buffer)
[docs]
def reset_buffer(self):
# easier to create a new buffer than reset the existing one
new_buffer = io.StringIO()
old_buffer = self.setStream(new_buffer)
old_buffer.close()
self._log_buffer = new_buffer
[docs]
def close(self):
super(StringIOHandler, self).close()
self._log_buffer.close()
[docs]
def buffer_contents(self):
"""Return contents of buffer as a string."""
return self._log_buffer.getvalue()
[docs]
class MultiFlushMemoryHandler(logging.handlers.MemoryHandler):
"""Subclass :py:class:`~logging.handlers.MemoryHandler` to enable flushing
the contents of its log buffer to multiple targets. We do this to solve the
chicken-and-egg problem of logging any events that happen before the log
outputs are configured: those events are captured by an instance of this
handler and then transfer()'ed to other handlers once they're set up.
See `<https://stackoverflow.com/a/12896092>`__.
"""
[docs]
def transfer(self, target_handler):
"""Transfer contents of buffer to target_handler.
Args:
target_handler (:py:class:`~logging.Handler`): log handler to transfer
contents of buffer to.
"""
self.acquire()
try:
self.setTarget(target_handler)
if self.target:
for record in self.buffer:
if self.target.level <= record.levelno:
self.target.handle(record)
# self.buffer = [] # don't clear buffer!
finally:
self.release()
[docs]
def transfer_to_non_console(self, logger):
"""Transfer contents of buffer to all non-console-based handlers attached
to *logger* (handlers that aren't :py:class:`MDTFConsoleHandler`.)
If no handlers are attached to the logger, a warning is printed and the
buffer is transferred to the :py:class:`~logging.lastResort` handler, i.e.
printed to stderr.
Args:
logger (:py:class:`~logging.Logger`): logger to transfer
contents of buffer to.
"""
no_transfer_flag = True
for h in logger.handlers:
if not isinstance(h, (MultiFlushMemoryHandler, MDTFConsoleHandler)):
self.transfer(h)
no_transfer_flag = False
if no_transfer_flag:
logger.warning("No non-console loggers configured.")
self.transfer(logging.lastResort)
[docs]
class HeaderFileHandler(logging.FileHandler):
"""Subclass :py:class:`~logging.FileHandler` to print system and git repo
information in a header at the start of a log file without writing it to
other loggers (e.g. the console.)
"""
def _log_header(self):
return ""
def _open(self):
"""Write header information right after we open the log file, then
proceed normally.
"""
fp = super(HeaderFileHandler, self)._open()
fp.write(self._log_header())
return fp
[docs]
class MDTFHeaderFileHandler(HeaderFileHandler):
""":py:class:`~logging.FileHandler` which adds a header to log files with
system information, git repo status etc. provided by :func:`mdtf_log_header`.
"""
def _log_header(self):
return mdtf_log_header("MDTF PACKAGE LOG")
def _hanging_indent(str_, initial_indent, subsequent_indent):
"""Poor man's indenter. Easier than using textwrap for this case.
Args:
str_ (str): String to be indented.
initial_indent (str): string to insert as the indent for the first
line.
subsequent_indent (str): string to insert as the indent for all
subsequent lines.
Returns:
Indented string.
"""
if isinstance(initial_indent, int):
initial_indent = initial_indent * ' '
if isinstance(subsequent_indent, int):
subsequent_indent = subsequent_indent * ' '
lines_ = str_.splitlines()
lines_out = []
if len(lines_) > 0:
lines_out = lines_out + [initial_indent + lines_[0]]
if len(lines_) > 1:
lines_out = lines_out + [(subsequent_indent+l) for l in lines_[1:]]
return '\n'.join(lines_out)
class _LevelFilterBase(logging.Filter):
"""Base class to implement common features of level-based filters."""
def __init__(self, name="", level=None):
super(_LevelFilterBase, self).__init__(name=name)
if level is None:
level = logging.NOTSET
if not isinstance(level, int):
if hasattr(logging, str(level)):
level = getattr(logging, str(level))
else:
level = int(level)
self.levelno = level
[docs]
class GeqLevelFilter(_LevelFilterBase):
""":py:class:`~logging.Filter` to include only log messages with a severity of
*level* or worse. This is normally done by setting the *level* attribute on a
:py:class:`~logging.Handler`, but we need to add a filter when transferring
records from another logger (the :class:`MultiFlushMemoryHandler` cache), as
shown in `<https://stackoverflow.com/a/24324246>`__.
"""
[docs]
def filter(self, record):
return record.levelno >= self.levelno
[docs]
class LtLevelFilter(_LevelFilterBase):
""":py:class:`~logging.Filter` to include only log messages with a severity
less than *level*.
"""
[docs]
def filter(self, record):
return record.levelno < self.levelno
[docs]
class EqLevelFilter(_LevelFilterBase):
""":py:class:`~logging.Filter` to include only log messages with a severity
equal to *level*.
"""
[docs]
def filter(self, record):
return record.levelno == self.levelno
[docs]
class NameMatchFilter(logging.Filter):
""":py:class:`~logging.Filter` that only accepts log events directed to it
specifically, rejecting all events coming from child loggers.
Intended to be attached to a handler -- the effect of attaching this to a
logger is the same as setting *propagate* = False on it.
"""
def __init__(self, name=""):
super(NameMatchFilter, self).__init__()
self._name = name
[docs]
def filter(self, record):
return (record.name == self._name)
OBJ_LOG_TAG_ATTR_NAME = 'tags'
[docs]
class TagMatchFilter(logging.Filter):
""":py:class:`~logging.Filter` which only accepts records having the
designated combination of custom 'tag' attributes. These are assigned by the
methods in :class:`MDTFObjectLogger` or can be passed via the 'extra' kwarg
on any logger (see discussion in entry for
`~logging.Logger
<https://docs.python.org/3.7/library/logging.html#logging.Logger.debug>`__.)
"""
def __init__(self, name="", tags=None):
super(TagMatchFilter, self).__init__(name=name)
self._tags = basic.to_iter(tags, set)
[docs]
def filter(self, record):
if not hasattr(record, OBJ_LOG_TAG_ATTR_NAME):
return False
tags = basic.to_iter(record.tags, set)
return self._tags.issubset(tags)
# ------------------------------------------------------------------------------
# standardize
OBJ_LOG_ROOT = 'MDTF' # "root logger" of the object logger hierarchy
ObjectLogTag = basic.MDTFEnum(
"ObjectLogTag",
"NC_HISTORY BANNER IN_FILE OUT_FILE",
module=__name__
)
ObjectLogTag.__doc__ = """Standardized values that the MDTF-defined *tags*
attribute on :py:class:`~logging.LogRecord` objects can take, and that
:class:`TagMatchFilter` can listen for. These specify different destinations for
the logging events.
"""
[docs]
class MDTFObjectLogger(logging.Logger):
"""This class wraps functionality for use by :class:`MDTFObjectLoggerMixin`
for log record-keeping by objects in the object hierarchy:
- A :py:class:`~logging.Logger` to record events affecting the parent object
only. This logger does not propagate events up the log hierarchy: the
module-level logger should be used if that functionality is desired.
- A queue (*_exceptions*) for holding :py:class:`Exception` objects received
by the parent object.
"""
def __init__(self, name):
super(MDTFObjectLogger, self).__init__(name)
self._exceptions = []
self._tracebacks = []
self._log_buffer = io.StringIO()
[docs]
def log(self, level, msg, *args, **kw):
# add "tags" attribute to all emitted LogRecords
if 'extra' not in kw:
kw['extra'] = dict()
kw['extra'][OBJ_LOG_TAG_ATTR_NAME] = basic.to_iter(
kw['extra'].get(OBJ_LOG_TAG_ATTR_NAME, None), set
)
if OBJ_LOG_TAG_ATTR_NAME in kw:
kw['extra'][OBJ_LOG_TAG_ATTR_NAME].update(
basic.to_iter(kw[OBJ_LOG_TAG_ATTR_NAME])
)
del kw[OBJ_LOG_TAG_ATTR_NAME]
super(MDTFObjectLogger, self).log(level, msg, *args, **kw)
# wrap convenience methods
# nb: typo in https://github.com/python/cpython/blob/3.7/Lib/logging/__init__.py line 1407
[docs]
def debug(self, msg, *args, **kw):
self.log(logging.DEBUG, msg, *args, **kw)
[docs]
def info(self, msg, *args, **kw):
self.log(logging.INFO, msg, *args, **kw)
[docs]
def warning(self, msg, *args, **kw):
self.log(logging.WARNING, msg, *args, **kw)
[docs]
def error(self, msg, *args, **kw):
self.log(logging.ERROR, msg, *args, **kw)
[docs]
def critical(self, msg, *args, **kw):
self.log(logging.CRITICAL, msg, *args, **kw)
[docs]
def exception(self, msg, *args, exc_info=True, **kw):
if exc_info:
exc_type, _, _ = sys.exc_info()
if issubclass(exc_type, exceptions.MDTFEvent):
# "exception" isn't an exception, just an event, so log with
# severity info
self.log(logging.INFO, msg, *args, exc_info=False, **kw)
return
self.log(logging.ERROR, msg, *args, exc_info=exc_info, **kw)
# exception object storage
@property
def has_exceptions(self):
"""Return boolean corresponding to whether this object has received any
exceptions (via :meth:`store_exception`.)
"""
return len(self._exceptions) > 0
[docs]
def store_exception(self, exc):
"""Add an Exception object *exc* to the internal list.
"""
self._exceptions.append(exc)
tb_exc = traceback.TracebackException(*(sys.exc_info()))
self._tracebacks.append(tb_exc)
[docs]
@classmethod
def get_logger(cls, log_name) -> logging.getLogger():
"""Workaround for setting the logger class, since logger objects have
global state (calling getLogger with the same name returns the same
object, like a Singleton.)
"""
old_log_class = logging.getLoggerClass()
logging.setLoggerClass(cls)
log = logging.getLogger(log_name)
logging.setLoggerClass(old_log_class)
# default settings
log.propagate = True
log.setLevel(logging.NOTSET)
return log
[docs]
def close_log_file(self, log=True):
self.log._log_handler.close()
self.log._log_handler = None
[docs]
class MDTFObjectLoggerMixinBase:
"""Dummy base class acting as a parent for all logging mixin classes for
elements of the object hierarchy.
"""
pass
[docs]
class MDTFObjectLoggerMixin(MDTFObjectLoggerMixinBase):
"""Base class to implement per-object logging for objects in the object hierarchy.
Based on `<https://stackoverflow.com/q/57813096>`__.
This wraps related functionalities:
- A :py:class:`~logging.Logger` to record events affecting the object
only. Log messages are cached in a :py:class:`~io.StringIO` buffer.
- A method :meth:`~MDTFObjectLogger.format` for formatting the contents of
the above into a string, along with log messages of any child objects.
This is intended for preparing per-POD and per-case log files; logging
intended for the console should use the module loggers.
"""
[docs]
def init_log(self, fmt=None, log_dir=None):
"""Logger initialization. This is a mixin class, so we don't define a
``__init__`` method for simplicity.
"""
if fmt is None:
fmt = '%(levelname)s: %(message)s'
assert hasattr(self, 'log'), 'class is missing required `log` attribute'
if log_dir is not None:
self._log_handler = MDTFHeaderFileHandler(
filename=os.path.join(log_dir, f"{self.name}.log"),
mode="w", encoding="utf-8"
)
else:
self._log_handler = StringIOHandler()
# don't record events from children in StringIO buffer
self._log_handler.addFilter(NameMatchFilter(self._log_name))
formatter = logging.Formatter(fmt=fmt, datefmt='%H:%M:%S')
self._log_handler.setFormatter(formatter)
self.log.addHandler(self._log_handler)
self.init_extra_log_handlers()
[docs]
def init_extra_log_handlers(self):
"""Hook used by child classes to add class-specific log handlers.
"""
pass
@property
def last_exception(self):
"""Return most recent Exception received by the object.
"""
if self.log.has_exceptions:
return self.log._exceptions[-1]
else:
return None
[docs]
class VarlistEntryLoggerMixin(MDTFObjectLoggerMixin):
"""Mixin providing per-object logging for :class:`~diagnostic.VarlistEntry`.
"""
[docs]
def init_extra_log_handlers(self):
# add extra handler for additions to netCDF history attribute
self._nc_history_log = StringIOHandler()
self._nc_history_log.addFilter(TagMatchFilter(tags=ObjectLogTag.NC_HISTORY))
formatter = logging.Formatter(
fmt='%(asctime)s: MDTF package: %(message)s',
datefmt='%a %b %d %H:%M:%S %Y' # mimic date format used by NCO, CDO
)
self._nc_history_log.setFormatter(formatter)
self.log.addHandler(self._nc_history_log)
class _CaseAndPODHandlerMixin:
"""Common methods for providing per-object logging for
:class:`PODLoggerMixin` and :class:`CaseLoggerMixin`.
"""
def init_extra_log_handlers(self):
# add handler for warning banner
self._banner_log = StringIOHandler()
self._banner_log.addFilter(TagMatchFilter(tags=ObjectLogTag.BANNER))
formatter = logging.Formatter(
fmt='%(levelname)s: %(message)s', datefmt='%H:%M:%S'
)
self._banner_log.setFormatter(formatter)
self.log.addHandler(self._banner_log)
# add handlers for log of data files used
formatter = logging.Formatter(fmt='%(message)s', datefmt='%H:%M:%S')
self._in_file_log = StringIOHandler()
self._in_file_log.addFilter(TagMatchFilter(tags=ObjectLogTag.IN_FILE))
self._in_file_log.setFormatter(formatter)
self.log.addHandler(self._in_file_log)
self._out_file_log = StringIOHandler()
self._out_file_log.addFilter(TagMatchFilter(tags=ObjectLogTag.OUT_FILE))
self._out_file_log.setFormatter(formatter)
self.log.addHandler(self._out_file_log)
[docs]
class PODLoggerMixin(_CaseAndPODHandlerMixin, MDTFObjectLoggerMixin):
"""Mixin providing per-object logging for :class:`~diagnostic.Diagnostic`
(POD objects.)
"""
[docs]
class CaseLoggerMixin(_CaseAndPODHandlerMixin, MDTFObjectLoggerMixin):
"""Mixin providing per-object logging for :class:`~data_manager.DataSourceBase`
(case objects, corresponding to experiments.)
"""
[docs]
def init_log(self, log_dir, fmt=None):
# Mixin class, so no __init__ for simplicity
# NB: no super(); redefining the method
if fmt is None:
fmt = ("%(asctime)s %(levelname)s: %(funcName)s (%(filename)s line "
"%(lineno)d):\n%(message)s")
assert hasattr(self, 'log')
self.log.propagate = True
self.log.setLevel(logging.DEBUG)
formatter = HangingIndentFormatter(
fmt=fmt, datefmt='%H:%M:%S',
header="", footer="\n"
)
if log_dir is not None:
self._log_handler = MDTFHeaderFileHandler(
filename=os.path.join(log_dir, f"{self.name}.log"),
mode="w", encoding="utf-8"
)
else:
self._log_handler = StringIOHandler()
# don't record events from children in StringIO buffer
self._log_handler.addFilter(NameMatchFilter(self._log_name))
self._log_handler.setFormatter(formatter)
if self.log.hasHandlers():
for handler in self.log.handlers:
self.log.removeHandler(handler)
self._log_handler = MDTFHeaderFileHandler(
filename=os.path.join(log_dir, f"{self.name}.log"),
mode="w", encoding="utf-8"
)
self._log_handler.setFormatter(formatter)
self.log.addHandler(self._log_handler)
self.init_extra_log_handlers()
# transfer stuff from root logger cache
transfer_log_cache(self.log, close=False)
[docs]
def close_log_file(self, log=True):
self._log_handler.close()
self._log_handler = None
# ------------------------------------------------------------------------------
[docs]
def git_info():
"""Get the current git branch, hash, and list of uncommitted files, if
available. Based on NumPy's implementation: `<https://stackoverflow.com/a/40170206>`__.
"""
def _minimal_ext_cmd(cmd):
# construct minimal environment
env = os.environ.copy()
env.update({'LANGUAGE':'C', 'LANG':'C', 'LC_ALL':'C'})
try:
out = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, env=env
).communicate()[0]
except subprocess.CalledProcessError:
out = ''
return out.strip().decode('utf-8')
git_branch = ""
git_hash = ""
git_dirty = ""
try:
git_branch = _minimal_ext_cmd(['git', 'rev-parse', '--abbrev-ref', 'HEAD'])
git_hash = _minimal_ext_cmd(['git', 'rev-parse', 'HEAD'])
git_dirty = _minimal_ext_cmd(['git', 'diff', '--no-ext-diff', '--name-only'])
except OSError:
pass
if git_dirty:
git_dirty = git_dirty.splitlines()
elif git_hash:
git_dirty = "<none>"
else:
git_dirty = "<couldn't get uncomitted files>"
if not git_branch:
git_branch = "<no branch>"
if not git_hash:
git_hash = "<couldn't get git hash>"
return (git_branch, git_hash, git_dirty)
[docs]
def signal_logger(caller_name: str, signum=None, log=_log):
"""Lookup signal name from number and write to log. Taken from
`<https://stackoverflow.com/a/2549950>`__.
Args:
caller_name (str): Calling function name, only used in log message.
signum: Signal number of the signal we received.
log: log file
"""
if signum:
sig_lookup = {
k: v for v, k in reversed(sorted(list(signal.__dict__.items())))
if v.startswith('SIG') and not v.startswith('SIG_')
}
log.info(
"%s caught signal %s (%s)",
caller_name, sig_lookup.get(signum, 'UNKNOWN'), signum
)
else:
log.info("%s caught unknown signal.", caller_name)
def _set_excepthook(root_logger):
"""Ensure all uncaught exceptions, other than user KeyboardInterrupt, are
logged to the root logger.
See `<https://docs.python.org/3/library/sys.html#sys.excepthook>`__.
"""
def uncaught_exception_handler(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
# Skip logging for user interrupt
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
root_logger.critical(
(70*'*') + "\nUncaught exception:\n", # banner so it stands out
exc_info=(exc_type, exc_value, exc_traceback)
)
sys.excepthook = uncaught_exception_handler
def _configure_logging_dict(log_d, log_args):
"""Convert CLI flags (``--verbose``/``--quiet``) into log levels. Configure
log level and filters on console handlers in a logging config dictionary.
"""
# smaller number = more verbose
level = getattr(log_args, 'quiet', 0) - getattr(log_args, 'verbose', 0)
if level <= 1:
stderr_level = logging.WARNING
elif level == 2:
stderr_level = logging.ERROR
else:
stderr_level = logging.CRITICAL
log_d['filters'] = {
"stderr_filter": {"()": GeqLevelFilter, "level": stderr_level},
"stdout_filter": {"()": EqLevelFilter, "level": logging.INFO},
"debug_filter": {"()": LtLevelFilter, "level": logging.INFO}
}
for h in ('stderr', 'stdout', 'debug'):
if h in log_d['handlers']:
log_d['handlers'][h]['filters'] = [h+'_filter']
if 'stderr' in log_d['handlers']:
log_d['handlers']['stderr']['level'] = stderr_level
if level <= -2:
for d in log_d['handlers'].values():
d['formatter'] = 'debug'
if level == 0:
del log_d['handlers']['debug']
log_d['root']['handlers'] = ["stdout", "stderr"]
elif level == 1:
del log_d['handlers']['debug']
del log_d['handlers']['stdout']
log_d['root']['handlers'] = ["stderr"]
return log_d
[docs]
def initial_log_config():
"""Configure the root logger for logging to console and to a cache provided
by :class:`MultiFlushMemoryHandler`.
This is temporary logging configuration, used to solve the chicken-and-egg
problem of logging any problems that happen before we've set up logging
according to the user's log config files, which requires doing a full parse
of the user input. Once this is set up, any messages in the cache are sent
to the user-configured logs.
"""
logging.captureWarnings(True)
# log uncaught exceptions
root_logger = logging.getLogger()
_set_excepthook(root_logger)
log_d = {
"version": 1,
"disable_existing_loggers": True,
"root": {
"level": "NOTSET",
"handlers": ["debug", "stdout", "stderr", "cache"]
},
"handlers": {
"debug": {
"()": "src.util.logs.MDTFConsoleHandler",
"formatter": "level",
"level": logging.DEBUG,
"stream": "ext://sys.stdout"
},
"stdout": {
"()": "src.util.logs.MDTFConsoleHandler",
"formatter": "normal",
"level": logging.INFO,
"stream": "ext://sys.stdout"
},
"stderr": {
"()": "src.util.logs.MDTFConsoleHandler",
"formatter": "level",
"level": logging.WARNING,
"stream": "ext://sys.stderr"
},
"cache": {
"()": "src.util.logs.MultiFlushMemoryHandler",
"level": logging.NOTSET,
"capacity": 8*1024,
"flushOnClose": False
}
},
"formatters": {
"normal": {"format": "%(message)s"},
"level": {"format": "%(levelname)s: %(message)s"},
"debug": {
"()": HangingIndentFormatter,
"format": ('%(levelname)s in %(funcName)s() (%(filename)s line '
'%(lineno)d):\n%(message)s'),
"tabsize": 4,
"footer": "\n"
}
}
}
log_parser = argparse.ArgumentParser(add_help=False)
log_parser.add_argument('--verbose', '-v', default=0, action="count")
log_parser.add_argument('--quiet', '-q', default=0, action="count")
log_args, _ = log_parser.parse_known_args()
log_d = _configure_logging_dict(log_d, log_args)
logging.config.dictConfig(log_d)
root_logger.debug('Console loggers configured.')
[docs]
def transfer_log_cache(target_log=None, close=False):
"""Transfer the contents of the root log cache
(:class:`MultiFlushMemoryHandler`) to logs on newly-configured objects.
"""
root_logger = logging.getLogger()
cache_idx = [i for i, handler in enumerate(root_logger.handlers)
if isinstance(handler, MultiFlushMemoryHandler)]
if len(cache_idx) > 0:
temp_log_cache = root_logger.handlers[cache_idx[0]]
if target_log is not None:
try:
temp_log_cache.transfer_to_non_console(target_log)
except Exception as exc:
root_logger.debug(f"{exc} Couldn't transfer log to '%s'.", target_log.name)
if close:
# delete it
temp_log_cache.close()
root_logger.removeHandler(temp_log_cache)
[docs]
@dataclass.mdtf_dataclass
class MDTFObjectBase(metaclass=basic.MDTFABCMeta):
"""Base class providing shared functionality for the case object hierarchy, which is:
- :class:`~data_sources.DataSourceBase`s belonging to a run of the package;
- :class:`~pod_setup.PODObject`s POD belonging to a
:class:`~data_sources.DataSourceBase`;
- :class:`~varlist_util.VarlistEntry`s (requested model variables) belonging
to a :class:`~pod_setup.PodObject`.
"""
_id: basic.MDTF_ID = None
name: str = dataclass.MANDATORY
_parent: typing.Any = dc.field(default=dataclass.MANDATORY, compare=False)
status: basic.ObjectStatus = dc.field(default=basic.ObjectStatus.NOTSET, compare=False)
def __post_init__(self):
if self._id is None:
# assign unique ID # so that we don't need to rely on names being unique
self._id = basic.MDTF_ID()
# init object-level logger
self.log = MDTFObjectLogger.get_logger(self._log_name)
# the @property decorator allows us to attach code to designated attribute, such as getter and setter methods
@property
def _log_name(self):
if self._parent is None:
return OBJ_LOG_ROOT # framework: root of tree
else:
_log_name = f"{self.name}_{self._id}".replace('.', '_')
return f"{self._parent._log_name}.{_log_name}"
@property
def full_name(self):
return f"<#{self._id}:{self._parent.name}.{self.name}>"
def __hash__(self):
return self._id.__hash__()
@property
def failed(self):
return self.status == basic.ObjectStatus.FAILED
@property
def active(self):
return self.status == basic.ObjectStatus.ACTIVE
@property
@abc.abstractmethod
def _children(self):
"""Iterable of child objects associated with this object."""
pass
# This is a figurative "birth" routine that generates an object full of child objects
[docs]
def iter_children(self, child_type=None, status=None, status_neq=None):
"""Generator iterating over child objects associated with this object.
Args:
child_type: None or Type `type`; default None. If None, iterates over
all child objects regardless of their type
status: None or :class:`ObjectStatus`, default None. If None,
iterates over all child objects, regardless of status. If a
:class:`ObjectStatus` value is passed, only iterates over
child objects with that status.
status_neq: None or :class:`ObjectStatus`, default None. If set,
iterates over child objects which *don't* have the given status.
If *status* is set, this setting is ignored.
"""
iter_ = self._children
if child_type is not None: # return the iter_ elements that match a specified child_type
iter_ = filter((lambda x: isinstance(x, child_type)), iter_)
if status is not None: # return the iter_ elements that match the specified status
iter_ = filter((lambda x: x.status == status), iter_)
elif status_neq is not None: # return the iter elements that do NOT match status_neq
iter_ = filter((lambda x: x.status != status_neq), iter_)
yield from iter_
[docs]
def child_deactivation_handler(self, child, exc):
# needs to test for child_type
pass
[docs]
def child_status_update(self):
if next(self.iter_children(), None) is None:
# should never get here (no children of any status), because this
# method should only be called by children
raise ValueError(f"Children misconfigured for {self.full_name}.")
# if all children have failed, deactivate self
if (not self.failed and
next(self.iter_children(status_neq=basic.ObjectStatus.FAILED), None) is None):
self.deactivate(exceptions.ChildFailureEvent(self), level=None)
# level at which to log deactivation events
_deactivation_log_level = logging.ERROR
[docs]
def deactivate(self, exc, level=None):
# always log exceptions, even if we've already failed
self.log.store_exception(exc)
if not (self.failed or self.status == basic.ObjectStatus.SUCCEEDED):
# only need to log and update on status change for still-active objs
if level is None:
level = self._deactivation_log_level # default level for child class
self.log.log(level, "Deactivated %s due to %r.", self.full_name, exc)
# update status on self
self.status = basic.ObjectStatus.FAILED
if self._parent is not None:
# call handler on parent, which may change parent and/or siblings
self._parent.child_deactivation_handler(self, exc)
self._parent.child_status_update()