diff options
-rw-r--r-- | mmi2grpc/__init__.py | 34 | ||||
-rw-r--r-- | mmi2grpc/_description.py | 25 | ||||
-rw-r--r-- | mmi2grpc/_proxy.py | 12 | ||||
-rw-r--r-- | mmi2grpc/a2dp.py | 30 | ||||
-rw-r--r-- | requirements.txt | 5 |
5 files changed, 74 insertions, 32 deletions
diff --git a/mmi2grpc/__init__.py b/mmi2grpc/__init__.py index 194676a..953d814 100644 --- a/mmi2grpc/__init__.py +++ b/mmi2grpc/__init__.py @@ -1,21 +1,19 @@ -# Copyright 2022 Google LLC - from typing import List import grpc import time import sys -import textwrap - -from .a2dp import A2DPProxy from blueberry.host_grpc import Host +from .a2dp import A2DPProxy +from ._description import format_proxy + GRPC_PORT = 8999 class IUT: - def __init__(self, test: str, args: List[str], - port: int = GRPC_PORT, **kwargs): + def __init__( + self, test: str, args: List[str], port: int = GRPC_PORT, **kwargs): self.a2dp_ = None self.address_ = None self.port = port @@ -32,11 +30,13 @@ class IUT: def address(self) -> bytes: with grpc.insecure_channel(f'localhost:{self.port}') as channel: try: - return Host(channel).ReadLocalAddress(wait_for_ready=True).address + return Host(channel).ReadLocalAddress( + wait_for_ready=True).address except grpc.RpcError: print('Retry') time.sleep(5) - return Host(channel).ReadLocalAddress(wait_for_ready=True).address + return Host(channel).ReadLocalAddress( + wait_for_ready=True).address def interact(self, pts_address: bytes, @@ -49,5 +49,17 @@ class IUT: print(f'{profile} mmi: {interaction}', file=sys.stderr) if profile in ('A2DP', 'AVDTP'): if not self.a2dp_: - self.a2dp_ = A2DPProxy(grpc.insecure_channel(f'localhost:{self.port}')) - return self.a2dp_.interact(interaction, test, description, pts_address) + self.a2dp_ = A2DPProxy( + grpc.insecure_channel(f'localhost:{self.port}')) + return self.a2dp_.interact( + interaction, test, description, pts_address) + + code = format_proxy(profile, interaction, description) + error_msg = ( + f'Missing {profile} proxy and mmi: {interaction}\n' + f'Create a {profile.lower()}.py in mmi2grpc/:\n\n{code}\n' + f'Then, instantiate the corresponding proxy in __init__.py\n' + f'Finally, create a {profile.lower()}.proto in proto/blueberry/' + f'and generate the corresponding interface.' + ) + assert False, error_msg diff --git a/mmi2grpc/_description.py b/mmi2grpc/_description.py index a6fa3dd..0a3c574 100644 --- a/mmi2grpc/_description.py +++ b/mmi2grpc/_description.py @@ -20,7 +20,8 @@ def assert_description(f): test = unittest.TestCase() test.maxDiff = None test.assertMultiLineEqual( - docstring.strip(), description.strip(), + docstring.strip(), + description.strip(), f'description does not match with function docstring of {f.__name__}') return f(*args, **kwargs) @@ -28,8 +29,8 @@ def assert_description(f): def format_function(id, description): - wrapped = textwrap.fill(description, COMMENT_WIDTH, - replace_whitespace=False) + wrapped = textwrap.fill( + description, COMMENT_WIDTH, replace_whitespace=False) return ( f'@assert_description\n' f'def {id}(self, **kwargs):\n' @@ -39,3 +40,21 @@ def format_function(id, description): f'\n' f' return "OK"\n' ) + + +def format_proxy(profile, id, description): + return ( + f'from ._description import assert_description\n' + f'from ._proxy import ProfileProxy\n' + f'\n' + f'from blueberry.{profile.lower()}_grpc import {profile}\n' + f'\n' + f'\n' + f'class {profile}Proxy(ProfileProxy):\n' + f'\n' + f' def __init__(self, channel):\n' + f' super().__init__()\n' + f' self.{profile.lower()} = {profile}(channel)\n' + f'\n' + f'{textwrap.indent(format_function(id, description), " ")}' + ) diff --git a/mmi2grpc/_proxy.py b/mmi2grpc/_proxy.py new file mode 100644 index 0000000..f5065a3 --- /dev/null +++ b/mmi2grpc/_proxy.py @@ -0,0 +1,12 @@ +from mmi2grpc._description import format_function + + +class ProfileProxy: + + def interact(self, id: str, test: str, description: str, pts_addr: bytes): + try: + return getattr(self, id)( + test=test, description=description, pts_addr=pts_addr) + except AttributeError: + code = format_function(id, description) + assert False, f'Unhandled mmi {id}\n{code}' diff --git a/mmi2grpc/a2dp.py b/mmi2grpc/a2dp.py index 6f3ba7c..34f187c 100644 --- a/mmi2grpc/a2dp.py +++ b/mmi2grpc/a2dp.py @@ -1,28 +1,27 @@ import time -import os -import textwrap from typing import Optional -import grpc - from blueberry.a2dp_grpc import A2DP from blueberry.host_grpc import Host from blueberry.a2dp_pb2 import Sink, Source, PlaybackAudioRequest from blueberry.host_pb2 import Connection -from ._description import assert_description, format_function from ._audio import AudioSignal +from ._description import assert_description +from ._proxy import ProfileProxy AUDIO_AMPLITUDE = 0.8 -class A2DPProxy: +class A2DPProxy(ProfileProxy): connection: Optional[Connection] = None sink: Optional[Sink] = None source: Optional[Source] = None def __init__(self, channel): + super().__init__() + self.host = Host(channel) self.a2dp = A2DP(channel) @@ -34,15 +33,9 @@ class A2DPProxy: 44100 ) - def interact(self, id: str, test: str, description: str, pts_addr: bytes): - try: - return getattr(self, id)(test=test, description=description, pts_addr=pts_addr) - except AttributeError: - code = format_function(id, description) - assert False, f'Unhandled mmi {id}\n{code}' - @assert_description - def TSC_AVDTP_mmi_iut_accept_connect(self, test: str, pts_addr: bytes, **kwargs): + def TSC_AVDTP_mmi_iut_accept_connect( + self, test: str, pts_addr: bytes, **kwargs): """ If necessary, take action to accept the AVDTP Signaling Channel Connection initiated by the tester. @@ -137,7 +130,8 @@ class A2DPProxy: return "OK" @assert_description - def TSC_AVDTP_mmi_iut_initiate_out_of_range(self, pts_addr: bytes, **kwargs): + def TSC_AVDTP_mmi_iut_initiate_out_of_range( + self, pts_addr: bytes, **kwargs): """ Move the IUT out of range to create a link loss scenario. @@ -165,8 +159,10 @@ class A2DPProxy: if test == "AVDTP/SRC/ACP/SIG/SMG/BI-29-C": time.sleep(2) # TODO: Remove, AVRCP SegFault - if test in ("A2DP/SRC/CC/BV-09-I", "A2DP/SRC/SET/BV-04-I", - "AVDTP/SRC/ACP/SIG/SMG/BV-18-C", "AVDTP/SRC/ACP/SIG/SMG/BV-20-C", + if test in ("A2DP/SRC/CC/BV-09-I", + "A2DP/SRC/SET/BV-04-I", + "AVDTP/SRC/ACP/SIG/SMG/BV-18-C", + "AVDTP/SRC/ACP/SIG/SMG/BV-20-C", "AVDTP/SRC/ACP/SIG/SMG/BV-22-C"): time.sleep(1) # TODO: Remove, AVRCP SegFault if test == "A2DP/SRC/SUS/BV-01-I": diff --git a/requirements.txt b/requirements.txt index 31854ce..7b45d0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,6 @@ -protobuf==3.18.1 grpcio==1.41.0 grpcio-tools==1.41.0 +numpy==1.22.3 +protobuf==3.18.1 +scipy==1.8.0 +six==1.16.0 |