Module pedantic.decorators.fn_deco_in_subprocess

Functions

async def calculate_in_subprocess(func: Callable[..., ~T | Awaitable[~T]], *args: Any, **kwargs: Any) ‑> ~T
Expand source code
async def calculate_in_subprocess(func: Callable[..., Union[T, Awaitable[T]]], *args: Any, **kwargs: Any) -> T:
    """
        Calculates the result of a synchronous function in subprocess without blocking the current thread.

        Arguments:
            func: The function that will be called in a subprocess.
            args: Positional arguments that will be passed to the function.
            kwargs: Keyword arguments that will be passed to the function.

        Returns:
             The calculated result of the function "func".

        Raises:
            Any Exception that is raised inside [func].

        Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9

        Example:
            >>> import time
            >>> import asyncio
            >>> def f(value: int) -> int:
            ...     time.sleep(0.1)  # a long taking synchronous blocking calculation
            ...     return 2 * value
            >>> asyncio.run(calculate_in_subprocess(func=f, value=42))
            84
    """

    if Pipe is None:
        raise ImportError('You need to install the multiprocess package to use this: pip install multiprocess')

    rx, tx = Pipe(duplex=False)  # receiver & transmitter ; Pipe is one-way only
    process = Process(target=_inner, args=(tx, func, *args), kwargs=kwargs)
    process.start()

    event = asyncio.Event()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd=rx.fileno(), callback=event.set)

    if not rx.poll():  # do not use process.is_alive() as condition here
        await event.wait()

    loop.remove_reader(fd=rx.fileno())
    event.clear()

    result = rx.recv()
    process.join()  # this blocks synchronously! make sure that process is terminated before you call join()
    rx.close()
    tx.close()

    if isinstance(result, SubprocessError):
        raise result.exception

    return result

Calculates the result of a synchronous function in subprocess without blocking the current thread.

Arguments

func: The function that will be called in a subprocess. args: Positional arguments that will be passed to the function. kwargs: Keyword arguments that will be passed to the function.

Returns

The calculated result of the function "func".

Raises

Any Exception that is raised inside [func]. Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9

Example

>>> import time
>>> import asyncio
>>> def f(value: int) -> int:
...     time.sleep(0.1)  # a long taking synchronous blocking calculation
...     return 2 * value
>>> asyncio.run(calculate_in_subprocess(func=f, value=42))
84
def in_subprocess(func: Callable[..., ~T | Awaitable[~T]]) ‑> Callable[..., Awaitable[~T]]
Expand source code
def in_subprocess(func: Callable[..., Union[T, Awaitable[T]]]) -> Callable[..., Awaitable[T]]:
    """
        Executes the decorated function in a subprocess and returns the return value of it.
        Note that the decorated function will be replaced with an async function which returns
        a coroutine that needs to be awaited.
        This purpose of this is doing long-taking calculations without blocking the main thread
        of your application synchronously. That ensures that other asyncio.Tasks can work without any problem
        at the same time.

        Example:
            >>> import time
            >>> import asyncio
            >>> @in_subprocess
            ... def f(value: int) -> int:
            ...     time.sleep(0.1)  # a long taking synchronous blocking calculation
            ...     return 2 * value
            >>> asyncio.run(f(value=42))
            84
    """

    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> T:
        return await calculate_in_subprocess(func, *args, **kwargs)

    return wrapper

Executes the decorated function in a subprocess and returns the return value of it. Note that the decorated function will be replaced with an async function which returns a coroutine that needs to be awaited. This purpose of this is doing long-taking calculations without blocking the main thread of your application synchronously. That ensures that other asyncio.Tasks can work without any problem at the same time.

Example

>>> import time
>>> import asyncio
>>> @in_subprocess
... def f(value: int) -> int:
...     time.sleep(0.1)  # a long taking synchronous blocking calculation
...     return 2 * value
>>> asyncio.run(f(value=42))
84

Classes

class SubprocessError (ex: Exception)
Expand source code
class SubprocessError:
    """ Is returned by the subprocess if an error occurs in the subprocess. """

    def __init__(self, ex: Exception) -> None:
        self.exception = ex

Is returned by the subprocess if an error occurs in the subprocess.