diff options
Diffstat (limited to 'src/tracing/core/shared_memory_arbiter_impl.cc')
-rw-r--r-- | src/tracing/core/shared_memory_arbiter_impl.cc | 248 |
1 files changed, 32 insertions, 216 deletions
diff --git a/src/tracing/core/shared_memory_arbiter_impl.cc b/src/tracing/core/shared_memory_arbiter_impl.cc index 39ddd4b29..c19c96a54 100644 --- a/src/tracing/core/shared_memory_arbiter_impl.cc +++ b/src/tracing/core/shared_memory_arbiter_impl.cc @@ -249,9 +249,7 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest( MaybeUnboundBufferID target_buffer, PatchList* patch_list) { // Note: chunk will be invalid if the call came from SendPatches(). - base::TaskRunner* task_runner_to_post_delayed_callback_on = nullptr; - // The delay with which the flush will be posted. - uint32_t flush_delay_ms = 0; + base::TaskRunner* task_runner_to_post_callback_on = nullptr; base::WeakPtr<SharedMemoryArbiterImpl> weak_this; { std::lock_guard<std::mutex> scoped_lock(lock_); @@ -261,11 +259,9 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest( // Flushing the commit is only supported while we're |fully_bound_|. If we // aren't, we'll flush when |fully_bound_| is updated. - if (fully_bound_ && !delayed_flush_scheduled_) { + if (fully_bound_) { weak_this = weak_ptr_factory_.GetWeakPtr(); - task_runner_to_post_delayed_callback_on = task_runner_; - flush_delay_ms = batch_commits_duration_ms_; - delayed_flush_scheduled_ = true; + task_runner_to_post_callback_on = task_runner_; } } @@ -274,31 +270,10 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest( PERFETTO_DCHECK(chunk.writer_id() == writer_id); uint8_t chunk_idx = chunk.chunk_idx(); bytes_pending_commit_ += chunk.size(); - size_t page_idx; - // If the chunk needs patching, it should not be marked as complete yet, - // because this would indicate to the service that the producer will not - // be writing to it anymore, while the producer might still apply patches - // to the chunk later on. In particular, when re-reading (e.g. because of - // periodic scraping) a completed chunk, the service expects the flags of - // that chunk not to be removed between reads. So, let's say the producer - // marked the chunk as complete here and the service then read it for the - // first time. If the producer then fully patched the chunk, thus removing - // the kChunkNeedsPatching flag, and the service re-read the chunk after - // the patching, the service would be thrown off by the removed flag. - if (direct_patching_enabled_ && - (chunk.GetPacketCountAndFlags().second & - SharedMemoryABI::ChunkHeader::kChunkNeedsPatching)) { - page_idx = shmem_abi_.GetPageAndChunkIndex(std::move(chunk)).first; - } else { - // If the chunk doesn't need patching, we can mark it as complete - // immediately. This allows the service to read it in full while - // scraping, which would not be the case if the chunk was left in a - // kChunkBeingWritten state. - page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk)); - } + size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk)); + + // DO NOT access |chunk| after this point, has been std::move()-d above. - // DO NOT access |chunk| after this point, it has been std::move()-d - // above. CommitDataRequest::ChunksToMove* ctm = commit_data_req_->add_chunks_to_move(); ctm->set_page(static_cast<uint32_t>(page_idx)); @@ -306,172 +281,43 @@ void SharedMemoryArbiterImpl::UpdateCommitDataRequest( ctm->set_target_buffer(target_buffer); } - // Process the completed patches for previous chunks from the |patch_list|. - CommitDataRequest::ChunkToPatch* last_patch_req = nullptr; + // Get the completed patches for previous chunks from the |patch_list| + // and attach them. + ChunkID last_chunk_id = 0; // 0 is irrelevant but keeps the compiler happy. + CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr; while (!patch_list->empty() && patch_list->front().is_patched()) { - Patch curr_patch = patch_list->front(); - patch_list->pop_front(); - // Patches for the same chunk are contiguous in the |patch_list|. So, to - // determine if there are any other patches that apply to the chunk that - // is being patched, check if the next patch in the |patch_list| applies - // to the same chunk. - bool chunk_needs_more_patching = - !patch_list->empty() && - patch_list->front().chunk_id == curr_patch.chunk_id; - - if (direct_patching_enabled_ && - TryDirectPatchLocked(writer_id, curr_patch, - chunk_needs_more_patching)) { - continue; + if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) { + last_chunk_req = commit_data_req_->add_chunks_to_patch(); + last_chunk_req->set_writer_id(writer_id); + last_chunk_id = patch_list->front().chunk_id; + last_chunk_req->set_chunk_id(last_chunk_id); + last_chunk_req->set_target_buffer(target_buffer); } - - // The chunk that this patch applies to has already been released to the - // service, so it cannot be patches here. Add the patch to the commit data - // request, so that it can be sent to the service and applied there. - if (!last_patch_req || - last_patch_req->chunk_id() != curr_patch.chunk_id) { - last_patch_req = commit_data_req_->add_chunks_to_patch(); - last_patch_req->set_writer_id(writer_id); - last_patch_req->set_chunk_id(curr_patch.chunk_id); - last_patch_req->set_target_buffer(target_buffer); - } - auto* patch = last_patch_req->add_patches(); - patch->set_offset(curr_patch.offset); - patch->set_data(&curr_patch.size_field[0], curr_patch.size_field.size()); + auto* patch_req = last_chunk_req->add_patches(); + patch_req->set_offset(patch_list->front().offset); + patch_req->set_data(&patch_list->front().size_field[0], + patch_list->front().size_field.size()); + patch_list->pop_front(); } - // Patches are enqueued in the |patch_list| in order and are notified to // the service when the chunk is returned. The only case when the current // patch list is incomplete is if there is an unpatched entry at the head of // the |patch_list| that belongs to the same ChunkID as the last one we are // about to send to the service. - if (last_patch_req && !patch_list->empty() && - patch_list->front().chunk_id == last_patch_req->chunk_id()) { - last_patch_req->set_has_more_patches(true); - } - - // If the buffer is filling up or if we are given a patch for a chunk - // that was already sent to the service, we don't want to wait for the next - // delayed flush to happen and we flush immediately. Otherwise, if we - // accumulate the patch and a crash occurs before the patch is sent, the - // service will not know of the patch and won't be able to reconstruct the - // trace. - if (fully_bound_ && - (last_patch_req || bytes_pending_commit_ >= shmem_abi_.size() / 2)) { - weak_this = weak_ptr_factory_.GetWeakPtr(); - task_runner_to_post_delayed_callback_on = task_runner_; - flush_delay_ms = 0; + if (last_chunk_req && !patch_list->empty() && + patch_list->front().chunk_id == last_chunk_id) { + last_chunk_req->set_has_more_patches(true); } } // scoped_lock(lock_) - // We shouldn't post tasks while locked. - // |task_runner_to_post_delayed_callback_on| remains valid after unlocking, - // because |task_runner_| is never reset. - if (task_runner_to_post_delayed_callback_on) { - task_runner_to_post_delayed_callback_on->PostDelayedTask( - [weak_this] { - if (!weak_this) - return; - { - std::lock_guard<std::mutex> scoped_lock(weak_this->lock_); - // Clear |delayed_flush_scheduled_|, allowing the next call to - // UpdateCommitDataRequest to start another batching period. - weak_this->delayed_flush_scheduled_ = false; - } - weak_this->FlushPendingCommitDataRequests(); - }, - flush_delay_ms); - } -} - -bool SharedMemoryArbiterImpl::TryDirectPatchLocked( - WriterID writer_id, - const Patch& patch, - bool chunk_needs_more_patching) { - // Search the chunks that are being batched in |commit_data_req_| for a chunk - // that needs patching and that matches the provided |writer_id| and - // |patch.chunk_id|. Iterate |commit_data_req_| in reverse, since - // |commit_data_req_| is appended to at the end with newly-returned chunks, - // and patches are more likely to apply to chunks that have been returned - // recently. - SharedMemoryABI::Chunk chunk; - bool chunk_found = false; - auto& chunks_to_move = commit_data_req_->chunks_to_move(); - for (auto ctm_it = chunks_to_move.rbegin(); ctm_it != chunks_to_move.rend(); - ++ctm_it) { - uint32_t layout = shmem_abi_.GetPageLayout(ctm_it->page()); - auto chunk_state = - shmem_abi_.GetChunkStateFromLayout(layout, ctm_it->chunk()); - // Note: the subset of |commit_data_req_| chunks that still need patching is - // also the subset of chunks that are still being written to. The rest of - // the chunks in |commit_data_req_| do not need patching and have already - // been marked as complete. - if (chunk_state != SharedMemoryABI::kChunkBeingWritten) - continue; - - chunk = - shmem_abi_.GetChunkUnchecked(ctm_it->page(), layout, ctm_it->chunk()); - if (chunk.writer_id() == writer_id && - chunk.header()->chunk_id.load(std::memory_order_relaxed) == - patch.chunk_id) { - chunk_found = true; - break; - } - } - - if (!chunk_found) { - // The chunk has already been committed to the service and the patch cannot - // be applied in the producer. - return false; - } - - // Apply the patch. - size_t page_idx; - uint8_t chunk_idx; - std::tie(page_idx, chunk_idx) = shmem_abi_.GetPageAndChunkIndex(chunk); - PERFETTO_DCHECK(shmem_abi_.GetChunkState(page_idx, chunk_idx) == - SharedMemoryABI::ChunkState::kChunkBeingWritten); - auto chunk_begin = chunk.payload_begin(); - uint8_t* ptr = chunk_begin + patch.offset; - PERFETTO_CHECK(ptr <= chunk.end() - SharedMemoryABI::kPacketHeaderSize); - // DCHECK that we are writing into a zero-filled size field and not into - // valid data. It relies on ScatteredStreamWriter::ReserveBytes() to - // zero-fill reservations in debug builds. - const char zero[SharedMemoryABI::kPacketHeaderSize]{}; - PERFETTO_DCHECK(memcmp(ptr, &zero, SharedMemoryABI::kPacketHeaderSize) == 0); - - memcpy(ptr, &patch.size_field[0], SharedMemoryABI::kPacketHeaderSize); - - if (!chunk_needs_more_patching) { - // Mark that the chunk doesn't need more patching and mark it as complete, - // as the producer will not write to it anymore. This allows the service to - // read the chunk in full while scraping, which would not be the case if the - // chunk was left in a kChunkBeingWritten state. - chunk.ClearNeedsPatchingFlag(); - shmem_abi_.ReleaseChunkAsComplete(std::move(chunk)); + // We shouldn't post tasks while locked. |task_runner_to_post_callback_on| + // remains valid after unlocking, because |task_runner_| is never reset. + if (task_runner_to_post_callback_on) { + task_runner_to_post_callback_on->PostTask([weak_this] { + if (weak_this) + weak_this->FlushPendingCommitDataRequests(); + }); } - - return true; -} - -void SharedMemoryArbiterImpl::SetBatchCommitsDuration( - uint32_t batch_commits_duration_ms) { - std::lock_guard<std::mutex> scoped_lock(lock_); - batch_commits_duration_ms_ = batch_commits_duration_ms; -} - -bool SharedMemoryArbiterImpl::EnableDirectSMBPatching() { - std::lock_guard<std::mutex> scoped_lock(lock_); - if (!direct_patching_supported_by_service_) { - return false; - } - - return direct_patching_enabled_ = true; -} - -void SharedMemoryArbiterImpl::SetDirectSMBPatchingSupportedByService() { - std::lock_guard<std::mutex> scoped_lock(lock_); - direct_patching_supported_by_service_ = true; } // This function is quite subtle. When making changes keep in mind these two @@ -524,27 +370,6 @@ void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests( // should have been replaced. PERFETTO_DCHECK(all_placeholders_replaced); - // In order to allow patching in the producer we delay the kChunkComplete - // transition and keep batched chunks in the kChunkBeingWritten state. - // Since we are about to notify the service of all batched chunks, it will - // not be possible to apply any more patches to them and we need to move - // them to kChunkComplete - otherwise the service won't look at them. - for (auto& ctm : commit_data_req_->chunks_to_move()) { - uint32_t layout = shmem_abi_.GetPageLayout(ctm.page()); - auto chunk_state = - shmem_abi_.GetChunkStateFromLayout(layout, ctm.chunk()); - // Note: the subset of |commit_data_req_| chunks that still need - // patching is also the subset of chunks that are still being written - // to. The rest of the chunks in |commit_data_req_| do not need patching - // and have already been marked as complete. - if (chunk_state != SharedMemoryABI::kChunkBeingWritten) - continue; - - SharedMemoryABI::Chunk chunk = - shmem_abi_.GetChunkUnchecked(ctm.page(), layout, ctm.chunk()); - shmem_abi_.ReleaseChunkAsComplete(std::move(chunk)); - } - req = std::move(commit_data_req_); bytes_pending_commit_ = 0; } @@ -561,13 +386,6 @@ void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests( } } -bool SharedMemoryArbiterImpl::TryShutdown() { - std::lock_guard<std::mutex> scoped_lock(lock_); - did_shutdown_ = true; - // Shutdown is safe if there are no active trace writers for this arbiter. - return active_writer_ids_.IsEmpty(); -} - std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter( BufferID target_buffer, BufferExhaustedPolicy buffer_exhausted_policy) { @@ -791,10 +609,8 @@ std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriterInternal( { std::lock_guard<std::mutex> scoped_lock(lock_); - if (did_shutdown_) - return std::unique_ptr<TraceWriter>(new NullTraceWriter()); - id = active_writer_ids_.Allocate(); + if (!id) return std::unique_ptr<TraceWriter>(new NullTraceWriter()); |