aboutsummaryrefslogtreecommitdiff
path: root/pyee/executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyee/executor.py')
-rw-r--r--pyee/executor.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/pyee/executor.py b/pyee/executor.py
new file mode 100644
index 0000000..25df774
--- /dev/null
+++ b/pyee/executor.py
@@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+
+from concurrent.futures import Executor, Future, ThreadPoolExecutor
+from types import TracebackType
+from typing import Any, Callable, Dict, Optional, Tuple, Type
+
+from pyee.base import EventEmitter
+
+__all__ = ["ExecutorEventEmitter"]
+
+
+class ExecutorEventEmitter(EventEmitter):
+ """An event emitter class which runs handlers in a ``concurrent.futures``
+ executor.
+
+ By default, this class creates a default ``ThreadPoolExecutor``, but
+ a custom executor may also be passed in explicitly to, for instance,
+ use a ``ProcessPoolExecutor`` instead.
+
+ This class runs all emitted events on the configured executor. Errors
+ captured by the resulting Future are automatically emitted on the
+ ``error`` event. This is unlike the EventEmitter, which have no error
+ handling.
+
+ The underlying executor may be shut down by calling the ``shutdown``
+ method. Alternately you can treat the event emitter as a context manager::
+
+ with ExecutorEventEmitter() as ee:
+ # Underlying executor open
+
+ @ee.on('data')
+ def handler(data):
+ print(data)
+
+ ee.emit('event')
+
+ # Underlying executor closed
+
+ Since the function call is scheduled on an executor, emit is always
+ non-blocking.
+
+ No effort is made to ensure thread safety, beyond using an executor.
+ """
+
+ def __init__(self, executor: Executor = None):
+ super(ExecutorEventEmitter, self).__init__()
+ if executor:
+ self._executor: Executor = executor
+ else:
+ self._executor = ThreadPoolExecutor()
+
+ def _emit_run(
+ self,
+ f: Callable,
+ args: Tuple[Any, ...],
+ kwargs: Dict[str, Any],
+ ):
+ future: Future = self._executor.submit(f, *args, **kwargs)
+
+ @future.add_done_callback
+ def _callback(f: Future) -> None:
+ exc: Optional[BaseException] = f.exception()
+ if isinstance(exc, Exception):
+ self.emit("error", exc)
+ elif exc is not None:
+ raise exc
+
+ def shutdown(self, wait: bool = True) -> None:
+ """Call ``shutdown`` on the internal executor."""
+
+ self._executor.shutdown(wait=wait)
+
+ def __enter__(self) -> "ExecutorEventEmitter":
+ return self
+
+ def __exit__(
+ self, type: Type[Exception], value: Exception, traceback: TracebackType
+ ) -> Optional[bool]:
+ self.shutdown()