aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Wu <joshwu@google.com>2023-09-20 23:38:25 +0800
committerLucas Abel <22837557+uael@users.noreply.github.com>2023-09-20 23:03:23 +0200
commit45f60edbb685054b634b28dbee90ee908bce2e6e (patch)
tree315a98a024325b9bb91ad7a98ec2e0f961d93484
parent393ea6a7bb6a3c824190e474c8cb281e88ce0cd7 (diff)
downloadbumble-45f60edbb685054b634b28dbee90ee908bce2e6e.tar.gz
Pyee watcher context
-rw-r--r--bumble/utils.py76
-rw-r--r--tests/utils_test.py77
2 files changed, 152 insertions, 1 deletions
diff --git a/bumble/utils.py b/bumble/utils.py
index 8a55684..b9f2e59 100644
--- a/bumble/utils.py
+++ b/bumble/utils.py
@@ -15,12 +15,13 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
+from __future__ import annotations
import asyncio
import logging
import traceback
import collections
import sys
-from typing import Awaitable, Set, TypeVar
+from typing import Awaitable, Set, TypeVar, List, Tuple, Callable, Any, Optional, Union
from functools import wraps
from pyee import EventEmitter
@@ -64,6 +65,79 @@ def composite_listener(cls):
return cls
+_Handler = TypeVar('_Handler', bound=Callable)
+
+# -----------------------------------------------------------------------------
+class EventWatcher:
+ '''A wrapper class to control the lifecycle of event handlers better.
+
+ Usage:
+ watcher = EventWatcher()
+
+ def on_foo():
+ ...
+ watcher.on(emitter, 'foo', on_foo)
+
+ @watcher.on(emitter, 'bar')
+ def on_bar():
+ ...
+
+ # Close all event handlers watching through this watcher
+ watcher.close()
+
+ As context:
+ with contextlib.closing(EventWatcher()) as context:
+ @context.on(emitter, 'foo')
+ def on_foo():
+ ...
+ # on_foo() has been removed here!
+ '''
+
+ handlers: List[Tuple[EventEmitter, str, Callable[..., Any]]]
+
+ def __init__(self) -> None:
+ self.handlers = []
+
+ def on(
+ self, emitter: EventEmitter, event: str, handler: Optional[_Handler] = None
+ ) -> Union[_Handler, Callable[[_Handler], _Handler]]:
+ '''Watch a event until the context is destroyed.
+
+ Args:
+ emitter: EventEmitter to watch
+ event: Event string
+ handler: (Optional) Event handler. When nothing passed, this method works as a decorator.
+ '''
+
+ def wrapper(f: _Handler):
+ self.handlers.append((emitter, event, f))
+ emitter.on(event, f)
+
+ return wrapper if handler is None else wrapper(handler)
+
+ def once(
+ self, emitter: EventEmitter, event: str, handler: Optional[_Handler] = None
+ ) -> Union[_Handler, Callable[[_Handler], _Handler]]:
+ '''Watch a event for once.
+
+ Args:
+ emitter: EventEmitter to watch
+ event: Event string
+ handler: (Optional) Event handler. When nothing passed, this method works as a decorator.
+ '''
+
+ def wrapper(f: _Handler):
+ self.handlers.append((emitter, event, f))
+ emitter.once(event, f)
+
+ return wrapper if handler is None else wrapper(handler)
+
+ def close(self) -> None:
+ for emitter, event, handler in self.handlers:
+ if handler in emitter.listeners(event):
+ emitter.remove_listener(event, handler)
+
+
# -----------------------------------------------------------------------------
_T = TypeVar('_T')
diff --git a/tests/utils_test.py b/tests/utils_test.py
new file mode 100644
index 0000000..003d1a6
--- /dev/null
+++ b/tests/utils_test.py
@@ -0,0 +1,77 @@
+# Copyright 2021-2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import logging
+import os
+
+from bumble import utils
+from pyee import EventEmitter
+from unittest.mock import MagicMock
+
+
+def test_on():
+ emitter = EventEmitter()
+ with contextlib.closing(utils.EventWatcher()) as context:
+ mock = MagicMock()
+ context.on(emitter, 'event', mock)
+
+ emitter.emit('event')
+
+ assert not emitter.listeners('event')
+ assert mock.call_count == 1
+
+
+def test_on_decorator():
+ emitter = EventEmitter()
+ with contextlib.closing(utils.EventWatcher()) as context:
+ mock = MagicMock()
+
+ @context.on(emitter, 'event')
+ def on_event(*_):
+ mock()
+
+ emitter.emit('event')
+
+ assert not emitter.listeners('event')
+ assert mock.call_count == 1
+
+
+def test_multiple_handlers():
+ emitter = EventEmitter()
+ with contextlib.closing(utils.EventWatcher()) as context:
+ mock = MagicMock()
+
+ context.once(emitter, 'a', mock)
+ context.once(emitter, 'b', mock)
+
+ emitter.emit('b', 'b')
+
+ assert not emitter.listeners('a')
+ assert not emitter.listeners('b')
+
+ mock.assert_called_once_with('b')
+
+
+# -----------------------------------------------------------------------------
+def run_tests():
+ test_on()
+ test_on_decorator()
+ test_multiple_handlers()
+
+
+# -----------------------------------------------------------------------------
+if __name__ == '__main__':
+ logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
+ run_tests()