diff options
Diffstat (limited to 'pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h')
-rw-r--r-- | pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h | 150 |
1 files changed, 124 insertions, 26 deletions
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h index e66c42092..d47582165 100644 --- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h +++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h @@ -21,8 +21,13 @@ #include "pw_function/function.h" #include "pw_rpc/channel.h" #include "pw_rpc/internal/client_call.h" +#include "pw_rpc/internal/config.h" #include "pw_rpc/pwpb/internal/common.h" +#if PW_RPC_DYNAMIC_ALLOCATION +#include PW_RPC_MAKE_UNIQUE_PTR_INCLUDE +#endif // PW_RPC_DYNAMIC_ALLOCATION + namespace pw::rpc { namespace internal { @@ -47,17 +52,38 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall { rpc_lock().lock(); CallType call( client.ClaimLocked(), channel_id, service_id, method_id, serde); + SetCallbacksAndSendRequest(call, + client, + serde, + std::move(on_completed), + std::move(on_error), + request...); + return call; + } - call.set_pwpb_on_completed_locked(std::move(on_completed)); - call.set_on_error_locked(std::move(on_error)); - - if constexpr (sizeof...(Request) == 0u) { - call.SendInitialClientRequest({}); - } else { - PwpbSendInitialRequest(call, serde.request(), request...); - } - - client.CleanUpCalls(); + template <typename CallType, typename... Request> + static auto StartDynamic( + Endpoint& client, + uint32_t channel_id, + uint32_t service_id, + uint32_t method_id, + const PwpbMethodSerde& serde, + Function<void(const Response&, Status)>&& on_completed, + Function<void(Status)>&& on_error, + const Request&... request) PW_LOCKS_EXCLUDED(rpc_lock()) { + rpc_lock().lock(); + auto call = PW_RPC_MAKE_UNIQUE_PTR(CallType, + client.ClaimLocked(), + channel_id, + service_id, + method_id, + serde); + SetCallbacksAndSendRequest(*call, + client, + serde, + std::move(on_completed), + std::move(on_error), + request...); return call; } @@ -91,6 +117,8 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall { return *this; } + ~PwpbUnaryResponseClientCall() { DestroyClientCall(); } + // Implement moving by copying the serde pointer and on_completed function. void MovePwpbUnaryResponseClientCallFrom(PwpbUnaryResponseClientCall& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { @@ -123,6 +151,26 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall { } private: + template <typename CallType, typename... Request> + static void SetCallbacksAndSendRequest( + CallType& call, + Endpoint& client, + const PwpbMethodSerde& serde, + Function<void(const Response&, Status)>&& on_completed, + Function<void(Status)>&& on_error, + const Request&... request) PW_UNLOCK_FUNCTION(rpc_lock()) { + call.set_pwpb_on_completed_locked(std::move(on_completed)); + call.set_on_error_locked(std::move(on_error)); + + if constexpr (sizeof...(Request) == 0u) { + call.SendInitialClientRequest({}); + } else { + PwpbSendInitialRequest(call, serde.request(), request...); + } + + client.CleanUpCalls(); + } + void set_pwpb_on_completed_locked( Function<void(const Response& response, Status)>&& on_completed) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { @@ -163,17 +211,41 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall { rpc_lock().lock(); CallType call( client.ClaimLocked(), channel_id, service_id, method_id, serde); + SetCallbacksAndSendRequest(call, + client, + serde, + std::move(on_next), + std::move(on_completed), + std::move(on_error), + request...); + return call; + } - call.set_pwpb_on_next_locked(std::move(on_next)); - call.set_on_completed_locked(std::move(on_completed)); - call.set_on_error_locked(std::move(on_error)); - - if constexpr (sizeof...(Request) == 0u) { - call.SendInitialClientRequest({}); - } else { - PwpbSendInitialRequest(call, serde.request(), request...); - } - client.CleanUpCalls(); + template <typename CallType, typename... Request> + static auto StartDynamic(Endpoint& client, + uint32_t channel_id, + uint32_t service_id, + uint32_t method_id, + const PwpbMethodSerde& serde, + Function<void(const Response&)>&& on_next, + Function<void(Status)>&& on_completed, + Function<void(Status)>&& on_error, + const Request&... request) + PW_LOCKS_EXCLUDED(rpc_lock()) { + rpc_lock().lock(); + auto call = PW_RPC_MAKE_UNIQUE_PTR(CallType, + client.ClaimLocked(), + channel_id, + service_id, + method_id, + serde); + SetCallbacksAndSendRequest(*call, + client, + serde, + std::move(on_next), + std::move(on_completed), + std::move(on_error), + request...); return call; } @@ -207,6 +279,8 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall { return *this; } + ~PwpbStreamResponseClientCall() { DestroyClientCall(); } + // Implement moving by copying the serde pointer and on_next function. void MovePwpbStreamResponseClientCallFrom(PwpbStreamResponseClientCall& other) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { @@ -238,6 +312,27 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall { } private: + template <typename CallType, typename... Request> + static void SetCallbacksAndSendRequest( + CallType& call, + Endpoint& client, + const PwpbMethodSerde& serde, + Function<void(const Response&)>&& on_next, + Function<void(Status)>&& on_completed, + Function<void(Status)>&& on_error, + const Request&... request) PW_UNLOCK_FUNCTION(rpc_lock()) { + call.set_pwpb_on_next_locked(std::move(on_next)); + call.set_on_completed_locked(std::move(on_completed)); + call.set_on_error_locked(std::move(on_error)); + + if constexpr (sizeof...(Request) == 0u) { + call.SendInitialClientRequest({}); + } else { + PwpbSendInitialRequest(call, serde.request(), request...); + } + client.CleanUpCalls(); + } + void set_pwpb_on_next_locked( Function<void(const Response& response)>&& on_next) PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) { @@ -288,16 +383,18 @@ class PwpbClientReaderWriter request); } - // Notifies the server that no further client stream messages will be sent. - using internal::ClientCall::CloseClientStream; + // Notifies the server that the client has requested to stop communication by + // sending CLIENT_REQUEST_COMPLETION. + using internal::ClientCall::RequestCompletion; // Cancels this RPC. Closes the call locally and sends a CANCELLED error to // the server. using internal::Call::Cancel; - // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation - // packet. Future packets for this RPC are dropped, and the client sends a - // FAILED_PRECONDITION error in response because the call is not active. + // Closes this RPC locally. Sends a CLIENT_REQUEST_COMPLETION, but no + // cancellation packet. Future packets for this RPC are dropped, and the + // client sends a FAILED_PRECONDITION error in response because the call is + // not active. using internal::ClientCall::Abandon; // Functions for setting RPC event callbacks. @@ -343,6 +440,7 @@ class PwpbClientReader using internal::StreamResponseClientCall::channel_id; using internal::Call::Cancel; + using internal::Call::RequestCompletion; using internal::ClientCall::Abandon; // Functions for setting RPC event callbacks. @@ -401,7 +499,7 @@ class PwpbClientWriter } using internal::Call::Cancel; - using internal::Call::CloseClientStream; + using internal::Call::RequestCompletion; using internal::ClientCall::Abandon; // Functions for setting RPC event callbacks. |