aboutsummaryrefslogtreecommitdiff
path: root/src/tracing/ipc/service/consumer_ipc_service.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/tracing/ipc/service/consumer_ipc_service.cc')
-rw-r--r--src/tracing/ipc/service/consumer_ipc_service.cc127
1 files changed, 100 insertions, 27 deletions
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index 0f8d64db0..073bc7457 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -29,6 +29,7 @@
#include "perfetto/ext/tracing/core/trace_stats.h"
#include "perfetto/ext/tracing/core/tracing_service.h"
#include "perfetto/tracing/core/trace_config.h"
+#include "perfetto/tracing/core/tracing_service_capabilities.h"
#include "perfetto/tracing/core/tracing_service_state.h"
namespace perfetto {
@@ -71,7 +72,7 @@ void ConsumerIPCService::EnableTracing(
}
const TraceConfig& trace_config = req.trace_config();
base::ScopedFile fd;
- if (trace_config.write_into_file())
+ if (trace_config.write_into_file() && trace_config.output_path().empty())
fd = ipc::Service::TakeReceivedFD();
remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd));
remote_consumer->enable_tracing_response = std::move(resp);
@@ -170,22 +171,15 @@ void ConsumerIPCService::ObserveEvents(
remote_consumer->observe_events_response = std::move(resp);
- bool observe_instances = false;
+ uint32_t events_mask = 0;
for (const auto& type : req.events_to_observe()) {
- switch (static_cast<int>(type)) {
- case protos::gen::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES:
- observe_instances = true;
- break;
- default:
- PERFETTO_DFATAL("Unknown ObservableEvent type: %d", type);
- break;
- }
+ events_mask |= static_cast<uint32_t>(type);
}
- remote_consumer->service_endpoint->ObserveEvents(observe_instances);
+ remote_consumer->service_endpoint->ObserveEvents(events_mask);
// If no events are to be observed, close the stream immediately so that the
// client can clean up.
- if (req.events_to_observe().size() == 0)
+ if (events_mask == 0)
remote_consumer->CloseObserveEventsResponseStream();
}
@@ -205,19 +199,6 @@ void ConsumerIPCService::QueryServiceState(
remote_consumer->service_endpoint->QueryServiceState(callback);
}
-// Called by the service in response to a service_endpoint->Flush() request.
-void ConsumerIPCService::OnFlushCallback(
- bool success,
- PendingFlushResponses::iterator pending_response_it) {
- DeferredFlushResponse response(std::move(*pending_response_it));
- pending_flush_responses_.erase(pending_response_it);
- if (success) {
- response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
- } else {
- response.Reject();
- }
-}
-
// Called by the service in response to service_endpoint->QueryServiceState().
void ConsumerIPCService::OnQueryServiceCallback(
bool success,
@@ -225,16 +206,108 @@ void ConsumerIPCService::OnQueryServiceCallback(
PendingQuerySvcResponses::iterator pending_response_it) {
DeferredQueryServiceStateResponse response(std::move(*pending_response_it));
pending_query_service_responses_.erase(pending_response_it);
- if (success) {
+ if (!success) {
+ response.Reject();
+ return;
+ }
+
+ // The TracingServiceState object might be too big to fit into a single IPC
+ // message because it contains the DataSourceDescriptor of each data source.
+ // Here we split it in chunks to fit in the IPC limit, observing the
+ // following rule: each chunk must be invididually a valid TracingServiceState
+ // message; all the chunks concatenated together must form the original
+ // message. This is to deal with the legacy API that was just sending one
+ // whole message (failing in presence of too many data sources, b/153142114).
+ // The message is split as follows: we take the whole TracingServiceState,
+ // take out the data sources section (which is a top-level repeated field)
+ // and re-add them one-by-one. If, in the process of appending, the IPC msg
+ // size is reached, a new chunk is created. This assumes that the rest of
+ // TracingServiceState fits in one IPC message and each DataSourceDescriptor
+ // fits in the worst case in a dedicated message (which is true, because
+ // otherwise the RegisterDataSource() which passes the descriptor in the first
+ // place would fail).
+
+ std::vector<uint8_t> chunked_reply;
+
+ // Transmits the current chunk and starts a new one.
+ bool sent_eof = false;
+ auto send_chunked_reply = [&chunked_reply, &response,
+ &sent_eof](bool has_more) {
+ PERFETTO_CHECK(!sent_eof);
+ sent_eof = !has_more;
auto resp =
ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create();
- *resp->mutable_service_state() = svc_state;
+ resp.set_has_more(has_more);
+ PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray(
+ chunked_reply.data(), chunked_reply.size()));
+ chunked_reply.clear();
response.Resolve(std::move(resp));
+ };
+
+ // Create a copy of the whole response and cut away the data_sources section.
+ protos::gen::TracingServiceState svc_state_copy = svc_state;
+ auto data_sources = std::move(*svc_state_copy.mutable_data_sources());
+ chunked_reply = svc_state_copy.SerializeAsArray();
+
+ // Now re-add them fitting within the IPC message limits (- some margin for
+ // the outer IPC frame).
+ constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128;
+ for (const auto& data_source : data_sources) {
+ protos::gen::TracingServiceState tmp;
+ tmp.mutable_data_sources()->emplace_back(std::move(data_source));
+ std::vector<uint8_t> chunk = tmp.SerializeAsArray();
+ if (chunked_reply.size() + chunk.size() < kMaxMsgSize) {
+ chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end());
+ } else {
+ send_chunked_reply(/*has_more=*/true);
+ chunked_reply = std::move(chunk);
+ }
+ }
+
+ PERFETTO_DCHECK(!chunked_reply.empty());
+ send_chunked_reply(/*has_more=*/false);
+ PERFETTO_CHECK(sent_eof);
+}
+
+// Called by the service in response to a service_endpoint->Flush() request.
+void ConsumerIPCService::OnFlushCallback(
+ bool success,
+ PendingFlushResponses::iterator pending_response_it) {
+ DeferredFlushResponse response(std::move(*pending_response_it));
+ pending_flush_responses_.erase(pending_response_it);
+ if (success) {
+ response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create());
} else {
response.Reject();
}
}
+void ConsumerIPCService::QueryCapabilities(
+ const protos::gen::QueryCapabilitiesRequest&,
+ DeferredQueryCapabilitiesResponse resp) {
+ RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+ auto it = pending_query_capabilities_responses_.insert(
+ pending_query_capabilities_responses_.end(), std::move(resp));
+ auto weak_this = weak_ptr_factory_.GetWeakPtr();
+ auto callback = [weak_this, it](const TracingServiceCapabilities& caps) {
+ if (weak_this)
+ weak_this->OnQueryCapabilitiesCallback(caps, std::move(it));
+ };
+ remote_consumer->service_endpoint->QueryCapabilities(callback);
+}
+
+// Called by the service in response to service_endpoint->QueryCapabilities().
+void ConsumerIPCService::OnQueryCapabilitiesCallback(
+ const TracingServiceCapabilities& caps,
+ PendingQueryCapabilitiesResponses::iterator pending_response_it) {
+ DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it));
+ pending_query_capabilities_responses_.erase(pending_response_it);
+ auto resp =
+ ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create();
+ *resp->mutable_capabilities() = caps;
+ response.Resolve(std::move(resp));
+}
+
////////////////////////////////////////////////////////////////////////////////
// RemoteConsumer methods
////////////////////////////////////////////////////////////////////////////////