diff options
Diffstat (limited to 'src/tracing/ipc/service/consumer_ipc_service.cc')
-rw-r--r-- | src/tracing/ipc/service/consumer_ipc_service.cc | 127 |
1 files changed, 100 insertions, 27 deletions
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc index 0f8d64db0..073bc7457 100644 --- a/src/tracing/ipc/service/consumer_ipc_service.cc +++ b/src/tracing/ipc/service/consumer_ipc_service.cc @@ -29,6 +29,7 @@ #include "perfetto/ext/tracing/core/trace_stats.h" #include "perfetto/ext/tracing/core/tracing_service.h" #include "perfetto/tracing/core/trace_config.h" +#include "perfetto/tracing/core/tracing_service_capabilities.h" #include "perfetto/tracing/core/tracing_service_state.h" namespace perfetto { @@ -71,7 +72,7 @@ void ConsumerIPCService::EnableTracing( } const TraceConfig& trace_config = req.trace_config(); base::ScopedFile fd; - if (trace_config.write_into_file()) + if (trace_config.write_into_file() && trace_config.output_path().empty()) fd = ipc::Service::TakeReceivedFD(); remote_consumer->service_endpoint->EnableTracing(trace_config, std::move(fd)); remote_consumer->enable_tracing_response = std::move(resp); @@ -170,22 +171,15 @@ void ConsumerIPCService::ObserveEvents( remote_consumer->observe_events_response = std::move(resp); - bool observe_instances = false; + uint32_t events_mask = 0; for (const auto& type : req.events_to_observe()) { - switch (static_cast<int>(type)) { - case protos::gen::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES: - observe_instances = true; - break; - default: - PERFETTO_DFATAL("Unknown ObservableEvent type: %d", type); - break; - } + events_mask |= static_cast<uint32_t>(type); } - remote_consumer->service_endpoint->ObserveEvents(observe_instances); + remote_consumer->service_endpoint->ObserveEvents(events_mask); // If no events are to be observed, close the stream immediately so that the // client can clean up. - if (req.events_to_observe().size() == 0) + if (events_mask == 0) remote_consumer->CloseObserveEventsResponseStream(); } @@ -205,19 +199,6 @@ void ConsumerIPCService::QueryServiceState( remote_consumer->service_endpoint->QueryServiceState(callback); } -// Called by the service in response to a service_endpoint->Flush() request. -void ConsumerIPCService::OnFlushCallback( - bool success, - PendingFlushResponses::iterator pending_response_it) { - DeferredFlushResponse response(std::move(*pending_response_it)); - pending_flush_responses_.erase(pending_response_it); - if (success) { - response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create()); - } else { - response.Reject(); - } -} - // Called by the service in response to service_endpoint->QueryServiceState(). void ConsumerIPCService::OnQueryServiceCallback( bool success, @@ -225,16 +206,108 @@ void ConsumerIPCService::OnQueryServiceCallback( PendingQuerySvcResponses::iterator pending_response_it) { DeferredQueryServiceStateResponse response(std::move(*pending_response_it)); pending_query_service_responses_.erase(pending_response_it); - if (success) { + if (!success) { + response.Reject(); + return; + } + + // The TracingServiceState object might be too big to fit into a single IPC + // message because it contains the DataSourceDescriptor of each data source. + // Here we split it in chunks to fit in the IPC limit, observing the + // following rule: each chunk must be invididually a valid TracingServiceState + // message; all the chunks concatenated together must form the original + // message. This is to deal with the legacy API that was just sending one + // whole message (failing in presence of too many data sources, b/153142114). + // The message is split as follows: we take the whole TracingServiceState, + // take out the data sources section (which is a top-level repeated field) + // and re-add them one-by-one. If, in the process of appending, the IPC msg + // size is reached, a new chunk is created. This assumes that the rest of + // TracingServiceState fits in one IPC message and each DataSourceDescriptor + // fits in the worst case in a dedicated message (which is true, because + // otherwise the RegisterDataSource() which passes the descriptor in the first + // place would fail). + + std::vector<uint8_t> chunked_reply; + + // Transmits the current chunk and starts a new one. + bool sent_eof = false; + auto send_chunked_reply = [&chunked_reply, &response, + &sent_eof](bool has_more) { + PERFETTO_CHECK(!sent_eof); + sent_eof = !has_more; auto resp = ipc::AsyncResult<protos::gen::QueryServiceStateResponse>::Create(); - *resp->mutable_service_state() = svc_state; + resp.set_has_more(has_more); + PERFETTO_CHECK(resp->mutable_service_state()->ParseFromArray( + chunked_reply.data(), chunked_reply.size())); + chunked_reply.clear(); response.Resolve(std::move(resp)); + }; + + // Create a copy of the whole response and cut away the data_sources section. + protos::gen::TracingServiceState svc_state_copy = svc_state; + auto data_sources = std::move(*svc_state_copy.mutable_data_sources()); + chunked_reply = svc_state_copy.SerializeAsArray(); + + // Now re-add them fitting within the IPC message limits (- some margin for + // the outer IPC frame). + constexpr size_t kMaxMsgSize = ipc::kIPCBufferSize - 128; + for (const auto& data_source : data_sources) { + protos::gen::TracingServiceState tmp; + tmp.mutable_data_sources()->emplace_back(std::move(data_source)); + std::vector<uint8_t> chunk = tmp.SerializeAsArray(); + if (chunked_reply.size() + chunk.size() < kMaxMsgSize) { + chunked_reply.insert(chunked_reply.end(), chunk.begin(), chunk.end()); + } else { + send_chunked_reply(/*has_more=*/true); + chunked_reply = std::move(chunk); + } + } + + PERFETTO_DCHECK(!chunked_reply.empty()); + send_chunked_reply(/*has_more=*/false); + PERFETTO_CHECK(sent_eof); +} + +// Called by the service in response to a service_endpoint->Flush() request. +void ConsumerIPCService::OnFlushCallback( + bool success, + PendingFlushResponses::iterator pending_response_it) { + DeferredFlushResponse response(std::move(*pending_response_it)); + pending_flush_responses_.erase(pending_response_it); + if (success) { + response.Resolve(ipc::AsyncResult<protos::gen::FlushResponse>::Create()); } else { response.Reject(); } } +void ConsumerIPCService::QueryCapabilities( + const protos::gen::QueryCapabilitiesRequest&, + DeferredQueryCapabilitiesResponse resp) { + RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest(); + auto it = pending_query_capabilities_responses_.insert( + pending_query_capabilities_responses_.end(), std::move(resp)); + auto weak_this = weak_ptr_factory_.GetWeakPtr(); + auto callback = [weak_this, it](const TracingServiceCapabilities& caps) { + if (weak_this) + weak_this->OnQueryCapabilitiesCallback(caps, std::move(it)); + }; + remote_consumer->service_endpoint->QueryCapabilities(callback); +} + +// Called by the service in response to service_endpoint->QueryCapabilities(). +void ConsumerIPCService::OnQueryCapabilitiesCallback( + const TracingServiceCapabilities& caps, + PendingQueryCapabilitiesResponses::iterator pending_response_it) { + DeferredQueryCapabilitiesResponse response(std::move(*pending_response_it)); + pending_query_capabilities_responses_.erase(pending_response_it); + auto resp = + ipc::AsyncResult<protos::gen::QueryCapabilitiesResponse>::Create(); + *resp->mutable_capabilities() = caps; + response.Resolve(std::move(resp)); +} + //////////////////////////////////////////////////////////////////////////////// // RemoteConsumer methods //////////////////////////////////////////////////////////////////////////////// |