diff options
Diffstat (limited to 'mojo/public/cpp/bindings/lib/interface_endpoint_client.cc')
-rw-r--r-- | mojo/public/cpp/bindings/lib/interface_endpoint_client.cc | 412 |
1 files changed, 0 insertions, 412 deletions
diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc deleted file mode 100644 index 4682e72..0000000 --- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc +++ /dev/null @@ -1,412 +0,0 @@ -// Copyright 2015 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "mojo/public/cpp/bindings/interface_endpoint_client.h" - -#include <stdint.h> - -#include <utility> - -#include "base/bind.h" -#include "base/location.h" -#include "base/logging.h" -#include "base/macros.h" -#include "base/memory/ptr_util.h" -#include "base/single_thread_task_runner.h" -#include "base/stl_util.h" -#include "mojo/public/cpp/bindings/associated_group.h" -#include "mojo/public/cpp/bindings/associated_group_controller.h" -#include "mojo/public/cpp/bindings/interface_endpoint_controller.h" -#include "mojo/public/cpp/bindings/lib/validation_util.h" -#include "mojo/public/cpp/bindings/sync_call_restrictions.h" - -namespace mojo { - -// ---------------------------------------------------------------------------- - -namespace { - -void DCheckIfInvalid(const base::WeakPtr<InterfaceEndpointClient>& client, - const std::string& message) { - bool is_valid = client && !client->encountered_error(); - DCHECK(!is_valid) << message; -} - -// When receiving an incoming message which expects a repsonse, -// InterfaceEndpointClient creates a ResponderThunk object and passes it to the -// incoming message receiver. When the receiver finishes processing the message, -// it can provide a response using this object. -class ResponderThunk : public MessageReceiverWithStatus { - public: - explicit ResponderThunk( - const base::WeakPtr<InterfaceEndpointClient>& endpoint_client, - scoped_refptr<base::SingleThreadTaskRunner> runner) - : endpoint_client_(endpoint_client), - accept_was_invoked_(false), - task_runner_(std::move(runner)) {} - ~ResponderThunk() override { - if (!accept_was_invoked_) { - // The Service handled a message that was expecting a response - // but did not send a response. - // We raise an error to signal the calling application that an error - // condition occurred. Without this the calling application would have no - // way of knowing it should stop waiting for a response. - if (task_runner_->RunsTasksOnCurrentThread()) { - // Please note that even if this code is run from a different task - // runner on the same thread as |task_runner_|, it is okay to directly - // call InterfaceEndpointClient::RaiseError(), because it will raise - // error from the correct task runner asynchronously. - if (endpoint_client_) { - endpoint_client_->RaiseError(); - } - } else { - task_runner_->PostTask( - FROM_HERE, - base::Bind(&InterfaceEndpointClient::RaiseError, endpoint_client_)); - } - } - } - - // MessageReceiver implementation: - bool Accept(Message* message) override { - DCHECK(task_runner_->RunsTasksOnCurrentThread()); - accept_was_invoked_ = true; - DCHECK(message->has_flag(Message::kFlagIsResponse)); - - bool result = false; - - if (endpoint_client_) - result = endpoint_client_->Accept(message); - - return result; - } - - // MessageReceiverWithStatus implementation: - bool IsValid() override { - DCHECK(task_runner_->RunsTasksOnCurrentThread()); - return endpoint_client_ && !endpoint_client_->encountered_error(); - } - - void DCheckInvalid(const std::string& message) override { - if (task_runner_->RunsTasksOnCurrentThread()) { - DCheckIfInvalid(endpoint_client_, message); - } else { - task_runner_->PostTask( - FROM_HERE, base::Bind(&DCheckIfInvalid, endpoint_client_, message)); - } - } - - private: - base::WeakPtr<InterfaceEndpointClient> endpoint_client_; - bool accept_was_invoked_; - scoped_refptr<base::SingleThreadTaskRunner> task_runner_; - - DISALLOW_COPY_AND_ASSIGN(ResponderThunk); -}; - -} // namespace - -// ---------------------------------------------------------------------------- - -InterfaceEndpointClient::SyncResponseInfo::SyncResponseInfo( - bool* in_response_received) - : response_received(in_response_received) {} - -InterfaceEndpointClient::SyncResponseInfo::~SyncResponseInfo() {} - -// ---------------------------------------------------------------------------- - -InterfaceEndpointClient::HandleIncomingMessageThunk::HandleIncomingMessageThunk( - InterfaceEndpointClient* owner) - : owner_(owner) {} - -InterfaceEndpointClient::HandleIncomingMessageThunk:: - ~HandleIncomingMessageThunk() {} - -bool InterfaceEndpointClient::HandleIncomingMessageThunk::Accept( - Message* message) { - return owner_->HandleValidatedMessage(message); -} - -// ---------------------------------------------------------------------------- - -InterfaceEndpointClient::InterfaceEndpointClient( - ScopedInterfaceEndpointHandle handle, - MessageReceiverWithResponderStatus* receiver, - std::unique_ptr<MessageReceiver> payload_validator, - bool expect_sync_requests, - scoped_refptr<base::SingleThreadTaskRunner> runner, - uint32_t interface_version) - : expect_sync_requests_(expect_sync_requests), - handle_(std::move(handle)), - incoming_receiver_(receiver), - thunk_(this), - filters_(&thunk_), - task_runner_(std::move(runner)), - control_message_proxy_(this), - control_message_handler_(interface_version), - weak_ptr_factory_(this) { - DCHECK(handle_.is_valid()); - - // TODO(yzshen): the way to use validator (or message filter in general) - // directly is a little awkward. - if (payload_validator) - filters_.Append(std::move(payload_validator)); - - if (handle_.pending_association()) { - handle_.SetAssociationEventHandler(base::Bind( - &InterfaceEndpointClient::OnAssociationEvent, base::Unretained(this))); - } else { - InitControllerIfNecessary(); - } -} - -InterfaceEndpointClient::~InterfaceEndpointClient() { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (controller_) - handle_.group_controller()->DetachEndpointClient(handle_); -} - -AssociatedGroup* InterfaceEndpointClient::associated_group() { - if (!associated_group_) - associated_group_ = base::MakeUnique<AssociatedGroup>(handle_); - return associated_group_.get(); -} - -ScopedInterfaceEndpointHandle InterfaceEndpointClient::PassHandle() { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(!has_pending_responders()); - - if (!handle_.is_valid()) - return ScopedInterfaceEndpointHandle(); - - handle_.SetAssociationEventHandler( - ScopedInterfaceEndpointHandle::AssociationEventCallback()); - - if (controller_) { - controller_ = nullptr; - handle_.group_controller()->DetachEndpointClient(handle_); - } - - return std::move(handle_); -} - -void InterfaceEndpointClient::AddFilter( - std::unique_ptr<MessageReceiver> filter) { - filters_.Append(std::move(filter)); -} - -void InterfaceEndpointClient::RaiseError() { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (!handle_.pending_association()) - handle_.group_controller()->RaiseError(); -} - -void InterfaceEndpointClient::CloseWithReason(uint32_t custom_reason, - const std::string& description) { - DCHECK(thread_checker_.CalledOnValidThread()); - - auto handle = PassHandle(); - handle.ResetWithReason(custom_reason, description); -} - -bool InterfaceEndpointClient::Accept(Message* message) { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(!message->has_flag(Message::kFlagExpectsResponse)); - DCHECK(!handle_.pending_association()); - - // This has to been done even if connection error has occurred. For example, - // the message contains a pending associated request. The user may try to use - // the corresponding associated interface pointer after sending this message. - // That associated interface pointer has to join an associated group in order - // to work properly. - if (!message->associated_endpoint_handles()->empty()) - message->SerializeAssociatedEndpointHandles(handle_.group_controller()); - - if (encountered_error_) - return false; - - InitControllerIfNecessary(); - - return controller_->SendMessage(message); -} - -bool InterfaceEndpointClient::AcceptWithResponder( - Message* message, - std::unique_ptr<MessageReceiver> responder) { - DCHECK(thread_checker_.CalledOnValidThread()); - DCHECK(message->has_flag(Message::kFlagExpectsResponse)); - DCHECK(!handle_.pending_association()); - - // Please see comments in Accept(). - if (!message->associated_endpoint_handles()->empty()) - message->SerializeAssociatedEndpointHandles(handle_.group_controller()); - - if (encountered_error_) - return false; - - InitControllerIfNecessary(); - - // Reserve 0 in case we want it to convey special meaning in the future. - uint64_t request_id = next_request_id_++; - if (request_id == 0) - request_id = next_request_id_++; - - message->set_request_id(request_id); - - bool is_sync = message->has_flag(Message::kFlagIsSync); - if (!controller_->SendMessage(message)) - return false; - - if (!is_sync) { - async_responders_[request_id] = std::move(responder); - return true; - } - - SyncCallRestrictions::AssertSyncCallAllowed(); - - bool response_received = false; - sync_responses_.insert(std::make_pair( - request_id, base::MakeUnique<SyncResponseInfo>(&response_received))); - - base::WeakPtr<InterfaceEndpointClient> weak_self = - weak_ptr_factory_.GetWeakPtr(); - controller_->SyncWatch(&response_received); - // Make sure that this instance hasn't been destroyed. - if (weak_self) { - DCHECK(base::ContainsKey(sync_responses_, request_id)); - auto iter = sync_responses_.find(request_id); - DCHECK_EQ(&response_received, iter->second->response_received); - if (response_received) - ignore_result(responder->Accept(&iter->second->response)); - sync_responses_.erase(iter); - } - - return true; -} - -bool InterfaceEndpointClient::HandleIncomingMessage(Message* message) { - DCHECK(thread_checker_.CalledOnValidThread()); - return filters_.Accept(message); -} - -void InterfaceEndpointClient::NotifyError( - const base::Optional<DisconnectReason>& reason) { - DCHECK(thread_checker_.CalledOnValidThread()); - - if (encountered_error_) - return; - encountered_error_ = true; - - // Response callbacks may hold on to resource, and there's no need to keep - // them alive any longer. Note that it's allowed that a pending response - // callback may own this endpoint, so we simply move the responders onto the - // stack here and let them be destroyed when the stack unwinds. - AsyncResponderMap responders = std::move(async_responders_); - - control_message_proxy_.OnConnectionError(); - - if (!error_handler_.is_null()) { - base::Closure error_handler = std::move(error_handler_); - error_handler.Run(); - } else if (!error_with_reason_handler_.is_null()) { - ConnectionErrorWithReasonCallback error_with_reason_handler = - std::move(error_with_reason_handler_); - if (reason) { - error_with_reason_handler.Run(reason->custom_reason, reason->description); - } else { - error_with_reason_handler.Run(0, std::string()); - } - } -} - -void InterfaceEndpointClient::QueryVersion( - const base::Callback<void(uint32_t)>& callback) { - control_message_proxy_.QueryVersion(callback); -} - -void InterfaceEndpointClient::RequireVersion(uint32_t version) { - control_message_proxy_.RequireVersion(version); -} - -void InterfaceEndpointClient::FlushForTesting() { - control_message_proxy_.FlushForTesting(); -} - -void InterfaceEndpointClient::InitControllerIfNecessary() { - if (controller_ || handle_.pending_association()) - return; - - controller_ = handle_.group_controller()->AttachEndpointClient(handle_, this, - task_runner_); - if (expect_sync_requests_) - controller_->AllowWokenUpBySyncWatchOnSameThread(); -} - -void InterfaceEndpointClient::OnAssociationEvent( - ScopedInterfaceEndpointHandle::AssociationEvent event) { - if (event == ScopedInterfaceEndpointHandle::ASSOCIATED) { - InitControllerIfNecessary(); - } else if (event == - ScopedInterfaceEndpointHandle::PEER_CLOSED_BEFORE_ASSOCIATION) { - task_runner_->PostTask(FROM_HERE, - base::Bind(&InterfaceEndpointClient::NotifyError, - weak_ptr_factory_.GetWeakPtr(), - handle_.disconnect_reason())); - } -} - -bool InterfaceEndpointClient::HandleValidatedMessage(Message* message) { - DCHECK_EQ(handle_.id(), message->interface_id()); - - if (encountered_error_) { - // This message is received after error has been encountered. For associated - // interfaces, this means the remote side sends a - // PeerAssociatedEndpointClosed event but continues to send more messages - // for the same interface. Close the pipe because this shouldn't happen. - DVLOG(1) << "A message is received for an interface after it has been " - << "disconnected. Closing the pipe."; - return false; - } - - if (message->has_flag(Message::kFlagExpectsResponse)) { - std::unique_ptr<MessageReceiverWithStatus> responder = - base::MakeUnique<ResponderThunk>(weak_ptr_factory_.GetWeakPtr(), - task_runner_); - if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) { - return control_message_handler_.AcceptWithResponder(message, - std::move(responder)); - } else { - return incoming_receiver_->AcceptWithResponder(message, - std::move(responder)); - } - } else if (message->has_flag(Message::kFlagIsResponse)) { - uint64_t request_id = message->request_id(); - - if (message->has_flag(Message::kFlagIsSync)) { - auto it = sync_responses_.find(request_id); - if (it == sync_responses_.end()) - return false; - it->second->response = std::move(*message); - *it->second->response_received = true; - return true; - } - - auto it = async_responders_.find(request_id); - if (it == async_responders_.end()) - return false; - std::unique_ptr<MessageReceiver> responder = std::move(it->second); - async_responders_.erase(it); - return responder->Accept(message); - } else { - if (mojo::internal::ControlMessageHandler::IsControlMessage(message)) - return control_message_handler_.Accept(message); - - return incoming_receiver_->Accept(message); - } -} - -} // namespace mojo |