Module aiopipe

This package wraps the os.pipe simplex communication pipe so it can be used as part of the non-blocking asyncio event loop. A duplex pipe is also provided, which allows reading and writing on both ends.

Simplex example

The following example opens a pipe with the write end in the child process and the read end in the parent process.

>>> from multiprocessing import Process
>>> import asyncio
>>>
>>> from aiopipe import aiopipe
>>>
>>> async def main():
...     rx, tx = aiopipe()
...
...     with tx.detach() as tx:
...         proc = Process(target=childproc, args=(tx,))
...         proc.start()
...
...     # The write end is now available in the child process
...     # and detached from the parent process.
...
...     async with rx.open() as rx:
...         msg = await rx.readline()
...
...     proc.join()
...     return msg
>>>
>>> def childproc(tx):
...     asyncio.run(childtask(tx))
>>>
>>> async def childtask(tx):
...     async with tx.open() as tx:
...         tx.write(b"hi from the child process\n")
>>>
>>> asyncio.run(main())
b'hi from the child process\n'
>>>

Duplex example

The following example shows a parent and child process sharing a duplex pipe to exchange messages.

>>> from multiprocessing import Process
>>> import asyncio
>>>
>>> from aiopipe import aioduplex
>>>
>>> async def main():
...     mainpipe, chpipe = aioduplex()
...
...     with chpipe.detach() as chpipe:
...         proc = Process(target=childproc, args=(chpipe,))
...         proc.start()
...
...     # The second pipe is now available in the child process
...     # and detached from the parent process.
...
...     async with mainpipe.open() as (rx, tx):
...         req = await rx.read(5)
...         tx.write(req + b" world\n")
...         msg = await rx.readline()
...
...     proc.join()
...     return msg
>>>
>>> def childproc(pipe):
...     asyncio.run(childtask(pipe))
>>>
>>> async def childtask(pipe):
...     async with pipe.open() as (rx, tx):
...         tx.write(b"hello")
...         rep = await rx.readline()
...         tx.write(rep.upper())
>>>
>>> asyncio.run(main())
b'HELLO WORLD\n'
>>>
Source

Functions

Create a new duplex multiprocess communication pipe.

Both returned pipes can write to and read from the other.

Source

Create a new simplex multiprocess communication pipe.

Return the read end and write end, respectively.

Source

Classes

class AioDuplex

Represents one end of a duplex pipe.

Source

Ancestors

  • builtins.object

Methods

def detach
(
self)
AbstractContextManager[AioDuplex]

Detach this end of the duplex pipe from the current process in preparation for use in a child process.

This returns a context manager, which must be used as part of a with context. When the context is entered, the pipe is prepared for inheritance by the child process and returned as the context variable. When the context is exited, the stream is closed in the parent process.

Source
def open
(
self)
AbstractAsyncContextManager[Tuple[StreamReader, StreamWriter]]

Open this end of the duplex pipe on the current event loop.

This returns an async context manager, which must be used as part of an async with context. When the context is entered, the pipe is opened and the underlying StreamReader and StreamWriter are returned as the context variable. When the context is exited, the pipe is closed.

Source
class AioPipeReader

The read end of a pipe.

Source

Ancestors

Methods

def detach
(
self)
AbstractContextManager[AioPipeStream]

AioPipeStream.detach

Detach this end of the pipe from the current process in preparation for use in a child process.

This returns a context manager, which must be used as part of a with context. When the context is entered, the stream is prepared for inheritance by the child process and returned as the context variable. When the context is exited, the stream is closed in the parent process.

Source
def open
(
self)

Open the receive end on the current event loop.

This returns an async context manager, which must be used as part of an async with context. When the context is entered, the receive end is opened and an instance of StreamReader is returned as the context variable. When the context is exited, the receive end is closed.

Source
class AioPipeStream

Abstract class for pipe readers and writers.

Source

Ancestors

  • builtins.object

Methods

def detach
(
self)
AbstractContextManager[AioPipeStream]

Detach this end of the pipe from the current process in preparation for use in a child process.

This returns a context manager, which must be used as part of a with context. When the context is entered, the stream is prepared for inheritance by the child process and returned as the context variable. When the context is exited, the stream is closed in the parent process.

Source
class AioPipeWriter

The write end of a pipe.

Source

Ancestors

Methods

def detach
(
self)
AbstractContextManager[AioPipeStream]

AioPipeStream.detach

Detach this end of the pipe from the current process in preparation for use in a child process.

This returns a context manager, which must be used as part of a with context. When the context is entered, the stream is prepared for inheritance by the child process and returned as the context variable. When the context is exited, the stream is closed in the parent process.

Source
def open
(
self)

Open the transmit end on the current event loop.

This returns an async context manager, which must be used as part of an async with context. When the context is entered, the transmit end is opened and an instance of StreamWriter is returned as the context variable. When the context is exited, the transmit end is closed.

Source