aboutsummaryrefslogtreecommitdiff
path: root/src/tracing/core/shared_memory_arbiter_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/tracing/core/shared_memory_arbiter_impl.cc')
-rw-r--r--src/tracing/core/shared_memory_arbiter_impl.cc248
1 files changed, 32 insertions, 216 deletions
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc
index 39ddd4b29..c19c96a54 100644
--- a/src/tracing/core/shared_memory_arbiter_impl.cc
+++ b/src/tracing/core/shared_memory_arbiter_impl.cc
@@ -249,9 +249,7 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
MaybeUnboundBufferID target_buffer,
PatchList* patch_list) {
// Note: chunk will be invalid if the call came from SendPatches().
- base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr;
- // The delay with which the flush will be posted.
- uint32_t flush_delay_ms = 0;
+ base::TaskRunner* task_runner_to_post_callback_on = nullptr;
base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
{
std::lock_guard<std::mutex> scoped_lock(lock_);
@@ -261,11 +259,9 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
// Flushing the commit is only supported while we're |fully_bound_|. If we
// aren't, we'll flush when |fully_bound_| is updated.
- if (fully_bound_ && !delayed_flush_scheduled_) {
+ if (fully_bound_) {
weak_this = weak_ptr_factory_.GetWeakPtr();
- task_runner_to_post_delayed_callback_on = task_runner_;
- flush_delay_ms = batch_commits_duration_ms_;
- delayed_flush_scheduled_ = true;
+ task_runner_to_post_callback_on = task_runner_;
}
}
@@ -274,31 +270,10 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
PERFETTO_DCHECK(chunk.writer_id() == writer_id);
uint8_t chunk_idx = chunk.chunk_idx();
bytes_pending_commit_ += chunk.size();
- size_t page_idx;
- // If the chunk needs patching, it should not be marked as complete yet,
- // because this would indicate to the service that the producer will not
- // be writing to it anymore, while the producer might still apply patches
- // to the chunk later on. In particular, when re-reading (e.g. because of
- // periodic scraping) a completed chunk, the service expects the flags of
- // that chunk not to be removed between reads. So, let's say the producer
- // marked the chunk as complete here and the service then read it for the
- // first time. If the producer then fully patched the chunk, thus removing
- // the kChunkNeedsPatching flag, and the service re-read the chunk after
- // the patching, the service would be thrown off by the removed flag.
- if (direct_patching_enabled_ &&
- (chunk.GetPacketCountAndFlags().second &
- SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) {
- page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first;
- } else {
- // If the chunk doesn't need patching, we can mark it as complete
- // immediately. This allows the service to read it in full while
- // scraping, which would not be the case if the chunk was left in a
- // kChunkBeingWritten state.
- page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
- }
+ size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
+
+ // DO NOT access |chunk| after this point, has been std::move()-d above.
- // DO NOT access |chunk| after this point, it has been std::move()-d
- // above.
CommitDataRequest::ChunksToMove* ctm =
commit_data_req_->add_chunks_to_move();
ctm->set_page(static_cast<uint32_t>(page_idx));
@@ -306,172 +281,43 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest(
ctm->set_target_buffer(target_buffer);
}
- // Process the completed patches for previous chunks from the |patch_list|.
- CommitDataRequest::ChunkToPatch* last_patch_req = nullptr;
+ // Get the completed patches for previous chunks from the |patch_list|
+ // and attach them.
+ ChunkID last_chunk_id = 0; // 0 is irrelevant but keeps the compiler happy.
+ CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
while (!patch_list->empty() && patch_list->front().is_patched()) {
- Patch curr_patch = patch_list->front();
- patch_list->pop_front();
- // Patches for the same chunk are contiguous in the |patch_list|. So, to
- // determine if there are any other patches that apply to the chunk that
- // is being patched, check if the next patch in the |patch_list| applies
- // to the same chunk.
- bool chunk_needs_more_patching =
- !patch_list->empty() &&
- patch_list->front().chunk_id == curr_patch.chunk_id;
-
- if (direct_patching_enabled_ &&
- TryDirectPatchLocked(writer_id, curr_patch,
- chunk_needs_more_patching)) {
- continue;
+ if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) {
+ last_chunk_req = commit_data_req_->add_chunks_to_patch();
+ last_chunk_req->set_writer_id(writer_id);
+ last_chunk_id = patch_list->front().chunk_id;
+ last_chunk_req->set_chunk_id(last_chunk_id);
+ last_chunk_req->set_target_buffer(target_buffer);
}
-
- // The chunk that this patch applies to has already been released to the
- // service, so it cannot be patches here. Add the patch to the commit data
- // request, so that it can be sent to the service and applied there.
- if (!last_patch_req ||
- last_patch_req->chunk_id() != curr_patch.chunk_id) {
- last_patch_req = commit_data_req_->add_chunks_to_patch();
- last_patch_req->set_writer_id(writer_id);
- last_patch_req->set_chunk_id(curr_patch.chunk_id);
- last_patch_req->set_target_buffer(target_buffer);
- }
- auto* patch = last_patch_req->add_patches();
- patch->set_offset(curr_patch.offset);
- patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size());
+ auto* patch_req = last_chunk_req->add_patches();
+ patch_req->set_offset(patch_list->front().offset);
+ patch_req->set_data(&patch_list->front().size_field[0],
+ patch_list->front().size_field.size());
+ patch_list->pop_front();
}
-
// Patches are enqueued in the |patch_list| in order and are notified to
// the service when the chunk is returned. The only case when the current
// patch list is incomplete is if there is an unpatched entry at the head of
// the |patch_list| that belongs to the same ChunkID as the last one we are
// about to send to the service.
- if (last_patch_req && !patch_list->empty() &&
- patch_list->front().chunk_id == last_patch_req->chunk_id()) {
- last_patch_req->set_has_more_patches(true);
- }
-
- // If the buffer is filling up or if we are given a patch for a chunk
- // that was already sent to the service, we don't want to wait for the next
- // delayed flush to happen and we flush immediately. Otherwise, if we
- // accumulate the patch and a crash occurs before the patch is sent, the
- // service will not know of the patch and won't be able to reconstruct the
- // trace.
- if (fully_bound_ &&
- (last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) {
- weak_this = weak_ptr_factory_.GetWeakPtr();
- task_runner_to_post_delayed_callback_on = task_runner_;
- flush_delay_ms = 0;
+ if (last_chunk_req && !patch_list->empty() &&
+ patch_list->front().chunk_id == last_chunk_id) {
+ last_chunk_req->set_has_more_patches(true);
}
} // scoped_lock(lock_)
- // We shouldn't post tasks while locked.
- // |task_runner_to_post_delayed_callback_on| remains valid after unlocking,
- // because |task_runner_| is never reset.
- if (task_runner_to_post_delayed_callback_on) {
- task_runner_to_post_delayed_callback_on->PostDelayedTask(
- [weak_this] {
- if (!weak_this)
- return;
- {
- std::lock_guard<std::mutex> scoped_lock(weak_this->lock_);
- // Clear |delayed_flush_scheduled_|, allowing the next call to
- // UpdateCommitDataRequest to start another batching period.
- weak_this->delayed_flush_scheduled_ = false;
- }
- weak_this->FlushPendingCommitDataRequests();
- },
- flush_delay_ms);
- }
-}
-
-bool SharedMemoryArbiterImpl::TryDirectPatchLocked(
- WriterID writer_id,
- const Patch& patch,
- bool chunk_needs_more_patching) {
- // Search the chunks that are being batched in |commit_data_req_| for a chunk
- // that needs patching and that matches the provided |writer_id| and
- // |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since
- // |commit_data_req_| is appended to at the end with newly-returned chunks,
- // and patches are more likely to apply to chunks that have been returned
- // recently.
- SharedMemoryABI::Chunk chunk;
- bool chunk_found = false;
- auto& chunks_to_move = commit_data_req_->chunks_to_move();
- for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend();
- ++ctm_it) {
- uint32_t layout = shmem_abi_.GetPageLayout(ctm_it->page());
- auto chunk_state =
- shmem_abi_.GetChunkStateFromLayout(layout, ctm_it->chunk());
- // Note: the subset of |commit_data_req_| chunks that still need patching is
- // also the subset of chunks that are still being written to. The rest of
- // the chunks in |commit_data_req_| do not need patching and have already
- // been marked as complete.
- if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
- continue;
-
- chunk =
- shmem_abi_.GetChunkUnchecked(ctm_it->page(), layout, ctm_it->chunk());
- if (chunk.writer_id() == writer_id &&
- chunk.header()->chunk_id.load(std::memory_order_relaxed) ==
- patch.chunk_id) {
- chunk_found = true;
- break;
- }
- }
-
- if (!chunk_found) {
- // The chunk has already been committed to the service and the patch cannot
- // be applied in the producer.
- return false;
- }
-
- // Apply the patch.
- size_t page_idx;
- uint8_t chunk_idx;
- std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk);
- PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) ==
- SharedMemoryABI::ChunkState::kChunkBeingWritten);
- auto chunk_begin = chunk.payload_begin();
- uint8_t* ptr = chunk_begin + patch.offset;
- PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize);
- // DCHECK that we are writing into a zero-filled size field and not into
- // valid data. It relies on ScatteredStreamWriter::ReserveBytes() to
- // zero-fill reservations in debug builds.
- const char zero[SharedMemoryABI::kPacketHeaderSize]{};
- PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0);
-
- memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize);
-
- if (!chunk_needs_more_patching) {
- // Mark that the chunk doesn't need more patching and mark it as complete,
- // as the producer will not write to it anymore. This allows the service to
- // read the chunk in full while scraping, which would not be the case if the
- // chunk was left in a kChunkBeingWritten state.
- chunk.ClearNeedsPatchingFlag();
- shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
+ // We shouldn't post tasks while locked. |task_runner_to_post_callback_on|
+ // remains valid after unlocking, because |task_runner_| is never reset.
+ if (task_runner_to_post_callback_on) {
+ task_runner_to_post_callback_on->PostTask([weak_this] {
+ if (weak_this)
+ weak_this->FlushPendingCommitDataRequests();
+ });
}
-
- return true;
-}
-
-void SharedMemoryArbiterImpl::SetBatchCommitsDuration(
- uint32_t batch_commits_duration_ms) {
- std::lock_guard<std::mutex> scoped_lock(lock_);
- batch_commits_duration_ms_ = batch_commits_duration_ms;
-}
-
-bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() {
- std::lock_guard<std::mutex> scoped_lock(lock_);
- if (!direct_patching_supported_by_service_) {
- return false;
- }
-
- return direct_patching_enabled_ = true;
-}
-
-void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() {
- std::lock_guard<std::mutex> scoped_lock(lock_);
- direct_patching_supported_by_service_ = true;
}
// This function is quite subtle. When making changes keep in mind these two
@@ -524,27 +370,6 @@ void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
// should have been replaced.
PERFETTO_DCHECK(all_placeholders_replaced);
- // In order to allow patching in the producer we delay the kChunkComplete
- // transition and keep batched chunks in the kChunkBeingWritten state.
- // Since we are about to notify the service of all batched chunks, it will
- // not be possible to apply any more patches to them and we need to move
- // them to kChunkComplete - otherwise the service won't look at them.
- for (auto& ctm : commit_data_req_->chunks_to_move()) {
- uint32_t layout = shmem_abi_.GetPageLayout(ctm.page());
- auto chunk_state =
- shmem_abi_.GetChunkStateFromLayout(layout, ctm.chunk());
- // Note: the subset of |commit_data_req_| chunks that still need
- // patching is also the subset of chunks that are still being written
- // to. The rest of the chunks in |commit_data_req_| do not need patching
- // and have already been marked as complete.
- if (chunk_state != SharedMemoryABI::kChunkBeingWritten)
- continue;
-
- SharedMemoryABI::Chunk chunk =
- shmem_abi_.GetChunkUnchecked(ctm.page(), layout, ctm.chunk());
- shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
- }
-
req = std::move(commit_data_req_);
bytes_pending_commit_ = 0;
}
@@ -561,13 +386,6 @@ void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
}
}
-bool SharedMemoryArbiterImpl::TryShutdown() {
- std::lock_guard<std::mutex> scoped_lock(lock_);
- did_shutdown_ = true;
- // Shutdown is safe if there are no active trace writers for this arbiter.
- return active_writer_ids_.IsEmpty();
-}
-
std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
BufferID target_buffer,
BufferExhaustedPolicy buffer_exhausted_policy) {
@@ -791,10 +609,8 @@ std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal(
{
std::lock_guard<std::mutex> scoped_lock(lock_);
- if (did_shutdown_)
- return std::unique_ptr<TraceWriter>(new NullTraceWriter());
-
id = active_writer_ids_.Allocate();
+
if (!id)
return std::unique_ptr<TraceWriter>(new NullTraceWriter());