aboutsummaryrefslogtreecommitdiff
path: root/pyee/asyncio.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyee/asyncio.py')
-rw-r--r--pyee/asyncio.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/pyee/asyncio.py b/pyee/asyncio.py
new file mode 100644
index 0000000..433001f
--- /dev/null
+++ b/pyee/asyncio.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+
+from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine
+from typing import Any, Callable, cast, Dict, Optional, Tuple
+
+from pyee.base import EventEmitter
+
+__all__ = ["AsyncIOEventEmitter"]
+
+
+class AsyncIOEventEmitter(EventEmitter):
+ """An event emitter class which can run asyncio coroutines in addition to
+ synchronous blocking functions. For example::
+
+ @ee.on('event')
+ async def async_handler(*args, **kwargs):
+ await returns_a_future()
+
+ On emit, the event emitter will automatically schedule the coroutine using
+ ``asyncio.ensure_future`` and the configured event loop (defaults to
+ ``asyncio.get_event_loop()``).
+
+ Unlike the case with the EventEmitter, all exceptions raised by
+ event handlers are automatically emitted on the ``error`` event. This is
+ important for asyncio coroutines specifically but is also handled for
+ synchronous functions for consistency.
+
+ When ``loop`` is specified, the supplied event loop will be used when
+ scheduling work with ``ensure_future``. Otherwise, the default asyncio
+ event loop is used.
+
+ For asyncio coroutine event handlers, calling emit is non-blocking.
+ In other words, you do not have to await any results from emit, and the
+ coroutine is scheduled in a fire-and-forget fashion.
+ """
+
+ def __init__(self, loop: Optional[AbstractEventLoop] = None):
+ super(AsyncIOEventEmitter, self).__init__()
+ self._loop: Optional[AbstractEventLoop] = loop
+
+ def _emit_run(
+ self,
+ f: Callable,
+ args: Tuple[Any, ...],
+ kwargs: Dict[str, Any],
+ ):
+ try:
+ coro: Any = f(*args, **kwargs)
+ except Exception as exc:
+ self.emit("error", exc)
+ else:
+ if iscoroutine(coro):
+ if self._loop:
+ # ensure_future is *extremely* cranky about the types here,
+ # but this is relatively well-tested and I think the types
+ # are more strict than they should be
+ fut: Any = ensure_future(cast(Any, coro), loop=self._loop)
+ else:
+ fut = ensure_future(cast(Any, coro))
+ elif isinstance(coro, Future):
+ fut = cast(Any, coro)
+ else:
+ return
+
+ def callback(f):
+ if f.cancelled():
+ return
+
+ exc: Exception = f.exception()
+ if exc:
+ self.emit("error", exc)
+
+ fut.add_done_callback(callback)