Module pedantic.decorators.fn_deco_in_subprocess
Functions
- async def calculate_in_subprocess(func: Callable[..., ~T | Awaitable[~T]], *args: Any, **kwargs: Any) ‑> ~T
- 
Expand source codeasync def calculate_in_subprocess(func: Callable[..., Union[T, Awaitable[T]]], *args: Any, **kwargs: Any) -> T: """ Calculates the result of a synchronous function in subprocess without blocking the current thread. Arguments: func: The function that will be called in a subprocess. args: Positional arguments that will be passed to the function. kwargs: Keyword arguments that will be passed to the function. Returns: The calculated result of the function "func". Raises: Any Exception that is raised inside [func]. Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9 Example: >>> import time >>> import asyncio >>> def f(value: int) -> int: ... time.sleep(0.1) # a long taking synchronous blocking calculation ... return 2 * value >>> asyncio.run(calculate_in_subprocess(func=f, value=42)) 84 """ if Pipe is None: raise ImportError('You need to install the multiprocess package to use this: pip install multiprocess') rx, tx = Pipe(duplex=False) # receiver & transmitter ; Pipe is one-way only process = Process(target=_inner, args=(tx, func, *args), kwargs=kwargs) process.start() event = asyncio.Event() loop = asyncio.get_event_loop() loop.add_reader(fd=rx.fileno(), callback=event.set) if not rx.poll(): # do not use process.is_alive() as condition here await event.wait() loop.remove_reader(fd=rx.fileno()) event.clear() result = rx.recv() process.join() # this blocks synchronously! make sure that process is terminated before you call join() rx.close() tx.close() if isinstance(result, SubprocessError): raise result.exception return resultCalculates the result of a synchronous function in subprocess without blocking the current thread. Argumentsfunc: The function that will be called in a subprocess. args: Positional arguments that will be passed to the function. kwargs: Keyword arguments that will be passed to the function. ReturnsThe calculated result of the function "func". RaisesAny Exception that is raised inside [func]. Further reading: https://medium.com/devopss-hole/python-multiprocessing-pickle-issue-e2d35ccf96a9 Example>>> import time >>> import asyncio >>> def f(value: int) -> int: ... time.sleep(0.1) # a long taking synchronous blocking calculation ... return 2 * value >>> asyncio.run(calculate_in_subprocess(func=f, value=42)) 84
- def in_subprocess(func: Callable[..., ~T | Awaitable[~T]]) ‑> Callable[..., Awaitable[~T]]
- 
Expand source codedef in_subprocess(func: Callable[..., Union[T, Awaitable[T]]]) -> Callable[..., Awaitable[T]]: """ Executes the decorated function in a subprocess and returns the return value of it. Note that the decorated function will be replaced with an async function which returns a coroutine that needs to be awaited. This purpose of this is doing long-taking calculations without blocking the main thread of your application synchronously. That ensures that other asyncio.Tasks can work without any problem at the same time. Example: >>> import time >>> import asyncio >>> @in_subprocess ... def f(value: int) -> int: ... time.sleep(0.1) # a long taking synchronous blocking calculation ... return 2 * value >>> asyncio.run(f(value=42)) 84 """ @wraps(func) async def wrapper(*args: Any, **kwargs: Any) -> T: return await calculate_in_subprocess(func, *args, **kwargs) return wrapperExecutes the decorated function in a subprocess and returns the return value of it. Note that the decorated function will be replaced with an async function which returns a coroutine that needs to be awaited. This purpose of this is doing long-taking calculations without blocking the main thread of your application synchronously. That ensures that other asyncio.Tasks can work without any problem at the same time. Example>>> import time >>> import asyncio >>> @in_subprocess ... def f(value: int) -> int: ... time.sleep(0.1) # a long taking synchronous blocking calculation ... return 2 * value >>> asyncio.run(f(value=42)) 84
Classes
- class SubprocessError (ex: Exception)
- 
Expand source codeclass SubprocessError: """ Is returned by the subprocess if an error occurs in the subprocess. """ def __init__(self, ex: Exception) -> None: self.exception = exIs returned by the subprocess if an error occurs in the subprocess.