Source code for pasteur.utils.logging
""" Logging logic that stores loges in mlflow runs. """
from io import StringIO
from logging import Handler, getLevelName
from os import path
from time import time
from weakref import WeakSet
import mlflow
[docs]
class MlflowHandler(Handler):
"""
A handler class based on `StreamHandler` and `MemoryHandler`, which
buffers the records using a StringIO and dumps that StringIO every
N records into an mlflow artifact.
@Warning: currently doesn't have a size limit
"""
terminator = "\n"
_handlers: WeakSet["MlflowHandler"] = WeakSet()
[docs]
@staticmethod
def close_all():
"""Call this function to close the mlflow handlers before ending a run
if you don't plan to reuse them."""
for handler in MlflowHandler._handlers:
handler.close()
[docs]
@staticmethod
def reset_all():
"""Flushes and cleans up all handlers so that they can be used for a new run."""
for handler in MlflowHandler._handlers:
handler.reset()
[docs]
@staticmethod
def flush_all():
for handler in MlflowHandler._handlers:
handler.flush()
def __init__(
self,
name: str = "user",
logdir: str = "_logs",
interval: int | str = 5,
):
Handler.__init__(self)
self._handlers.add(self)
self._mlflow = None
self.name = name
self.logdir = logdir
self.interval = int(interval)
self.last_sent = time()
self.stream = StringIO()
[docs]
def flush(self):
self.acquire()
try:
if mlflow.active_run() is not None:
mlflow.log_text(
self.stream.getvalue(), path.join(self.logdir, f"{self.name}.log")
)
self.last_sent = time()
finally:
self.release()
[docs]
def emit(self, record):
try:
msg = self.format(record)
if ":ephemeral:" in msg:
return
stream = self.stream
# issue 35046: merged two stream.writes into one.
stream.write(msg + self.terminator)
if time() - self.last_sent > self.interval:
self.flush()
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)
[docs]
def close(self) -> None:
try:
self.flush()
finally:
super().close()
[docs]
def reset(self):
"""Flushes and removes the current buffer of the handler, allowing it
to be reused for a new run."""
self.acquire()
try:
self.flush()
self.steam = StringIO()
finally:
self.release()
def __repr__(self):
level = getLevelName(self.level)
return "<Mlflow handler %s(%s)>" % (self.name, level)
__all__ = ["MlflowHandler"]