aboutsummaryrefslogtreecommitdiff
path: root/python/perfetto/trace_processor/api.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/perfetto/trace_processor/api.py')
-rw-r--r--python/perfetto/trace_processor/api.py359
1 files changed, 359 insertions, 0 deletions
diff --git a/python/perfetto/trace_processor/api.py b/python/perfetto/trace_processor/api.py
new file mode 100644
index 000000000..a5efbcde2
--- /dev/null
+++ b/python/perfetto/trace_processor/api.py
@@ -0,0 +1,359 @@
+# Copyright (C) 2020 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import dataclasses as dc
+from urllib.parse import urlparse
+from typing import List, Optional
+
+from perfetto.trace_processor.http import TraceProcessorHttp
+from perfetto.trace_processor.platform import PlatformDelegate
+from perfetto.trace_processor.protos import ProtoFactory
+from perfetto.trace_processor.shell import load_shell
+from perfetto.trace_uri_resolver import registry
+from perfetto.trace_uri_resolver.registry import ResolverRegistry
+
+# Defining this field as a module variable means this can be changed by
+# implementations at startup and used for all TraceProcessor objects
+# without having to specify on each one.
+# In Google3, this field is rewritten using Copybara to a implementation
+# which can integrates with internal infra.
+PLATFORM_DELEGATE = PlatformDelegate
+
+TraceReference = registry.TraceReference
+
+# Custom exception raised if any trace_processor functions return a
+# response with an error defined
+class TraceProcessorException(Exception):
+
+ def __init__(self, message):
+ super().__init__(message)
+
+
+@dc.dataclass
+class TraceProcessorConfig:
+ bin_path: Optional[str]
+ unique_port: bool
+ verbose: bool
+ ingest_ftrace_in_raw: bool
+ resolver_registry: Optional[ResolverRegistry]
+
+ def __init__(self,
+ bin_path: Optional[str] = None,
+ unique_port: bool = True,
+ verbose: bool = False,
+ ingest_ftrace_in_raw: bool = False,
+ resolver_registry: Optional[ResolverRegistry] = None):
+ self.bin_path = bin_path
+ self.unique_port = unique_port
+ self.verbose = verbose
+ self.ingest_ftrace_in_raw = ingest_ftrace_in_raw
+ self.resolver_registry = resolver_registry
+
+
+class TraceProcessor:
+
+ # Values of these constants correspond to the QueryResponse message at
+ # protos/perfetto/trace_processor/trace_processor.proto
+ QUERY_CELL_INVALID_FIELD_ID = 0
+ QUERY_CELL_NULL_FIELD_ID = 1
+ QUERY_CELL_VARINT_FIELD_ID = 2
+ QUERY_CELL_FLOAT64_FIELD_ID = 3
+ QUERY_CELL_STRING_FIELD_ID = 4
+ QUERY_CELL_BLOB_FIELD_ID = 5
+
+ # This is the class returned to the user and contains one row of the
+ # resultant query. Each column name is stored as an attribute of this
+ # class, with the value corresponding to the column name and row in
+ # the query results table.
+ class Row(object):
+
+ def __str__(self):
+ return str(self.__dict__)
+
+ def __repr__(self):
+ return self.__dict__
+
+ class QueryResultIterator:
+
+ def __init__(self, column_names, batches):
+ self.__column_names = column_names
+ self.__column_count = 0
+ self.__count = 0
+ self.__cells = []
+ self.__data_lists = [[], [], [], [], [], []]
+ self.__data_lists_index = [0, 0, 0, 0, 0, 0]
+ self.__current_index = 0
+
+ # Iterate over all the batches and collect their
+ # contents into lists based on the type of the batch
+ batch_index = 0
+ while True:
+ # It's possible on some occasions that there are non UTF-8 characters
+ # in the string_cells field. If this is the case, string_cells is
+ # a bytestring which needs to be decoded (but passing ignore so that
+ # we don't fail in decoding).
+ strings_batch_str = batches[batch_index].string_cells
+ try:
+ strings_batch_str = strings_batch_str.decode('utf-8', 'ignore')
+ except AttributeError:
+ # AttributeError can occur when |strings_batch_str| is an str which
+ # happens when everything in it is UTF-8 (protobuf automatically
+ # does the conversion if it can).
+ pass
+
+ # Null-terminated strings in a batch are concatenated
+ # into a single large byte array, so we split on the
+ # null-terminator to get the individual strings
+ strings_batch = strings_batch_str.split('\0')[:-1]
+ self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend(
+ strings_batch)
+ self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend(
+ batches[batch_index].varint_cells)
+ self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend(
+ batches[batch_index].float64_cells)
+ self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend(
+ batches[batch_index].blob_cells)
+ self.__cells.extend(batches[batch_index].cells)
+
+ if batches[batch_index].is_last_batch:
+ break
+ batch_index += 1
+
+ # If there are no rows in the query result, don't bother updating the
+ # counts to avoid dealing with / 0 errors.
+ if len(self.__cells) == 0:
+ return
+
+ # The count we collected so far was a count of all individual columns
+ # in the query result, so we divide by the number of columns in a row
+ # to get the number of rows
+ self.__column_count = len(self.__column_names)
+ self.__count = int(len(self.__cells) / self.__column_count)
+
+ # Data integrity check - see that we have the expected amount of cells
+ # for the number of rows that we need to return
+ if len(self.__cells) % self.__column_count != 0:
+ raise TraceProcessorException("Cell count " + str(len(self.__cells)) +
+ " is not a multiple of column count " +
+ str(len(self.__column_names)))
+
+ # To use the query result as a populated Pandas dataframe, this
+ # function must be called directly after calling query inside
+ # TraceProcesor.
+ def as_pandas_dataframe(self):
+ try:
+ import pandas as pd
+
+ # Populate the dataframe with the query results
+ rows = []
+ for i in range(0, self.__count):
+ row = []
+ base_cell_index = i * self.__column_count
+ for num in range(len(self.__column_names)):
+ col_type = self.__cells[base_cell_index + num]
+ if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
+ raise TraceProcessorException('Invalid cell type')
+
+ if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
+ row.append(None)
+ else:
+ col_index = self.__data_lists_index[col_type]
+ self.__data_lists_index[col_type] += 1
+ row.append(self.__data_lists[col_type][col_index])
+ rows.append(row)
+
+ df = pd.DataFrame(rows, columns=self.__column_names)
+ return df.astype(object).where(df.notnull(),
+ None).reset_index(drop=True)
+
+ except ModuleNotFoundError:
+ raise TraceProcessorException(
+ 'Python dependencies missing. Please pip3 install pandas numpy')
+
+ def __len__(self):
+ return self.__count
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ if self.__current_index == self.__count:
+ raise StopIteration
+ result = TraceProcessor.Row()
+ base_cell_index = self.__current_index * self.__column_count
+ for num, column_name in enumerate(self.__column_names):
+ col_type = self.__cells[base_cell_index + num]
+ if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
+ raise TraceProcessorException('Invalid cell type')
+ if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
+ col_index = self.__data_lists_index[col_type]
+ self.__data_lists_index[col_type] += 1
+ setattr(result, column_name, self.__data_lists[col_type][col_index])
+ else:
+ setattr(result, column_name, None)
+
+ self.__current_index += 1
+ return result
+
+ def __init__(self,
+ trace: Optional[TraceReference] = None,
+ addr: Optional[str] = None,
+ config: TraceProcessorConfig = TraceProcessorConfig(),
+ file_path: Optional[str] = None):
+ """Create a trace processor instance.
+
+ Args:
+ trace: reference to a trace to be loaded into the trace
+ processor instance.
+
+ One of several types is supported:
+ 1) path to a trace file to open and read
+ 2) a file like object (file, io.BytesIO or similar) to read
+ 3) a generator yielding bytes
+ 4) a trace URI which resolves to one of the above types
+ 5) a trace URI resolver; this is a subclass of
+ resolver.TraceUriResolver which generates a reference to a
+ trace when the |resolve| method is called on it.
+
+ An URI is similar to a connection string (e.g. for a web
+ address or SQL database) which specifies where to lookup traces
+ and which traces to pick from this data source. The format of a
+ string should be as follows:
+ resolver_name:key_1=list,of,values;key_2=value
+
+ Custom resolvers can be provided to handle URIs via
+ |config.resolver_registry|.
+ addr: address of a running trace processor instance. Useful to query an
+ already loaded trace.
+ config: configuration options which customize functionality of trace
+ processor and the Python binding.
+ file_path (deprecated): path to a trace file to load. Use
+ |trace| instead of this field: specifying both will cause
+ an exception to be thrown.
+ """
+
+ if trace and file_path:
+ raise TraceProcessorException(
+ "trace and file_path cannot both be specified.")
+
+ self.config = config
+ self.platform_delegate = PLATFORM_DELEGATE()
+ self.protos = ProtoFactory(self.platform_delegate)
+ self.resolver_registry = config.resolver_registry or \
+ self.platform_delegate.default_resolver_registry()
+ self.http = self._create_tp_http(addr)
+
+ if trace or file_path:
+ self._parse_trace(trace if trace else file_path)
+
+ def query(self, sql: str):
+ """Executes passed in SQL query using class defined HTTP API, and returns
+ the response as a QueryResultIterator. Raises TraceProcessorException if
+ the response returns with an error.
+
+ Args:
+ sql: SQL query written as a String
+
+ Returns:
+ A class which can iterate through each row of the results table. This
+ can also be converted to a pandas dataframe by calling the
+ as_pandas_dataframe() function after calling query.
+ """
+ response = self.http.execute_query(sql)
+ if response.error:
+ raise TraceProcessorException(response.error)
+
+ return TraceProcessor.QueryResultIterator(response.column_names,
+ response.batch)
+
+ def metric(self, metrics: List[str]):
+ """Returns the metrics data corresponding to the passed in trace metric.
+ Raises TraceProcessorException if the response returns with an error.
+
+ Args:
+ metrics: A list of valid metrics as defined in TraceMetrics
+
+ Returns:
+ The metrics data as a proto message
+ """
+ response = self.http.compute_metric(metrics)
+ if response.error:
+ raise TraceProcessorException(response.error)
+
+ metrics = self.protos.TraceMetrics()
+ metrics.ParseFromString(response.metrics)
+ return metrics
+
+ def enable_metatrace(self):
+ """Enable metatrace for the currently running trace_processor.
+ """
+ return self.http.enable_metatrace()
+
+ def disable_and_read_metatrace(self):
+ """Disable and return the metatrace formed from the currently running
+ trace_processor. This must be enabled before attempting to disable. This
+ returns the serialized bytes of the metatrace data directly. Raises
+ TraceProcessorException if the response returns with an error.
+ """
+ response = self.http.disable_and_read_metatrace()
+ if response.error:
+ raise TraceProcessorException(response.error)
+
+ return response.metatrace
+
+ def _create_tp_http(self, addr: str) -> TraceProcessorHttp:
+ if addr:
+ p = urlparse(addr)
+ parsed = p.netloc if p.netloc else p.path
+ return TraceProcessorHttp(parsed, protos=self.protos)
+
+ url, self.subprocess = load_shell(self.config.bin_path,
+ self.config.unique_port,
+ self.config.verbose,
+ self.config.ingest_ftrace_in_raw,
+ self.platform_delegate)
+ return TraceProcessorHttp(url, protos=self.protos)
+
+ def _parse_trace(self, trace: TraceReference):
+ resolved_lst = self.resolver_registry.resolve(trace)
+ if not resolved_lst:
+ raise TraceProcessorException(
+ 'trace argument did not resolve to a trace.')
+
+ if len(resolved_lst) > 1:
+ raise TraceProcessorException(
+ 'trace argument resolved to more than one trace. Trace processor '
+ 'only supports loading a single trace; please use '
+ 'BatchTraceProcessor to operate on multiple traces.')
+
+ resolved = resolved_lst[0]
+ for chunk in resolved.generator:
+ result = self.http.parse(chunk)
+ if result.error:
+ raise TraceProcessorException(
+ f'Failed while parsing trace. Error message: {result.error}')
+ self.http.notify_eof()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ del a, b, c # Unused.
+ self.close()
+ return False
+
+ def close(self):
+ if hasattr(self, 'subprocess'):
+ self.subprocess.kill()
+ self.http.conn.close()