aboutsummaryrefslogtreecommitdiff
path: root/src/trace_processor/python/perfetto/trace_processor/api.py
blob: 9cabf07dfc8b91654fb4ac99f942d0f2ab87cbac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# Copyright (C) 2020 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from urllib.parse import urlparse

from .http import TraceProcessorHttp
from .loader import get_loader
from .protos import ProtoFactory
from .shell import load_shell


# Custom exception raised if any trace_processor functions return a
# response with an error defined
class TraceProcessorException(Exception):

  def __init__(self, message):
    super().__init__(message)


class TraceProcessor:

  # Values of these constants correspond to the QueryResponse message at
  # protos/perfetto/trace_processor/trace_processor.proto
  # Value 0 corresponds to CELL_INVALID, which is represented as None in
  # this class
  QUERY_CELL_NULL_FIELD_ID = 1
  QUERY_CELL_VARINT_FIELD_ID = 2
  QUERY_CELL_FLOAT64_FIELD_ID = 3
  QUERY_CELL_STRING_FIELD_ID = 4
  QUERY_CELL_BLOB_FIELD_ID = 5

  # This is the class returned to the user and contains one row of the
  # resultant query. Each column name is stored as an attribute of this
  # class, with the value corresponding to the column name and row in
  # the query results table.
  class Row(object):

    def __str__(self):
      return str(self.__dict__)

    def __repr__(self):
      return self.__dict__

  class QueryResultIterator:

    def __init__(self, column_names, batches):
      self.__batches = batches
      self.__column_names = column_names
      self.__batch_index = 0
      self.__next_index = 0
      # TODO(lalitm): Look into changing string_cells to bytes in the protobuf
      self.__string_cells = memoryview(bytes(batches[0].string_cells, 'utf-8'))
      self.__string_index = 0

    def get_cell_list(self, proto_index):
      if proto_index == TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
        return None
      elif proto_index == TraceProcessor.QUERY_CELL_VARINT_FIELD_ID:
        return self.__batches[self.__batch_index].varint_cells
      elif proto_index == TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID:
        return self.__batches[self.__batch_index].float64_cells
      elif proto_index == TraceProcessor.QUERY_CELL_BLOB_FIELD_ID:
        return self.__batches[self.__batch_index].blob_cells
      else:
        raise TraceProcessorException('Invalid cell type')

    def cells(self):
      return self.__batches[self.__batch_index].cells

    # To use the query result as a populated Pandas dataframe, this
    # function must be called directly after calling query inside
    # TraceProcesor.
    def as_pandas_dataframe(self):
      try:
        import numpy as np
        import pandas as pd

        df = pd.DataFrame(columns=self.__column_names)

        # Populate the dataframe with the query results
        while True:
          # If all cells are read, then check if last batch before
          # returning the populated dataframe
          if self.__next_index >= len(self.__batches[self.__batch_index].cells):
            if self.__batches[self.__batch_index].is_last_batch:
              ordered_df = df.reset_index(drop=True)
              return ordered_df
            self.__batch_index += 1
            self.__next_index = 0
            self.__string_cells = memoryview(
                bytes(self.__batches[self.__batch_index].string_cells, 'utf-8'))
            self.__string_index = 0

          row = []
          for num, column_name in enumerate(self.__column_names):
            cell_type = self.__batches[self.__batch_index].cells[
                self.__next_index + num]
            if cell_type == TraceProcessor.QUERY_CELL_STRING_FIELD_ID:
              start_index = self.__string_index
              while self.__string_cells[self.__string_index] != 0:
                self.__string_index += 1
              row.append(
                  str(self.__string_cells[start_index:self.__string_index],
                      'utf-8'))
              self.__string_index += 1
            else:
              cell_list = self.get_cell_list(cell_type)
              if cell_list is None:
                row.append(np.NAN)
              else:
                row.append(cell_list.pop(0))
          df.loc[-1] = row
          df.index = df.index + 1
          self.__next_index = self.__next_index + len(self.__column_names)

      except ModuleNotFoundError:
        raise TraceProcessorException(
            'The sufficient libraries are not installed')

    def __iter__(self):
      return self

    def __next__(self):
      # If all cells are read, then check if last batch before raising
      # StopIteration
      if self.__next_index >= len(self.cells()):
        if self.__batches[self.__batch_index].is_last_batch:
          raise StopIteration
        self.__batch_index += 1
        self.__next_index = 0
        self.__string_cells = memoryview(
            bytes(self.__batches[self.__batch_index].string_cells, 'utf-8'))
        self.__string_index = 0

      row = TraceProcessor.Row()
      for num, column_name in enumerate(self.__column_names):
        cell_type = self.__batches[self.__batch_index].cells[self.__next_index +
                                                             num]
        if cell_type == TraceProcessor.QUERY_CELL_STRING_FIELD_ID:
          start_index = self.__string_index
          while self.__string_cells[self.__string_index] != 0:
            self.__string_index += 1
          setattr(
              row, column_name,
              str(self.__string_cells[start_index:self.__string_index],
                  'utf-8'))
          self.__string_index += 1
        else:
          cell_list = self.get_cell_list(cell_type)
          if cell_list is None:
            setattr(row, column_name, None)
          else:
            setattr(row, column_name, cell_list.pop(0))
      self.__next_index = self.__next_index + len(self.__column_names)
      return row

  def __init__(self, addr=None, file_path=None, bin_path=None,
               unique_port=True):
    # Load trace_processor_shell or access via given address
    if addr:
      p = urlparse(addr)
      tp = TraceProcessorHttp(p.netloc if p.netloc else p.path)
    else:
      url, self.subprocess = load_shell(
          bin_path=bin_path, unique_port=unique_port)
      tp = TraceProcessorHttp(url)
    self.http = tp
    self.protos = ProtoFactory()

    # Parse trace by its file_path into the loaded instance of trace_processor
    if file_path:
      get_loader().parse_file(self.http, file_path)

  def query(self, sql):
    """Executes passed in SQL query using class defined HTTP API, and returns
    the response as a QueryResultIterator. Raises TraceProcessorException if
    the response returns with an error.

    Args:
      sql: SQL query written as a String

    Returns:
      A class which can iterate through each row of the results table. This
      can also be converted to a pandas dataframe by calling the
      as_pandas_dataframe() function after calling query.
    """
    response = self.http.execute_query(sql)
    if response.error:
      raise TraceProcessorException(response.error)

    return TraceProcessor.QueryResultIterator(response.column_names,
                                              response.batch)

  def metric(self, metrics):
    """Returns the metrics data corresponding to the passed in trace metric.
    Raises TraceProcessorException if the response returns with an error.

    Args:
      metrics: A list of valid metrics as defined in TraceMetrics

    Returns:
      The metrics data as a proto message
    """
    response = self.http.compute_metric(metrics)
    if response.error:
      raise TraceProcessorException(response.error)

    metrics = self.protos.TraceMetrics()
    metrics.ParseFromString(response.metrics)
    return metrics

  def enable_metatrace(self):
    """Enable metatrace for the currently running trace_processor.
    """
    return self.http.enable_metatrace()

  def disable_and_read_metatrace(self):
    """Disable and return the metatrace formed from the currently running
    trace_processor. This must be enabled before attempting to disable. This
    returns the serialized bytes of the metatrace data directly. Raises
    TraceProcessorException if the response returns with an error.
    """
    response = self.http.disable_and_read_metatrace()
    if response.error:
      raise TraceProcessorException(response.error)

    return response.metatrace

  # TODO(@aninditaghosh): Investigate context managers for
  # cleaner usage
  def close(self):
    if hasattr(self, 'subprocess'):
      self.subprocess.kill()
    self.http.conn.close()