Top

logbook_aiopipe module

This package provides a handler and subscriber for multiprocess logbook logging that runs on the asyncio event loop. It uses aiopipe to transfer log messages from the child process to the parent process.

Example

The following example shows a typical application of multiprocess logging. It results in two log messages, hello from parent process and hello from child process, being printed in some order.

from contextlib import closing
from multiprocessing import Process
import asyncio

from aiopipe import aiopipe
from logbook_aiopipe import AioPipeSubscriber, \
        AioPipeHandler
from logbook import Logger, StderrHandler

async def mainTask(eventLoop):
    # The parent process logger can be set up as normal.
    log = Logger()
    log.handlers.append(StderrHandler())

    rx, tx = aiopipe()
    sub = AioPipeSubscriber(await rx.open(eventLoop), log)

    with closing(sub):
        subTask = eventLoop.create_task(sub.run())

        with tx.send() as tx:
            proc = Process(target=childProc, args=(tx,))
            proc.start()

        log.info("hello from parent process")

        proc.join()
        await subTask

def childProc(tx):
    eventLoop = asyncio.new_event_loop()
    eventLoop.run_until_complete(childTask(eventLoop, tx))

async def childTask(eventLoop, tx):
    log = Logger()

    # The child process should use only `AioPipeHandler` as
    # its handler.
    handler = AioPipeHandler(await tx.open(eventLoop))
    log.handlers.append(handler)

    with closing(handler):
        log.info("hello from child process")

eventLoop = asyncio.get_event_loop()
eventLoop.run_until_complete(mainTask(eventLoop))
"""
This package provides a handler and subscriber for multiprocess
[`logbook`](http://logbook.readthedocs.io) logging that runs on the
[`asyncio`](https://docs.python.org/3/library/asyncio.html) event loop. It uses
[`aiopipe`](https://github.com/kchmck/aiopipe) to transfer log messages from the child
process to the parent process.

#### Example

The following example shows a typical application of multiprocess logging. It results in
two log messages, `hello from parent process` and `hello from child process`, being
printed in some order.

```python3
from contextlib import closing
from multiprocessing import Process
import asyncio

from aiopipe import aiopipe
from logbook_aiopipe import AioPipeSubscriber, \\
        AioPipeHandler
from logbook import Logger, StderrHandler

async def mainTask(eventLoop):
    # The parent process logger can be set up as normal.
    log = Logger()
    log.handlers.append(StderrHandler())

    rx, tx = aiopipe()
    sub = AioPipeSubscriber(await rx.open(eventLoop), log)

    with closing(sub):
        subTask = eventLoop.create_task(sub.run())

        with tx.send() as tx:
            proc = Process(target=childProc, args=(tx,))
            proc.start()

        log.info("hello from parent process")

        proc.join()
        await subTask

def childProc(tx):
    eventLoop = asyncio.new_event_loop()
    eventLoop.run_until_complete(childTask(eventLoop, tx))

async def childTask(eventLoop, tx):
    log = Logger()

    # The child process should use only `AioPipeHandler` as
    # its handler.
    handler = AioPipeHandler(await tx.open(eventLoop))
    log.handlers.append(handler)

    with closing(handler):
        log.info("hello from child process")

eventLoop = asyncio.get_event_loop()
eventLoop.run_until_complete(mainTask(eventLoop))
```
"""

from asyncio import IncompleteReadError
import json

from logbook import Handler, LogRecord

class AioPipeHandler(Handler):
    """
    Forwards log messages in a child process to the parent process.

    This should be pushed on the stack or added to a `Logger` in the [typical
    manner](https://logbook.readthedocs.io/en/stable/quickstart.html#registering-handlers).
    """

    def __init__(self, tx, *args, **kwargs):
        """
        Create a new `AioPipeHandler` that forwards log messages over the given pipe
        transmit end. The other arguments are passed to
        [`logbook.Handler`](https://logbook.readthedocs.io/en/stable/api/handlers.html#logbook.Handler).
        This object takes ownership of `tx`.

        This handler should be attached to a `logbook.Logger` instance.
        """

        super().__init__(*args, **kwargs)

        self._tx = tx

    def emit(self, record):
        self._tx.write(json.dumps(record.to_dict(json_safe=True)).encode())
        self._tx.write(b"\n")

    def close(self):
        self._tx.close()

