aboutsummaryrefslogtreecommitdiff
path: root/pyee/trio.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyee/trio.py')
-rw-r--r--pyee/trio.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/pyee/trio.py b/pyee/trio.py
new file mode 100644
index 0000000..e79d457
--- /dev/null
+++ b/pyee/trio.py
@@ -0,0 +1,129 @@
+# -*- coding: utf-8 -*-
+
+from contextlib import AbstractAsyncContextManager, asynccontextmanager
+from types import TracebackType
+from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional, Tuple, Type
+
+import trio
+
+from pyee.base import EventEmitter, PyeeException
+
+__all__ = ["TrioEventEmitter"]
+
+
+Nursery = trio.Nursery
+
+
+class TrioEventEmitter(EventEmitter):
+ """An event emitter class which can run trio tasks in a trio nursery.
+
+ By default, this class will lazily create both a nursery manager (the
+ object returned from ``trio.open_nursery()`` and a nursery (the object
+ yielded by using the nursery manager as an async context manager). It is
+ also possible to supply an existing nursery manager via the ``manager``
+ argument, or an existing nursery via the ``nursery`` argument.
+
+ Instances of TrioEventEmitter are themselves async context managers, so
+ that they may manage the lifecycle of the underlying trio nursery. For
+ example, typical usage of this library may look something like this::
+
+ async with TrioEventEmitter() as ee:
+ # Underlying nursery is instantiated and ready to go
+ @ee.on('data')
+ async def handler(data):
+ print(data)
+
+ ee.emit('event')
+
+ # Underlying nursery and manager have been cleaned up
+
+ Unlike the case with the EventEmitter, all exceptions raised by event
+ handlers are automatically emitted on the ``error`` event. This is
+ important for trio coroutines specifically but is also handled for
+ synchronous functions for consistency.
+
+ For trio coroutine event handlers, calling emit is non-blocking. In other
+ words, you should not attempt to await emit; the coroutine is scheduled
+ in a fire-and-forget fashion.
+ """
+
+ def __init__(
+ self,
+ nursery: Nursery = None,
+ manager: "AbstractAsyncContextManager[trio.Nursery]" = None,
+ ):
+ super(TrioEventEmitter, self).__init__()
+ self._nursery: Optional[Nursery] = None
+ self._manager: Optional["AbstractAsyncContextManager[trio.Nursery]"] = None
+ if nursery:
+ if manager:
+ raise PyeeException(
+ "You may either pass a nursery or a nursery manager " "but not both"
+ )
+ self._nursery = nursery
+ elif manager:
+ self._manager = manager
+ else:
+ self._manager = trio.open_nursery()
+
+ def _async_runner(
+ self,
+ f: Callable,
+ args: Tuple[Any, ...],
+ kwargs: Dict[str, Any],
+ ) -> Callable[[], Awaitable[None]]:
+ async def runner() -> None:
+ try:
+ await f(*args, **kwargs)
+ except Exception as exc:
+ self.emit("error", exc)
+
+ return runner
+
+ def _emit_run(
+ self,
+ f: Callable,
+ args: Tuple[Any, ...],
+ kwargs: Dict[str, Any],
+ ) -> None:
+ if not self._nursery:
+ raise PyeeException("Uninitialized trio nursery")
+ self._nursery.start_soon(self._async_runner(f, args, kwargs))
+
+ @asynccontextmanager
+ async def context(
+ self,
+ ) -> AsyncGenerator["TrioEventEmitter", None]:
+ """Returns an async contextmanager which manages the underlying
+ nursery to the EventEmitter. The ``TrioEventEmitter``'s
+ async context management methods are implemented using this
+ function, but it may also be used directly for clarity.
+ """
+ if self._nursery is not None:
+ yield self
+ elif self._manager is not None:
+ async with self._manager as nursery:
+ self._nursery = nursery
+ yield self
+ else:
+ raise PyeeException("Uninitialized nursery or nursery manager")
+
+ async def __aenter__(self) -> "TrioEventEmitter":
+ self._context: Optional[
+ AbstractAsyncContextManager["TrioEventEmitter"]
+ ] = self.context()
+ return await self._context.__aenter__()
+
+ async def __aexit__(
+ self,
+ type: Optional[Type[BaseException]],
+ value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> Optional[bool]:
+ if self._context is None:
+ raise PyeeException("Attempting to exit uninitialized context")
+ rv = await self._context.__aexit__(type, value, traceback)
+ self._context = None
+ self._nursery = None
+ self._manager = None
+ return rv