Module pedantic.decorators.fn_deco_in_subprocess

Expand source code
import asyncio
import inspect
from functools import wraps
from typing import Callable, TypeVar, Any, Awaitable, Optional, Type, Union

try:
    from multiprocess import Process, Pipe
    from multiprocess.connection import Connection
except ImportError:
    Process: Optional[Type] = None
    Pipe: Optional[Type] = None
    Connection: Optional[Type] = None

T = TypeVar('T')


class SubprocessError:
    """ Is returned by the subprocess if an error occurs in the subprocess. """

    def __init__(self, ex: Exception) -> None:
        self.exception = ex


def in_subprocess(func: Callable[..., Union[T, Awaitable[T]]]) -> Callable[..., Awaitable[T]]:
    """
        Executes the decorated function in a subprocess and returns the return value of it.
        Note that the decorated function will be replaced with an async function which returns
        a coroutine that needs to be awaited.
        This purpose of this is doing long-taking calculations without blocking the main thread
        of your application synchronously. That ensures that other asyncio.Tasks can work without any problem
        at the same time.

        Example:
            >>> import time
            >>> import asyncio
            >>> @in_subprocess
            ... def f(value: int) -> int:
            ...     time.sleep(0.1)  # a long taking synchronous blocking calculation
            ...     return 2 * value
            >>> asyncio.run(f(value=42))
            84
    """

    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> T:
        return await calculate_in_subprocess(func, *args, **kwargs)

    return wrapper


async def calculate_in_subprocess(func: Callable[..., Union[T, Awaitable[T]]], *args: Any, **kwargs: Any) -> T:
    """
        Calculates the result of a synchronous function in subprocess without blocking the current thread.

        Arguments:
            func: The function that will be called in a subprocess.
            args: Positional arguments that will be passed to the function.
            kwargs: Keyword arguments that will be passed to the function.

        Returns:
             The calculated result of the function "func".

        Raises:
            Any Exception that is raised inside [func].

        Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9

        Example:
            >>> import time
            >>> import asyncio
            >>> def f(value: int) -> int:
            ...     time.sleep(0.1)  # a long taking synchronous blocking calculation
            ...     return 2 * value
            >>> asyncio.run(calculate_in_subprocess(func=f, value=42))
            84
    """

    if Pipe is None:
        raise ImportError('You need to install the multiprocess package to use this: pip install multiprocess')

    rx, tx = Pipe(duplex=False)  # receiver & transmitter ; Pipe is one-way only
    process = Process(target=_inner, args=(tx, func, *args), kwargs=kwargs)
    process.start()

    event = asyncio.Event()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd=rx.fileno(), callback=event.set)

    if not rx.poll():  # do not use process.is_alive() as condition here
        await event.wait()

    loop.remove_reader(fd=rx.fileno())
    event.clear()

    result = rx.recv()
    process.join()  # this blocks synchronously! make sure that process is terminated before you call join()
    rx.close()
    tx.close()

    if isinstance(result, SubprocessError):
        raise result.exception

    return result


def _inner(tx: Connection, fun: Callable[..., Union[T, Awaitable[T]]], *a, **kw_args) -> None:
    """ This runs in another process. """

    event_loop = None
    if inspect.iscoroutinefunction(fun):
        event_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(event_loop)

    try:
        if event_loop is not None:
            res = event_loop.run_until_complete(fun(*a, **kw_args))
        else:
            res = fun(*a, **kw_args)
    except Exception as ex:
        tx.send(SubprocessError(ex=ex))
    else:
        tx.send(res)


if __name__ == '__main__':
    import doctest

    doctest.testmod(verbose=False, optionflags=doctest.ELLIPSIS)

Functions

async def calculate_in_subprocess(func: Callable[..., Union[~T, Awaitable[~T]]], *args: Any, **kwargs: Any) ‑> ~T

Calculates the result of a synchronous function in subprocess without blocking the current thread.