class AioPipeSubscriber:
    """
    Receives log messages in the parent process and emits them to a
    [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger)
    instance.
    """

    def __init__(self, rx, logger):
        """
        Create a new `AioPipeSubscriber` to listen for messages on the given pipe receive
        end and emit them to the given
        [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger)
        instance. This object takes ownership of `rx`.
        """

        self._rx = rx
        self._logger = logger

    async def run(self):
        """
        Run the subcribing task to continuously receive and emit log messages.

        This can be ran in the background of the event loop using
        [`create_task`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task).
        """

        while True:
            try:
                msg = await self._rx.readuntil(b"\n")
            except IncompleteReadError as e:
                if e.expected is None:
                    break

                raise

            self._logger.handle(LogRecord.from_dict(json.loads(msg.decode())))

    def close(self):
        """
        Close the subscriber and receiving pipe.
        """

        self._rx._transport.close()

Classes

class AioPipeHandler

Forwards log messages in a child process to the parent process.

This should be pushed on the stack or added to a Logger in the typical manner.

class AioPipeHandler(Handler):
    """
    Forwards log messages in a child process to the parent process.

    This should be pushed on the stack or added to a `Logger` in the [typical
    manner](https://logbook.readthedocs.io/en/stable/quickstart.html#registering-handlers).
    """

    def __init__(self, tx, *args, **kwargs):
        """
        Create a new `AioPipeHandler` that forwards log messages over the given pipe
        transmit end. The other arguments are passed to
        [`logbook.Handler`](https://logbook.readthedocs.io/en/stable/api/handlers.html#logbook.Handler).
        This object takes ownership of `tx`.

        This handler should be attached to a `logbook.Logger` instance.
        """

        super().__init__(*args, **kwargs)

        self._tx = tx

    def emit(self, record):
        self._tx.write(json.dumps(record.to_dict(json_safe=True)).encode())
        self._tx.write(b"\n")

    def close(self):
        self._tx.close()

Ancestors (in MRO)

  • AioPipeHandler
  • logbook.handlers.Handler
  • logbook.helpers._WithMetaclassBase
  • logbook.base.ContextObject
  • logbook._fallback.StackedObject
  • builtins.object

Class variables

Static methods

def __init__(

self, tx, *args, **kwargs)

Create a new AioPipeHandler that forwards log messages over the given pipe transmit end. The other arguments are passed to logbook.Handler. This object takes ownership of tx.

This handler should be attached to a logbook.Logger instance.

def __init__(self, tx, *args, **kwargs):
    """
    Create a new `AioPipeHandler` that forwards log messages over the given pipe
    transmit end. The other arguments are passed to
    [`logbook.Handler`](https://logbook.readthedocs.io/en/stable/api/handlers.html#logbook.Handler).
    This object takes ownership of `tx`.
    This handler should be attached to a `logbook.Logger` instance.
    """
    super().__init__(*args, **kwargs)
    self._tx = tx

def applicationbound(

self, _cls=<class 'logbook._fallback._StackBound'>)

Can be used in combination with the with statement to execute code while the object is bound to the application.

def applicationbound(self, _cls=_StackBound):
    """Can be used in combination with the `with` statement to
    execute code while the object is bound to the application.
    """
    return _cls(self, self.push_application, self.pop_application)

def close(

self)

Tidy up any resources used by the handler. This is automatically called by the destructor of the class as well, but explicit calls are encouraged. Make sure that multiple calls to close are possible.

def close(self):
    self._tx.close()

def emit(

self, record)

Emit the specified logging record. This should take the record and deliver it to whereever the handler sends formatted log records.

def emit(self, record):
    self._tx.write(json.dumps(record.to_dict(json_safe=True)).encode())
    self._tx.write(b"\n")

def emit_batch(

self, records, reason)

Some handlers may internally queue up records and want to forward them at once to another handler. For example the :class:~logbook.FingersCrossedHandler internally buffers records until a level threshold is reached in which case the buffer is sent to this method and not :meth:emit for each record.

The default behaviour is to call :meth:emit for each record in the buffer, but handlers can use this to optimize log handling. For instance the mail handler will try to batch up items into one mail and not to emit mails for each record in the buffer.

Note that unlike :meth:emit there is no wrapper method like :meth:handle that does error handling. The reason is that this is intended to be used by other handlers which are already protected against internal breakage.

reason is a string that specifies the rason why :meth:emit_batch was called, and not :meth:emit. The following are valid values:

'buffer' Records were buffered for performance reasons or because the records were sent to another process and buffering was the only possible way. For most handlers this should be equivalent to calling :meth:emit for each record.

'escalation' Escalation means that records were buffered in case the threshold was exceeded. In this case, the last record in the iterable is the record that triggered the call.

