diff options
Diffstat (limited to 'python/perfetto/trace_processor/api.py')
-rw-r--r-- | python/perfetto/trace_processor/api.py | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/python/perfetto/trace_processor/api.py b/python/perfetto/trace_processor/api.py new file mode 100644 index 000000000..a5efbcde2 --- /dev/null +++ b/python/perfetto/trace_processor/api.py @@ -0,0 +1,359 @@ +# Copyright (C) 2020 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses as dc +from urllib.parse import urlparse +from typing import List, Optional + +from perfetto.trace_processor.http import TraceProcessorHttp +from perfetto.trace_processor.platform import PlatformDelegate +from perfetto.trace_processor.protos import ProtoFactory +from perfetto.trace_processor.shell import load_shell +from perfetto.trace_uri_resolver import registry +from perfetto.trace_uri_resolver.registry import ResolverRegistry + +# Defining this field as a module variable means this can be changed by +# implementations at startup and used for all TraceProcessor objects +# without having to specify on each one. +# In Google3, this field is rewritten using Copybara to a implementation +# which can integrates with internal infra. +PLATFORM_DELEGATE = PlatformDelegate + +TraceReference = registry.TraceReference + +# Custom exception raised if any trace_processor functions return a +# response with an error defined +class TraceProcessorException(Exception): + + def __init__(self, message): + super().__init__(message) + + +@dc.dataclass +class TraceProcessorConfig: + bin_path: Optional[str] + unique_port: bool + verbose: bool + ingest_ftrace_in_raw: bool + resolver_registry: Optional[ResolverRegistry] + + def __init__(self, + bin_path: Optional[str] = None, + unique_port: bool = True, + verbose: bool = False, + ingest_ftrace_in_raw: bool = False, + resolver_registry: Optional[ResolverRegistry] = None): + self.bin_path = bin_path + self.unique_port = unique_port + self.verbose = verbose + self.ingest_ftrace_in_raw = ingest_ftrace_in_raw + self.resolver_registry = resolver_registry + + +class TraceProcessor: + + # Values of these constants correspond to the QueryResponse message at + # protos/perfetto/trace_processor/trace_processor.proto + QUERY_CELL_INVALID_FIELD_ID = 0 + QUERY_CELL_NULL_FIELD_ID = 1 + QUERY_CELL_VARINT_FIELD_ID = 2 + QUERY_CELL_FLOAT64_FIELD_ID = 3 + QUERY_CELL_STRING_FIELD_ID = 4 + QUERY_CELL_BLOB_FIELD_ID = 5 + + # This is the class returned to the user and contains one row of the + # resultant query. Each column name is stored as an attribute of this + # class, with the value corresponding to the column name and row in + # the query results table. + class Row(object): + + def __str__(self): + return str(self.__dict__) + + def __repr__(self): + return self.__dict__ + + class QueryResultIterator: + + def __init__(self, column_names, batches): + self.__column_names = column_names + self.__column_count = 0 + self.__count = 0 + self.__cells = [] + self.__data_lists = [[], [], [], [], [], []] + self.__data_lists_index = [0, 0, 0, 0, 0, 0] + self.__current_index = 0 + + # Iterate over all the batches and collect their + # contents into lists based on the type of the batch + batch_index = 0 + while True: + # It's possible on some occasions that there are non UTF-8 characters + # in the string_cells field. If this is the case, string_cells is + # a bytestring which needs to be decoded (but passing ignore so that + # we don't fail in decoding). + strings_batch_str = batches[batch_index].string_cells + try: + strings_batch_str = strings_batch_str.decode('utf-8', 'ignore') + except AttributeError: + # AttributeError can occur when |strings_batch_str| is an str which + # happens when everything in it is UTF-8 (protobuf automatically + # does the conversion if it can). + pass + + # Null-terminated strings in a batch are concatenated + # into a single large byte array, so we split on the + # null-terminator to get the individual strings + strings_batch = strings_batch_str.split('\0')[:-1] + self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend( + strings_batch) + self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend( + batches[batch_index].varint_cells) + self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend( + batches[batch_index].float64_cells) + self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend( + batches[batch_index].blob_cells) + self.__cells.extend(batches[batch_index].cells) + + if batches[batch_index].is_last_batch: + break + batch_index += 1 + + # If there are no rows in the query result, don't bother updating the + # counts to avoid dealing with / 0 errors. + if len(self.__cells) == 0: + return + + # The count we collected so far was a count of all individual columns + # in the query result, so we divide by the number of columns in a row + # to get the number of rows + self.__column_count = len(self.__column_names) + self.__count = int(len(self.__cells) / self.__column_count) + + # Data integrity check - see that we have the expected amount of cells + # for the number of rows that we need to return + if len(self.__cells) % self.__column_count != 0: + raise TraceProcessorException("Cell count " + str(len(self.__cells)) + + " is not a multiple of column count " + + str(len(self.__column_names))) + + # To use the query result as a populated Pandas dataframe, this + # function must be called directly after calling query inside + # TraceProcesor. + def as_pandas_dataframe(self): + try: + import pandas as pd + + # Populate the dataframe with the query results + rows = [] + for i in range(0, self.__count): + row = [] + base_cell_index = i * self.__column_count + for num in range(len(self.__column_names)): + col_type = self.__cells[base_cell_index + num] + if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: + raise TraceProcessorException('Invalid cell type') + + if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID: + row.append(None) + else: + col_index = self.__data_lists_index[col_type] + self.__data_lists_index[col_type] += 1 + row.append(self.__data_lists[col_type][col_index]) + rows.append(row) + + df = pd.DataFrame(rows, columns=self.__column_names) + return df.astype(object).where(df.notnull(), + None).reset_index(drop=True) + + except ModuleNotFoundError: + raise TraceProcessorException( + 'Python dependencies missing. Please pip3 install pandas numpy') + + def __len__(self): + return self.__count + + def __iter__(self): + return self + + def __next__(self): + if self.__current_index == self.__count: + raise StopIteration + result = TraceProcessor.Row() + base_cell_index = self.__current_index * self.__column_count + for num, column_name in enumerate(self.__column_names): + col_type = self.__cells[base_cell_index + num] + if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: + raise TraceProcessorException('Invalid cell type') + if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID: + col_index = self.__data_lists_index[col_type] + self.__data_lists_index[col_type] += 1 + setattr(result, column_name, self.__data_lists[col_type][col_index]) + else: + setattr(result, column_name, None) + + self.__current_index += 1 + return result + + def __init__(self, + trace: Optional[TraceReference] = None, + addr: Optional[str] = None, + config: TraceProcessorConfig = TraceProcessorConfig(), + file_path: Optional[str] = None): + """Create a trace processor instance. + + Args: + trace: reference to a trace to be loaded into the trace + processor instance. + + One of several types is supported: + 1) path to a trace file to open and read + 2) a file like object (file, io.BytesIO or similar) to read + 3) a generator yielding bytes + 4) a trace URI which resolves to one of the above types + 5) a trace URI resolver; this is a subclass of + resolver.TraceUriResolver which generates a reference to a + trace when the |resolve| method is called on it. + + An URI is similar to a connection string (e.g. for a web + address or SQL database) which specifies where to lookup traces + and which traces to pick from this data source. The format of a + string should be as follows: + resolver_name:key_1=list,of,values;key_2=value + + Custom resolvers can be provided to handle URIs via + |config.resolver_registry|. + addr: address of a running trace processor instance. Useful to query an + already loaded trace. + config: configuration options which customize functionality of trace + processor and the Python binding. + file_path (deprecated): path to a trace file to load. Use + |trace| instead of this field: specifying both will cause + an exception to be thrown. + """ + + if trace and file_path: + raise TraceProcessorException( + "trace and file_path cannot both be specified.") + + self.config = config + self.platform_delegate = PLATFORM_DELEGATE() + self.protos = ProtoFactory(self.platform_delegate) + self.resolver_registry = config.resolver_registry or \ + self.platform_delegate.default_resolver_registry() + self.http = self._create_tp_http(addr) + + if trace or file_path: + self._parse_trace(trace if trace else file_path) + + def query(self, sql: str): + """Executes passed in SQL query using class defined HTTP API, and returns + the response as a QueryResultIterator. Raises TraceProcessorException if + the response returns with an error. + + Args: + sql: SQL query written as a String + + Returns: + A class which can iterate through each row of the results table. This + can also be converted to a pandas dataframe by calling the + as_pandas_dataframe() function after calling query. + """ + response = self.http.execute_query(sql) + if response.error: + raise TraceProcessorException(response.error) + + return TraceProcessor.QueryResultIterator(response.column_names, + response.batch) + + def metric(self, metrics: List[str]): + """Returns the metrics data corresponding to the passed in trace metric. + Raises TraceProcessorException if the response returns with an error. + + Args: + metrics: A list of valid metrics as defined in TraceMetrics + + Returns: + The metrics data as a proto message + """ + response = self.http.compute_metric(metrics) + if response.error: + raise TraceProcessorException(response.error) + + metrics = self.protos.TraceMetrics() + metrics.ParseFromString(response.metrics) + return metrics + + def enable_metatrace(self): + """Enable metatrace for the currently running trace_processor. + """ + return self.http.enable_metatrace() + + def disable_and_read_metatrace(self): + """Disable and return the metatrace formed from the currently running + trace_processor. This must be enabled before attempting to disable. This + returns the serialized bytes of the metatrace data directly. Raises + TraceProcessorException if the response returns with an error. + """ + response = self.http.disable_and_read_metatrace() + if response.error: + raise TraceProcessorException(response.error) + + return response.metatrace + + def _create_tp_http(self, addr: str) -> TraceProcessorHttp: + if addr: + p = urlparse(addr) + parsed = p.netloc if p.netloc else p.path + return TraceProcessorHttp(parsed, protos=self.protos) + + url, self.subprocess = load_shell(self.config.bin_path, + self.config.unique_port, + self.config.verbose, + self.config.ingest_ftrace_in_raw, + self.platform_delegate) + return TraceProcessorHttp(url, protos=self.protos) + + def _parse_trace(self, trace: TraceReference): + resolved_lst = self.resolver_registry.resolve(trace) + if not resolved_lst: + raise TraceProcessorException( + 'trace argument did not resolve to a trace.') + + if len(resolved_lst) > 1: + raise TraceProcessorException( + 'trace argument resolved to more than one trace. Trace processor ' + 'only supports loading a single trace; please use ' + 'BatchTraceProcessor to operate on multiple traces.') + + resolved = resolved_lst[0] + for chunk in resolved.generator: + result = self.http.parse(chunk) + if result.error: + raise TraceProcessorException( + f'Failed while parsing trace. Error message: {result.error}') + self.http.notify_eof() + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + del a, b, c # Unused. + self.close() + return False + + def close(self): + if hasattr(self, 'subprocess'): + self.subprocess.kill() + self.http.conn.close() |