aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/server.cc')
-rw-r--r--pw_rpc/server.cc72
1 files changed, 42 insertions, 30 deletions
diff --git a/pw_rpc/server.cc b/pw_rpc/server.cc
index d7634ab6b..4af9ff4e6 100644
--- a/pw_rpc/server.cc
+++ b/pw_rpc/server.cc
@@ -23,23 +23,21 @@
#include "pw_log/log.h"
#include "pw_rpc/internal/endpoint.h"
#include "pw_rpc/internal/packet.h"
+#include "pw_rpc/service_id.h"
namespace pw::rpc {
namespace {
using internal::Packet;
-using internal::PacketType;
+using internal::pwpb::PacketType;
} // namespace
-Status Server::ProcessPacket(ConstByteSpan packet_data,
- ChannelOutput* interface) {
+Status Server::ProcessPacket(ConstByteSpan packet_data) {
PW_TRY_ASSIGN(Packet packet,
Endpoint::ProcessPacket(packet_data, Packet::kServer));
internal::rpc_lock().lock();
- internal::ServerCall* const call =
- static_cast<internal::ServerCall*>(FindCall(packet));
// Verbose log for debugging.
// PW_LOG_DEBUG("RPC server received packet type %u for %u:%08x/%08x",
@@ -50,15 +48,6 @@ Status Server::ProcessPacket(ConstByteSpan packet_data,
internal::Channel* channel = GetInternalChannel(packet.channel_id());
if (channel == nullptr) {
- // If an interface was provided, respond with a SERVER_ERROR to indicate
- // that the channel is not available on this server. Don't send responses to
- // error messages, though, to avoid potential infinite cycles.
- if (interface != nullptr && packet.type() != PacketType::CLIENT_ERROR) {
- internal::Channel(packet.channel_id(), interface)
- .Send(Packet::ServerError(packet, Status::Unavailable()))
- .IgnoreError();
- }
-
internal::rpc_lock().unlock();
PW_LOG_WARN("RPC server received packet for unknown channel %u",
static_cast<unsigned>(packet.channel_id()));
@@ -74,24 +63,30 @@ Status Server::ProcessPacket(ConstByteSpan packet_data,
.IgnoreError();
}
internal::rpc_lock().unlock();
+ PW_LOG_DEBUG("Received packet on channel %u for unknown RPC %08x/%08x",
+ static_cast<unsigned>(packet.channel_id()),
+ static_cast<unsigned>(packet.service_id()),
+ static_cast<unsigned>(packet.method_id()));
return OkStatus(); // OK since the packet was handled.
}
+ // Handle request packets separately to avoid an unnecessary call lookup. The
+ // Call constructor looks up and cancels any duplicate calls.
+ if (packet.type() == PacketType::REQUEST) {
+ const internal::CallContext context(
+ *this, packet.channel_id(), *service, *method, packet.call_id());
+ method->Invoke(context, packet);
+ return OkStatus();
+ }
+
+ IntrusiveList<internal::Call>::iterator call = FindCall(packet);
+
switch (packet.type()) {
- case PacketType::REQUEST: {
- // If the REQUEST is for an ongoing RPC, the existing call will be
- // cancelled when the new call object is created.
- const internal::CallContext context(
- *this, channel->id(), *service, *method, packet.call_id());
- method->Invoke(context, packet);
- break;
- }
case PacketType::CLIENT_STREAM:
HandleClientStreamPacket(packet, *channel, call);
break;
case PacketType::CLIENT_ERROR:
- case PacketType::DEPRECATED_CANCEL:
- if (call != nullptr && call->id() == packet.call_id()) {
+ if (call != calls_end()) {
call->HandleError(packet.status());
} else {
internal::rpc_lock().unlock();
@@ -100,6 +95,10 @@ Status Server::ProcessPacket(ConstByteSpan packet_data,
case PacketType::CLIENT_STREAM_END:
HandleClientStreamPacket(packet, *channel, call);
break;
+ case PacketType::REQUEST: // Handled above
+ case PacketType::RESPONSE:
+ case PacketType::SERVER_ERROR:
+ case PacketType::SERVER_STREAM:
default:
internal::rpc_lock().unlock();
PW_LOG_WARN("pw_rpc server unable to handle packet of type %u",
@@ -113,7 +112,7 @@ std::tuple<Service*, const internal::Method*> Server::FindMethod(
const internal::Packet& packet) {
// Packets always include service and method IDs.
auto service = std::find_if(services_.begin(), services_.end(), [&](auto& s) {
- return s.id() == packet.service_id();
+ return internal::UnwrapServiceId(s.service_id()) == packet.service_id();
});
if (service == services_.end()) {
@@ -123,10 +122,11 @@ std::tuple<Service*, const internal::Method*> Server::FindMethod(
return {&(*service), service->FindMethod(packet.method_id())};
}
-void Server::HandleClientStreamPacket(const internal::Packet& packet,
- internal::Channel& channel,
- internal::ServerCall* call) const {
- if (call == nullptr || call->id() != packet.call_id()) {
+void Server::HandleClientStreamPacket(
+ const internal::Packet& packet,
+ internal::Channel& channel,
+ IntrusiveList<internal::Call>::iterator call) const {
+ if (call == calls_end()) {
channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
.IgnoreError(); // Errors are logged in Channel::Send.
internal::rpc_lock().unlock();
@@ -142,6 +142,12 @@ void Server::HandleClientStreamPacket(const internal::Packet& packet,
channel.Send(Packet::ServerError(packet, Status::InvalidArgument()))
.IgnoreError(); // Errors are logged in Channel::Send.
internal::rpc_lock().unlock();
+ PW_LOG_DEBUG(
+ "Received client stream packet for %u:%08x/%08x, which doesn't have a "
+ "client stream",
+ static_cast<unsigned>(packet.channel_id()),
+ static_cast<unsigned>(packet.service_id()),
+ static_cast<unsigned>(packet.method_id()));
return;
}
@@ -149,13 +155,19 @@ void Server::HandleClientStreamPacket(const internal::Packet& packet,
channel.Send(Packet::ServerError(packet, Status::FailedPrecondition()))
.IgnoreError(); // Errors are logged in Channel::Send.
internal::rpc_lock().unlock();
+ PW_LOG_DEBUG(
+ "Received client stream packet for %u:%08x/%08x, but its client stream "
+ "is closed",
+ static_cast<unsigned>(packet.channel_id()),
+ static_cast<unsigned>(packet.service_id()),
+ static_cast<unsigned>(packet.method_id()));
return;
}
if (packet.type() == PacketType::CLIENT_STREAM) {
call->HandlePayload(packet.payload());
} else { // Handle PacketType::CLIENT_STREAM_END.
- call->HandleClientStreamEnd();
+ static_cast<internal::ServerCall&>(*call).HandleClientStreamEnd();
}
}