'group' All the records in the iterable belong to the same logical component and happened in the same process. For example there was a long running computation and the handler is invoked with a bunch of records that happened there. This is similar to the escalation reason, just that the first one is the significant one, not the last.

If a subclass overrides this and does not want to handle a specific reason it must call into the superclass because more reasons might appear in future releases.

Example implementation::

def emit_batch(self, records, reason):
    if reason not in ('escalation', 'group'):
        Handler.emit_batch(self, records, reason)
    ...
def emit_batch(self, records, reason):
    """Some handlers may internally queue up records and want to forward
    them at once to another handler.  For example the
    :class:`~logbook.FingersCrossedHandler` internally buffers
    records until a level threshold is reached in which case the buffer
    is sent to this method and not :meth:`emit` for each record.
    The default behaviour is to call :meth:`emit` for each record in
    the buffer, but handlers can use this to optimize log handling.  For
    instance the mail handler will try to batch up items into one mail
    and not to emit mails for each record in the buffer.
    Note that unlike :meth:`emit` there is no wrapper method like
    :meth:`handle` that does error handling.  The reason is that this
    is intended to be used by other handlers which are already protected
    against internal breakage.
    `reason` is a string that specifies the rason why :meth:`emit_batch`
    was called, and not :meth:`emit`.  The following are valid values:
    ``'buffer'``
        Records were buffered for performance reasons or because the
        records were sent to another process and buffering was the only
        possible way.  For most handlers this should be equivalent to
        calling :meth:`emit` for each record.
    ``'escalation'``
        Escalation means that records were buffered in case the threshold
        was exceeded.  In this case, the last record in the iterable is the
        record that triggered the call.
    ``'group'``
        All the records in the iterable belong to the same logical
        component and happened in the same process.  For example there was
        a long running computation and the handler is invoked with a bunch
        of records that happened there.  This is similar to the escalation
        reason, just that the first one is the significant one, not the
        last.
    If a subclass overrides this and does not want to handle a specific
    reason it must call into the superclass because more reasons might
    appear in future releases.
    Example implementation::
        def emit_batch(self, records, reason):
            if reason not in ('escalation', 'group'):
                Handler.emit_batch(self, records, reason)
            ...
    """
    for record in records:
        self.emit(record)

def format(

self, record)

Formats a record with the given formatter. If no formatter is set, the record message is returned. Generally speaking the return value is most likely a unicode string, but nothing in the handler interface requires a formatter to return a unicode string.

The combination of a handler and formatter might have the formatter return an XML element tree for example.

def format(self, record):
    """Formats a record with the given formatter.  If no formatter
    is set, the record message is returned.  Generally speaking the
    return value is most likely a unicode string, but nothing in
    the handler interface requires a formatter to return a unicode
    string.
    The combination of a handler and formatter might have the
    formatter return an XML element tree for example.
    """
    if self.formatter is None:
        return record.message
    return self.formatter(record, self)

def greenletbound(

self, _cls=<class 'logbook._fallback._StackBound'>)

Can be used in combination with the with statement to execute code while the object is bound to the greenlet.

def greenletbound(self, _cls=_StackBound):
    """Can be used in combination with the `with` statement to
    execute code while the object is bound to the greenlet.
    """
    return _cls(self, self.push_greenlet, self.pop_greenlet)

def handle(

self, record)

Emits the record and falls back. It tries to :meth:emit the record and if that fails, it will call into :meth:handle_error with the record and traceback. This function itself will always emit when called, even if the logger level is higher than the record's level.

If this method returns False it signals to the calling function that no recording took place in which case it will automatically bubble. This should not be used to signal error situations. The default implementation always returns True.

def handle(self, record):
    """Emits the record and falls back.  It tries to :meth:`emit` the
    record and if that fails, it will call into :meth:`handle_error` with
    the record and traceback.  This function itself will always emit
    when called, even if the logger level is higher than the record's
    level.
    If this method returns `False` it signals to the calling function that
    no recording took place in which case it will automatically bubble.
    This should not be used to signal error situations.  The default
    implementation always returns `True`.
    """
    try:
        self.emit(record)
    except Exception:
        self.handle_error(record, sys.exc_info())
    return True

def handle_error(

self, record, exc_info)

Handle errors which occur during an emit() call. The behaviour of this function depends on the current errors setting.

Check :class:Flags for more information.

