diff options
Diffstat (limited to 'pyee/asyncio.py')
-rw-r--r-- | pyee/asyncio.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/pyee/asyncio.py b/pyee/asyncio.py new file mode 100644 index 0000000..433001f --- /dev/null +++ b/pyee/asyncio.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- + +from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine +from typing import Any, Callable, cast, Dict, Optional, Tuple + +from pyee.base import EventEmitter + +__all__ = ["AsyncIOEventEmitter"] + + +class AsyncIOEventEmitter(EventEmitter): + """An event emitter class which can run asyncio coroutines in addition to + synchronous blocking functions. For example:: + + @ee.on('event') + async def async_handler(*args, **kwargs): + await returns_a_future() + + On emit, the event emitter will automatically schedule the coroutine using + ``asyncio.ensure_future`` and the configured event loop (defaults to + ``asyncio.get_event_loop()``). + + Unlike the case with the EventEmitter, all exceptions raised by + event handlers are automatically emitted on the ``error`` event. This is + important for asyncio coroutines specifically but is also handled for + synchronous functions for consistency. + + When ``loop`` is specified, the supplied event loop will be used when + scheduling work with ``ensure_future``. Otherwise, the default asyncio + event loop is used. + + For asyncio coroutine event handlers, calling emit is non-blocking. + In other words, you do not have to await any results from emit, and the + coroutine is scheduled in a fire-and-forget fashion. + """ + + def __init__(self, loop: Optional[AbstractEventLoop] = None): + super(AsyncIOEventEmitter, self).__init__() + self._loop: Optional[AbstractEventLoop] = loop + + def _emit_run( + self, + f: Callable, + args: Tuple[Any, ...], + kwargs: Dict[str, Any], + ): + try: + coro: Any = f(*args, **kwargs) + except Exception as exc: + self.emit("error", exc) + else: + if iscoroutine(coro): + if self._loop: + # ensure_future is *extremely* cranky about the types here, + # but this is relatively well-tested and I think the types + # are more strict than they should be + fut: Any = ensure_future(cast(Any, coro), loop=self._loop) + else: + fut = ensure_future(cast(Any, coro)) + elif isinstance(coro, Future): + fut = cast(Any, coro) + else: + return + + def callback(f): + if f.cancelled(): + return + + exc: Exception = f.exception() + if exc: + self.emit("error", exc) + + fut.add_done_callback(callback) |