diff options
Diffstat (limited to 'mojo/edk/system/channel_posix.cc')
-rw-r--r-- | mojo/edk/system/channel_posix.cc | 572 |
1 files changed, 0 insertions, 572 deletions
diff --git a/mojo/edk/system/channel_posix.cc b/mojo/edk/system/channel_posix.cc deleted file mode 100644 index 8b4ca7f..0000000 --- a/mojo/edk/system/channel_posix.cc +++ /dev/null @@ -1,572 +0,0 @@ -// Copyright 2016 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "mojo/edk/system/channel.h" - -#include <errno.h> -#include <sys/socket.h> - -#include <algorithm> -#include <deque> -#include <limits> -#include <memory> - -#include "base/bind.h" -#include "base/location.h" -#include "base/macros.h" -#include "base/memory/ref_counted.h" -#include "base/message_loop/message_loop.h" -#include "base/synchronization/lock.h" -#include "base/task_runner.h" -#include "mojo/edk/embedder/platform_channel_utils_posix.h" -#include "mojo/edk/embedder/platform_handle_vector.h" - -#if !defined(OS_NACL) -#include <sys/uio.h> -#endif - -namespace mojo { -namespace edk { - -namespace { - -const size_t kMaxBatchReadCapacity = 256 * 1024; - -// A view over a Channel::Message object. The write queue uses these since -// large messages may need to be sent in chunks. -class MessageView { - public: - // Owns |message|. |offset| indexes the first unsent byte in the message. - MessageView(Channel::MessagePtr message, size_t offset) - : message_(std::move(message)), - offset_(offset), - handles_(message_->TakeHandlesForTransport()) { - DCHECK_GT(message_->data_num_bytes(), offset_); - } - - MessageView(MessageView&& other) { *this = std::move(other); } - - MessageView& operator=(MessageView&& other) { - message_ = std::move(other.message_); - offset_ = other.offset_; - handles_ = std::move(other.handles_); - return *this; - } - - ~MessageView() {} - - const void* data() const { - return static_cast<const char*>(message_->data()) + offset_; - } - - size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } - - size_t data_offset() const { return offset_; } - void advance_data_offset(size_t num_bytes) { - DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); - offset_ += num_bytes; - } - - ScopedPlatformHandleVectorPtr TakeHandles() { return std::move(handles_); } - Channel::MessagePtr TakeMessage() { return std::move(message_); } - - void SetHandles(ScopedPlatformHandleVectorPtr handles) { - handles_ = std::move(handles); - } - - private: - Channel::MessagePtr message_; - size_t offset_; - ScopedPlatformHandleVectorPtr handles_; - - DISALLOW_COPY_AND_ASSIGN(MessageView); -}; - -class ChannelPosix : public Channel, - public base::MessageLoop::DestructionObserver, - public base::MessageLoopForIO::Watcher { - public: - ChannelPosix(Delegate* delegate, - ConnectionParams connection_params, - scoped_refptr<base::TaskRunner> io_task_runner) - : Channel(delegate), - self_(this), - handle_(connection_params.TakeChannelHandle()), - io_task_runner_(io_task_runner) -#if defined(OS_MACOSX) - , - handles_to_close_(new PlatformHandleVector) -#endif - { - CHECK(handle_.is_valid()); - } - - void Start() override { - if (io_task_runner_->RunsTasksOnCurrentThread()) { - StartOnIOThread(); - } else { - io_task_runner_->PostTask( - FROM_HERE, base::Bind(&ChannelPosix::StartOnIOThread, this)); - } - } - - void ShutDownImpl() override { - // Always shut down asynchronously when called through the public interface. - io_task_runner_->PostTask( - FROM_HERE, base::Bind(&ChannelPosix::ShutDownOnIOThread, this)); - } - - void Write(MessagePtr message) override { - bool write_error = false; - { - base::AutoLock lock(write_lock_); - if (reject_writes_) - return; - if (outgoing_messages_.empty()) { - if (!WriteNoLock(MessageView(std::move(message), 0))) - reject_writes_ = write_error = true; - } else { - outgoing_messages_.emplace_back(std::move(message), 0); - } - } - if (write_error) { - // Do not synchronously invoke OnError(). Write() may have been called by - // the delegate and we don't want to re-enter it. - io_task_runner_->PostTask(FROM_HERE, - base::Bind(&ChannelPosix::OnError, this)); - } - } - - void LeakHandle() override { - DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - leak_handle_ = true; - } - - bool GetReadPlatformHandles( - size_t num_handles, - const void* extra_header, - size_t extra_header_size, - ScopedPlatformHandleVectorPtr* handles) override { - if (num_handles > std::numeric_limits<uint16_t>::max()) - return false; -#if defined(OS_MACOSX) && !defined(OS_IOS) - // On OSX, we can have mach ports which are located in the extra header - // section. - using MachPortsEntry = Channel::Message::MachPortsEntry; - using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader; - CHECK(extra_header_size >= - sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)); - const MachPortsExtraHeader* mach_ports_header = - reinterpret_cast<const MachPortsExtraHeader*>(extra_header); - size_t num_mach_ports = mach_ports_header->num_ports; - CHECK(num_mach_ports <= num_handles); - if (incoming_platform_handles_.size() + num_mach_ports < num_handles) { - handles->reset(); - return true; - } - - handles->reset(new PlatformHandleVector(num_handles)); - const MachPortsEntry* mach_ports = mach_ports_header->entries; - for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) { - if (mach_port_index < num_mach_ports && - mach_ports[mach_port_index].index == i) { - (*handles)->at(i) = PlatformHandle( - static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port)); - CHECK((*handles)->at(i).type == PlatformHandle::Type::MACH); - // These are actually just Mach port names until they're resolved from - // the remote process. - (*handles)->at(i).type = PlatformHandle::Type::MACH_NAME; - mach_port_index++; - } else { - CHECK(!incoming_platform_handles_.empty()); - (*handles)->at(i) = incoming_platform_handles_.front(); - incoming_platform_handles_.pop_front(); - } - } -#else - if (incoming_platform_handles_.size() < num_handles) { - handles->reset(); - return true; - } - - handles->reset(new PlatformHandleVector(num_handles)); - for (size_t i = 0; i < num_handles; ++i) { - (*handles)->at(i) = incoming_platform_handles_.front(); - incoming_platform_handles_.pop_front(); - } -#endif - - return true; - } - - private: - ~ChannelPosix() override { - DCHECK(!read_watcher_); - DCHECK(!write_watcher_); - for (auto handle : incoming_platform_handles_) - handle.CloseIfNecessary(); - } - - void StartOnIOThread() { - DCHECK(!read_watcher_); - DCHECK(!write_watcher_); - read_watcher_.reset( - new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE)); - base::MessageLoop::current()->AddDestructionObserver(this); - if (handle_.get().needs_connection) { - base::MessageLoopForIO::current()->WatchFileDescriptor( - handle_.get().handle, false /* persistent */, - base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); - } else { - write_watcher_.reset( - new base::MessageLoopForIO::FileDescriptorWatcher(FROM_HERE)); - base::MessageLoopForIO::current()->WatchFileDescriptor( - handle_.get().handle, true /* persistent */, - base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this); - base::AutoLock lock(write_lock_); - FlushOutgoingMessagesNoLock(); - } - } - - void WaitForWriteOnIOThread() { - base::AutoLock lock(write_lock_); - WaitForWriteOnIOThreadNoLock(); - } - - void WaitForWriteOnIOThreadNoLock() { - if (pending_write_) - return; - if (!write_watcher_) - return; - if (io_task_runner_->RunsTasksOnCurrentThread()) { - pending_write_ = true; - base::MessageLoopForIO::current()->WatchFileDescriptor( - handle_.get().handle, false /* persistent */, - base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(), this); - } else { - io_task_runner_->PostTask( - FROM_HERE, base::Bind(&ChannelPosix::WaitForWriteOnIOThread, this)); - } - } - - void ShutDownOnIOThread() { - base::MessageLoop::current()->RemoveDestructionObserver(this); - - read_watcher_.reset(); - write_watcher_.reset(); - if (leak_handle_) - ignore_result(handle_.release()); - handle_.reset(); -#if defined(OS_MACOSX) - handles_to_close_.reset(); -#endif - - // May destroy the |this| if it was the last reference. - self_ = nullptr; - } - - // base::MessageLoop::DestructionObserver: - void WillDestroyCurrentMessageLoop() override { - DCHECK(io_task_runner_->RunsTasksOnCurrentThread()); - if (self_) - ShutDownOnIOThread(); - } - - // base::MessageLoopForIO::Watcher: - void OnFileCanReadWithoutBlocking(int fd) override { - CHECK_EQ(fd, handle_.get().handle); - if (handle_.get().needs_connection) { -#if !defined(OS_NACL) - read_watcher_.reset(); - base::MessageLoop::current()->RemoveDestructionObserver(this); - - ScopedPlatformHandle accept_fd; - ServerAcceptConnection(handle_.get(), &accept_fd); - if (!accept_fd.is_valid()) { - OnError(); - return; - } - handle_ = std::move(accept_fd); - StartOnIOThread(); -#else - NOTREACHED(); -#endif - return; - } - - bool read_error = false; - size_t next_read_size = 0; - size_t buffer_capacity = 0; - size_t total_bytes_read = 0; - size_t bytes_read = 0; - do { - buffer_capacity = next_read_size; - char* buffer = GetReadBuffer(&buffer_capacity); - DCHECK_GT(buffer_capacity, 0u); - - ssize_t read_result = PlatformChannelRecvmsg( - handle_.get(), - buffer, - buffer_capacity, - &incoming_platform_handles_); - - if (read_result > 0) { - bytes_read = static_cast<size_t>(read_result); - total_bytes_read += bytes_read; - if (!OnReadComplete(bytes_read, &next_read_size)) { - read_error = true; - break; - } - } else if (read_result == 0 || - (errno != EAGAIN && errno != EWOULDBLOCK)) { - read_error = true; - break; - } - } while (bytes_read == buffer_capacity && - total_bytes_read < kMaxBatchReadCapacity && - next_read_size > 0); - if (read_error) { - // Stop receiving read notifications. - read_watcher_.reset(); - - OnError(); - } - } - - void OnFileCanWriteWithoutBlocking(int fd) override { - bool write_error = false; - { - base::AutoLock lock(write_lock_); - pending_write_ = false; - if (!FlushOutgoingMessagesNoLock()) - reject_writes_ = write_error = true; - } - if (write_error) - OnError(); - } - - // Attempts to write a message directly to the channel. If the full message - // cannot be written, it's queued and a wait is initiated to write the message - // ASAP on the I/O thread. - bool WriteNoLock(MessageView message_view) { - if (handle_.get().needs_connection) { - outgoing_messages_.emplace_front(std::move(message_view)); - return true; - } - size_t bytes_written = 0; - do { - message_view.advance_data_offset(bytes_written); - - ssize_t result; - ScopedPlatformHandleVectorPtr handles = message_view.TakeHandles(); - if (handles && handles->size()) { - iovec iov = { - const_cast<void*>(message_view.data()), - message_view.data_num_bytes() - }; - // TODO: Handle lots of handles. - result = PlatformChannelSendmsgWithHandles( - handle_.get(), &iov, 1, handles->data(), handles->size()); - if (result >= 0) { -#if defined(OS_MACOSX) - // There is a bug on OSX which makes it dangerous to close - // a file descriptor while it is in transit. So instead we - // store the file descriptor in a set and send a message to - // the recipient, which is queued AFTER the message that - // sent the FD. The recipient will reply to the message, - // letting us know that it is now safe to close the file - // descriptor. For more information, see: - // http://crbug.com/298276 - std::vector<int> fds; - for (auto& handle : *handles) - fds.push_back(handle.handle); - { - base::AutoLock l(handles_to_close_lock_); - for (auto& handle : *handles) - handles_to_close_->push_back(handle); - } - MessagePtr fds_message( - new Channel::Message(sizeof(fds[0]) * fds.size(), 0, - Message::MessageType::HANDLES_SENT)); - memcpy(fds_message->mutable_payload(), fds.data(), - sizeof(fds[0]) * fds.size()); - outgoing_messages_.emplace_back(std::move(fds_message), 0); - handles->clear(); -#else - handles.reset(); -#endif // defined(OS_MACOSX) - } - } else { - result = PlatformChannelWrite(handle_.get(), message_view.data(), - message_view.data_num_bytes()); - } - - if (result < 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK -#if defined(OS_MACOSX) - // On OS X if sendmsg() is trying to send fds between processes and - // there isn't enough room in the output buffer to send the fd - // structure over atomically then EMSGSIZE is returned. - // - // EMSGSIZE presents a problem since the system APIs can only call - // us when there's room in the socket buffer and not when there is - // "enough" room. - // - // The current behavior is to return to the event loop when EMSGSIZE - // is received and hopefull service another FD. This is however - // still technically a busy wait since the event loop will call us - // right back until the receiver has read enough data to allow - // passing the FD over atomically. - && errno != EMSGSIZE -#endif - ) { - return false; - } - message_view.SetHandles(std::move(handles)); - outgoing_messages_.emplace_front(std::move(message_view)); - WaitForWriteOnIOThreadNoLock(); - return true; - } - - bytes_written = static_cast<size_t>(result); - } while (bytes_written < message_view.data_num_bytes()); - - return FlushOutgoingMessagesNoLock(); - } - - bool FlushOutgoingMessagesNoLock() { - std::deque<MessageView> messages; - std::swap(outgoing_messages_, messages); - - while (!messages.empty()) { - if (!WriteNoLock(std::move(messages.front()))) - return false; - - messages.pop_front(); - if (!outgoing_messages_.empty()) { - // The message was requeued by WriteNoLock(), so we have to wait for - // pipe to become writable again. Repopulate the message queue and exit. - // If sending the message triggered any control messages, they may be - // in |outgoing_messages_| in addition to or instead of the message - // being sent. - std::swap(messages, outgoing_messages_); - while (!messages.empty()) { - outgoing_messages_.push_front(std::move(messages.back())); - messages.pop_back(); - } - return true; - } - } - - return true; - } - -#if defined(OS_MACOSX) - bool OnControlMessage(Message::MessageType message_type, - const void* payload, - size_t payload_size, - ScopedPlatformHandleVectorPtr handles) override { - switch (message_type) { - case Message::MessageType::HANDLES_SENT: { - if (payload_size == 0) - break; - MessagePtr message(new Channel::Message( - payload_size, 0, Message::MessageType::HANDLES_SENT_ACK)); - memcpy(message->mutable_payload(), payload, payload_size); - Write(std::move(message)); - return true; - } - - case Message::MessageType::HANDLES_SENT_ACK: { - size_t num_fds = payload_size / sizeof(int); - if (num_fds == 0 || payload_size % sizeof(int) != 0) - break; - - const int* fds = reinterpret_cast<const int*>(payload); - if (!CloseHandles(fds, num_fds)) - break; - return true; - } - - default: - break; - } - - return false; - } - - // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if - // |fds| does not match a sequence of handles in |handles_to_close_|. - bool CloseHandles(const int* fds, size_t num_fds) { - base::AutoLock l(handles_to_close_lock_); - if (!num_fds) - return false; - - auto start = - std::find_if(handles_to_close_->begin(), handles_to_close_->end(), - [&fds](const PlatformHandle& handle) { - return handle.handle == fds[0]; - }); - if (start == handles_to_close_->end()) - return false; - - auto it = start; - size_t i = 0; - // The FDs in the message should match a sequence of handles in - // |handles_to_close_|. - for (; i < num_fds && it != handles_to_close_->end(); i++, ++it) { - if (it->handle != fds[i]) - return false; - - it->CloseIfNecessary(); - } - if (i != num_fds) - return false; - - handles_to_close_->erase(start, it); - return true; - } -#endif // defined(OS_MACOSX) - - // Keeps the Channel alive at least until explicit shutdown on the IO thread. - scoped_refptr<Channel> self_; - - ScopedPlatformHandle handle_; - scoped_refptr<base::TaskRunner> io_task_runner_; - - // These watchers must only be accessed on the IO thread. - std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_; - std::unique_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_; - - std::deque<PlatformHandle> incoming_platform_handles_; - - // Protects |pending_write_| and |outgoing_messages_|. - base::Lock write_lock_; - bool pending_write_ = false; - bool reject_writes_ = false; - std::deque<MessageView> outgoing_messages_; - - bool leak_handle_ = false; - -#if defined(OS_MACOSX) - base::Lock handles_to_close_lock_; - ScopedPlatformHandleVectorPtr handles_to_close_; -#endif - - DISALLOW_COPY_AND_ASSIGN(ChannelPosix); -}; - -} // namespace - -// static -scoped_refptr<Channel> Channel::Create( - Delegate* delegate, - ConnectionParams connection_params, - scoped_refptr<base::TaskRunner> io_task_runner) { - return new ChannelPosix(delegate, std::move(connection_params), - io_task_runner); -} - -} // namespace edk -} // namespace mojo |