diff options
Diffstat (limited to 'mojo/edk/system/data_pipe_consumer_dispatcher.cc')
-rw-r--r-- | mojo/edk/system/data_pipe_consumer_dispatcher.cc | 562 |
1 files changed, 0 insertions, 562 deletions
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc deleted file mode 100644 index f338732..0000000 --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc +++ /dev/null @@ -1,562 +0,0 @@ -// Copyright 2013 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "mojo/edk/system/data_pipe_consumer_dispatcher.h" - -#include <stddef.h> -#include <stdint.h> - -#include <algorithm> -#include <limits> -#include <utility> - -#include "base/bind.h" -#include "base/logging.h" -#include "base/memory/ref_counted.h" -#include "base/message_loop/message_loop.h" -#include "mojo/edk/embedder/embedder_internal.h" -#include "mojo/edk/embedder/platform_shared_buffer.h" -#include "mojo/edk/system/core.h" -#include "mojo/edk/system/data_pipe_control_message.h" -#include "mojo/edk/system/node_controller.h" -#include "mojo/edk/system/ports_message.h" -#include "mojo/edk/system/request_context.h" -#include "mojo/public/c/system/data_pipe.h" - -namespace mojo { -namespace edk { - -namespace { - -const uint8_t kFlagPeerClosed = 0x01; - -#pragma pack(push, 1) - -struct SerializedState { - MojoCreateDataPipeOptions options; - uint64_t pipe_id; - uint32_t read_offset; - uint32_t bytes_available; - uint8_t flags; - char padding[7]; -}; - -static_assert(sizeof(SerializedState) % 8 == 0, - "Invalid SerializedState size."); - -#pragma pack(pop) - -} // namespace - -// A PortObserver which forwards to a DataPipeConsumerDispatcher. This owns a -// reference to the dispatcher to ensure it lives as long as the observed port. -class DataPipeConsumerDispatcher::PortObserverThunk - : public NodeController::PortObserver { - public: - explicit PortObserverThunk( - scoped_refptr<DataPipeConsumerDispatcher> dispatcher) - : dispatcher_(dispatcher) {} - - private: - ~PortObserverThunk() override {} - - // NodeController::PortObserver: - void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } - - scoped_refptr<DataPipeConsumerDispatcher> dispatcher_; - - DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); -}; - -DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( - NodeController* node_controller, - const ports::PortRef& control_port, - scoped_refptr<PlatformSharedBuffer> shared_ring_buffer, - const MojoCreateDataPipeOptions& options, - bool initialized, - uint64_t pipe_id) - : options_(options), - node_controller_(node_controller), - control_port_(control_port), - pipe_id_(pipe_id), - watchers_(this), - shared_ring_buffer_(shared_ring_buffer) { - if (initialized) { - base::AutoLock lock(lock_); - InitializeNoLock(); - } -} - -Dispatcher::Type DataPipeConsumerDispatcher::GetType() const { - return Type::DATA_PIPE_CONSUMER; -} - -MojoResult DataPipeConsumerDispatcher::Close() { - base::AutoLock lock(lock_); - DVLOG(1) << "Closing data pipe consumer " << pipe_id_; - return CloseNoLock(); -} - -MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, - uint32_t* num_bytes, - MojoReadDataFlags flags) { - base::AutoLock lock(lock_); - - if (!shared_ring_buffer_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - - if (in_two_phase_read_) - return MOJO_RESULT_BUSY; - - const bool had_new_data = new_data_available_; - new_data_available_ = false; - - if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { - if ((flags & MOJO_READ_DATA_FLAG_PEEK) || - (flags & MOJO_READ_DATA_FLAG_DISCARD)) - return MOJO_RESULT_INVALID_ARGUMENT; - DCHECK(!(flags & MOJO_READ_DATA_FLAG_DISCARD)); // Handled above. - DVLOG_IF(2, elements) - << "Query mode: ignoring non-null |elements|"; - *num_bytes = static_cast<uint32_t>(bytes_available_); - - if (had_new_data) - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - return MOJO_RESULT_OK; - } - - bool discard = false; - if ((flags & MOJO_READ_DATA_FLAG_DISCARD)) { - // These flags are mutally exclusive. - if (flags & MOJO_READ_DATA_FLAG_PEEK) - return MOJO_RESULT_INVALID_ARGUMENT; - DVLOG_IF(2, elements) - << "Discard mode: ignoring non-null |elements|"; - discard = true; - } - - uint32_t max_num_bytes_to_read = *num_bytes; - if (max_num_bytes_to_read % options_.element_num_bytes != 0) - return MOJO_RESULT_INVALID_ARGUMENT; - - bool all_or_none = flags & MOJO_READ_DATA_FLAG_ALL_OR_NONE; - uint32_t min_num_bytes_to_read = - all_or_none ? max_num_bytes_to_read : 0; - - if (min_num_bytes_to_read > bytes_available_) { - if (had_new_data) - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION - : MOJO_RESULT_OUT_OF_RANGE; - } - - uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); - if (bytes_to_read == 0) { - if (had_new_data) - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION - : MOJO_RESULT_SHOULD_WAIT; - } - - if (!discard) { - uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); - CHECK(data); - - uint8_t* destination = static_cast<uint8_t*>(elements); - CHECK(destination); - - DCHECK_LE(read_offset_, options_.capacity_num_bytes); - uint32_t tail_bytes_to_copy = - std::min(options_.capacity_num_bytes - read_offset_, bytes_to_read); - uint32_t head_bytes_to_copy = bytes_to_read - tail_bytes_to_copy; - if (tail_bytes_to_copy > 0) - memcpy(destination, data + read_offset_, tail_bytes_to_copy); - if (head_bytes_to_copy > 0) - memcpy(destination + tail_bytes_to_copy, data, head_bytes_to_copy); - } - *num_bytes = bytes_to_read; - - bool peek = !!(flags & MOJO_READ_DATA_FLAG_PEEK); - if (discard || !peek) { - read_offset_ = (read_offset_ + bytes_to_read) % options_.capacity_num_bytes; - bytes_available_ -= bytes_to_read; - - base::AutoUnlock unlock(lock_); - NotifyRead(bytes_to_read); - } - - // We may have just read the last available data and thus changed the signals - // state. - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - - return MOJO_RESULT_OK; -} - -MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, - uint32_t* buffer_num_bytes, - MojoReadDataFlags flags) { - base::AutoLock lock(lock_); - if (!shared_ring_buffer_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - - if (in_two_phase_read_) - return MOJO_RESULT_BUSY; - - // These flags may not be used in two-phase mode. - if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || - (flags & MOJO_READ_DATA_FLAG_QUERY) || - (flags & MOJO_READ_DATA_FLAG_PEEK)) - return MOJO_RESULT_INVALID_ARGUMENT; - - const bool had_new_data = new_data_available_; - new_data_available_ = false; - - if (bytes_available_ == 0) { - if (had_new_data) - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION - : MOJO_RESULT_SHOULD_WAIT; - } - - DCHECK_LT(read_offset_, options_.capacity_num_bytes); - uint32_t bytes_to_read = std::min(bytes_available_, - options_.capacity_num_bytes - read_offset_); - - CHECK(ring_buffer_mapping_); - uint8_t* data = static_cast<uint8_t*>(ring_buffer_mapping_->GetBase()); - CHECK(data); - - in_two_phase_read_ = true; - *buffer = data + read_offset_; - *buffer_num_bytes = bytes_to_read; - two_phase_max_bytes_read_ = bytes_to_read; - - if (had_new_data) - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - - return MOJO_RESULT_OK; -} - -MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { - base::AutoLock lock(lock_); - if (!in_two_phase_read_) - return MOJO_RESULT_FAILED_PRECONDITION; - - if (in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - - CHECK(shared_ring_buffer_); - - MojoResult rv; - if (num_bytes_read > two_phase_max_bytes_read_ || - num_bytes_read % options_.element_num_bytes != 0) { - rv = MOJO_RESULT_INVALID_ARGUMENT; - } else { - rv = MOJO_RESULT_OK; - read_offset_ = - (read_offset_ + num_bytes_read) % options_.capacity_num_bytes; - - DCHECK_GE(bytes_available_, num_bytes_read); - bytes_available_ -= num_bytes_read; - - base::AutoUnlock unlock(lock_); - NotifyRead(num_bytes_read); - } - - in_two_phase_read_ = false; - two_phase_max_bytes_read_ = 0; - - watchers_.NotifyState(GetHandleSignalsStateNoLock()); - - return rv; -} - -HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { - base::AutoLock lock(lock_); - return GetHandleSignalsStateNoLock(); -} - -MojoResult DataPipeConsumerDispatcher::AddWatcherRef( - const scoped_refptr<WatcherDispatcher>& watcher, - uintptr_t context) { - base::AutoLock lock(lock_); - if (is_closed_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); -} - -MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( - WatcherDispatcher* watcher, - uintptr_t context) { - base::AutoLock lock(lock_); - if (is_closed_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - return watchers_.Remove(watcher, context); -} - -void DataPipeConsumerDispatcher::StartSerialize(uint32_t* num_bytes, - uint32_t* num_ports, - uint32_t* num_handles) { - base::AutoLock lock(lock_); - DCHECK(in_transit_); - *num_bytes = static_cast<uint32_t>(sizeof(SerializedState)); - *num_ports = 1; - *num_handles = 1; -} - -bool DataPipeConsumerDispatcher::EndSerialize( - void* destination, - ports::PortName* ports, - PlatformHandle* platform_handles) { - SerializedState* state = static_cast<SerializedState*>(destination); - memcpy(&state->options, &options_, sizeof(MojoCreateDataPipeOptions)); - memset(state->padding, 0, sizeof(state->padding)); - - base::AutoLock lock(lock_); - DCHECK(in_transit_); - state->pipe_id = pipe_id_; - state->read_offset = read_offset_; - state->bytes_available = bytes_available_; - state->flags = peer_closed_ ? kFlagPeerClosed : 0; - - ports[0] = control_port_.name(); - - buffer_handle_for_transit_ = shared_ring_buffer_->DuplicatePlatformHandle(); - platform_handles[0] = buffer_handle_for_transit_.get(); - - return true; -} - -bool DataPipeConsumerDispatcher::BeginTransit() { - base::AutoLock lock(lock_); - if (in_transit_) - return false; - in_transit_ = !in_two_phase_read_; - return in_transit_; -} - -void DataPipeConsumerDispatcher::CompleteTransitAndClose() { - node_controller_->SetPortObserver(control_port_, nullptr); - - base::AutoLock lock(lock_); - DCHECK(in_transit_); - in_transit_ = false; - transferred_ = true; - ignore_result(buffer_handle_for_transit_.release()); - CloseNoLock(); -} - -void DataPipeConsumerDispatcher::CancelTransit() { - base::AutoLock lock(lock_); - DCHECK(in_transit_); - in_transit_ = false; - buffer_handle_for_transit_.reset(); - UpdateSignalsStateNoLock(); -} - -// static -scoped_refptr<DataPipeConsumerDispatcher> -DataPipeConsumerDispatcher::Deserialize(const void* data, - size_t num_bytes, - const ports::PortName* ports, - size_t num_ports, - PlatformHandle* handles, - size_t num_handles) { - if (num_ports != 1 || num_handles != 1 || - num_bytes != sizeof(SerializedState)) { - return nullptr; - } - - const SerializedState* state = static_cast<const SerializedState*>(data); - - NodeController* node_controller = internal::g_core->GetNodeController(); - ports::PortRef port; - if (node_controller->node()->GetPort(ports[0], &port) != ports::OK) - return nullptr; - - PlatformHandle buffer_handle; - std::swap(buffer_handle, handles[0]); - scoped_refptr<PlatformSharedBuffer> ring_buffer = - PlatformSharedBuffer::CreateFromPlatformHandle( - state->options.capacity_num_bytes, - false /* read_only */, - ScopedPlatformHandle(buffer_handle)); - if (!ring_buffer) { - DLOG(ERROR) << "Failed to deserialize shared buffer handle."; - return nullptr; - } - - scoped_refptr<DataPipeConsumerDispatcher> dispatcher = - new DataPipeConsumerDispatcher(node_controller, port, ring_buffer, - state->options, false /* initialized */, - state->pipe_id); - - { - base::AutoLock lock(dispatcher->lock_); - dispatcher->read_offset_ = state->read_offset; - dispatcher->bytes_available_ = state->bytes_available; - dispatcher->new_data_available_ = state->bytes_available > 0; - dispatcher->peer_closed_ = state->flags & kFlagPeerClosed; - dispatcher->InitializeNoLock(); - dispatcher->UpdateSignalsStateNoLock(); - } - - return dispatcher; -} - -DataPipeConsumerDispatcher::~DataPipeConsumerDispatcher() { - DCHECK(is_closed_ && !shared_ring_buffer_ && !ring_buffer_mapping_ && - !in_transit_); -} - -void DataPipeConsumerDispatcher::InitializeNoLock() { - lock_.AssertAcquired(); - - if (shared_ring_buffer_) { - DCHECK(!ring_buffer_mapping_); - ring_buffer_mapping_ = - shared_ring_buffer_->Map(0, options_.capacity_num_bytes); - if (!ring_buffer_mapping_) { - DLOG(ERROR) << "Failed to map shared buffer."; - shared_ring_buffer_ = nullptr; - } - } - - base::AutoUnlock unlock(lock_); - node_controller_->SetPortObserver( - control_port_, - make_scoped_refptr(new PortObserverThunk(this))); -} - -MojoResult DataPipeConsumerDispatcher::CloseNoLock() { - lock_.AssertAcquired(); - if (is_closed_ || in_transit_) - return MOJO_RESULT_INVALID_ARGUMENT; - is_closed_ = true; - ring_buffer_mapping_.reset(); - shared_ring_buffer_ = nullptr; - - watchers_.NotifyClosed(); - if (!transferred_) { - base::AutoUnlock unlock(lock_); - node_controller_->ClosePort(control_port_); - } - - return MOJO_RESULT_OK; -} - -HandleSignalsState -DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { - lock_.AssertAcquired(); - - HandleSignalsState rv; - if (shared_ring_buffer_ && bytes_available_) { - if (!in_two_phase_read_) { - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; - if (new_data_available_) - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; - } - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; - } else if (!peer_closed_ && shared_ring_buffer_) { - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; - } - - if (shared_ring_buffer_) { - if (new_data_available_ || !peer_closed_) - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_NEW_DATA_READABLE; - } - - if (peer_closed_) - rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; - rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; - - return rv; -} - -void DataPipeConsumerDispatcher::NotifyRead(uint32_t num_bytes) { - DVLOG(1) << "Data pipe consumer " << pipe_id_ << " notifying peer: " - << num_bytes << " bytes read. [control_port=" - << control_port_.name() << "]"; - - SendDataPipeControlMessage(node_controller_, control_port_, - DataPipeCommand::DATA_WAS_READ, num_bytes); -} - -void DataPipeConsumerDispatcher::OnPortStatusChanged() { - DCHECK(RequestContext::current()); - - base::AutoLock lock(lock_); - - // We stop observing the control port as soon it's transferred, but this can - // race with events which are raised right before that happens. This is fine - // to ignore. - if (transferred_) - return; - - DVLOG(1) << "Control port status changed for data pipe producer " << pipe_id_; - - UpdateSignalsStateNoLock(); -} - -void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { - lock_.AssertAcquired(); - - bool was_peer_closed = peer_closed_; - size_t previous_bytes_available = bytes_available_; - - ports::PortStatus port_status; - int rv = node_controller_->node()->GetStatus(control_port_, &port_status); - if (rv != ports::OK || !port_status.receiving_messages) { - DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware of peer closure" - << " [control_port=" << control_port_.name() << "]"; - peer_closed_ = true; - } else if (rv == ports::OK && port_status.has_messages && !in_transit_) { - ports::ScopedMessage message; - do { - int rv = node_controller_->node()->GetMessage( - control_port_, &message, nullptr); - if (rv != ports::OK) - peer_closed_ = true; - if (message) { - if (message->num_payload_bytes() < sizeof(DataPipeControlMessage)) { - peer_closed_ = true; - break; - } - - const DataPipeControlMessage* m = - static_cast<const DataPipeControlMessage*>( - message->payload_bytes()); - - if (m->command != DataPipeCommand::DATA_WAS_WRITTEN) { - DLOG(ERROR) << "Unexpected control message from producer."; - peer_closed_ = true; - break; - } - - if (static_cast<size_t>(bytes_available_) + m->num_bytes > - options_.capacity_num_bytes) { - DLOG(ERROR) << "Producer claims to have written too many bytes."; - peer_closed_ = true; - break; - } - - DVLOG(1) << "Data pipe consumer " << pipe_id_ << " is aware that " - << m->num_bytes << " bytes were written. [control_port=" - << control_port_.name() << "]"; - - bytes_available_ += m->num_bytes; - } - } while (message); - } - - bool has_new_data = bytes_available_ != previous_bytes_available; - if (has_new_data) - new_data_available_ = true; - - if (peer_closed_ != was_peer_closed || has_new_data) - watchers_.NotifyState(GetHandleSignalsStateNoLock()); -} - -} // namespace edk -} // namespace mojo |