aboutsummaryrefslogtreecommitdiff
path: root/pw_trace/py/pw_trace/trace.py
blob: 571d66f0cad305c4d6bdd407740eeb500a369fed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#!/usr/bin/env python3
# Copyright 2020 The Pigweed Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
r"""

Trace module which creates trace files from a list of trace events.

This is a work in progress, future work will look to add:
    - Config options to customize output.
    - A method of providing custom data formatters.
    - Perfetto support.
"""
from enum import Enum
import json
import logging
import struct
from typing import Iterable, NamedTuple

_LOG = logging.getLogger('pw_trace')
_ORDERING_CHARS = ("@", "=", "<", ">", "!")


class TraceType(Enum):
    INVALID = 0
    INSTANTANEOUS = 1
    INSTANTANEOUS_GROUP = 2
    ASYNC_START = 3
    ASYNC_STEP = 4
    ASYNC_END = 5
    DURATION_START = 6
    DURATION_END = 7
    DURATION_GROUP_START = 8
    DURATION_GROUP_END = 9

    # TODO(hepler): Remove these aliases for the original style-incompliant
    #     names when users have migrated.
    DurationStart = 6  # pylint: disable=invalid-name
    DurationEnd = 7  # pylint: disable=invalid-name


class TraceEvent(NamedTuple):
    event_type: TraceType
    module: str
    label: str
    timestamp_us: float
    group: str = ""
    trace_id: int = 0
    flags: int = 0
    has_data: bool = False
    data_fmt: str = ""
    data: bytes = b''


def event_has_trace_id(event_type):
    return event_type in {
        "PW_TRACE_EVENT_TYPE_ASYNC_START",
        "PW_TRACE_EVENT_TYPE_ASYNC_STEP",
        "PW_TRACE_EVENT_TYPE_ASYNC_END",
    }


def decode_struct_fmt_args(event):
    """Decodes the trace's event data for struct-formatted data"""
    args = {}
    # we assume all data is packed, little-endian ordering if not specified
    struct_fmt = event.data_fmt[len("@pw_py_struct_fmt:") :]
    if not struct_fmt.startswith(_ORDERING_CHARS):
        struct_fmt = "<" + struct_fmt
    try:
        # needed in case the buffer is larger than expected
        assert struct.calcsize(struct_fmt) == len(event.data)
        items = struct.unpack_from(struct_fmt, event.data)
        for i, item in enumerate(items):
            args["data_" + str(i)] = item
    except (AssertionError, struct.error):
        args["error"] = (
            f"Mismatched struct/data format {event.data_fmt} "
            f"expected data len {struct.calcsize(struct_fmt)} "
            f"data {event.data.hex()} "
            f"data len {len(event.data)}"
        )
    return args


def decode_map_fmt_args(event):
    """Decodes the trace's event data for map-formatted data"""
    args = {}
    fmt = event.data_fmt[len("@pw_py_map_fmt:") :]

    # we assume all data is packed, little-endian ordering if not specified
    if not fmt.startswith(_ORDERING_CHARS):
        fmt = '<' + fmt

    try:
        (fmt_bytes, fmt_list) = fmt.split("{")
        fmt_list = fmt_list.strip("}").split(",")

        names = []
        for pair in fmt_list:
            (name, fmt_char) = (s.strip() for s in pair.split(":"))
            names.append(name)
            fmt_bytes += fmt_char
    except ValueError:
        args["error"] = f"Invalid map format {event.data_fmt}"
    else:
        try:
            # needed in case the buffer is larger than expected
            assert struct.calcsize(fmt_bytes) == len(event.data)
            items = struct.unpack_from(fmt_bytes, event.data)
            for i, item in enumerate(items):
                args[names[i]] = item
        except (AssertionError, struct.error):
            args["error"] = (
                f"Mismatched map/data format {event.data_fmt} "
                f"expected data len {struct.calcsize(fmt_bytes)} "
                f"data {event.data.hex()} "
                f"data len {len(event.data)}"
            )
    return args


def generate_trace_json(events: Iterable[TraceEvent]):
    """Generates a list of JSON lines from provided trace events."""
    json_lines = []
    for event in events:
        if (
            event.module is None
            or event.timestamp_us is None
            or event.event_type is None
            or event.label is None
        ):
            _LOG.error("Invalid sample")
            continue

        line = {
            "pid": event.module,
            "name": (event.label),
            "ts": event.timestamp_us,
        }
        if event.event_type == TraceType.DURATION_START:
            line["ph"] = "B"
            line["tid"] = event.label
        elif event.event_type == TraceType.DURATION_END:
            line["ph"] = "E"
            line["tid"] = event.label
        elif event.event_type == TraceType.DURATION_GROUP_START:
            line["ph"] = "B"
            line["tid"] = event.group
        elif event.event_type == TraceType.DURATION_GROUP_END:
            line["ph"] = "E"
            line["tid"] = event.group
        elif event.event_type == TraceType.INSTANTANEOUS:
            line["ph"] = "I"
            line["s"] = "p"
        elif event.event_type == TraceType.INSTANTANEOUS_GROUP:
            line["ph"] = "I"
            line["s"] = "t"
            line["tid"] = event.group
        elif event.event_type == TraceType.ASYNC_START:
            line["ph"] = "b"
            line["scope"] = event.group
            line["tid"] = event.group
            line["cat"] = event.module
            line["id"] = event.trace_id
            line["args"] = {"id": line["id"]}
        elif event.event_type == TraceType.ASYNC_STEP:
            line["ph"] = "n"
            line["scope"] = event.group
            line["tid"] = event.group
            line["cat"] = event.module
            line["id"] = event.trace_id
            line["args"] = {"id": line["id"]}
        elif event.event_type == TraceType.ASYNC_END:
            line["ph"] = "e"
            line["scope"] = event.group
            line["tid"] = event.group
            line["cat"] = event.module
            line["id"] = event.trace_id
            line["args"] = {"id": line["id"]}
        else:
            _LOG.error("Unknown event type, skipping")
            continue

        # Handle Data
        if event.has_data:
            if event.data_fmt == "@pw_arg_label":
                line["name"] = event.data.decode("utf-8")
            elif event.data_fmt == "@pw_arg_group":
                line["tid"] = event.data.decode("utf-8")
            elif event.data_fmt == "@pw_arg_counter":
                line["ph"] = "C"
                line["args"] = {
                    line["name"]: int.from_bytes(event.data, "little")
                }
            elif event.data_fmt.startswith("@pw_py_struct_fmt:"):
                line["args"] = decode_struct_fmt_args(event)
            elif event.data_fmt.startswith("@pw_py_map_fmt:"):
                line["args"] = decode_map_fmt_args(event)
            else:
                line["args"] = {"data": event.data.hex()}

        # Encode as JSON
        json_lines.append(json.dumps(line))

    return json_lines