def handle_error(self, record, exc_info):
    """Handle errors which occur during an emit() call.  The behaviour of
    this function depends on the current `errors` setting.
    Check :class:`Flags` for more information.
    """
    try:
        behaviour = Flags.get_flag('errors', 'print')
        if behaviour == 'raise':
            reraise(exc_info[0], exc_info[1], exc_info[2])
        elif behaviour == 'print':
            traceback.print_exception(*(exc_info + (None, sys.stderr)))
            sys.stderr.write('Logged from file %s, line %s\n' % (
                             record.filename, record.lineno))
    except IOError:
        pass

def pop_application(

self)

Pops the context object from the stack.

def pop_application(self):
    """Pops the context object from the stack."""
    popped = self.stack_manager.pop_application()
    assert popped is self, 'popped unexpected object'

def pop_greenlet(

self)

Pops the context object from the stack.

def pop_greenlet(self):
    """Pops the context object from the stack."""
    popped = self.stack_manager.pop_greenlet()
    assert popped is self, 'popped unexpected object'

def pop_thread(

self)

Pops the context object from the stack.

def pop_thread(self):
    """Pops the context object from the stack."""
    popped = self.stack_manager.pop_thread()
    assert popped is self, 'popped unexpected object'

def push_application(

self)

Pushes the context object to the application stack.

def push_application(self):
    """Pushes the context object to the application stack."""
    self.stack_manager.push_application(self)

def push_greenlet(

self)

Pushes the context object to the greenlet stack.

def push_greenlet(self):
    """Pushes the context object to the greenlet stack."""
    self.stack_manager.push_greenlet(self)

def push_thread(

self)

Pushes the context object to the thread stack.

def push_thread(self):
    """Pushes the context object to the thread stack."""
    self.stack_manager.push_thread(self)

def should_handle(

self, record)

Returns True if this handler wants to handle the record. The default implementation checks the level.

def should_handle(self, record):
    """Returns `True` if this handler wants to handle the record.  The
    default implementation checks the level.
    """
    return record.level >= self.level

def threadbound(

self, _cls=<class 'logbook._fallback._StackBound'>)

Can be used in combination with the with statement to execute code while the object is bound to the thread.

def threadbound(self, _cls=_StackBound):
    """Can be used in combination with the `with` statement to
    execute code while the object is bound to the thread.
    """
    return _cls(self, self.push_thread, self.pop_thread)

Instance variables

var level_name

The level as unicode string

class AioPipeSubscriber

Receives log messages in the parent process and emits them to a Logger instance.

class AioPipeSubscriber:
    """
    Receives log messages in the parent process and emits them to a
    [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger)
    instance.
    """

    def __init__(self, rx, logger):
        """
        Create a new `AioPipeSubscriber` to listen for messages on the given pipe receive
        end and emit them to the given
        [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger)
        instance. This object takes ownership of `rx`.
        """

        self._rx = rx
        self._logger = logger

    async def run(self):
        """
        Run the subcribing task to continuously receive and emit log messages.

        This can be ran in the background of the event loop using
        [`create_task`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task).
        """

        while True:
            try:
                msg = await self._rx.readuntil(b"\n")
            except IncompleteReadError as e:
                if e.expected is None:
                    break

                raise

            self._logger.handle(LogRecord.from_dict(json.loads(msg.decode())))

    def close(self):
        """
        Close the subscriber and receiving pipe.
        """

        self._rx._transport.close()

Ancestors (in MRO)

Static methods

def __init__(

self, rx, logger)

Create a new AioPipeSubscriber to listen for messages on the given pipe receive end and emit them to the given Logger instance. This object takes ownership of rx.

def __init__(self, rx, logger):
    """
    Create a new `AioPipeSubscriber` to listen for messages on the given pipe receive
    end and emit them to the given
    [`Logger`](https://logbook.readthedocs.io/en/stable/api/base.html#logbook.Logger)
    instance. This object takes ownership of `rx`.
    """
    self._rx = rx
    self._logger = logger

def close(

self)

Close the subscriber and receiving pipe.

def close(self):
    """
    Close the subscriber and receiving pipe.
    """
    self._rx._transport.close()

async def run(

self)

Run the subcribing task to continuously receive and emit log messages.

This can be ran in the background of the event loop using create_task.

async def run(self):
    """
    Run the subcribing task to continuously receive and emit log messages.
    This can be ran in the background of the event loop using
    [`create_task`](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task).
    """
    while True:
        try:
            msg = await self._rx.readuntil(b"\n")
        except IncompleteReadError as e:
            if e.expected is None:
                break
            raise
        self._logger.handle(LogRecord.from_dict(json.loads(msg.decode())))