aboutsummaryrefslogtreecommitdiff
path: root/mmi2grpc/a2dp.py
blob: 45c62c065725355321b6936e60d8dad77f8c1696 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from typing import Optional

from grpc import Channel

from blueberry.a2dp_grpc import A2DP
from blueberry.host_grpc import Host

from blueberry.a2dp_pb2 import Sink, Source
from blueberry.host_pb2 import Connection

_connection: Optional[Connection] = None
_sink: Optional[Sink] = None
_source: Optional[Source] = None

def _ensure_connection(host, addr):
    global _connection
    if not _connection:
        _connection = host.GetConnection(address=addr).connection

def interact(channel: Channel, interaction_id: str, test: str, pts_addr: bytes):
    global _connection, _sink, _source
    a2dp = A2DP(channel)
    host = Host(channel)
    if interaction_id == "TSC_AVDTP_mmi_iut_accept_connect":
        host.SetConnectable(connectable=True)
    elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_start":
        _connection = host.Connect(address=pts_addr).connection
        if "SNK" in test:
            _sink = a2dp.OpenSink(connection=_connection).sink
    elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_out_of_range":
        _ensure_connection(host, pts_addr)
        host.Disconnect(connection=_connection)
        _connection = None
        _sink = None
        _source = None
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_discover":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_initiate_set_configuration":
        _connection = host.Connect(address=pts_addr).connection
        if "SRC" in test:
            _source = a2dp.OpenSource(connection=_connection).source
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_close_stream":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_get_capabilities":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_set_configuration":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_open_stream":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_start":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_confirm_streaming":
        pass
    elif interaction_id == "TSC_AVDTP_mmi_iut_accept_reconnect":
        pass
    else:
        print(f'MMI NOT IMPLEMENTED: {interaction_id}')