aboutsummaryrefslogtreecommitdiff
path: root/src/tracing/internal/tracing_muxer_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/tracing/internal/tracing_muxer_impl.cc')
-rw-r--r--src/tracing/internal/tracing_muxer_impl.cc958
1 files changed, 0 insertions, 958 deletions
diff --git a/src/tracing/internal/tracing_muxer_impl.cc b/src/tracing/internal/tracing_muxer_impl.cc
deleted file mode 100644
index 1ce6f9387..000000000
--- a/src/tracing/internal/tracing_muxer_impl.cc
+++ /dev/null
@@ -1,958 +0,0 @@
-/*
- * Copyright (C) 2019 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "src/tracing/internal/tracing_muxer_impl.h"
-
-#include <algorithm>
-#include <atomic>
-#include <mutex>
-#include <vector>
-
-#include "perfetto/base/build_config.h"
-#include "perfetto/base/logging.h"
-#include "perfetto/base/task_runner.h"
-#include "perfetto/ext/base/hash.h"
-#include "perfetto/ext/base/thread_checker.h"
-#include "perfetto/ext/base/waitable_event.h"
-#include "perfetto/ext/tracing/core/trace_packet.h"
-#include "perfetto/ext/tracing/core/trace_writer.h"
-#include "perfetto/ext/tracing/core/tracing_service.h"
-#include "perfetto/tracing/buffer_exhausted_policy.h"
-#include "perfetto/tracing/core/data_source_config.h"
-#include "perfetto/tracing/data_source.h"
-#include "perfetto/tracing/internal/data_source_internal.h"
-#include "perfetto/tracing/trace_writer_base.h"
-#include "perfetto/tracing/tracing.h"
-#include "perfetto/tracing/tracing_backend.h"
-#include "protos/perfetto/config/trace_config.pb.h"
-#include "src/tracing/internal/in_process_tracing_backend.h"
-#include "src/tracing/internal/system_tracing_backend.h"
-
-namespace perfetto {
-namespace internal {
-
-namespace {
-
-class StopArgsImpl : public DataSourceBase::StopArgs {
- public:
- std::function<void()> HandleStopAsynchronously() const override {
- auto closure = std::move(async_stop_closure);
- async_stop_closure = std::function<void()>();
- return closure;
- }
-
- mutable std::function<void()> async_stop_closure;
-};
-
-uint64_t ComputeConfigHash(const DataSourceConfig& config) {
- base::Hash hasher;
- perfetto::protos::DataSourceConfig config_proto;
- config.ToProto(&config_proto);
- std::string config_bytes;
- bool serialized = config_proto.SerializeToString(&config_bytes);
- PERFETTO_DCHECK(serialized);
- hasher.Update(&config_bytes[0], config_bytes.size());
- return hasher.digest();
-}
-
-} // namespace
-
-// ----- Begin of TracingMuxerImpl::ProducerImpl
-TracingMuxerImpl::ProducerImpl::ProducerImpl(TracingMuxerImpl* muxer,
- TracingBackendId backend_id)
- : muxer_(muxer), backend_id_(backend_id) {}
-TracingMuxerImpl::ProducerImpl::~ProducerImpl() = default;
-
-void TracingMuxerImpl::ProducerImpl::Initialize(
- std::unique_ptr<ProducerEndpoint> endpoint) {
- service_ = std::move(endpoint);
-}
-
-void TracingMuxerImpl::ProducerImpl::OnConnect() {
- PERFETTO_DLOG("Producer connected");
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_DCHECK(!connected_);
- connected_ = true;
- muxer_->UpdateDataSourcesOnAllBackends();
-}
-
-void TracingMuxerImpl::ProducerImpl::OnDisconnect() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- connected_ = false;
- // TODO: handle more graceful.
- PERFETTO_ELOG("Cannot connect to traced. Is it running?");
-}
-
-void TracingMuxerImpl::ProducerImpl::OnTracingSetup() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
-}
-
-void TracingMuxerImpl::ProducerImpl::SetupDataSource(
- DataSourceInstanceID id,
- const DataSourceConfig& cfg) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- muxer_->SetupDataSource(backend_id_, id, cfg);
-}
-
-void TracingMuxerImpl::ProducerImpl::StartDataSource(DataSourceInstanceID id,
- const DataSourceConfig&) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- muxer_->StartDataSource(backend_id_, id);
- service_->NotifyDataSourceStarted(id);
-}
-
-void TracingMuxerImpl::ProducerImpl::StopDataSource(DataSourceInstanceID id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- muxer_->StopDataSource_AsyncBegin(backend_id_, id);
-}
-
-void TracingMuxerImpl::ProducerImpl::Flush(FlushRequestID flush_id,
- const DataSourceInstanceID*,
- size_t) {
- // Flush is not plumbed for now, we just ack straight away.
- PERFETTO_DCHECK_THREAD(thread_checker_);
- service_->NotifyFlushComplete(flush_id);
-}
-
-void TracingMuxerImpl::ProducerImpl::ClearIncrementalState(
- const DataSourceInstanceID*,
- size_t) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- // TODO(skyostil): Mark each affected data source's incremental state as
- // needing to be cleared.
-}
-// ----- End of TracingMuxerImpl::ProducerImpl methods.
-
-// ----- Begin of TracingMuxerImpl::ConsumerImpl
-TracingMuxerImpl::ConsumerImpl::ConsumerImpl(TracingMuxerImpl* muxer,
- TracingBackendId backend_id,
- TracingSessionGlobalID session_id)
- : muxer_(muxer), backend_id_(backend_id), session_id_(session_id) {}
-
-TracingMuxerImpl::ConsumerImpl::~ConsumerImpl() = default;
-
-void TracingMuxerImpl::ConsumerImpl::Initialize(
- std::unique_ptr<ConsumerEndpoint> endpoint) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- service_ = std::move(endpoint);
- // Observe data source instance events so we get notified when tracing starts.
- service_->ObserveEvents(ConsumerEndpoint::kDataSourceInstances);
-}
-
-void TracingMuxerImpl::ConsumerImpl::OnConnect() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_DCHECK(!connected_);
- connected_ = true;
-
- // If the API client configured and started tracing before we connected,
- // tell the backend about it now.
- if (trace_config_) {
- muxer_->SetupTracingSession(session_id_, trace_config_);
- if (start_pending_)
- muxer_->StartTracingSession(session_id_);
- if (stop_pending_)
- muxer_->StopTracingSession(session_id_);
- }
-}
-
-void TracingMuxerImpl::ConsumerImpl::OnDisconnect() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- // It shouldn't be necessary to call StopTracingSession. If we get this call
- // it means that the service did shutdown before us, so there is no point
- // trying it to ask it to stop the session. We should just remember to cleanup
- // the consumer vector.
- connected_ = false;
-
- // TODO notify the client somehow.
-
- // Notify the muxer that it is safe to destroy |this|. This is needed because
- // the ConsumerEndpoint stored in |service_| requires that |this| be safe to
- // access until OnDisconnect() is called.
- muxer_->OnConsumerDisconnected(this);
-}
-
-void TracingMuxerImpl::ConsumerImpl::Disconnect() {
- // This is weird and deserves a comment.
- //
- // When we called the ConnectConsumer method on the service it returns
- // us a ConsumerEndpoint which we stored in |service_|, however this
- // ConsumerEndpoint holds a pointer to the ConsumerImpl pointed to by
- // |this|. Part of the API contract to TracingService::ConnectConsumer is that
- // the ConsumerImpl pointer has to be valid until the
- // ConsumerImpl::OnDisconnect method is called. Therefore we reset the
- // ConsumerEndpoint |service_|. Eventually this will call
- // ConsumerImpl::OnDisconnect and we will inform the muxer it is safe to
- // call the destructor of |this|.
- service_.reset();
-}
-
-void TracingMuxerImpl::ConsumerImpl::OnTracingDisabled() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_DCHECK(!stopped_);
- stopped_ = true;
- // If we're still waiting for the start event, fire it now. This may happen if
- // there are no active data sources in the session.
- NotifyStartComplete();
- NotifyStopComplete();
-}
-
-void TracingMuxerImpl::ConsumerImpl::NotifyStartComplete() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- if (blocking_start_complete_callback_) {
- muxer_->task_runner_->PostTask(
- std::move(blocking_start_complete_callback_));
- blocking_start_complete_callback_ = nullptr;
- }
-}
-
-void TracingMuxerImpl::ConsumerImpl::NotifyStopComplete() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- if (stop_complete_callback_) {
- muxer_->task_runner_->PostTask(std::move(stop_complete_callback_));
- stop_complete_callback_ = nullptr;
- }
- if (blocking_stop_complete_callback_) {
- muxer_->task_runner_->PostTask(std::move(blocking_stop_complete_callback_));
- blocking_stop_complete_callback_ = nullptr;
- }
-}
-
-void TracingMuxerImpl::ConsumerImpl::OnTraceData(
- std::vector<TracePacket> packets,
- bool has_more) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- if (!read_trace_callback_)
- return;
-
- size_t capacity = 0;
- for (const auto& packet : packets) {
- // 16 is an over-estimation of the proto preamble size
- capacity += packet.size() + 16;
- }
-
- // The shared_ptr is to avoid making a copy of the buffer when PostTask-ing.
- std::shared_ptr<std::vector<char>> buf(new std::vector<char>());
- buf->reserve(capacity);
- for (auto& packet : packets) {
- char* start;
- size_t size;
- std::tie(start, size) = packet.GetProtoPreamble();
- buf->insert(buf->end(), start, start + size);
- for (auto& slice : packet.slices()) {
- const auto* slice_data = reinterpret_cast<const char*>(slice.start);
- buf->insert(buf->end(), slice_data, slice_data + slice.size);
- }
- }
-
- auto callback = read_trace_callback_;
- muxer_->task_runner_->PostTask([callback, buf, has_more] {
- TracingSession::ReadTraceCallbackArgs callback_arg{};
- callback_arg.data = &(*buf)[0];
- callback_arg.size = buf->size();
- callback_arg.has_more = has_more;
- callback(callback_arg);
- });
-
- if (!has_more)
- read_trace_callback_ = nullptr;
-}
-
-void TracingMuxerImpl::ConsumerImpl::OnObservableEvents(
- const ObservableEvents& events) {
- if (events.instance_state_changes_size()) {
- for (const auto& state_change : events.instance_state_changes()) {
- DataSourceHandle handle{state_change.producer_name(),
- state_change.data_source_name()};
- data_source_states_[handle] =
- state_change.state() ==
- ObservableEvents::DATA_SOURCE_INSTANCE_STATE_STARTED;
- }
- // Data sources are first reported as being stopped before starting, so once
- // all the data sources we know about have started we can declare tracing
- // begun.
- if (blocking_start_complete_callback_) {
- bool all_data_sources_started = std::all_of(
- data_source_states_.cbegin(), data_source_states_.cend(),
- [](std::pair<DataSourceHandle, bool> state) { return state.second; });
- if (all_data_sources_started)
- NotifyStartComplete();
- }
- }
-}
-
-// The callbacks below are not used.
-void TracingMuxerImpl::ConsumerImpl::OnDetach(bool) {}
-void TracingMuxerImpl::ConsumerImpl::OnAttach(bool, const TraceConfig&) {}
-void TracingMuxerImpl::ConsumerImpl::OnTraceStats(bool, const TraceStats&) {}
-// ----- End of TracingMuxerImpl::ConsumerImpl
-
-// ----- Begin of TracingMuxerImpl::TracingSessionImpl
-
-// TracingSessionImpl is the RAII object returned to API clients when they
-// invoke Tracing::CreateTracingSession. They use it for starting/stopping
-// tracing.
-
-TracingMuxerImpl::TracingSessionImpl::TracingSessionImpl(
- TracingMuxerImpl* muxer,
- TracingSessionGlobalID session_id)
- : muxer_(muxer), session_id_(session_id) {}
-
-// Can be destroyed from any thread.
-TracingMuxerImpl::TracingSessionImpl::~TracingSessionImpl() {
- auto* muxer = muxer_;
- auto session_id = session_id_;
- muxer->task_runner_->PostTask(
- [muxer, session_id] { muxer->DestroyTracingSession(session_id); });
-}
-
-// Can be called from any thread.
-void TracingMuxerImpl::TracingSessionImpl::Setup(const TraceConfig& cfg,
- int fd) {
- auto* muxer = muxer_;
- auto session_id = session_id_;
- std::shared_ptr<TraceConfig> trace_config(new TraceConfig(cfg));
- if (fd >= 0) {
- trace_config->set_write_into_file(true);
- fd = dup(fd);
- }
- muxer->task_runner_->PostTask([muxer, session_id, trace_config, fd] {
- muxer->SetupTracingSession(session_id, trace_config, base::ScopedFile(fd));
- });
-}
-
-// Can be called from any thread.
-void TracingMuxerImpl::TracingSessionImpl::Start() {
- auto* muxer = muxer_;
- auto session_id = session_id_;
- muxer->task_runner_->PostTask(
- [muxer, session_id] { muxer->StartTracingSession(session_id); });
-}
-
-// Can be called from any thread except the service thread.
-void TracingMuxerImpl::TracingSessionImpl::StartBlocking() {
- PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
- auto* muxer = muxer_;
- auto session_id = session_id_;
- base::WaitableEvent tracing_started;
- muxer->task_runner_->PostTask([muxer, session_id, &tracing_started] {
- auto* consumer = muxer->FindConsumer(session_id);
- PERFETTO_DCHECK(!consumer->blocking_start_complete_callback_);
- consumer->blocking_start_complete_callback_ = [&] {
- tracing_started.Notify();
- };
- muxer->StartTracingSession(session_id);
- });
- tracing_started.Wait();
-}
-
-// Can be called from any thread.
-void TracingMuxerImpl::TracingSessionImpl::Stop() {
- auto* muxer = muxer_;
- auto session_id = session_id_;
- muxer->task_runner_->PostTask(
- [muxer, session_id] { muxer->StopTracingSession(session_id); });
-}
-
-// Can be called from any thread except the service thread.
-void TracingMuxerImpl::TracingSessionImpl::StopBlocking() {
- PERFETTO_DCHECK(!muxer_->task_runner_->RunsTasksOnCurrentThread());
- auto* muxer = muxer_;
- auto session_id = session_id_;
- base::WaitableEvent tracing_stopped;
- muxer->task_runner_->PostTask([muxer, session_id, &tracing_stopped] {
- auto* consumer = muxer->FindConsumer(session_id);
- PERFETTO_DCHECK(!consumer->blocking_stop_complete_callback_);
- consumer->blocking_stop_complete_callback_ = [&] {
- tracing_stopped.Notify();
- };
- muxer->StopTracingSession(session_id);
- });
- tracing_stopped.Wait();
-}
-
-// Can be called from any thread.
-void TracingMuxerImpl::TracingSessionImpl::ReadTrace(ReadTraceCallback cb) {
- auto* muxer = muxer_;
- auto session_id = session_id_;
- muxer->task_runner_->PostTask([muxer, session_id, cb] {
- muxer->ReadTracingSessionData(session_id, std::move(cb));
- });
-}
-
-// Can be called from any thread.
-void TracingMuxerImpl::TracingSessionImpl::SetOnStopCallback(
- std::function<void()> cb) {
- auto* muxer = muxer_;
- auto session_id = session_id_;
- muxer->task_runner_->PostTask([muxer, session_id, cb] {
- auto* consumer = muxer->FindConsumer(session_id);
- consumer->stop_complete_callback_ = cb;
- });
-}
-// ----- End of TracingMuxerImpl::TracingSessionImpl
-
-// static
-TracingMuxer* TracingMuxer::instance_ = nullptr;
-
-// This is called by perfetto::Tracing::Initialize().
-// Can be called on any thread. Typically, but not necessarily, that will be
-// the embedder's main thread.
-TracingMuxerImpl::TracingMuxerImpl(const TracingInitArgs& args)
- : TracingMuxer(args.platform ? args.platform
- : Platform::GetDefaultPlatform()) {
- PERFETTO_DETACH_FROM_THREAD(thread_checker_);
-
- // Create the thread where muxer, producers and service will live.
- task_runner_ = platform_->CreateTaskRunner({});
-
- // Run the initializer on that thread.
- task_runner_->PostTask([this, args] { Initialize(args); });
-}
-
-void TracingMuxerImpl::Initialize(const TracingInitArgs& args) {
- PERFETTO_DCHECK_THREAD(thread_checker_); // Rebind the thread checker.
-
- auto add_backend = [this, &args](TracingBackend* backend, BackendType type) {
- TracingBackendId backend_id = backends_.size();
- backends_.emplace_back();
- RegisteredBackend& rb = backends_.back();
- rb.backend = backend;
- rb.id = backend_id;
- rb.type = type;
- rb.producer.reset(new ProducerImpl(this, backend_id));
- TracingBackend::ConnectProducerArgs conn_args;
- conn_args.producer = rb.producer.get();
- conn_args.producer_name = platform_->GetCurrentProcessName();
- conn_args.task_runner = task_runner_.get();
- conn_args.shmem_size_hint_bytes = args.shmem_size_hint_kb * 1024;
- conn_args.shmem_page_size_hint_bytes = args.shmem_page_size_hint_kb * 1024;
- rb.producer->Initialize(rb.backend->ConnectProducer(conn_args));
- };
-
- if (args.backends & kSystemBackend) {
-#if (PERFETTO_BUILDFLAG(PERFETTO_IPC))
- add_backend(SystemTracingBackend::GetInstance(), kSystemBackend);
-#else
- PERFETTO_ELOG("System backend not supporteed in the current configuration");
-#endif
- }
-
- if (args.backends & kInProcessBackend)
- add_backend(InProcessTracingBackend::GetInstance(), kInProcessBackend);
-
- if (args.backends & kCustomBackend) {
- PERFETTO_CHECK(args.custom_backend);
- add_backend(args.custom_backend, kCustomBackend);
- }
-
- if (args.backends & ~(kSystemBackend | kInProcessBackend | kCustomBackend)) {
- PERFETTO_FATAL("Unsupported tracing backend type");
- }
-}
-
-// Can be called from any thread (but not concurrently).
-bool TracingMuxerImpl::RegisterDataSource(
- const DataSourceDescriptor& descriptor,
- DataSourceFactory factory,
- DataSourceStaticState* static_state) {
- // Ignore repeated registrations.
- if (static_state->index != kMaxDataSources)
- return true;
-
- static std::atomic<uint32_t> last_id{};
- uint32_t new_index = last_id++;
- if (new_index >= kMaxDataSources) {
- PERFETTO_DLOG(
- "RegisterDataSource failed: too many data sources already registered");
- return false;
- }
-
- // Initialize the static state.
- static_assert(sizeof(static_state->instances[0]) >= sizeof(DataSourceState),
- "instances[] size mismatch");
- for (size_t i = 0; i < static_state->instances.size(); i++)
- new (&static_state->instances[i]) DataSourceState{};
-
- static_state->index = new_index;
-
- task_runner_->PostTask([this, descriptor, factory, static_state] {
- data_sources_.emplace_back();
- RegisteredDataSource& rds = data_sources_.back();
- rds.descriptor = descriptor;
- rds.factory = factory;
- rds.static_state = static_state;
- UpdateDataSourcesOnAllBackends();
- });
- return true;
-}
-
-// Called by the service of one of the backends.
-void TracingMuxerImpl::SetupDataSource(TracingBackendId backend_id,
- DataSourceInstanceID instance_id,
- const DataSourceConfig& cfg) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_DLOG("Setting up data source %" PRIu64 " %s", instance_id,
- cfg.name().c_str());
- uint64_t config_hash = ComputeConfigHash(cfg);
-
- for (const auto& rds : data_sources_) {
- if (rds.descriptor.name() != cfg.name())
- continue;
- DataSourceStaticState& static_state = *rds.static_state;
-
- // If this data source is already active for this exact config, don't start
- // another instance. This happens when we have several data sources with the
- // same name, in which case the service sends one SetupDataSource event for
- // each one. Since we can't map which event maps to which data source, we
- // ensure each event only starts one data source instance.
- // TODO(skyostil): Register a unique id with each data source to the service
- // to disambiguate.
- bool active_for_config = false;
- for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
- if (!static_state.TryGet(i))
- continue;
- auto* internal_state =
- reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
- if (internal_state->backend_id == backend_id &&
- internal_state->config_hash == config_hash) {
- active_for_config = true;
- break;
- }
- }
- if (active_for_config) {
- PERFETTO_DLOG(
- "Data source %s is already active with this config, skipping",
- cfg.name().c_str());
- continue;
- }
-
- for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
- // Find a free slot.
- if (static_state.TryGet(i))
- continue;
-
- auto* internal_state =
- reinterpret_cast<DataSourceState*>(&static_state.instances[i]);
- std::lock_guard<std::recursive_mutex> guard(internal_state->lock);
- static_assert(
- std::is_same<decltype(internal_state->data_source_instance_id),
- DataSourceInstanceID>::value,
- "data_source_instance_id type mismatch");
- internal_state->backend_id = backend_id;
- internal_state->data_source_instance_id = instance_id;
- internal_state->buffer_id =
- static_cast<internal::BufferId>(cfg.target_buffer());
- internal_state->config_hash = config_hash;
- internal_state->data_source = rds.factory();
-
- // This must be made at the end. See matching acquire-load in
- // DataSource::Trace().
- static_state.valid_instances.fetch_or(1 << i, std::memory_order_release);
-
- DataSourceBase::SetupArgs setup_args;
- setup_args.config = &cfg;
- setup_args.internal_instance_index = i;
- internal_state->data_source->OnSetup(setup_args);
- return;
- }
- PERFETTO_ELOG(
- "Maximum number of data source instances exhausted. "
- "Dropping data source %" PRIu64,
- instance_id);
- break;
- }
-}
-
-// Called by the service of one of the backends.
-void TracingMuxerImpl::StartDataSource(TracingBackendId backend_id,
- DataSourceInstanceID instance_id) {
- PERFETTO_DLOG("Starting data source %" PRIu64, instance_id);
- PERFETTO_DCHECK_THREAD(thread_checker_);
-
- auto ds = FindDataSource(backend_id, instance_id);
- if (!ds) {
- PERFETTO_ELOG("Could not find data source to start");
- return;
- }
-
- DataSourceBase::StartArgs start_args{};
- start_args.internal_instance_index = ds.instance_idx;
-
- std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
- ds.internal_state->trace_lambda_enabled = true;
- ds.internal_state->data_source->OnStart(start_args);
-}
-
-// Called by the service of one of the backends.
-void TracingMuxerImpl::StopDataSource_AsyncBegin(
- TracingBackendId backend_id,
- DataSourceInstanceID instance_id) {
- PERFETTO_DLOG("Stopping data source %" PRIu64, instance_id);
- PERFETTO_DCHECK_THREAD(thread_checker_);
-
- auto ds = FindDataSource(backend_id, instance_id);
- if (!ds) {
- PERFETTO_ELOG("Could not find data source to stop");
- return;
- }
-
- StopArgsImpl stop_args{};
- stop_args.internal_instance_index = ds.instance_idx;
- stop_args.async_stop_closure = [this, backend_id, instance_id] {
- // TracingMuxerImpl is long lived, capturing |this| is okay.
- // The notification closure can be moved out of the StopArgs by the
- // embedder to handle stop asynchronously. The embedder might then
- // call the closure on a different thread than the current one, hence
- // this nested PostTask().
- task_runner_->PostTask([this, backend_id, instance_id] {
- StopDataSource_AsyncEnd(backend_id, instance_id);
- });
- };
-
- {
- std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
- ds.internal_state->data_source->OnStop(stop_args);
- }
-
- // If the embedder hasn't called StopArgs.HandleStopAsynchronously() run the
- // async closure here. In theory we could avoid the PostTask and call
- // straight into CompleteDataSourceAsyncStop(). We keep that to reduce
- // divergencies between the deferred-stop vs non-deferred-stop code paths.
- if (stop_args.async_stop_closure)
- std::move(stop_args.async_stop_closure)();
-}
-
-void TracingMuxerImpl::StopDataSource_AsyncEnd(
- TracingBackendId backend_id,
- DataSourceInstanceID instance_id) {
- PERFETTO_DLOG("Ending async stop of data source %" PRIu64, instance_id);
- PERFETTO_DCHECK_THREAD(thread_checker_);
-
- auto ds = FindDataSource(backend_id, instance_id);
- if (!ds) {
- PERFETTO_ELOG(
- "Async stop of data source %" PRIu64
- " failed. This might be due to calling the async_stop_closure twice.",
- instance_id);
- return;
- }
-
- const uint32_t mask = ~(1 << ds.instance_idx);
- ds.static_state->valid_instances.fetch_and(mask, std::memory_order_acq_rel);
-
- // Take the mutex to prevent that the data source is in the middle of
- // a Trace() execution where it called GetDataSourceLocked() while we
- // destroy it.
- {
- std::lock_guard<std::recursive_mutex> guard(ds.internal_state->lock);
- ds.internal_state->trace_lambda_enabled = false;
- ds.internal_state->data_source.reset();
- }
-
- // The other fields of internal_state are deliberately *not* cleared.
- // See races-related comments of DataSource::Trace().
-
- TracingMuxer::generation_++;
-
- // |backends_| is append-only, Backend instances are always valid.
- PERFETTO_CHECK(backend_id < backends_.size());
- ProducerImpl* producer = backends_[backend_id].producer.get();
- if (producer && producer->connected_)
- producer->service_->NotifyDataSourceStopped(instance_id);
-}
-
-void TracingMuxerImpl::DestroyStoppedTraceWritersForCurrentThread() {
- // Iterate across all possible data source types.
- auto cur_generation = generation_.load(std::memory_order_acquire);
- auto* root_tls = GetOrCreateTracingTLS();
-
- auto destroy_stopped_instances = [](DataSourceThreadLocalState& tls) {
- // |tls| has a vector of per-data-source-instance thread-local state.
- DataSourceStaticState* static_state = tls.static_state;
- if (!static_state)
- return; // Slot not used.
-
- // Iterate across all possible instances for this data source.
- for (uint32_t inst = 0; inst < kMaxDataSourceInstances; inst++) {
- DataSourceInstanceThreadLocalState& ds_tls = tls.per_instance[inst];
- if (!ds_tls.trace_writer)
- continue;
-
- DataSourceState* ds_state = static_state->TryGet(inst);
- if (ds_state && ds_state->backend_id == ds_tls.backend_id &&
- ds_state->buffer_id == ds_tls.buffer_id) {
- continue;
- }
-
- // The DataSource instance has been destroyed or recycled.
- ds_tls.Reset(); // Will also destroy the |ds_tls.trace_writer|.
- }
- };
-
- for (size_t ds_idx = 0; ds_idx < kMaxDataSources; ds_idx++) {
- // |tls| has a vector of per-data-source-instance thread-local state.
- DataSourceThreadLocalState& tls = root_tls->data_sources_tls[ds_idx];
- destroy_stopped_instances(tls);
- }
- destroy_stopped_instances(root_tls->track_event_tls);
- root_tls->generation = cur_generation;
-}
-
-// Called both when a new data source is registered or when a new backend
-// connects. In both cases we want to be sure we reflected the data source
-// registrations on the backends.
-void TracingMuxerImpl::UpdateDataSourcesOnAllBackends() {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredDataSource& rds : data_sources_) {
- for (RegisteredBackend& backend : backends_) {
- // We cannot call RegisterDataSource on the backend before it connects.
- if (!backend.producer->connected_)
- continue;
-
- PERFETTO_DCHECK(rds.static_state->index < kMaxDataSourceInstances);
- if (backend.producer->registered_data_sources_.test(
- rds.static_state->index))
- continue;
-
- rds.descriptor.set_will_notify_on_start(true);
- rds.descriptor.set_will_notify_on_stop(true);
- backend.producer->service_->RegisterDataSource(rds.descriptor);
- backend.producer->registered_data_sources_.set(rds.static_state->index);
- }
- }
-}
-
-void TracingMuxerImpl::SetupTracingSession(
- TracingSessionGlobalID session_id,
- const std::shared_ptr<TraceConfig>& trace_config,
- base::ScopedFile trace_fd) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- PERFETTO_CHECK(!trace_fd || trace_config->write_into_file());
-
- auto* consumer = FindConsumer(session_id);
- if (!consumer)
- return;
-
- consumer->trace_config_ = trace_config;
- if (trace_fd)
- consumer->trace_fd_ = std::move(trace_fd);
-
- if (!consumer->connected_)
- return;
-
- // Only used in the deferred start mode.
- if (trace_config->deferred_start()) {
- consumer->service_->EnableTracing(*trace_config,
- std::move(consumer->trace_fd_));
- }
-}
-
-void TracingMuxerImpl::StartTracingSession(TracingSessionGlobalID session_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
-
- auto* consumer = FindConsumer(session_id);
-
- if (!consumer)
- return;
-
- if (!consumer->trace_config_) {
- PERFETTO_ELOG("Must call Setup(config) first");
- return;
- }
-
- if (!consumer->connected_) {
- consumer->start_pending_ = true;
- return;
- }
-
- consumer->start_pending_ = false;
- if (consumer->trace_config_->deferred_start()) {
- consumer->service_->StartTracing();
- } else {
- consumer->service_->EnableTracing(*consumer->trace_config_,
- std::move(consumer->trace_fd_));
- }
-
- // TODO implement support for the deferred-start + fast-triggering case.
-}
-
-void TracingMuxerImpl::StopTracingSession(TracingSessionGlobalID session_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- auto* consumer = FindConsumer(session_id);
- if (!consumer)
- return;
-
- if (consumer->start_pending_) {
- // If the session hasn't started yet, wait until it does before stopping.
- consumer->stop_pending_ = true;
- return;
- }
-
- consumer->stop_pending_ = false;
- if (consumer->stopped_) {
- // If the session was already stopped (e.g., it failed to start), don't try
- // stopping again.
- consumer->NotifyStopComplete();
- } else if (!consumer->trace_config_) {
- PERFETTO_ELOG("Must call Setup(config) and Start() first");
- return;
- } else {
- consumer->service_->DisableTracing();
- }
-
- consumer->trace_config_.reset();
-}
-
-void TracingMuxerImpl::DestroyTracingSession(
- TracingSessionGlobalID session_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
- // We need to find the consumer (if any) and call Disconnect as we destroy
- // the tracing session. We can't call Disconnect() inside this for loop
- // because in the in-process case this will end up to a synchronous call to
- // OnConsumerDisconnect which will invalidate all the iterators to
- // |backend.consumers|.
- ConsumerImpl* consumer = nullptr;
- for (auto& con : backend.consumers) {
- if (con->session_id_ == session_id) {
- consumer = con.get();
- break;
- }
- }
- if (consumer) {
- // We broke out of the loop above on the assumption that each backend will
- // only have a single consumer per session. This DCHECK ensures that
- // this is the case.
- PERFETTO_DCHECK(
- std::count_if(backend.consumers.begin(), backend.consumers.end(),
- [session_id](const std::unique_ptr<ConsumerImpl>& con) {
- return con->session_id_ == session_id;
- }) == 1u);
- consumer->Disconnect();
- }
- }
-}
-
-void TracingMuxerImpl::ReadTracingSessionData(
- TracingSessionGlobalID session_id,
- std::function<void(TracingSession::ReadTraceCallbackArgs)> callback) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- auto* consumer = FindConsumer(session_id);
- if (!consumer)
- return;
- PERFETTO_DCHECK(!consumer->read_trace_callback_);
- consumer->read_trace_callback_ = std::move(callback);
- consumer->service_->ReadBuffers();
-}
-
-TracingMuxerImpl::ConsumerImpl* TracingMuxerImpl::FindConsumer(
- TracingSessionGlobalID session_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
- for (auto& consumer : backend.consumers) {
- if (consumer->session_id_ == session_id) {
- PERFETTO_DCHECK(consumer->service_);
- return consumer.get();
- }
- }
- }
- return nullptr;
-}
-
-void TracingMuxerImpl::OnConsumerDisconnected(ConsumerImpl* consumer) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- for (RegisteredBackend& backend : backends_) {
- auto pred = [consumer](const std::unique_ptr<ConsumerImpl>& con) {
- return con.get() == consumer;
- };
- backend.consumers.erase(std::remove_if(backend.consumers.begin(),
- backend.consumers.end(), pred),
- backend.consumers.end());
- }
-}
-
-TracingMuxerImpl::FindDataSourceRes TracingMuxerImpl::FindDataSource(
- TracingBackendId backend_id,
- DataSourceInstanceID instance_id) {
- PERFETTO_DCHECK_THREAD(thread_checker_);
- for (const auto& rds : data_sources_) {
- DataSourceStaticState* static_state = rds.static_state;
- for (uint32_t i = 0; i < kMaxDataSourceInstances; i++) {
- auto* internal_state = static_state->TryGet(i);
- if (internal_state && internal_state->backend_id == backend_id &&
- internal_state->data_source_instance_id == instance_id) {
- return FindDataSourceRes(static_state, internal_state, i);
- }
- }
- }
- return FindDataSourceRes();
-}
-
-// Can be called from any thread.
-std::unique_ptr<TraceWriterBase> TracingMuxerImpl::CreateTraceWriter(
- DataSourceState* data_source,
- BufferExhaustedPolicy buffer_exhausted_policy) {
- ProducerImpl* producer = backends_[data_source->backend_id].producer.get();
- return producer->service_->CreateTraceWriter(data_source->buffer_id,
- buffer_exhausted_policy);
-}
-
-// This is called via the public API Tracing::NewTrace().
-// Can be called from any thread.
-std::unique_ptr<TracingSession> TracingMuxerImpl::CreateTracingSession(
- BackendType backend_type) {
- TracingSessionGlobalID session_id = ++next_tracing_session_id_;
-
- // |backend_type| can only specify one backend, not an OR-ed mask.
- PERFETTO_CHECK((backend_type & (backend_type - 1)) == 0);
-
- // Capturing |this| is fine because the TracingMuxer is a leaky singleton.
- task_runner_->PostTask([this, backend_type, session_id] {
- for (RegisteredBackend& backend : backends_) {
- if (backend_type && backend.type != backend_type)
- continue;
-
- backend.consumers.emplace_back(
- new ConsumerImpl(this, backend.id, session_id));
- auto& consumer = backend.consumers.back();
- TracingBackend::ConnectConsumerArgs conn_args;
- conn_args.consumer = consumer.get();
- conn_args.task_runner = task_runner_.get();
- consumer->Initialize(backend.backend->ConnectConsumer(conn_args));
- return;
- }
- PERFETTO_ELOG(
- "Cannot create tracing session, no tracing backend ready for type=%d",
- backend_type);
- });
-
- return std::unique_ptr<TracingSession>(
- new TracingSessionImpl(this, session_id));
-}
-
-void TracingMuxerImpl::InitializeInstance(const TracingInitArgs& args) {
- if (instance_)
- PERFETTO_FATAL("Tracing already initialized");
- instance_ = new TracingMuxerImpl(args);
-}
-
-TracingMuxer::~TracingMuxer() = default;
-
-static_assert(std::is_same<internal::BufferId, BufferID>::value,
- "public's BufferId and tracing/core's BufferID diverged");
-
-} // namespace internal
-} // namespace perfetto