summaryrefslogtreecommitdiff
path: root/cras/src/plc/parse_sco.py
blob: c50df15981047ad558e606fc1e538bb7cd5f9176 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -*- coding: utf-8 -*-
# Copyright 2020 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
A script to extract raw SCO RX packets from btsnoop.
Use 'btmon -S' to dump SCO traffic from btsnoop file.
Trim the btsnoop output to just the SCO traffic period.
Then execute 'python parse-sco.py <btsnoop-output>'
"""

import atexit
import binascii
import os
import re
import sys


class SCOParser:
  """
  Parser for grepping SCO packets
  """

  def __init__(self):
    # On old releases, +CIEV: 4,1 indicates the start point of call session
    # c 31 0d 0a 9a     ..+CIEV: 4,1..
    self.call_start_re = re.compile(r'.*?\+CIEV:\s4,(\d).*?')

    # > SCO Data RX: Handle 257 flags 0x00 dlen 60           #13826 [hci0] 650.388305
    #         00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
    #         00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
    #         00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
    #         00 00 00 00 00 00 00 00 00 00 00 00
    self.sco_rx_re = re.compile(r'.*?SCO\sData\sRX.*?flags\s0x(\d+).*?')
    self.sco_f = None
    self.output_idx = 0
    self.pk_count = 0
    self.pl_count = 0

    atexit.register(self._cleanup)

  def _cleanup(self):
    if self.sco_f is not None:
      print(
          "Current file contains %d packets (%d with erroneous status flag)" %
          (self.pk_count, self.pl_count))
      self.pk_count = 0
      self.pl_count = 0
      self.sco_f.close()

  def _new_session(self):
    if self.sco_f is not None:
      close(self.sco_f)

    new_file = "sco_file_%d" % self.output_idx
    print("Record to %s" % new_file)
    self.sco_f = open(new_file, 'wb')
    self.output_idx += 1

    return self.sco_f

  def parse(self, filename):
    if not os.path.exists(filename):
      print("%s doesn't exist" % filename)
      return

    print("Start parsing %s" % filename)
    parse_rx_data = 0
    with open(filename, "r") as f:
      for line in f.readlines():
        if parse_rx_data > 0:
          self.sco_f.write(binascii.unhexlify(''.join(line[:56].split())))
          parse_rx_data = (parse_rx_data + 1) % 5

        # Start a new session and output following SCO data to a new file
        match = self.call_start_re.search(line)
        if match and (1 == int(match.group(1))):
          self._new_session()
          continue

        match = self.sco_rx_re.search(line)
        if match:
          if self.sco_f is None:
            self._new_session()

          self.pk_count += 1

          status_flag = int(match.group(1))
          hdr = ['01', str(status_flag) + '1', '3c']
          if status_flag != 0:
            self.pl_count += 1

          self.sco_f.write(binascii.unhexlify(''.join(hdr)))
          parse_rx_data = 1


def main(argv):
  if len(argv) < 1:
    print("parse_sco.py [btsnoop.txt]")
    return

  p = SCOParser()
  p.parse(argv[0])


if __name__ == "__main__":
  main(sys.argv[1:])