diff options
Diffstat (limited to 'pw_rpc/server.cc')
-rw-r--r-- | pw_rpc/server.cc | 72 |
1 files changed, 42 insertions, 30 deletions
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc index d7634ab6b..4af9ff4e6 100644 --- a/pw_rpc/server.cc +++ b/pw_rpc/server.cc @@ -23,23 +23,21 @@ #include "pw_log/log.h" #include "pw_rpc/internal/endpoint.h" #include "pw_rpc/internal/packet.h" +#include "pw_rpc/service_id.h" namespace pw::rpc { namespace { using internal::Packet; -using internal::PacketType; +using internal::pwpb::PacketType; } // namespace -Status Server::ProcessPacket(ConstByteSpan packet_data, - ChannelOutput* interface) { +Status Server::ProcessPacket(ConstByteSpan packet_data) { PW_TRY_ASSIGN(Packet packet, Endpoint::ProcessPacket(packet_data, Packet::kServer)); internal::rpc_lock().lock(); - internal::ServerCall* const call = - static_cast<internal::ServerCall*>(FindCall(packet)); // Verbose log for debugging. // PW_LOG_DEBUG("RPC server received packet type %u for %u:%08x/%08x", @@ -50,15 +48,6 @@ Status Server::ProcessPacket(ConstByteSpan packet_data, internal::Channel* channel = GetInternalChannel(packet.channel_id()); if (channel == nullptr) { - // If an interface was provided, respond with a SERVER_ERROR to indicate - // that the channel is not available on this server. Don't send responses to - // error messages, though, to avoid potential infinite cycles. - if (interface != nullptr && packet.type() != PacketType::CLIENT_ERROR) { - internal::Channel(packet.channel_id(), interface) - .Send(Packet::ServerError(packet, Status::Unavailable())) - .IgnoreError(); - } - internal::rpc_lock().unlock(); PW_LOG_WARN("RPC server received packet for unknown channel %u", static_cast<unsigned>(packet.channel_id())); @@ -74,24 +63,30 @@ Status Server::ProcessPacket(ConstByteSpan packet_data, .IgnoreError(); } internal::rpc_lock().unlock(); + PW_LOG_DEBUG("Received packet on channel %u for unknown RPC %08x/%08x", + static_cast<unsigned>(packet.channel_id()), + static_cast<unsigned>(packet.service_id()), + static_cast<unsigned>(packet.method_id())); return OkStatus(); // OK since the packet was handled. } + // Handle request packets separately to avoid an unnecessary call lookup. The + // Call constructor looks up and cancels any duplicate calls. + if (packet.type() == PacketType::REQUEST) { + const internal::CallContext context( + *this, packet.channel_id(), *service, *method, packet.call_id()); + method->Invoke(context, packet); + return OkStatus(); + } + + IntrusiveList<internal::Call>::iterator call = FindCall(packet); + switch (packet.type()) { - case PacketType::REQUEST: { - // If the REQUEST is for an ongoing RPC, the existing call will be - // cancelled when the new call object is created. - const internal::CallContext context( - *this, channel->id(), *service, *method, packet.call_id()); - method->Invoke(context, packet); - break; - } case PacketType::CLIENT_STREAM: HandleClientStreamPacket(packet, *channel, call); break; case PacketType::CLIENT_ERROR: - case PacketType::DEPRECATED_CANCEL: - if (call != nullptr && call->id() == packet.call_id()) { + if (call != calls_end()) { call->HandleError(packet.status()); } else { internal::rpc_lock().unlock(); @@ -100,6 +95,10 @@ Status Server::ProcessPacket(ConstByteSpan packet_data, case PacketType::CLIENT_STREAM_END: HandleClientStreamPacket(packet, *channel, call); break; + case PacketType::REQUEST: // Handled above + case PacketType::RESPONSE: + case PacketType::SERVER_ERROR: + case PacketType::SERVER_STREAM: default: internal::rpc_lock().unlock(); PW_LOG_WARN("pw_rpc server unable to handle packet of type %u", @@ -113,7 +112,7 @@ std::tuple<Service*, const internal::Method*> Server::FindMethod( const internal::Packet& packet) { // Packets always include service and method IDs. auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) { - return s.id() == packet.service_id(); + return internal::UnwrapServiceId(s.service_id()) == packet.service_id(); }); if (service == services_.end()) { @@ -123,10 +122,11 @@ std::tuple<Service*, const internal::Method*> Server::FindMethod( return {&(*service), service->FindMethod(packet.method_id())}; } -void Server::HandleClientStreamPacket(const internal::Packet& packet, - internal::Channel& channel, - internal::ServerCall* call) const { - if (call == nullptr || call->id() != packet.call_id()) { +void Server::HandleClientStreamPacket( + const internal::Packet& packet, + internal::Channel& channel, + IntrusiveList<internal::Call>::iterator call) const { + if (call == calls_end()) { channel.Send(Packet::ServerError(packet, Status::FailedPrecondition())) .IgnoreError(); // Errors are logged in Channel::Send. internal::rpc_lock().unlock(); @@ -142,6 +142,12 @@ void Server::HandleClientStreamPacket(const internal::Packet& packet, channel.Send(Packet::ServerError(packet, Status::InvalidArgument())) .IgnoreError(); // Errors are logged in Channel::Send. internal::rpc_lock().unlock(); + PW_LOG_DEBUG( + "Received client stream packet for %u:%08x/%08x, which doesn't have a " + "client stream", + static_cast<unsigned>(packet.channel_id()), + static_cast<unsigned>(packet.service_id()), + static_cast<unsigned>(packet.method_id())); return; } @@ -149,13 +155,19 @@ void Server::HandleClientStreamPacket(const internal::Packet& packet, channel.Send(Packet::ServerError(packet, Status::FailedPrecondition())) .IgnoreError(); // Errors are logged in Channel::Send. internal::rpc_lock().unlock(); + PW_LOG_DEBUG( + "Received client stream packet for %u:%08x/%08x, but its client stream " + "is closed", + static_cast<unsigned>(packet.channel_id()), + static_cast<unsigned>(packet.service_id()), + static_cast<unsigned>(packet.method_id())); return; } if (packet.type() == PacketType::CLIENT_STREAM) { call->HandlePayload(packet.payload()); } else { // Handle PacketType::CLIENT_STREAM_END. - call->HandleClientStreamEnd(); + static_cast<internal::ServerCall&>(*call).HandleClientStreamEnd(); } } |