Arguments

func: The function that will be called in a subprocess. args: Positional arguments that will be passed to the function. kwargs: Keyword arguments that will be passed to the function.

Returns

The calculated result of the function "func".

Raises

Any Exception that is raised inside [func]. Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9

Example

>>> import time
>>> import asyncio
>>> def f(value: int) -> int:
...     time.sleep(0.1)  # a long taking synchronous blocking calculation
...     return 2 * value
>>> asyncio.run(calculate_in_subprocess(func=f, value=42))
84
Expand source code
async def calculate_in_subprocess(func: Callable[..., Union[T, Awaitable[T]]], *args: Any, **kwargs: Any) -> T:
    """
        Calculates the result of a synchronous function in subprocess without blocking the current thread.

        Arguments:
            func: The function that will be called in a subprocess.
            args: Positional arguments that will be passed to the function.
            kwargs: Keyword arguments that will be passed to the function.

        Returns:
             The calculated result of the function "func".

        Raises:
            Any Exception that is raised inside [func].

        Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9

        Example:
            >>> import time
            >>> import asyncio
            >>> def f(value: int) -> int:
            ...     time.sleep(0.1)  # a long taking synchronous blocking calculation
            ...     return 2 * value
            >>> asyncio.run(calculate_in_subprocess(func=f, value=42))
            84
    """

    if Pipe is None:
        raise ImportError('You need to install the multiprocess package to use this: pip install multiprocess')

    rx, tx = Pipe(duplex=False)  # receiver & transmitter ; Pipe is one-way only
    process = Process(target=_inner, args=(tx, func, *args), kwargs=kwargs)
    process.start()

    event = asyncio.Event()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd=rx.fileno(), callback=event.set)

    if not rx.poll():  # do not use process.is_alive() as condition here
        await event.wait()

    loop.remove_reader(fd=rx.fileno())
    event.clear()

    result = rx.recv()
    process.join()  # this blocks synchronously! make sure that process is terminated before you call join()
    rx.close()
    tx.close()

    if isinstance(result, SubprocessError):
        raise result.exception

    return result
def in_subprocess(func: Callable[..., Union[~T, Awaitable[~T]]]) ‑> Callable[..., Awaitable[~T]]

Executes the decorated function in a subprocess and returns the return value of it. Note that the decorated function will be replaced with an async function which returns a coroutine that needs to be awaited. This purpose of this is doing long-taking calculations without blocking the main thread of your application synchronously. That ensures that other asyncio.Tasks can work without any problem at the same time.

Example

>>> import time
>>> import asyncio
>>> @in_subprocess
... def f(value: int) -> int:
...     time.sleep(0.1)  # a long taking synchronous blocking calculation
...     return 2 * value
>>> asyncio.run(f(value=42))
84
Expand source code
def in_subprocess(func: Callable[..., Union[T, Awaitable[T]]]) -> Callable[..., Awaitable[T]]:
    """
        Executes the decorated function in a subprocess and returns the return value of it.
        Note that the decorated function will be replaced with an async function which returns
        a coroutine that needs to be awaited.
        This purpose of this is doing long-taking calculations without blocking the main thread
        of your application synchronously. That ensures that other asyncio.Tasks can work without any problem
        at the same time.

        Example:
            >>> import time
            >>> import asyncio
            >>> @in_subprocess
            ... def f(value: int) -> int:
            ...     time.sleep(0.1)  # a long taking synchronous blocking calculation
            ...     return 2 * value
            >>> asyncio.run(f(value=42))
            84
    """

    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> T:
        return await calculate_in_subprocess(func, *args, **kwargs)

    return wrapper

Classes

class SubprocessError (ex: Exception)

Is returned by the subprocess if an error occurs in the subprocess.

Expand source code
class SubprocessError:
    """ Is returned by the subprocess if an error occurs in the subprocess. """

    def __init__(self, ex: Exception) -> None:
        self.exception = ex