diff options
Diffstat (limited to 'pyee/trio.py')
-rw-r--r-- | pyee/trio.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/pyee/trio.py b/pyee/trio.py new file mode 100644 index 0000000..e79d457 --- /dev/null +++ b/pyee/trio.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- + +from contextlib import AbstractAsyncContextManager, asynccontextmanager +from types import TracebackType +from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional, Tuple, Type + +import trio + +from pyee.base import EventEmitter, PyeeException + +__all__ = ["TrioEventEmitter"] + + +Nursery = trio.Nursery + + +class TrioEventEmitter(EventEmitter): + """An event emitter class which can run trio tasks in a trio nursery. + + By default, this class will lazily create both a nursery manager (the + object returned from ``trio.open_nursery()`` and a nursery (the object + yielded by using the nursery manager as an async context manager). It is + also possible to supply an existing nursery manager via the ``manager`` + argument, or an existing nursery via the ``nursery`` argument. + + Instances of TrioEventEmitter are themselves async context managers, so + that they may manage the lifecycle of the underlying trio nursery. For + example, typical usage of this library may look something like this:: + + async with TrioEventEmitter() as ee: + # Underlying nursery is instantiated and ready to go + @ee.on('data') + async def handler(data): + print(data) + + ee.emit('event') + + # Underlying nursery and manager have been cleaned up + + Unlike the case with the EventEmitter, all exceptions raised by event + handlers are automatically emitted on the ``error`` event. This is + important for trio coroutines specifically but is also handled for + synchronous functions for consistency. + + For trio coroutine event handlers, calling emit is non-blocking. In other + words, you should not attempt to await emit; the coroutine is scheduled + in a fire-and-forget fashion. + """ + + def __init__( + self, + nursery: Nursery = None, + manager: "AbstractAsyncContextManager[trio.Nursery]" = None, + ): + super(TrioEventEmitter, self).__init__() + self._nursery: Optional[Nursery] = None + self._manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None + if nursery: + if manager: + raise PyeeException( + "You may either pass a nursery or a nursery manager " "but not both" + ) + self._nursery = nursery + elif manager: + self._manager = manager + else: + self._manager = trio.open_nursery() + + def _async_runner( + self, + f: Callable, + args: Tuple[Any, ...], + kwargs: Dict[str, Any], + ) -> Callable[[], Awaitable[None]]: + async def runner() -> None: + try: + await f(*args, **kwargs) + except Exception as exc: + self.emit("error", exc) + + return runner + + def _emit_run( + self, + f: Callable, + args: Tuple[Any, ...], + kwargs: Dict[str, Any], + ) -> None: + if not self._nursery: + raise PyeeException("Uninitialized trio nursery") + self._nursery.start_soon(self._async_runner(f, args, kwargs)) + + @asynccontextmanager + async def context( + self, + ) -> AsyncGenerator["TrioEventEmitter", None]: + """Returns an async contextmanager which manages the underlying + nursery to the EventEmitter. The ``TrioEventEmitter``'s + async context management methods are implemented using this + function, but it may also be used directly for clarity. + """ + if self._nursery is not None: + yield self + elif self._manager is not None: + async with self._manager as nursery: + self._nursery = nursery + yield self + else: + raise PyeeException("Uninitialized nursery or nursery manager") + + async def __aenter__(self) -> "TrioEventEmitter": + self._context: Optional[ + AbstractAsyncContextManager["TrioEventEmitter"] + ] = self.context() + return await self._context.__aenter__() + + async def __aexit__( + self, + type: Optional[Type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> Optional[bool]: + if self._context is None: + raise PyeeException("Attempting to exit uninitialized context") + rv = await self._context.__aexit__(type, value, traceback) + self._context = None + self._nursery = None + self._manager = None + return rv |