1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
#!/usr/bin/env python3
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
r"""
Trace module which creates trace files from a list of trace events.
This is a work in progress, future work will look to add:
- Config options to customize output.
- A method of providing custom data formatters.
- Perfetto support.
"""
from enum import Enum
import json
import logging
import struct
from typing import Iterable, NamedTuple
_LOG = logging.getLogger('pw_trace')
_ORDERING_CHARS = ("@", "=", "<", ">", "!")
class TraceType(Enum):
INVALID = 0
INSTANTANEOUS = 1
INSTANTANEOUS_GROUP = 2
ASYNC_START = 3
ASYNC_STEP = 4
ASYNC_END = 5
DURATION_START = 6
DURATION_END = 7
DURATION_GROUP_START = 8
DURATION_GROUP_END = 9
# TODO(hepler): Remove these aliases for the original style-incompliant
# names when users have migrated.
DurationStart = 6 # pylint: disable=invalid-name
DurationEnd = 7 # pylint: disable=invalid-name
class TraceEvent(NamedTuple):
event_type: TraceType
module: str
label: str
timestamp_us: float
group: str = ""
trace_id: int = 0
flags: int = 0
has_data: bool = False
data_fmt: str = ""
data: bytes = b''
def event_has_trace_id(event_type):
return event_type in {
"PW_TRACE_EVENT_TYPE_ASYNC_START",
"PW_TRACE_EVENT_TYPE_ASYNC_STEP",
"PW_TRACE_EVENT_TYPE_ASYNC_END",
}
def decode_struct_fmt_args(event):
"""Decodes the trace's event data for struct-formatted data"""
args = {}
# we assume all data is packed, little-endian ordering if not specified
struct_fmt = event.data_fmt[len("@pw_py_struct_fmt:") :]
if not struct_fmt.startswith(_ORDERING_CHARS):
struct_fmt = "<" + struct_fmt
try:
# needed in case the buffer is larger than expected
assert struct.calcsize(struct_fmt) == len(event.data)
items = struct.unpack_from(struct_fmt, event.data)
for i, item in enumerate(items):
args["data_" + str(i)] = item
except (AssertionError, struct.error):
args["error"] = (
f"Mismatched struct/data format {event.data_fmt} "
f"expected data len {struct.calcsize(struct_fmt)} "
f"data {event.data.hex()} "
f"data len {len(event.data)}"
)
return args
def decode_map_fmt_args(event):
"""Decodes the trace's event data for map-formatted data"""
args = {}
fmt = event.data_fmt[len("@pw_py_map_fmt:") :]
# we assume all data is packed, little-endian ordering if not specified
if not fmt.startswith(_ORDERING_CHARS):
fmt = '<' + fmt
try:
(fmt_bytes, fmt_list) = fmt.split("{")
fmt_list = fmt_list.strip("}").split(",")
names = []
for pair in fmt_list:
(name, fmt_char) = (s.strip() for s in pair.split(":"))
names.append(name)
fmt_bytes += fmt_char
except ValueError:
args["error"] = f"Invalid map format {event.data_fmt}"
else:
try:
# needed in case the buffer is larger than expected
assert struct.calcsize(fmt_bytes) == len(event.data)
items = struct.unpack_from(fmt_bytes, event.data)
for i, item in enumerate(items):
args[names[i]] = item
except (AssertionError, struct.error):
args["error"] = (
f"Mismatched map/data format {event.data_fmt} "
f"expected data len {struct.calcsize(fmt_bytes)} "
f"data {event.data.hex()} "
f"data len {len(event.data)}"
)
return args
def generate_trace_json(events: Iterable[TraceEvent]):
"""Generates a list of JSON lines from provided trace events."""
json_lines = []
for event in events:
if (
event.module is None
or event.timestamp_us is None
or event.event_type is None
or event.label is None
):
_LOG.error("Invalid sample")
continue
line = {
"pid": event.module,
"name": (event.label),
"ts": event.timestamp_us,
}
if event.event_type == TraceType.DURATION_START:
line["ph"] = "B"
line["tid"] = event.label
elif event.event_type == TraceType.DURATION_END:
line["ph"] = "E"
line["tid"] = event.label
elif event.event_type == TraceType.DURATION_GROUP_START:
line["ph"] = "B"
line["tid"] = event.group
elif event.event_type == TraceType.DURATION_GROUP_END:
line["ph"] = "E"
line["tid"] = event.group
elif event.event_type == TraceType.INSTANTANEOUS:
line["ph"] = "I"
line["s"] = "p"
elif event.event_type == TraceType.INSTANTANEOUS_GROUP:
line["ph"] = "I"
line["s"] = "t"
line["tid"] = event.group
elif event.event_type == TraceType.ASYNC_START:
line["ph"] = "b"
line["scope"] = event.group
line["tid"] = event.group
line["cat"] = event.module
line["id"] = event.trace_id
line["args"] = {"id": line["id"]}
elif event.event_type == TraceType.ASYNC_STEP:
line["ph"] = "n"
line["scope"] = event.group
line["tid"] = event.group
line["cat"] = event.module
line["id"] = event.trace_id
line["args"] = {"id": line["id"]}
elif event.event_type == TraceType.ASYNC_END:
line["ph"] = "e"
line["scope"] = event.group
line["tid"] = event.group
line["cat"] = event.module
line["id"] = event.trace_id
line["args"] = {"id": line["id"]}
else:
_LOG.error("Unknown event type, skipping")
continue
# Handle Data
if event.has_data:
if event.data_fmt == "@pw_arg_label":
line["name"] = event.data.decode("utf-8")
elif event.data_fmt == "@pw_arg_group":
line["tid"] = event.data.decode("utf-8")
elif event.data_fmt == "@pw_arg_counter":
line["ph"] = "C"
line["args"] = {
line["name"]: int.from_bytes(event.data, "little")
}
elif event.data_fmt.startswith("@pw_py_struct_fmt:"):
line["args"] = decode_struct_fmt_args(event)
elif event.data_fmt.startswith("@pw_py_map_fmt:"):
line["args"] = decode_map_fmt_args(event)
else:
line["args"] = {"data": event.data.hex()}
# Encode as JSON
json_lines.append(json.dumps(line))
return json_lines
|