aboutsummaryrefslogtreecommitdiff
path: root/apps/show.py
blob: 97640a37e94d060fde846f8f083e9ef2a2a5369b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import datetime
import logging
import os
import struct

import click

from bumble.colors import color
from bumble import hci
from bumble.transport.common import PacketReader
from bumble.helpers import PacketTracer


# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)


# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class SnoopPacketReader:
    '''
    Reader that reads HCI packets from a "snoop" file (based on RFC 1761, but not
    exactly the same...)
    '''

    DATALINK_H1 = 1001
    DATALINK_H4 = 1002
    DATALINK_BSCP = 1003
    DATALINK_H5 = 1004

    IDENTIFICATION_PATTERN = b'btsnoop\0'
    TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
    TIMESTAMP_DELTA = 0x00E03AB44A676000
    ONE_MICROSECOND = datetime.timedelta(microseconds=1)

    def __init__(self, source):
        self.source = source
        self.at_end = False

        # Read the header
        identification_pattern = source.read(8)
        if identification_pattern != self.IDENTIFICATION_PATTERN:
            raise ValueError(
                'not a valid snoop file, unexpected identification pattern'
            )
        (self.version_number, self.data_link_type) = struct.unpack(
            '>II', source.read(8)
        )
        if self.data_link_type not in (self.DATALINK_H4, self.DATALINK_H1):
            raise ValueError(f'datalink type {self.data_link_type} not supported')

    def next_packet(self):
        # Read the record header
        header = self.source.read(24)
        if len(header) < 24:
            self.at_end = True
            return (None, 0, None)

        # Parse the header
        (
            original_length,
            included_length,
            packet_flags,
            _cumulative_drops,
            timestamp,
        ) = struct.unpack('>IIIIQ', header)

        # Skip truncated packets
        if original_length != included_length:
            print(
                color(
                    f"!!! truncated packet ({included_length}/{original_length})", "red"
                )
            )
            self.source.read(included_length)
            return (None, 0, None)

        # Convert the timestamp to a datetime object.
        ts_dt = self.TIMESTAMP_ANCHOR + datetime.timedelta(
            microseconds=timestamp - self.TIMESTAMP_DELTA
        )

        if self.data_link_type == self.DATALINK_H1:
            # The packet is un-encapsulated, look at the flags to figure out its type
            if packet_flags & 1:
                # Controller -> Host
                if packet_flags & 2:
                    packet_type = hci.HCI_EVENT_PACKET
                else:
                    packet_type = hci.HCI_ACL_DATA_PACKET
            else:
                # Host -> Controller
                if packet_flags & 2:
                    packet_type = hci.HCI_COMMAND_PACKET
                else:
                    packet_type = hci.HCI_ACL_DATA_PACKET

            return (
                packet_flags & 1,
                bytes([packet_type]) + self.source.read(included_length),
            )

        return (ts_dt, packet_flags & 1, self.source.read(included_length))


# -----------------------------------------------------------------------------
class Printer:
    def __init__(self):
        self.index = 0

    def print(self, message: str) -> None:
        self.index += 1
        print(f"[{self.index:8}]{message}")


# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
@click.command()
@click.option(
    '--format',
    type=click.Choice(['h4', 'snoop']),
    default='h4',
    help='Format of the input file',
)
@click.option(
    '--vendors',
    type=click.Choice(['android', 'zephyr']),
    multiple=True,
    help='Support vendor-specific commands (list one or more)',
)
@click.argument('filename')
# pylint: disable=redefined-builtin
def main(format, vendors, filename):
    for vendor in vendors:
        if vendor == 'android':
            import bumble.vendor.android.hci
        elif vendor == 'zephyr':
            import bumble.vendor.zephyr.hci

    input = open(filename, 'rb')
    if format == 'h4':
        packet_reader = PacketReader(input)

        def read_next_packet():
            return (None, 0, packet_reader.next_packet())

    else:
        packet_reader = SnoopPacketReader(input)
        read_next_packet = packet_reader.next_packet

    printer = Printer()
    tracer = PacketTracer(emit_message=printer.print)

    while not packet_reader.at_end:
        try:
            (timestamp, direction, packet) = read_next_packet()
            if packet:
                tracer.trace(hci.HCI_Packet.from_bytes(packet), direction, timestamp)
            else:
                printer.print(color("[TRUNCATED]", "red"))
        except Exception as error:
            logger.exception()
            print(color(f'!!! {error}', 'red'))


# -----------------------------------------------------------------------------
if __name__ == '__main__':
    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
    main()  # pylint: disable=no-value-for-parameter