diff options
Diffstat (limited to 'mmi2grpc/a2dp.py')
-rw-r--r-- | mmi2grpc/a2dp.py | 649 |
1 files changed, 560 insertions, 89 deletions
diff --git a/mmi2grpc/a2dp.py b/mmi2grpc/a2dp.py index 60a55fd..6f3ba7c 100644 --- a/mmi2grpc/a2dp.py +++ b/mmi2grpc/a2dp.py @@ -1,101 +1,572 @@ +import time +import os +import textwrap from typing import Optional -from grpc import Channel +import grpc from blueberry.a2dp_grpc import A2DP from blueberry.host_grpc import Host -from blueberry.a2dp_pb2 import Sink, Source +from blueberry.a2dp_pb2 import Sink, Source, PlaybackAudioRequest from blueberry.host_pb2 import Connection -_connection: Optional[Connection] = None -_sink: Optional[Sink] = None -_source: Optional[Source] = None - -def _ensure_connection(host, addr): - global _connection - if not _connection: - _connection = host.GetConnection(address=addr).connection - -def _ensure_sink_open(host, a2dp, addr): - global _connection, _sink, _source - _ensure_connection(host, addr) - if not _sink: - _sink = a2dp.OpenSink(connection=_connection).sink - -def _ensure_source_open(host, a2dp, addr): - global _connection, _source - _ensure_connection(host, addr) - if not _source: - _source = a2dp.OpenSource(connection=_connection).source - -def interact(channel: Channel, interaction_id: str, test: str, pts_addr: bytes): - global _connection, _sink, _source - a2dp = A2DP(channel) - host = Host(channel) - if interaction_id == "TSC_AVDTP_mmi_iut_accept_connect": - host.SetConnectable(connectable=True) - elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_start": - _ensure_connection(host, pts_addr) - if "SNK" in test: - _ensure_sink_open(host, a2dp, pts_addr) - a2dp.Start(sink=_sink) - if "SRC" in test: - _ensure_source_open(host, a2dp, pts_addr) - a2dp.Close(source=_source) - elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_out_of_range": - _ensure_connection(host, pts_addr) - host.Disconnect(connection=_connection) - _connection = None - _sink = None - _source = None - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_discover": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_set_configuration": - _connection = host.Connect(address=pts_addr).connection +from ._description import assert_description, format_function +from ._audio import AudioSignal + +AUDIO_AMPLITUDE = 0.8 + + +class A2DPProxy: + connection: Optional[Connection] = None + sink: Optional[Sink] = None + source: Optional[Source] = None + + def __init__(self, channel): + self.host = Host(channel) + self.a2dp = A2DP(channel) + + def convert_frame(data): return PlaybackAudioRequest( + data=data, source=self.source) + self.audio = AudioSignal( + lambda frames: self.a2dp.PlaybackAudio(map(convert_frame, frames)), + AUDIO_AMPLITUDE, + 44100 + ) + + def interact(self, id: str, test: str, description: str, pts_addr: bytes): + try: + return getattr(self, id)(test=test, description=description, pts_addr=pts_addr) + except AttributeError: + code = format_function(id, description) + assert False, f'Unhandled mmi {id}\n{code}' + + @assert_description + def TSC_AVDTP_mmi_iut_accept_connect(self, test: str, pts_addr: bytes, **kwargs): + """ + If necessary, take action to accept the AVDTP Signaling Channel + Connection initiated by the tester. + + Description: Make sure the IUT + (Implementation Under Test) is in a state to accept incoming Bluetooth + connections. Some devices may need to be on a specific screen, like a + Bluetooth settings screen, in order to pair with PTS. If the IUT is + still having problems pairing with PTS, try running a test case where + the IUT connects to PTS to establish pairing. + """ + if "SRC" in test: - _source = a2dp.OpenSource(connection=_connection).source - if "SNK" in test: - _sink = a2dp.OpenSink(connection=_connection).sink - elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_open_stream": - _ensure_connection(host, pts_addr) - if "SNK" in test: - _sink = a2dp.OpenSink(connection=_connection).sink + self.connection = self.host.WaitConnection( + address=pts_addr).connection + try: + if "INT" in test: + self.source = self.a2dp.OpenSource( + connection=self.connection).source + else: + self.source = self.a2dp.WaitSource( + connection=self.connection).source + except: + pass + else: + self.connection = self.host.WaitConnection( + address=pts_addr).connection + try: + self.sink = self.a2dp.WaitSink( + connection=self.connection).sink + except: + pass + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_discover(self, **kwargs): + """ + Send a discover command to PTS. + + Action: If the IUT (Implementation + Under Test) is already connected to PTS, attempting to send or receive + streaming media should trigger this action. If the IUT is not connected + to PTS, attempting to connect may trigger this action. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_start(self, test: str, **kwargs): + """ + Send a start command to PTS. + + Action: If the IUT (Implementation Under + Test) is already connected to PTS, attempting to send or receive + streaming media should trigger this action. If the IUT is not connected + to PTS, attempting to connect may trigger this action. + """ + if "SRC" in test: - _source = a2dp.OpenSource(connection=_connection).source - elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_close_stream": - _ensure_connection(host, pts_addr) - if "SNK" in test: - _ensure_sink_open(host, a2dp, pts_addr) - a2dp.Close(sink=_sink) - _sink = None + self.a2dp.Start(source=self.source) + else: + self.a2dp.Start(sink=self.sink) + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_suspend(self, test: str, **kwargs): + """ + Suspend the streaming channel. + """ + if "SRC" in test: - _ensure_source_open(host, a2dp, pts_addr) - a2dp.Close(source=_source) - _source = None - elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_suspend": - _ensure_connection(host, pts_addr) - if "SNK" in test: - _ensure_sink_open(host, a2dp, pts_addr) - a2dp.Suspend(sink=_sink) + self.a2dp.Suspend(source=self.source) + else: + assert False + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_close_stream(self, test: str, **kwargs): + """ + Close the streaming channel. + + Action: Disconnect the streaming channel, + or close the Bluetooth connection to the PTS. + """ + if "SRC" in test: - _ensure_source_open(host, a2dp, pts_addr) - a2dp.suspend(source=_source) - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_close_stream": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_get_capabilities": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_set_configuration": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_open_stream": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_start": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_confirm_streaming": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_reconnect": - pass - elif interaction_id == "TSC_AVDTP_mmi_iut_accept_suspend": - pass - else: - print(f'MMI NOT IMPLEMENTED: {interaction_id}') + self.a2dp.Close(source=self.source) + self.source = None + else: + self.a2dp.Close(sink=self.sink) + self.sink = None + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_out_of_range(self, pts_addr: bytes, **kwargs): + """ + Move the IUT out of range to create a link loss scenario. + + Action: This + can be also be done by placing the IUT or PTS in an RF shielded box. + """ + + if self.connection is None: + self.connection = self.host.GetConnection( + address=pts_addr).connection + self.host.Disconnect(connection=self.connection) + self.connection = None + self.sink = None + self.source = None + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_begin_streaming(self, test: str, **kwargs): + """ + Begin streaming media ... + + Note: If the IUT has suspended the stream + please restart the stream to begin streaming media. + """ + + if test == "AVDTP/SRC/ACP/SIG/SMG/BI-29-C": + time.sleep(2) # TODO: Remove, AVRCP SegFault + if test in ("A2DP/SRC/CC/BV-09-I", "A2DP/SRC/SET/BV-04-I", + "AVDTP/SRC/ACP/SIG/SMG/BV-18-C", "AVDTP/SRC/ACP/SIG/SMG/BV-20-C", + "AVDTP/SRC/ACP/SIG/SMG/BV-22-C"): + time.sleep(1) # TODO: Remove, AVRCP SegFault + if test == "A2DP/SRC/SUS/BV-01-I": + # Stream is not suspended when we receive the interaction + time.sleep(1) + + self.a2dp.Start(source=self.source) + self.audio.start() + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_media(self, **kwargs): + """ + Take action if necessary to start streaming media to the tester. + """ + + self.audio.start() + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_stream_media(self, **kwargs): + """ + Stream media to PTS. If the IUT is a SNK, wait for PTS to start + streaming media. + + Action: If the IUT (Implementation Under Test) is + already connected to PTS, attempting to send or receive streaming media + should trigger this action. If the IUT is not connected to PTS, + attempting to connect may trigger this action. + """ + + self.audio.start() + return "OK" + + @assert_description + def TSC_AVDTP_mmi_user_verify_media_playback(self, **kwargs): + """ + Is the test system properly playing back the media being sent by the + IUT? + """ + + result = self.audio.verify() + assert result + + return "Yes" if result else "No" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_get_capabilities(self, **kwargs): + """ + Send a get capabilities command to PTS. + + Action: If the IUT + (Implementation Under Test) is already connected to PTS, attempting to + send or receive streaming media should trigger this action. If the IUT + is not connected to PTS, attempting to connect may trigger this action. + """ + + # This will be done as part as the a2dp.OpenSource or a2dp.WaitSource + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_discover(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Discover operation + initiated by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_set_configuration(self, **kwargs): + """ + Send a set configuration command to PTS. + + Action: If the IUT + (Implementation Under Test) is already connected to PTS, attempting to + send or receive streaming media should trigger this action. If the IUT + is not connected to PTS, attempting to connect may trigger this action. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_close_stream(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Close operation initiated + by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_abort(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Abort operation initiated + by the tester.. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_get_all_capabilities(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Get All Capabilities + operation initiated by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_get_capabilities(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Get Capabilities operation + initiated by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_set_configuration(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Set Configuration + operation initiated by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_get_configuration(self, **kwargs): + """ + Take action to accept the AVDTP Get Configuration command from the + tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_open_stream(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Open operation initiated + by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_start(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Start operation initiated + by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_suspend(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Suspend operation + initiated by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_reconfigure(self, **kwargs): + """ + If necessary, take action to accept the AVDTP Reconfigure operation + initiated by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_media_transports(self, **kwargs): + """ + Take action to accept transport channels for the recently configured + media stream. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_confirm_streaming(self, **kwargs): + """ + Is the IUT (Implementation Under Test) receiving streaming media from + PTS? + + Action: Press 'Yes' if the IUT is receiving streaming data from + the PTS (in some cases the sound may not be clear, this is normal). + """ + + # TODO: verify + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_open_stream(self, **kwargs): + """ + Open a streaming media channel. + + Action: If the IUT (Implementation + Under Test) is already connected to PTS, attempting to send or receive + streaming media should trigger this action. If the IUT is not connected + to PTS, attempting to connect may trigger this action. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_accept_reconnect(self, pts_addr: bytes, **kwargs): + """ + Press OK when the IUT (Implementation Under Test) is ready to allow the + PTS to reconnect the AVDTP signaling channel. + + Action: Press OK when the + IUT is ready to accept Bluetooth connections again. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_iut_initiate_get_all_capabilities(self, **kwargs): + """ + Send a GET ALL CAPABILITIES command to PTS. + + Action: If the IUT + (Implementation Under Test) is already connected to PTS, attempting to + send or receive streaming media should trigger this action. If the IUT + is not connected to PTS, attempting to connect may trigger this action. + """ + + return "OK" + + @assert_description + def TSC_AVDTP_mmi_tester_verifying_suspend(self, **kwargs): + """ + Please wait while the tester verifies the IUT does not send media during + suspend ... + """ + + return "Yes" + + @assert_description + def TSC_A2DP_mmi_user_confirm_optional_data_attribute(self, **kwargs): + """ + Tester found the optional SDP attribute named 'Supported Features'. + Press 'Yes' if the data displayed below is correct. + + Value: 0x0001 + """ + + # TODO: Extract and verify attribute name and value from description + return "OK" + + @assert_description + def TSC_A2DP_mmi_user_confirm_optional_string_attribute(self, **kwargs): + """ + Tester found the optional SDP attribute named 'Service Name'. Press + 'Yes' if the string displayed below is correct. + + Value: Advanced Audio + Source + """ + + # TODO: Extract and verify attribute name and value from description + return "OK" + + @assert_description + def TSC_A2DP_mmi_user_confirm_no_optional_attribute_support(self, **kwargs): + """ + Tester could not find the optional SDP attribute named 'Provider Name'. + Is this correct? + """ + + # TODO: Extract and verify attribute name from description + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_accept_delayreport(self, **kwargs): + """ + Take action if necessary to accept the Delay Reportl command from the + tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_initiate_media_transport_connect(self, **kwargs): + """ + Take action to initiate an AVDTP media transport. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_user_confirm_SIG_SMG_BV_28_C(self, **kwargs): + """ + Were all the service capabilities reported to the upper tester valid? + """ + + # TODO: verify + return "Yes" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_invalid_command(self, **kwargs): + """ + Take action to reject the invalid command sent by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_open(self, **kwargs): + """ + Take action to reject the invalid OPEN command sent by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_start(self, **kwargs): + """ + Take action to reject the invalid START command sent by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_suspend(self, **kwargs): + """ + Take action to reject the invalid SUSPEND command sent by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_reconfigure(self, **kwargs): + """ + Take action to reject the invalid or incompatible RECONFIGURE command + sent by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_get_all_capabilities(self, **kwargs): + """ + Take action to reject the invalid GET ALL CAPABILITIES command with the + error code BAD_LENGTH. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_get_capabilities(self, **kwargs): + """ + Take action to reject the invalid GET CAPABILITIES command with the + error code BAD_LENGTH. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_set_configuration(self, **kwargs): + """ + Take action to reject the SET CONFIGURATION sent by the tester. The IUT + is expected to respond with SEP_IN_USE because the SEP requested was + previously configured. + """ + + return "OK" + + def TSC_AVDTPEX_mmi_iut_reject_get_configuration(self, **kwargs): + """ + Take action to reject the GET CONFIGURATION sent by the tester. The IUT + is expected to respond with BAD_ACP_SEID because the SEID requested was + not previously configured. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_iut_reject_close(self, **kwargs): + """ + Take action to reject the invalid CLOSE command sent by the tester. + """ + + return "OK" + + @assert_description + def TSC_AVDTPEX_mmi_user_confirm_SIG_SMG_BV_18_C(self, **kwargs): + """ + Did the IUT receive media with the following information? + + - V = RTP_Ver + - P = 0 (no padding bits) + - X = 0 (no extension) + - CC = 0 (no + contributing source) + - M = 0 + """ + + # TODO: verify + return "OK" |