logbook_aiopipe module
This package provides a handler and subscriber for multiprocess
logbook
logging that runs on the
asyncio
event loop. It uses
aiopipe
to transfer log messages from the child
process to the parent process.
Example
The following example shows a typical application of multiprocess logging. It results in
two log messages, hello from parent process
and hello from child process
, being
printed in some order.
from contextlib import closing from multiprocessing import Process import asyncio from aiopipe import aiopipe from logbook_aiopipe import AioPipeSubscriber, \ AioPipeHandler from logbook import Logger, StderrHandler async def mainTask(eventLoop): # The parent process logger can be set up as normal. log = Logger() log.handlers.append(StderrHandler()) rx, tx = aiopipe() sub = AioPipeSubscriber(await rx.open(eventLoop), log) with closing(sub): subTask = eventLoop.create_task(sub.run()) with tx.send() as tx: proc = Process(target=childProc, args=(tx,)) proc.start() log.info("hello from parent process") proc.join() await subTask def childProc(tx): eventLoop = asyncio.new_event_loop() eventLoop.run_until_complete(childTask(eventLoop, tx)) async def childTask(eventLoop, tx): log = Logger() # The child process should use only `AioPipeHandler` as # its handler. handler = AioPipeHandler(await tx.open(eventLoop)) log.handlers.append(handler) with closing(handler): log.info("hello from child process") eventLoop = asyncio.get_event_loop() eventLoop.run_until_complete(mainTask(eventLoop))
""" This package provides a handler and subscriber for multiprocess [`logbook`](http://logbook.readthedocs.io) logging that runs on the [`asyncio`](https://docs.python.org/3/library/asyncio.html) event loop. It uses [`aiopipe`](https://github.com/kchmck/aiopipe) to transfer log messages from the child process to the parent process. #### Example The following example shows a typical application of multiprocess logging. It results in two log messages, `hello from parent process` and `hello from child process`, being printed in some order. ```python3 from contextlib import closing from multiprocessing import Process import asyncio from aiopipe import aiopipe from logbook_aiopipe import AioPipeSubscriber, \\ AioPipeHandler from logbook import Logger, StderrHandler async def mainTask(eventLoop): # The parent process logger can be set up as normal. log = Logger() log.handlers.append(StderrHandler()) rx, tx = aiopipe() sub = AioPipeSubscriber(await rx.open(eventLoop), log) with closing(sub): subTask = eventLoop.create_task(sub.run()) with tx.send() as tx: proc = Process(target=childProc, args=(tx,)) proc.start() log.info("hello from parent process") proc.join() await subTask def childProc(tx): eventLoop = asyncio.new_event_loop() eventLoop.run_until_complete(childTask(eventLoop, tx)) async def childTask(eventLoop, tx): log = Logger() # The child process should use only `AioPipeHandler` as # its handler. handler = AioPipeHandler(await tx.open(eventLoop)) log.handlers.append(handler) with closing(handler): log.info("hello from child process") eventLoop = asyncio.get_event_loop() eventLoop.run_until_complete(mainTask(eventLoop)) ``` """ from asyncio import IncompleteReadError import json from logbook import Handler, LogRecord class AioPipeHandler(Handler): """ Forwards log messages in a child process to the parent process. This should be pushed on the stack or added to a `Logger` in the [typical manner](https://logbook.readthedocs.io/en/stable/quickstart.html#registering-handlers). """ def __init__(self, tx, *args, **kwargs): """ Create a new `AioPipeHandler` that forwards log messages over the given pipe transmit end. The other arguments are passed to [`logbook.Handler`](https://logbook.readthedocs.io/en/stable/api/handlers.html#logbook.Handler). This object takes ownership of `tx`. This handler should be attached to a `logbook.Logger` instance. """ super().__init__(*args, **kwargs) self._tx = tx def emit(self, record): self._tx.write(json.dumps(record.to_dict(json_safe=True)).encode()) self._tx.write(b"\n") def close(self): self._tx.close() class AioPipeSubscriber: """ Receives log messages in the parent process and emits them to a [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger) instance. """ def __init__(self, rx, logger): """ Create a new `AioPipeSubscriber` to listen for messages on the given pipe receive end and emit them to the given [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger) instance. This object takes ownership of `rx`. """ self._rx = rx self._logger = logger async def run(self): """ Run the subcribing task to continuously receive and emit log messages. This can be ran in the background of the event loop using [`create_task`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task). """ while True: try: msg = await self._rx.readuntil(b"\n") except IncompleteReadError as e: if e.expected is None: break raise self._logger.handle(LogRecord.from_dict(json.loads(msg.decode()))) def close(self): """ Close the subscriber and receiving pipe. """ self._rx._transport.close()
Classes
Forwards log messages in a child process to the parent process.
This should be pushed on the stack or added to a Logger
in the typical
manner.
class AioPipeHandler(Handler): """ Forwards log messages in a child process to the parent process. This should be pushed on the stack or added to a `Logger` in the [typical manner](https://logbook.readthedocs.io/en/stable/quickstart.html#registering-handlers). """ def __init__(self, tx, *args, **kwargs): """ Create a new `AioPipeHandler` that forwards log messages over the given pipe transmit end. The other arguments are passed to [`logbook.Handler`](https://logbook.readthedocs.io/en/stable/api/handlers.html#logbook.Handler). This object takes ownership of `tx`. This handler should be attached to a `logbook.Logger` instance. """ super().__init__(*args, **kwargs) self._tx = tx def emit(self, record): self._tx.write(json.dumps(record.to_dict(json_safe=True)).encode()) self._tx.write(b"\n") def close(self): self._tx.close()
Ancestors (in MRO)
- AioPipeHandler
- logbook.handlers.Handler
- logbook.helpers._WithMetaclassBase
- logbook.base.ContextObject
- logbook._fallback.StackedObject
- builtins.object
Class variables
Static methods
def __init__(
self, tx, *args, **kwargs)
Create a new AioPipeHandler
that forwards log messages over the given pipe
transmit end. The other arguments are passed to
logbook.Handler
.
This object takes ownership of tx
.
This handler should be attached to a logbook.Logger
instance.
def __init__(self, tx, *args, **kwargs): """ Create a new `AioPipeHandler` that forwards log messages over the given pipe transmit end. The other arguments are passed to [`logbook.Handler`](https://logbook.readthedocs.io/en/stable/api/handlers.html#logbook.Handler). This object takes ownership of `tx`. This handler should be attached to a `logbook.Logger` instance. """ super().__init__(*args, **kwargs) self._tx = tx
def applicationbound(
self, _cls=<class 'logbook._fallback._StackBound'>)
Can be used in combination with the with
statement to
execute code while the object is bound to the application.
def applicationbound(self, _cls=_StackBound): """Can be used in combination with the `with` statement to execute code while the object is bound to the application. """ return _cls(self, self.push_application, self.pop_application)
def close(
self)
Tidy up any resources used by the handler. This is automatically called by the destructor of the class as well, but explicit calls are encouraged. Make sure that multiple calls to close are possible.
def close(self): self._tx.close()
def emit(
self, record)
Emit the specified logging record. This should take the record and deliver it to whereever the handler sends formatted log records.
def emit(self, record): self._tx.write(json.dumps(record.to_dict(json_safe=True)).encode()) self._tx.write(b"\n")
def emit_batch(
self, records, reason)
Some handlers may internally queue up records and want to forward
them at once to another handler. For example the
:class:~logbook.FingersCrossedHandler
internally buffers
records until a level threshold is reached in which case the buffer
is sent to this method and not :meth:emit
for each record.
The default behaviour is to call :meth:emit
for each record in
the buffer, but handlers can use this to optimize log handling. For
instance the mail handler will try to batch up items into one mail
and not to emit mails for each record in the buffer.
Note that unlike :meth:emit
there is no wrapper method like
:meth:handle
that does error handling. The reason is that this
is intended to be used by other handlers which are already protected
against internal breakage.
reason
is a string that specifies the rason why :meth:emit_batch
was called, and not :meth:emit
. The following are valid values:
'buffer'
Records were buffered for performance reasons or because the
records were sent to another process and buffering was the only
possible way. For most handlers this should be equivalent to
calling :meth:emit
for each record.
'escalation'
Escalation means that records were buffered in case the threshold
was exceeded. In this case, the last record in the iterable is the
record that triggered the call.
'group'
All the records in the iterable belong to the same logical
component and happened in the same process. For example there was
a long running computation and the handler is invoked with a bunch
of records that happened there. This is similar to the escalation
reason, just that the first one is the significant one, not the
last.
If a subclass overrides this and does not want to handle a specific reason it must call into the superclass because more reasons might appear in future releases.
Example implementation::
def emit_batch(self, records, reason): if reason not in ('escalation', 'group'): Handler.emit_batch(self, records, reason) ...
def emit_batch(self, records, reason): """Some handlers may internally queue up records and want to forward them at once to another handler. For example the :class:`~logbook.FingersCrossedHandler` internally buffers records until a level threshold is reached in which case the buffer is sent to this method and not :meth:`emit` for each record. The default behaviour is to call :meth:`emit` for each record in the buffer, but handlers can use this to optimize log handling. For instance the mail handler will try to batch up items into one mail and not to emit mails for each record in the buffer. Note that unlike :meth:`emit` there is no wrapper method like :meth:`handle` that does error handling. The reason is that this is intended to be used by other handlers which are already protected against internal breakage. `reason` is a string that specifies the rason why :meth:`emit_batch` was called, and not :meth:`emit`. The following are valid values: ``'buffer'`` Records were buffered for performance reasons or because the records were sent to another process and buffering was the only possible way. For most handlers this should be equivalent to calling :meth:`emit` for each record. ``'escalation'`` Escalation means that records were buffered in case the threshold was exceeded. In this case, the last record in the iterable is the record that triggered the call. ``'group'`` All the records in the iterable belong to the same logical component and happened in the same process. For example there was a long running computation and the handler is invoked with a bunch of records that happened there. This is similar to the escalation reason, just that the first one is the significant one, not the last. If a subclass overrides this and does not want to handle a specific reason it must call into the superclass because more reasons might appear in future releases. Example implementation:: def emit_batch(self, records, reason): if reason not in ('escalation', 'group'): Handler.emit_batch(self, records, reason) ... """ for record in records: self.emit(record)
def format(
self, record)
Formats a record with the given formatter. If no formatter is set, the record message is returned. Generally speaking the return value is most likely a unicode string, but nothing in the handler interface requires a formatter to return a unicode string.
The combination of a handler and formatter might have the formatter return an XML element tree for example.
def format(self, record): """Formats a record with the given formatter. If no formatter is set, the record message is returned. Generally speaking the return value is most likely a unicode string, but nothing in the handler interface requires a formatter to return a unicode string. The combination of a handler and formatter might have the formatter return an XML element tree for example. """ if self.formatter is None: return record.message return self.formatter(record, self)
def greenletbound(
self, _cls=<class 'logbook._fallback._StackBound'>)
Can be used in combination with the with
statement to
execute code while the object is bound to the greenlet.
def greenletbound(self, _cls=_StackBound): """Can be used in combination with the `with` statement to execute code while the object is bound to the greenlet. """ return _cls(self, self.push_greenlet, self.pop_greenlet)
def handle(
self, record)
Emits the record and falls back. It tries to :meth:emit
the
record and if that fails, it will call into :meth:handle_error
with
the record and traceback. This function itself will always emit
when called, even if the logger level is higher than the record's
level.
If this method returns False
it signals to the calling function that
no recording took place in which case it will automatically bubble.
This should not be used to signal error situations. The default
implementation always returns True
.
def handle(self, record): """Emits the record and falls back. It tries to :meth:`emit` the record and if that fails, it will call into :meth:`handle_error` with the record and traceback. This function itself will always emit when called, even if the logger level is higher than the record's level. If this method returns `False` it signals to the calling function that no recording took place in which case it will automatically bubble. This should not be used to signal error situations. The default implementation always returns `True`. """ try: self.emit(record) except Exception: self.handle_error(record, sys.exc_info()) return True
def handle_error(
self, record, exc_info)
Handle errors which occur during an emit() call. The behaviour of
this function depends on the current errors
setting.
Check :class:Flags
for more information.
def handle_error(self, record, exc_info): """Handle errors which occur during an emit() call. The behaviour of this function depends on the current `errors` setting. Check :class:`Flags` for more information. """ try: behaviour = Flags.get_flag('errors', 'print') if behaviour == 'raise': reraise(exc_info[0], exc_info[1], exc_info[2]) elif behaviour == 'print': traceback.print_exception(*(exc_info + (None, sys.stderr))) sys.stderr.write('Logged from file %s, line %s\n' % ( record.filename, record.lineno)) except IOError: pass
def pop_application(
self)
Pops the context object from the stack.
def pop_application(self): """Pops the context object from the stack.""" popped = self.stack_manager.pop_application() assert popped is self, 'popped unexpected object'
def pop_greenlet(
self)
Pops the context object from the stack.
def pop_greenlet(self): """Pops the context object from the stack.""" popped = self.stack_manager.pop_greenlet() assert popped is self, 'popped unexpected object'
def pop_thread(
self)
Pops the context object from the stack.
def pop_thread(self): """Pops the context object from the stack.""" popped = self.stack_manager.pop_thread() assert popped is self, 'popped unexpected object'
def push_application(
self)
Pushes the context object to the application stack.
def push_application(self): """Pushes the context object to the application stack.""" self.stack_manager.push_application(self)
def push_greenlet(
self)
Pushes the context object to the greenlet stack.
def push_greenlet(self): """Pushes the context object to the greenlet stack.""" self.stack_manager.push_greenlet(self)
def push_thread(
self)
Pushes the context object to the thread stack.
def push_thread(self): """Pushes the context object to the thread stack.""" self.stack_manager.push_thread(self)
def should_handle(
self, record)
Returns True
if this handler wants to handle the record. The
default implementation checks the level.
def should_handle(self, record): """Returns `True` if this handler wants to handle the record. The default implementation checks the level. """ return record.level >= self.level
def threadbound(
self, _cls=<class 'logbook._fallback._StackBound'>)
Can be used in combination with the with
statement to
execute code while the object is bound to the thread.
def threadbound(self, _cls=_StackBound): """Can be used in combination with the `with` statement to execute code while the object is bound to the thread. """ return _cls(self, self.push_thread, self.pop_thread)
Instance variables
Receives log messages in the parent process and emits them to a
Logger
instance.
class AioPipeSubscriber: """ Receives log messages in the parent process and emits them to a [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger) instance. """ def __init__(self, rx, logger): """ Create a new `AioPipeSubscriber` to listen for messages on the given pipe receive end and emit them to the given [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger) instance. This object takes ownership of `rx`. """ self._rx = rx self._logger = logger async def run(self): """ Run the subcribing task to continuously receive and emit log messages. This can be ran in the background of the event loop using [`create_task`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task). """ while True: try: msg = await self._rx.readuntil(b"\n") except IncompleteReadError as e: if e.expected is None: break raise self._logger.handle(LogRecord.from_dict(json.loads(msg.decode()))) def close(self): """ Close the subscriber and receiving pipe. """ self._rx._transport.close()
Ancestors (in MRO)
- AioPipeSubscriber
- builtins.object
Static methods
def __init__(
self, rx, logger)
Create a new AioPipeSubscriber
to listen for messages on the given pipe receive
end and emit them to the given
Logger
instance. This object takes ownership of rx
.
def __init__(self, rx, logger): """ Create a new `AioPipeSubscriber` to listen for messages on the given pipe receive end and emit them to the given [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger) instance. This object takes ownership of `rx`. """ self._rx = rx self._logger = logger
def close(
self)
Close the subscriber and receiving pipe.
def close(self): """ Close the subscriber and receiving pipe. """ self._rx._transport.close()
async def run(
self)
Run the subcribing task to continuously receive and emit log messages.
This can be ran in the background of the event loop using
create_task
.
async def run(self): """ Run the subcribing task to continuously receive and emit log messages. This can be ran in the background of the event loop using [`create_task`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task). """ while True: try: msg = await self._rx.readuntil(b"\n") except IncompleteReadError as e: if e.expected is None: break raise self._logger.handle(LogRecord.from_dict(json.loads(msg.decode())))