aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTreehugger Robot <android-test-infra-autosubmit@system.gserviceaccount.com>2023-08-02 22:21:51 +0000
committerAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>2023-08-02 22:21:51 +0000
commit442e52acb0685025cb3c9121a614700f893b33a7 (patch)
tree07476617dad41b7c2d29b92029378c8930eac433
parentc3867816722041b0952dc34a0521394c5e7a1199 (diff)
parente50459adf1e5e38c3d1fc66479ef332279a8243b (diff)
downloadavatar-442e52acb0685025cb3c9121a614700f893b33a7.tar.gz
Merge changes Ied4e7d24,Ib0926538,I842c1c27,Ie091ce2d into main am: e50459adf1
Original change: https://android-review.googlesource.com/c/platform/external/pandora/avatar/+/2679527 Change-Id: I0e1d468510984e105b263a2a9b421475ee1b6f19 Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r--Android.bp1
-rw-r--r--README.md49
-rw-r--r--avatar/__init__.py2
-rw-r--r--avatar/metrics/README.md17
-rw-r--r--avatar/metrics/__init__.py15
-rw-r--r--avatar/metrics/interceptors.py289
-rw-r--r--avatar/metrics/trace.proto83
-rw-r--r--avatar/metrics/trace.py284
-rw-r--r--avatar/metrics/trace_pb2.py43
-rw-r--r--avatar/metrics/trace_pb2.pyi147
-rw-r--r--avatar/pandora_client.py9
-rw-r--r--avatar/pandora_server.py34
-rw-r--r--cases/config.yml6
-rw-r--r--cases/le_security_test.py56
-rw-r--r--cases/security_test.py6
-rw-r--r--pyproject.toml7
16 files changed, 965 insertions, 83 deletions
diff --git a/Android.bp b/Android.bp
index 2d859db..a984e60 100644
--- a/Android.bp
+++ b/Android.bp
@@ -20,6 +20,7 @@ python_library_host {
srcs: [
"avatar/*.py",
"avatar/controllers/*.py",
+ "avatar/metrics/*.py",
],
libs: [
"pandora-python",
diff --git a/README.md b/README.md
index 3dcfc2d..62c7410 100644
--- a/README.md
+++ b/README.md
@@ -1,48 +1 @@
-# Avatar
-
-Avatar aims to provide a scalable multi-platform Bluetooth testing tool capable
-of running any Bluetooth test cases virtually and physically. It aims to
-complete PTS-bot in the Pandora testing suite.
-
-## Install
-
-```bash
-python -m venv venv
-source venv/bin/activate.fish # or any other shell
-pip install [-e] .
-```
-
-## Usage
-
-```bash
-python cases/host_test.py -c cases/config.yml --verbose
-```
-
-## Specify a test bed
-```bash
-python cases/host_test.py -c cases/config.yml --test_bed bumble.bumbles --verbose
-```
-
-## Development
-
-1. Make sure to have a `root-canal` instance running somewhere.
-```bash
-root-canal
-```
-
-1. Run the example using Bumble vs Bumble config file. The default `6402` HCI port of `root-canal` may be changed in this config file.
-```
-python cases/host_test.py -c cases/config.yml --verbose
-```
-
-3. Lint with `pyright` and `mypy`
-```
-pyright
-mypy
-```
-
-3. Format & imports style
-```
-black avatar/ cases/
-isort avatar/ cases/
-```
+See go/avatar-android
diff --git a/avatar/__init__.py b/avatar/__init__.py
index b679c27..e5a1cf2 100644
--- a/avatar/__init__.py
+++ b/avatar/__init__.py
@@ -28,6 +28,7 @@ import logging
from avatar import pandora_server
from avatar.aio import asynchronous
+from avatar.metrics import trace
from avatar.pandora_client import BumblePandoraClient as BumblePandoraDevice, PandoraClient as PandoraDevice
from avatar.pandora_server import PandoraServer
from mobly import base_test
@@ -79,6 +80,7 @@ class PandoraDevices(Sized, Iterable[PandoraDevice]):
self._clients = []
self._servers = []
+ trace.hook_test(test, self)
user_params: Dict[str, Any] = test.user_params # type: ignore
controller_configs: Dict[str, Any] = test.controller_configs.copy() # type: ignore
sorted_controllers = sorted(
diff --git a/avatar/metrics/README.md b/avatar/metrics/README.md
new file mode 100644
index 0000000..0f4d915
--- /dev/null
+++ b/avatar/metrics/README.md
@@ -0,0 +1,17 @@
+# Metrics
+
+Avatar metrics use `perfetto` traces.
+
+## Perfetto traces
+
+For convenience, `trace_pb2.py` and `trace_pb2.pyi` are pre-generated.
+
+To regenerate them run the following:
+
+```
+pip install protoc-exe
+protoc trace.proto --pyi_out=./ --python_out=./
+```
+
+To ensure compliance with the linter, you must modify the generated
+`.pyi` file by replacing `Union[T, _Mapping]` to `T`.
diff --git a/avatar/metrics/__init__.py b/avatar/metrics/__init__.py
new file mode 100644
index 0000000..3177ed9
--- /dev/null
+++ b/avatar/metrics/__init__.py
@@ -0,0 +1,15 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Avatar metrics."""
diff --git a/avatar/metrics/interceptors.py b/avatar/metrics/interceptors.py
new file mode 100644
index 0000000..e853395
--- /dev/null
+++ b/avatar/metrics/interceptors.py
@@ -0,0 +1,289 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Avatar metrics interceptors."""
+
+import grpc
+import time
+
+from avatar.metrics.trace import Callsite
+from grpc.aio import ClientCallDetails
+from pandora import _utils as utils
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ AsyncIterator,
+ Awaitable,
+ Callable,
+ Generic,
+ Iterator,
+ Protocol,
+ Sequence,
+ TypeVar,
+ Union,
+)
+
+if TYPE_CHECKING:
+ from avatar.pandora_client import PandoraClient
+else:
+ PandoraClient = object
+
+
+_T = TypeVar('_T')
+_U = TypeVar('_U')
+_T_co = TypeVar('_T_co', covariant=True)
+
+
+ClientInterceptor = Union[
+ grpc.UnaryUnaryClientInterceptor,
+ grpc.UnaryStreamClientInterceptor,
+ grpc.StreamStreamClientInterceptor,
+]
+
+
+def interceptors(device: PandoraClient) -> Sequence[ClientInterceptor]:
+ return [UnaryUnaryInterceptor(device), UnaryStreamInterceptor(device), StreamStreamInterceptor(device)]
+
+
+def aio_interceptors(device: PandoraClient) -> Sequence[grpc.aio.ClientInterceptor]:
+ return [AioUnaryUnaryInterceptor(device), AioUnaryStreamInterceptor(device), AioStreamStreamInterceptor(device)]
+
+
+class UnaryOutcome(Protocol, Generic[_T_co]):
+ def result(self) -> _T_co:
+ ...
+
+
+class UnaryUnaryInterceptor(grpc.UnaryUnaryClientInterceptor): # type: ignore[misc]
+ def __init__(self, device: PandoraClient) -> None:
+ self.device = device
+
+ def intercept_unary_unary(
+ self,
+ continuation: Callable[[ClientCallDetails, _T], UnaryOutcome[_U]],
+ client_call_details: ClientCallDetails,
+ request: _T,
+ ) -> UnaryOutcome[_U]:
+ callsite = Callsite(self.device, client_call_details.method, request)
+ response = continuation(client_call_details, request)
+ callsite.end(response.result())
+ return response
+
+
+class UnaryStreamInterceptor(grpc.UnaryStreamClientInterceptor): # type: ignore[misc]
+ def __init__(self, device: PandoraClient) -> None:
+ self.device = device
+
+ def intercept_unary_stream( # type: ignore
+ self,
+ continuation: Callable[[ClientCallDetails, _T], utils.Stream[_U]],
+ client_call_details: ClientCallDetails,
+ request: _T,
+ ) -> utils.Stream[_U]:
+ callsite = Callsite(self.device, client_call_details.method, request)
+ call = continuation(client_call_details, request)
+ call.add_callback(lambda: callsite.end(None)) # type: ignore
+
+ class Proxy:
+ def __iter__(self) -> Iterator[_U]:
+ return self
+
+ def __next__(self) -> _U:
+ res = next(call)
+ callsite.input(res)
+ return res
+
+ def is_active(self) -> bool:
+ return call.is_active() # type: ignore
+
+ def time_remaining(self) -> float:
+ return call.time_remaining() # type: ignore
+
+ def cancel(self) -> None:
+ return call.cancel() # type: ignore
+
+ def add_callback(self, callback: Any) -> None:
+ return call.add_callback(callback) # type: ignore
+
+ return Proxy() # type: ignore
+
+
+class StreamStreamInterceptor(grpc.StreamStreamClientInterceptor): # type: ignore[misc]
+ def __init__(self, device: PandoraClient) -> None:
+ self.device = device
+
+ def intercept_stream_stream( # type: ignore
+ self,
+ continuation: Callable[[ClientCallDetails, utils.Sender[_T]], utils.StreamStream[_T, _U]],
+ client_call_details: ClientCallDetails,
+ request: utils.Sender[_T],
+ ) -> utils.StreamStream[_T, _U]:
+ callsite = Callsite(self.device, client_call_details.method, None)
+
+ class RequestProxy:
+ def __iter__(self) -> Iterator[_T]:
+ return self
+
+ def __next__(self) -> _T:
+ req = next(request)
+ callsite.output(req)
+ return req
+
+ call = continuation(client_call_details, RequestProxy()) # type: ignore
+ call.add_callback(lambda: callsite.end(None)) # type: ignore
+
+ class Proxy:
+ def __iter__(self) -> Iterator[_U]:
+ return self
+
+ def __next__(self) -> _U:
+ res = next(call)
+ callsite.input(res)
+ return res
+
+ def is_active(self) -> bool:
+ return call.is_active() # type: ignore
+
+ def time_remaining(self) -> float:
+ return call.time_remaining() # type: ignore
+
+ def cancel(self) -> None:
+ return call.cancel() # type: ignore
+
+ def add_callback(self, callback: Any) -> None:
+ return call.add_callback(callback) # type: ignore
+
+ return Proxy() # type: ignore
+
+
+class AioUnaryUnaryInterceptor(grpc.aio.UnaryUnaryClientInterceptor): # type: ignore[misc]
+ def __init__(self, device: PandoraClient) -> None:
+ self.device = device
+
+ async def intercept_unary_unary( # type: ignore
+ self,
+ continuation: Callable[[ClientCallDetails, _T], Awaitable[Awaitable[_U]]],
+ client_call_details: ClientCallDetails,
+ request: _T,
+ ) -> _U:
+ callsite = Callsite(self.device, client_call_details.method, request)
+ response = await (await continuation(client_call_details, request))
+ callsite.end(response)
+ return response
+
+
+class AioUnaryStreamInterceptor(grpc.aio.UnaryStreamClientInterceptor): # type: ignore[misc]
+ def __init__(self, device: PandoraClient) -> None:
+ self.device = device
+
+ async def intercept_unary_stream( # type: ignore
+ self,
+ continuation: Callable[[ClientCallDetails, _T], Awaitable[utils.AioStream[_U]]],
+ client_call_details: ClientCallDetails,
+ request: _T,
+ ) -> utils.AioStream[_U]:
+
+ # TODO: this is a workaround for https://github.com/grpc/grpc/pull/33951
+ # need to be deleted as soon as `grpcio` contains the fix.
+ now = time.time()
+ if client_call_details.timeout and client_call_details.timeout > now:
+ client_call_details = client_call_details._replace(
+ timeout=client_call_details.timeout - now,
+ )
+
+ callsite = Callsite(self.device, client_call_details.method, request)
+ call = await continuation(client_call_details, request)
+ call.add_done_callback(lambda _: callsite.end(None)) # type: ignore
+ iter = aiter(call)
+
+ class Proxy:
+ def __aiter__(self) -> AsyncIterator[_U]:
+ return self
+
+ async def __anext__(self) -> _U:
+ res = await anext(iter)
+ callsite.input(res)
+ return res
+
+ def is_active(self) -> bool:
+ return call.is_active() # type: ignore
+
+ def time_remaining(self) -> float:
+ return call.time_remaining() # type: ignore
+
+ def cancel(self) -> None:
+ return call.cancel() # type: ignore
+
+ def add_done_callback(self, callback: Any) -> None:
+ return call.add_done_callback(callback) # type: ignore
+
+ return Proxy() # type: ignore
+
+
+class AioStreamStreamInterceptor(grpc.aio.StreamStreamClientInterceptor): # type: ignore[misc]
+ def __init__(self, device: PandoraClient) -> None:
+ self.device = device
+
+ async def intercept_stream_stream( # type: ignore
+ self,
+ continuation: Callable[[ClientCallDetails, utils.AioSender[_T]], Awaitable[utils.AioStreamStream[_T, _U]]],
+ client_call_details: ClientCallDetails,
+ request: utils.AioSender[_T],
+ ) -> utils.AioStreamStream[_T, _U]:
+
+ # TODO: this is a workaround for https://github.com/grpc/grpc/pull/33951
+ # need to be deleted as soon as `grpcio` contains the fix.
+ now = time.time()
+ if client_call_details.timeout and client_call_details.timeout > now:
+ client_call_details = client_call_details._replace(
+ timeout=client_call_details.timeout - now,
+ )
+
+ callsite = Callsite(self.device, client_call_details.method, None)
+
+ class RequestProxy:
+ def __aiter__(self) -> AsyncIterator[_T]:
+ return self
+
+ async def __anext__(self) -> _T:
+ req = await anext(request)
+ callsite.output(req)
+ return req
+
+ call = await continuation(client_call_details, RequestProxy()) # type: ignore
+ call.add_done_callback(lambda _: callsite.end(None)) # type: ignore
+ iter = aiter(call)
+
+ class ResponseProxy:
+ def __aiter__(self) -> AsyncIterator[_U]:
+ return self
+
+ async def __anext__(self) -> _U:
+ res = await anext(iter)
+ callsite.input(res)
+ return res
+
+ def is_active(self) -> bool:
+ return call.is_active() # type: ignore
+
+ def time_remaining(self) -> float:
+ return call.time_remaining() # type: ignore
+
+ def cancel(self) -> None:
+ return call.cancel() # type: ignore
+
+ def add_done_callback(self, callback: Any) -> None:
+ return call.add_done_callback(callback) # type: ignore
+
+ return ResponseProxy() # type: ignore
diff --git a/avatar/metrics/trace.proto b/avatar/metrics/trace.proto
new file mode 100644
index 0000000..9ef925d
--- /dev/null
+++ b/avatar/metrics/trace.proto
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package perfetto.protos;
+
+message Trace {
+ repeated TracePacket packet = 1;
+}
+
+message TracePacket {
+ optional uint64 timestamp = 8;
+ oneof data {
+ TrackEvent track_event = 11;
+ TrackDescriptor track_descriptor = 60;
+ }
+ oneof optional_trusted_packet_sequence_id {
+ uint32 trusted_packet_sequence_id = 10;
+ }
+}
+
+message TrackDescriptor {
+ optional uint64 uuid = 1;
+ optional uint64 parent_uuid = 5;
+ optional string name = 2;
+ optional ProcessDescriptor process = 3;
+ optional ThreadDescriptor thread = 4;
+}
+
+message TrackEvent {
+ enum Type {
+ TYPE_UNSPECIFIED = 0;
+ TYPE_SLICE_BEGIN = 1;
+ TYPE_SLICE_END = 2;
+ TYPE_INSTANT = 3;
+ TYPE_COUNTER = 4;
+ }
+ required string name = 23;
+ optional Type type = 9;
+ optional uint64 track_uuid = 11;
+ repeated DebugAnnotation debug_annotations = 4;
+}
+
+message ProcessDescriptor {
+ optional int32 pid = 1;
+ optional string process_name = 6;
+ repeated string process_labels = 8;
+}
+
+message ThreadDescriptor {
+ optional int32 pid = 1;
+ optional int32 tid = 2;
+ optional string thread_name = 5;
+}
+
+message DebugAnnotation {
+ oneof name_field {
+ string name = 10;
+ }
+ oneof value {
+ bool bool_value = 2;
+ uint64 uint_value = 3;
+ int64 int_value = 4;
+ double double_value = 5;
+ string string_value = 6;
+ }
+ repeated DebugAnnotation dict_entries = 11;
+ repeated DebugAnnotation array_values = 12;
+}
diff --git a/avatar/metrics/trace.py b/avatar/metrics/trace.py
new file mode 100644
index 0000000..409633c
--- /dev/null
+++ b/avatar/metrics/trace.py
@@ -0,0 +1,284 @@
+# Copyright 2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Avatar metrics trace."""
+
+import time
+import types
+
+from avatar.metrics.trace_pb2 import (
+ DebugAnnotation,
+ ProcessDescriptor,
+ ThreadDescriptor,
+ Trace,
+ TracePacket,
+ TrackDescriptor,
+ TrackEvent,
+)
+from google.protobuf import any_pb2, message
+from mobly.base_test import BaseTestClass
+from typing import TYPE_CHECKING, Any, Dict, List, Protocol, Tuple, Union
+
+if TYPE_CHECKING:
+ from avatar import PandoraDevices
+ from avatar.pandora_client import PandoraClient
+else:
+ PandoraClient = object
+ PandoraDevices = object
+
+devices_id: Dict[PandoraClient, int] = {}
+devices_process_id: Dict[PandoraClient, int] = {}
+packets: List[TracePacket] = []
+genesis: int = time.monotonic_ns()
+id: int = 0
+
+
+def next_id() -> int:
+ global id
+ id += 1
+ return id
+
+
+def hook_test(test: BaseTestClass, devices: PandoraDevices) -> None:
+ global packets
+ original_teardown_class = test.teardown_class
+ original_setup_test = test.setup_test
+
+ def teardown_class(self: BaseTestClass) -> None:
+ output_path: str = test.current_test_info.output_path # type: ignore
+ trace = Trace(packet=packets)
+ with open(f"{output_path}/avatar.trace", "wb") as f:
+ f.write(trace.SerializeToString())
+ with open(f"{output_path}/packets.log", "a") as f:
+ for packet in packets:
+ f.write(f"{packet}")
+ f.write("----------\n")
+
+ original_teardown_class()
+
+ def setup_test(self: BaseTestClass) -> None:
+ global genesis
+ genesis = time.monotonic_ns()
+ process_id = next_id()
+ packets.append(
+ TracePacket(
+ track_descriptor=TrackDescriptor(
+ uuid=process_id,
+ process=ProcessDescriptor(
+ pid=process_id, process_name=f"{test.__class__.__name__}.{test.current_test_info.name}"
+ ),
+ )
+ )
+ )
+
+ for device in devices:
+ devices_process_id[device] = process_id
+ devices_id[device] = next_id()
+ descriptor = TrackDescriptor(
+ uuid=devices_id[device],
+ parent_uuid=process_id,
+ thread=ThreadDescriptor(thread_name=device.name, pid=process_id, tid=devices_id[device]),
+ )
+ packets.append(TracePacket(track_descriptor=descriptor))
+
+ original_setup_test()
+
+ test.teardown_class = types.MethodType(teardown_class, test)
+ test.setup_test = types.MethodType(setup_test, test)
+
+
+class AsTrace(Protocol):
+ def as_trace(self) -> TracePacket:
+ ...
+
+
+class Callsite(AsTrace):
+ id_counter = 0
+
+ @classmethod
+ def next_id(cls) -> int:
+ cls.id_counter += 1
+ return cls.id_counter
+
+ def __init__(self, device: PandoraClient, name: Union[bytes, str], message: Any) -> None:
+ self.at = time.monotonic_ns() - genesis
+ self.name = name if isinstance(name, str) else name.decode('utf-8')
+ self.device = device
+ self.message = message
+ self.events: List[CallEvent] = []
+ self.id = Callsite.next_id()
+
+ device.log.info(f"{self}")
+
+ def pretty(self) -> str:
+ name_pretty = self.name[1:].replace('/', '.')
+ if self.message is None:
+ return f"%{self.id} {name_pretty}"
+ message_pretty, _ = debug_message(self.message)
+ return f"{name_pretty}({message_pretty})"
+
+ def __str__(self) -> str:
+ return f"{str2color('╭──', self.id)} {self.pretty()}"
+
+ def output(self, message: Any) -> None:
+ self.events.append(CallOutput(self, message))
+
+ def input(self, message: Any) -> None:
+ self.events.append(CallInput(self, message))
+
+ def end(self, message: Any) -> None:
+ global packets
+ if self.device not in devices_id:
+ return
+ self.events.append(CallEnd(self, message))
+ packets.append(self.as_trace())
+ for event in self.events:
+ packets.append(event.as_trace())
+
+ def as_trace(self) -> TracePacket:
+ return TracePacket(
+ timestamp=self.at,
+ track_event=TrackEvent(
+ name=self.name,
+ type=TrackEvent.Type.TYPE_SLICE_BEGIN,
+ track_uuid=devices_id[self.device],
+ debug_annotations=None
+ if self.message is None
+ else [
+ DebugAnnotation(name=self.message.__class__.__name__, dict_entries=debug_message(self.message)[1])
+ ],
+ ),
+ trusted_packet_sequence_id=devices_process_id[self.device],
+ )
+
+
+class CallEvent(AsTrace):
+ def __init__(self, callsite: Callsite, message: Any) -> None:
+ self.at = time.monotonic_ns() - genesis
+ self.callsite = callsite
+ self.message = message
+
+ callsite.device.log.info(f"{self}")
+
+ def __str__(self) -> str:
+ return f"{str2color('╰──', self.callsite.id)} {self.stringify('──→')}"
+
+ def as_trace(self) -> TracePacket:
+ return TracePacket(
+ timestamp=self.at,
+ track_event=TrackEvent(
+ name=self.callsite.name,
+ type=TrackEvent.Type.TYPE_INSTANT,
+ track_uuid=devices_id[self.callsite.device],
+ debug_annotations=None
+ if self.message is None
+ else [
+ DebugAnnotation(name=self.message.__class__.__name__, dict_entries=debug_message(self.message)[1])
+ ],
+ ),
+ trusted_packet_sequence_id=devices_process_id[self.callsite.device],
+ )
+
+ def stringify(self, direction: str) -> str:
+ message_pretty = "" if self.message is None else debug_message(self.message)[0]
+ return (
+ str2color(f"[{(self.at - self.callsite.at) / 1000000000:.3f}s]", self.callsite.id)
+ + f" {self.callsite.pretty()} {str2color(direction, self.callsite.id)} ({message_pretty})"
+ )
+
+
+class CallOutput(CallEvent):
+ def __str__(self) -> str:
+ return f"{str2color('├──', self.callsite.id)} {self.stringify('──→')}"
+
+ def as_trace(self) -> TracePacket:
+ return super().as_trace()
+
+
+class CallInput(CallEvent):
+ def __str__(self) -> str:
+ return f"{str2color('├──', self.callsite.id)} {self.stringify('←──')}"
+
+ def as_trace(self) -> TracePacket:
+ return super().as_trace()
+
+
+class CallEnd(CallEvent):
+ def __str__(self) -> str:
+ return f"{str2color('╰──', self.callsite.id)} {self.stringify('──→')}"
+
+ def as_trace(self) -> TracePacket:
+ return TracePacket(
+ timestamp=self.at,
+ track_event=TrackEvent(
+ name=self.callsite.name,
+ type=TrackEvent.Type.TYPE_SLICE_END,
+ track_uuid=devices_id[self.callsite.device],
+ debug_annotations=None
+ if self.message is None
+ else [
+ DebugAnnotation(name=self.message.__class__.__name__, dict_entries=debug_message(self.message)[1])
+ ],
+ ),
+ trusted_packet_sequence_id=devices_process_id[self.callsite.device],
+ )
+
+
+def debug_value(v: Any) -> Tuple[Any, Dict[str, Any]]:
+ if isinstance(v, any_pb2.Any):
+ return '...', {'string_value': f'{v}'}
+ elif isinstance(v, message.Message):
+ json, entries = debug_message(v)
+ return json, {'dict_entries': entries}
+ elif isinstance(v, bytes):
+ return (v if len(v) < 16 else '...'), {'string_value': f'{v!r}'}
+ elif isinstance(v, bool):
+ return v, {'bool_value': v}
+ elif isinstance(v, int):
+ return v, {'int_value': v}
+ elif isinstance(v, float):
+ return v, {'double_value': v}
+ elif isinstance(v, str):
+ return v, {'string_value': v}
+ try:
+ return v, {'array_values': [DebugAnnotation(**debug_value(x)[1]) for x in v]} # type: ignore
+ except:
+ return v, {'string_value': f'{v}'}
+
+
+def debug_message(msg: message.Message) -> Tuple[Dict[str, Any], List[DebugAnnotation]]:
+ json: Dict[str, Any] = {}
+ dbga: List[DebugAnnotation] = []
+ for (f, v) in msg.ListFields():
+ if (
+ isinstance(v, bytes)
+ and len(v) == 6
+ and ('address' in f.name or (f.containing_oneof and 'address' in f.containing_oneof.name))
+ ):
+ addr = ':'.join([f'{x:02X}' for x in v])
+ json[f.name] = addr
+ dbga.append(DebugAnnotation(name=f.name, string_value=addr))
+ else:
+ json_entry, dbga_entry = debug_value(v)
+ json[f.name] = json_entry
+ dbga.append(DebugAnnotation(name=f.name, **dbga_entry))
+ return json, dbga
+
+
+def str2color(s: str, id: int) -> str:
+ CSI = "\x1b["
+ CSI_RESET = CSI + "0m"
+ CSI_BOLD = CSI + "1m"
+ color = ((id * 10) % (230 - 17)) + 17
+ return CSI + ("1;38;5;%dm" % color) + CSI_BOLD + s + CSI_RESET
diff --git a/avatar/metrics/trace_pb2.py b/avatar/metrics/trace_pb2.py
new file mode 100644
index 0000000..095c6e4
--- /dev/null
+++ b/avatar/metrics/trace_pb2.py
@@ -0,0 +1,43 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: trace.proto
+"""Generated protocol buffer code."""
+from google.protobuf import (
+ descriptor as _descriptor,
+ descriptor_pool as _descriptor_pool,
+ symbol_database as _symbol_database,
+)
+from google.protobuf.internal import builder as _builder
+
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
+ b'\n\x0btrace.proto\x12\x0fperfetto.protos\"5\n\x05Trace\x12,\n\x06packet\x18\x01 \x03(\x0b\x32\x1c.perfetto.protos.TracePacket\"\xe7\x01\n\x0bTracePacket\x12\x11\n\ttimestamp\x18\x08 \x01(\x04\x12\x32\n\x0btrack_event\x18\x0b \x01(\x0b\x32\x1b.perfetto.protos.TrackEventH\x00\x12<\n\x10track_descriptor\x18< \x01(\x0b\x32 .perfetto.protos.TrackDescriptorH\x00\x12$\n\x1atrusted_packet_sequence_id\x18\n \x01(\rH\x01\x42\x06\n\x04\x64\x61taB%\n#optional_trusted_packet_sequence_id\"\xaa\x01\n\x0fTrackDescriptor\x12\x0c\n\x04uuid\x18\x01 \x01(\x04\x12\x13\n\x0bparent_uuid\x18\x05 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x33\n\x07process\x18\x03 \x01(\x0b\x32\".perfetto.protos.ProcessDescriptor\x12\x31\n\x06thread\x18\x04 \x01(\x0b\x32!.perfetto.protos.ThreadDescriptor\"\x87\x02\n\nTrackEvent\x12\x0c\n\x04name\x18\x17 \x02(\t\x12.\n\x04type\x18\t \x01(\x0e\x32 .perfetto.protos.TrackEvent.Type\x12\x12\n\ntrack_uuid\x18\x0b \x01(\x04\x12;\n\x11\x64\x65\x62ug_annotations\x18\x04 \x03(\x0b\x32 .perfetto.protos.DebugAnnotation\"j\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x14\n\x10TYPE_SLICE_BEGIN\x10\x01\x12\x12\n\x0eTYPE_SLICE_END\x10\x02\x12\x10\n\x0cTYPE_INSTANT\x10\x03\x12\x10\n\x0cTYPE_COUNTER\x10\x04\"N\n\x11ProcessDescriptor\x12\x0b\n\x03pid\x18\x01 \x01(\x05\x12\x14\n\x0cprocess_name\x18\x06 \x01(\t\x12\x16\n\x0eprocess_labels\x18\x08 \x03(\t\"A\n\x10ThreadDescriptor\x12\x0b\n\x03pid\x18\x01 \x01(\x05\x12\x0b\n\x03tid\x18\x02 \x01(\x05\x12\x13\n\x0bthread_name\x18\x05 \x01(\t\"\x99\x02\n\x0f\x44\x65\x62ugAnnotation\x12\x0e\n\x04name\x18\n \x01(\tH\x00\x12\x14\n\nbool_value\x18\x02 \x01(\x08H\x01\x12\x14\n\nuint_value\x18\x03 \x01(\x04H\x01\x12\x13\n\tint_value\x18\x04 \x01(\x03H\x01\x12\x16\n\x0c\x64ouble_value\x18\x05 \x01(\x01H\x01\x12\x16\n\x0cstring_value\x18\x06 \x01(\tH\x01\x12\x36\n\x0c\x64ict_entries\x18\x0b \x03(\x0b\x32 .perfetto.protos.DebugAnnotation\x12\x36\n\x0c\x61rray_values\x18\x0c \x03(\x0b\x32 .perfetto.protos.DebugAnnotationB\x0c\n\nname_fieldB\x07\n\x05value'
+)
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'trace_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+
+ DESCRIPTOR._options = None
+ _globals['_TRACE']._serialized_start = 32
+ _globals['_TRACE']._serialized_end = 85
+ _globals['_TRACEPACKET']._serialized_start = 88
+ _globals['_TRACEPACKET']._serialized_end = 319
+ _globals['_TRACKDESCRIPTOR']._serialized_start = 322
+ _globals['_TRACKDESCRIPTOR']._serialized_end = 492
+ _globals['_TRACKEVENT']._serialized_start = 495
+ _globals['_TRACKEVENT']._serialized_end = 758
+ _globals['_TRACKEVENT_TYPE']._serialized_start = 652
+ _globals['_TRACKEVENT_TYPE']._serialized_end = 758
+ _globals['_PROCESSDESCRIPTOR']._serialized_start = 760
+ _globals['_PROCESSDESCRIPTOR']._serialized_end = 838
+ _globals['_THREADDESCRIPTOR']._serialized_start = 840
+ _globals['_THREADDESCRIPTOR']._serialized_end = 905
+ _globals['_DEBUGANNOTATION']._serialized_start = 908
+ _globals['_DEBUGANNOTATION']._serialized_end = 1189
+# @@protoc_insertion_point(module_scope)
diff --git a/avatar/metrics/trace_pb2.pyi b/avatar/metrics/trace_pb2.pyi
new file mode 100644
index 0000000..e8aff00
--- /dev/null
+++ b/avatar/metrics/trace_pb2.pyi
@@ -0,0 +1,147 @@
+from google.protobuf import descriptor as _descriptor, message as _message
+from google.protobuf.internal import containers as _containers, enum_type_wrapper as _enum_type_wrapper
+from typing import ClassVar as _ClassVar, Iterable as _Iterable, Optional as _Optional, Union as _Union
+
+DESCRIPTOR: _descriptor.FileDescriptor
+
+class Trace(_message.Message):
+ __slots__ = ["packet"]
+ PACKET_FIELD_NUMBER: _ClassVar[int]
+ packet: _containers.RepeatedCompositeFieldContainer[TracePacket]
+ def __init__(self, packet: _Optional[_Iterable[TracePacket]] = ...) -> None: ...
+
+class TracePacket(_message.Message):
+ __slots__ = ["timestamp", "track_event", "track_descriptor", "trusted_packet_sequence_id"]
+ TIMESTAMP_FIELD_NUMBER: _ClassVar[int]
+ TRACK_EVENT_FIELD_NUMBER: _ClassVar[int]
+ TRACK_DESCRIPTOR_FIELD_NUMBER: _ClassVar[int]
+ TRUSTED_PACKET_SEQUENCE_ID_FIELD_NUMBER: _ClassVar[int]
+ timestamp: int
+ track_event: TrackEvent
+ track_descriptor: TrackDescriptor
+ trusted_packet_sequence_id: int
+ def __init__(
+ self,
+ timestamp: _Optional[int] = ...,
+ track_event: _Optional[TrackEvent] = ...,
+ track_descriptor: _Optional[TrackDescriptor] = ...,
+ trusted_packet_sequence_id: _Optional[int] = ...,
+ ) -> None: ...
+
+class TrackDescriptor(_message.Message):
+ __slots__ = ["uuid", "parent_uuid", "name", "process", "thread"]
+ UUID_FIELD_NUMBER: _ClassVar[int]
+ PARENT_UUID_FIELD_NUMBER: _ClassVar[int]
+ NAME_FIELD_NUMBER: _ClassVar[int]
+ PROCESS_FIELD_NUMBER: _ClassVar[int]
+ THREAD_FIELD_NUMBER: _ClassVar[int]
+ uuid: int
+ parent_uuid: int
+ name: str
+ process: ProcessDescriptor
+ thread: ThreadDescriptor
+ def __init__(
+ self,
+ uuid: _Optional[int] = ...,
+ parent_uuid: _Optional[int] = ...,
+ name: _Optional[str] = ...,
+ process: _Optional[ProcessDescriptor] = ...,
+ thread: _Optional[ThreadDescriptor] = ...,
+ ) -> None: ...
+
+class TrackEvent(_message.Message):
+ __slots__ = ["name", "type", "track_uuid", "debug_annotations"]
+
+ class Type(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): # type: ignore
+ __slots__ = [] # type: ignore
+ TYPE_UNSPECIFIED: _ClassVar[TrackEvent.Type]
+ TYPE_SLICE_BEGIN: _ClassVar[TrackEvent.Type]
+ TYPE_SLICE_END: _ClassVar[TrackEvent.Type]
+ TYPE_INSTANT: _ClassVar[TrackEvent.Type]
+ TYPE_COUNTER: _ClassVar[TrackEvent.Type]
+ TYPE_UNSPECIFIED: TrackEvent.Type
+ TYPE_SLICE_BEGIN: TrackEvent.Type
+ TYPE_SLICE_END: TrackEvent.Type
+ TYPE_INSTANT: TrackEvent.Type
+ TYPE_COUNTER: TrackEvent.Type
+ NAME_FIELD_NUMBER: _ClassVar[int]
+ TYPE_FIELD_NUMBER: _ClassVar[int]
+ TRACK_UUID_FIELD_NUMBER: _ClassVar[int]
+ DEBUG_ANNOTATIONS_FIELD_NUMBER: _ClassVar[int]
+ name: str
+ type: TrackEvent.Type
+ track_uuid: int
+ debug_annotations: _containers.RepeatedCompositeFieldContainer[DebugAnnotation]
+ def __init__(
+ self,
+ name: _Optional[str] = ...,
+ type: _Optional[_Union[TrackEvent.Type, str]] = ...,
+ track_uuid: _Optional[int] = ...,
+ debug_annotations: _Optional[_Iterable[DebugAnnotation]] = ...,
+ ) -> None: ...
+
+class ProcessDescriptor(_message.Message):
+ __slots__ = ["pid", "process_name", "process_labels"]
+ PID_FIELD_NUMBER: _ClassVar[int]
+ PROCESS_NAME_FIELD_NUMBER: _ClassVar[int]
+ PROCESS_LABELS_FIELD_NUMBER: _ClassVar[int]
+ pid: int
+ process_name: str
+ process_labels: _containers.RepeatedScalarFieldContainer[str]
+ def __init__(
+ self,
+ pid: _Optional[int] = ...,
+ process_name: _Optional[str] = ...,
+ process_labels: _Optional[_Iterable[str]] = ...,
+ ) -> None: ...
+
+class ThreadDescriptor(_message.Message):
+ __slots__ = ["pid", "tid", "thread_name"]
+ PID_FIELD_NUMBER: _ClassVar[int]
+ TID_FIELD_NUMBER: _ClassVar[int]
+ THREAD_NAME_FIELD_NUMBER: _ClassVar[int]
+ pid: int
+ tid: int
+ thread_name: str
+ def __init__(
+ self, pid: _Optional[int] = ..., tid: _Optional[int] = ..., thread_name: _Optional[str] = ...
+ ) -> None: ...
+
+class DebugAnnotation(_message.Message):
+ __slots__ = [
+ "name",
+ "bool_value",
+ "uint_value",
+ "int_value",
+ "double_value",
+ "string_value",
+ "dict_entries",
+ "array_values",
+ ]
+ NAME_FIELD_NUMBER: _ClassVar[int]
+ BOOL_VALUE_FIELD_NUMBER: _ClassVar[int]
+ UINT_VALUE_FIELD_NUMBER: _ClassVar[int]
+ INT_VALUE_FIELD_NUMBER: _ClassVar[int]
+ DOUBLE_VALUE_FIELD_NUMBER: _ClassVar[int]
+ STRING_VALUE_FIELD_NUMBER: _ClassVar[int]
+ DICT_ENTRIES_FIELD_NUMBER: _ClassVar[int]
+ ARRAY_VALUES_FIELD_NUMBER: _ClassVar[int]
+ name: str
+ bool_value: bool
+ uint_value: int
+ int_value: int
+ double_value: float
+ string_value: str
+ dict_entries: _containers.RepeatedCompositeFieldContainer[DebugAnnotation]
+ array_values: _containers.RepeatedCompositeFieldContainer[DebugAnnotation]
+ def __init__(
+ self,
+ name: _Optional[str] = ...,
+ bool_value: bool = ...,
+ uint_value: _Optional[int] = ...,
+ int_value: _Optional[int] = ...,
+ double_value: _Optional[float] = ...,
+ string_value: _Optional[str] = ...,
+ dict_entries: _Optional[_Iterable[DebugAnnotation]] = ...,
+ array_values: _Optional[_Iterable[DebugAnnotation]] = ...,
+ ) -> None: ...
diff --git a/avatar/pandora_client.py b/avatar/pandora_client.py
index dab098c..60689aa 100644
--- a/avatar/pandora_client.py
+++ b/avatar/pandora_client.py
@@ -23,6 +23,7 @@ import grpc
import grpc.aio
import logging
+from avatar.metrics.interceptors import aio_interceptors, interceptors
from bumble import pandora as bumble_server
from bumble.hci import Address as BumbleAddress
from bumble.pandora.device import PandoraDevice as BumblePandoraDevice
@@ -75,7 +76,7 @@ class PandoraClient:
self.name = name
self.grpc_target = grpc_target
self.log = PandoraClientLoggerAdapter(logging.getLogger(), {'client': self})
- self._channel = grpc.insecure_channel(grpc_target) # type: ignore
+ self._channel = grpc.intercept_channel(grpc.insecure_channel(grpc_target), *interceptors(self)) # type: ignore
self._address = Address(b'\x00\x00\x00\x00\x00\x00')
self._aio = None
@@ -169,7 +170,9 @@ class PandoraClient:
@property
def aio(self) -> 'PandoraClient.Aio':
if not self._aio:
- self._aio = PandoraClient.Aio(grpc.aio.insecure_channel(self.grpc_target))
+ self._aio = PandoraClient.Aio(
+ grpc.aio.insecure_channel(self.grpc_target, interceptors=aio_interceptors(self))
+ )
return self._aio
@@ -181,7 +184,7 @@ class PandoraClientLoggerAdapter(logging.LoggerAdapter): # type: ignore
client = self.extra['client']
assert isinstance(client, PandoraClient)
addr = ':'.join([f'{x:02X}' for x in client.address[4:]])
- return (f'[{client.name}:{addr}] {msg}', kwargs)
+ return (f'[{client.name:<8}:{addr}] {msg}', kwargs)
class BumblePandoraClient(PandoraClient):
diff --git a/avatar/pandora_server.py b/avatar/pandora_server.py
index 4fd56fb..be5e8b2 100644
--- a/avatar/pandora_server.py
+++ b/avatar/pandora_server.py
@@ -19,7 +19,11 @@ import asyncio
import avatar.aio
import grpc
import grpc.aio
+import logging
+import os
import portpicker
+import re
+import shlex
import threading
import types
@@ -110,6 +114,8 @@ class AndroidPandoraServer(PandoraServer[AndroidDevice]):
_instrumentation: Optional[threading.Thread] = None
_port: int
+ _logger: logging.Logger
+ _handler: logging.Handler
def start(self) -> PandoraClient:
"""Sets up and starts the Pandora server on the Android device."""
@@ -130,6 +136,31 @@ class AndroidPandoraServer(PandoraServer[AndroidDevice]):
self._instrumentation.start()
self.device.adb.forward([f'tcp:{self._port}', f'tcp:{ANDROID_SERVER_GRPC_PORT}']) # type: ignore
+ # Forward all logging to ADB logs
+ adb = self.device.adb
+
+ class AdbLoggingHandler(logging.Handler):
+ def emit(self, record: logging.LogRecord) -> None:
+ if record.levelno <= logging.DEBUG:
+ return
+ ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+ msg = self.format(record)
+ msg = ansi_escape.sub('', msg)
+ level = {
+ logging.FATAL: 'f',
+ logging.ERROR: 'e',
+ logging.WARN: 'w',
+ logging.INFO: 'i',
+ logging.DEBUG: 'd',
+ logging.NOTSET: 'd',
+ }[record.levelno]
+ for msg in msg.splitlines():
+ os.system(f'adb -s {adb.serial} shell "log -t Avatar -p {level} {shlex.quote(msg)}"')
+
+ self._logger = logging.getLogger()
+ self._handler = AdbLoggingHandler()
+ self._logger.addHandler(self._handler)
+
return PandoraClient(f'localhost:{self._port}', 'android')
def stop(self) -> None:
@@ -141,6 +172,9 @@ class AndroidPandoraServer(PandoraServer[AndroidDevice]):
'shell', f'am force-stop {ANDROID_SERVER_PACKAGE}', shell=False, timeout=None, stderr=None
)
+ # Remove ADB logging handler
+ self._logger.removeHandler(self._handler)
+
self.device.adb.forward(['--remove', f'tcp:{self._port}']) # type: ignore
self._instrumentation.join()
self._instrumentation = None
diff --git a/cases/config.yml b/cases/config.yml
index 3682981..a5451cd 100644
--- a/cases/config.yml
+++ b/cases/config.yml
@@ -10,6 +10,6 @@ TestBeds:
- Name: bumble.bumbles
Controllers:
BumbleDevice:
- - transport: 'tcp-client:127.0.0.1:7300'
- - transport: 'tcp-client:127.0.0.1:7300'
- - transport: 'tcp-client:127.0.0.1:7300'
+ - transport: 'tcp-client:127.0.0.1:6402'
+ - transport: 'tcp-client:127.0.0.1:6402'
+ - transport: 'tcp-client:127.0.0.1:6402'
diff --git a/cases/le_security_test.py b/cases/le_security_test.py
index 9305c9f..9c836b2 100644
--- a/cases/le_security_test.py
+++ b/cases/le_security_test.py
@@ -24,7 +24,7 @@ from mobly.asserts import assert_equal # type: ignore
from mobly.asserts import assert_in # type: ignore
from mobly.asserts import assert_is_not_none # type: ignore
from mobly.asserts import fail # type: ignore
-from pandora.host_pb2 import PUBLIC, RANDOM, DataTypes, OwnAddressType, Connection
+from pandora.host_pb2 import PUBLIC, RANDOM, Connection, DataTypes, OwnAddressType
from pandora.security_pb2 import LE_LEVEL3, PairingEventAnswer, SecureResponse, WaitSecurityResponse
from typing import Any, Literal, Optional, Tuple, Union
@@ -76,13 +76,12 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
),
)
) # type: ignore[misc]
-
@avatar.asynchronous
async def test_le_pairing(
self,
connect: Union[Literal['outgoing_connection'], Literal['incoming_connection']],
pair: Union[Literal['outgoing_pairing'], Literal['incoming_pairing']],
- ref_address_type: Union[Literal['against_random'], Literal['against_public']],
+ ref_address_type_name: Union[Literal['against_random'], Literal['against_public']],
variant: Union[
Literal['accept'],
Literal['reject'],
@@ -102,15 +101,11 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
if self.dut.name == 'android' and connect == 'outgoing_connection' and pair == 'incoming_pairing':
# TODO: do not skip when doing physical tests.
- raise signals.TestSkip(
- 'TODO: Yet to implement the test cases:\n'
- )
+ raise signals.TestSkip('TODO: Yet to implement the test cases:\n')
if self.dut.name == 'android' and connect == 'incoming_connection' and pair == 'outgoing_pairing':
# TODO: do not skip when doing physical tests.
- raise signals.TestSkip(
- 'TODO: Yet to implement the test cases:\n'
- )
+ raise signals.TestSkip('TODO: Yet to implement the test cases:\n')
if self.dut.name == 'android' and 'disconnect' in variant:
raise signals.TestSkip(
@@ -119,10 +114,11 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
+ '- When disconnected the `Secure/WaitSecurity` never returns.'
)
+ if self.ref.name == 'android' and ref_address_type_name == 'against_public':
+ raise signals.TestSkip('Android does not support PUBLIC address type.')
+
if 'reject' in variant or 'rejected' in variant:
- raise signals.TestSkip(
- 'TODO: Currently these scnearios are not working. Working on them.'
- )
+ raise signals.TestSkip('TODO: Currently these scnearios are not working. Working on them.')
if isinstance(self.ref, BumblePandoraDevice) and ref_io_capability == 'against_default_io_cap':
raise signals.TestSkip('Skip default IO cap for Bumble REF.')
@@ -146,9 +142,9 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
dut_address_type = RANDOM
ref_address_type = {
- 'against_random' : RANDOM,
- 'against_public' : PUBLIC,
- }[ref_address_type]
+ 'against_random': RANDOM,
+ 'against_public': PUBLIC,
+ }[ref_address_type_name]
# Pandora connection tokens
ref_dut, dut_ref = None, None
@@ -160,28 +156,32 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
# Make LE connection task.
async def connect_le(
- initiator: PandoraDevice, acceptor: PandoraDevice,
- initiator_addr_type: OwnAddressType, acceptor_addr_type: OwnAddressType
+ initiator: PandoraDevice,
+ acceptor: PandoraDevice,
+ initiator_addr_type: OwnAddressType,
+ acceptor_addr_type: OwnAddressType,
) -> Tuple[Connection, Connection]:
- #Acceptor - Advertise
+ # Acceptor - Advertise
advertisement = acceptor.aio.host.Advertise(
- legacy=True,
- connectable=True,
- own_address_type=acceptor_addr_type,
- data=DataTypes(manufacturer_specific_data=b'pause cafe'),
+ legacy=True,
+ connectable=True,
+ own_address_type=acceptor_addr_type,
+ data=DataTypes(manufacturer_specific_data=b'pause cafe'),
)
- #Initiator - Scan and fetch the address
+ # Initiator - Scan and fetch the address
scan = initiator.aio.host.Scan(own_address_type=initiator_addr_type)
acceptor_addr = await anext(
(x async for x in scan if b'pause cafe' in x.data.manufacturer_specific_data)
) # pytype: disable=name-error
scan.cancel()
- #Initiator - LE connect
+ # Initiator - LE connect
init_res, wait_res = await asyncio.gather(
- initiator.aio.host.ConnectLE(own_address_type=initiator_addr_type, **acceptor_addr.address_asdict()),
+ initiator.aio.host.ConnectLE(
+ own_address_type=initiator_addr_type, **acceptor_addr.address_asdict()
+ ),
anext(aiter(advertisement)), # pytype: disable=name-error
)
@@ -193,10 +193,10 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
# Make LE connection.
if connect == 'incoming_connection':
- #DUT is acceptor
+ # DUT is acceptor
ref_dut, dut_ref = await connect_le(self.ref, self.dut, ref_address_type, dut_address_type)
else:
- #DUT is initiator
+ # DUT is initiator
dut_ref, ref_dut = await connect_le(self.dut, self.ref, dut_address_type, ref_address_type)
# Pairing.
@@ -206,7 +206,7 @@ class LeSecurityTest(base_test.BaseTestClass): # type: ignore[misc]
self.ref.aio.security.Secure(connection=ref_dut, le=LE_LEVEL3),
self.dut.aio.security.WaitSecurity(connection=dut_ref, le=LE_LEVEL3),
)
- #Outgoing pairing
+ # Outgoing pairing
return await asyncio.gather(
self.dut.aio.security.Secure(connection=dut_ref, le=LE_LEVEL3),
self.ref.aio.security.WaitSecurity(connection=ref_dut, le=LE_LEVEL3),
diff --git a/cases/security_test.py b/cases/security_test.py
index c52d785..4d23b0c 100644
--- a/cases/security_test.py
+++ b/cases/security_test.py
@@ -130,6 +130,12 @@ class SecurityTest(base_test.BaseTestClass): # type: ignore[misc]
+ 'report the encryption state the with the bonding state'
)
+ if self.ref.name == 'android':
+ raise signals.TestSkip(
+ 'TODO: (add bug number) Fix core stack:\n'
+ + 'BOND_BONDED event is triggered before the encryption changed'
+ )
+
if isinstance(self.ref, BumblePandoraDevice) and ref_io_capability == 'against_default_io_cap':
raise signals.TestSkip('Skip default IO cap for Bumble REF.')
diff --git a/pyproject.toml b/pyproject.toml
index 789f629..cba7d27 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -11,6 +11,7 @@ classifiers = [
dependencies = [
"bt-test-interfaces",
"bumble==0.0.154",
+ "protobuf>=4.22.0",
"grpcio>=1.51.1",
"mobly>=1.12",
"portpicker>=1.5.2",
@@ -56,6 +57,10 @@ module = "grpc.*"
ignore_missing_imports = true
[[tool.mypy.overrides]]
+module = "google.protobuf.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
module = "mobly.*"
ignore_missing_imports = true
@@ -65,7 +70,7 @@ ignore_missing_imports = true
[tool.pyright]
include = ["avatar", "cases"]
-exclude = ["**/__pycache__"]
+exclude = ["**/__pycache__", "**/*_pb2.py"]
typeCheckingMode = "strict"
useLibraryCodeForTypes = true
verboseOutput = false