aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h')
-rw-r--r--pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h150
1 files changed, 124 insertions, 26 deletions
diff --git a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
index e66c42092..d47582165 100644
--- a/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
+++ b/pw_rpc/pwpb/public/pw_rpc/pwpb/client_reader_writer.h
@@ -21,8 +21,13 @@
#include "pw_function/function.h"
#include "pw_rpc/channel.h"
#include "pw_rpc/internal/client_call.h"
+#include "pw_rpc/internal/config.h"
#include "pw_rpc/pwpb/internal/common.h"
+#if PW_RPC_DYNAMIC_ALLOCATION
+#include PW_RPC_MAKE_UNIQUE_PTR_INCLUDE
+#endif // PW_RPC_DYNAMIC_ALLOCATION
+
namespace pw::rpc {
namespace internal {
@@ -47,17 +52,38 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall {
rpc_lock().lock();
CallType call(
client.ClaimLocked(), channel_id, service_id, method_id, serde);
+ SetCallbacksAndSendRequest(call,
+ client,
+ serde,
+ std::move(on_completed),
+ std::move(on_error),
+ request...);
+ return call;
+ }
- call.set_pwpb_on_completed_locked(std::move(on_completed));
- call.set_on_error_locked(std::move(on_error));
-
- if constexpr (sizeof...(Request) == 0u) {
- call.SendInitialClientRequest({});
- } else {
- PwpbSendInitialRequest(call, serde.request(), request...);
- }
-
- client.CleanUpCalls();
+ template <typename CallType, typename... Request>
+ static auto StartDynamic(
+ Endpoint& client,
+ uint32_t channel_id,
+ uint32_t service_id,
+ uint32_t method_id,
+ const PwpbMethodSerde& serde,
+ Function<void(const Response&, Status)>&& on_completed,
+ Function<void(Status)>&& on_error,
+ const Request&... request) PW_LOCKS_EXCLUDED(rpc_lock()) {
+ rpc_lock().lock();
+ auto call = PW_RPC_MAKE_UNIQUE_PTR(CallType,
+ client.ClaimLocked(),
+ channel_id,
+ service_id,
+ method_id,
+ serde);
+ SetCallbacksAndSendRequest(*call,
+ client,
+ serde,
+ std::move(on_completed),
+ std::move(on_error),
+ request...);
return call;
}
@@ -91,6 +117,8 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall {
return *this;
}
+ ~PwpbUnaryResponseClientCall() { DestroyClientCall(); }
+
// Implement moving by copying the serde pointer and on_completed function.
void MovePwpbUnaryResponseClientCallFrom(PwpbUnaryResponseClientCall& other)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
@@ -123,6 +151,26 @@ class PwpbUnaryResponseClientCall : public UnaryResponseClientCall {
}
private:
+ template <typename CallType, typename... Request>
+ static void SetCallbacksAndSendRequest(
+ CallType& call,
+ Endpoint& client,
+ const PwpbMethodSerde& serde,
+ Function<void(const Response&, Status)>&& on_completed,
+ Function<void(Status)>&& on_error,
+ const Request&... request) PW_UNLOCK_FUNCTION(rpc_lock()) {
+ call.set_pwpb_on_completed_locked(std::move(on_completed));
+ call.set_on_error_locked(std::move(on_error));
+
+ if constexpr (sizeof...(Request) == 0u) {
+ call.SendInitialClientRequest({});
+ } else {
+ PwpbSendInitialRequest(call, serde.request(), request...);
+ }
+
+ client.CleanUpCalls();
+ }
+
void set_pwpb_on_completed_locked(
Function<void(const Response& response, Status)>&& on_completed)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
@@ -163,17 +211,41 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall {
rpc_lock().lock();
CallType call(
client.ClaimLocked(), channel_id, service_id, method_id, serde);
+ SetCallbacksAndSendRequest(call,
+ client,
+ serde,
+ std::move(on_next),
+ std::move(on_completed),
+ std::move(on_error),
+ request...);
+ return call;
+ }
- call.set_pwpb_on_next_locked(std::move(on_next));
- call.set_on_completed_locked(std::move(on_completed));
- call.set_on_error_locked(std::move(on_error));
-
- if constexpr (sizeof...(Request) == 0u) {
- call.SendInitialClientRequest({});
- } else {
- PwpbSendInitialRequest(call, serde.request(), request...);
- }
- client.CleanUpCalls();
+ template <typename CallType, typename... Request>
+ static auto StartDynamic(Endpoint& client,
+ uint32_t channel_id,
+ uint32_t service_id,
+ uint32_t method_id,
+ const PwpbMethodSerde& serde,
+ Function<void(const Response&)>&& on_next,
+ Function<void(Status)>&& on_completed,
+ Function<void(Status)>&& on_error,
+ const Request&... request)
+ PW_LOCKS_EXCLUDED(rpc_lock()) {
+ rpc_lock().lock();
+ auto call = PW_RPC_MAKE_UNIQUE_PTR(CallType,
+ client.ClaimLocked(),
+ channel_id,
+ service_id,
+ method_id,
+ serde);
+ SetCallbacksAndSendRequest(*call,
+ client,
+ serde,
+ std::move(on_next),
+ std::move(on_completed),
+ std::move(on_error),
+ request...);
return call;
}
@@ -207,6 +279,8 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall {
return *this;
}
+ ~PwpbStreamResponseClientCall() { DestroyClientCall(); }
+
// Implement moving by copying the serde pointer and on_next function.
void MovePwpbStreamResponseClientCallFrom(PwpbStreamResponseClientCall& other)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
@@ -238,6 +312,27 @@ class PwpbStreamResponseClientCall : public StreamResponseClientCall {
}
private:
+ template <typename CallType, typename... Request>
+ static void SetCallbacksAndSendRequest(
+ CallType& call,
+ Endpoint& client,
+ const PwpbMethodSerde& serde,
+ Function<void(const Response&)>&& on_next,
+ Function<void(Status)>&& on_completed,
+ Function<void(Status)>&& on_error,
+ const Request&... request) PW_UNLOCK_FUNCTION(rpc_lock()) {
+ call.set_pwpb_on_next_locked(std::move(on_next));
+ call.set_on_completed_locked(std::move(on_completed));
+ call.set_on_error_locked(std::move(on_error));
+
+ if constexpr (sizeof...(Request) == 0u) {
+ call.SendInitialClientRequest({});
+ } else {
+ PwpbSendInitialRequest(call, serde.request(), request...);
+ }
+ client.CleanUpCalls();
+ }
+
void set_pwpb_on_next_locked(
Function<void(const Response& response)>&& on_next)
PW_EXCLUSIVE_LOCKS_REQUIRED(rpc_lock()) {
@@ -288,16 +383,18 @@ class PwpbClientReaderWriter
request);
}
- // Notifies the server that no further client stream messages will be sent.
- using internal::ClientCall::CloseClientStream;
+ // Notifies the server that the client has requested to stop communication by
+ // sending CLIENT_REQUEST_COMPLETION.
+ using internal::ClientCall::RequestCompletion;
// Cancels this RPC. Closes the call locally and sends a CANCELLED error to
// the server.
using internal::Call::Cancel;
- // Closes this RPC locally. Sends a CLIENT_STREAM_END, but no cancellation
- // packet. Future packets for this RPC are dropped, and the client sends a
- // FAILED_PRECONDITION error in response because the call is not active.
+ // Closes this RPC locally. Sends a CLIENT_REQUEST_COMPLETION, but no
+ // cancellation packet. Future packets for this RPC are dropped, and the
+ // client sends a FAILED_PRECONDITION error in response because the call is
+ // not active.
using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.
@@ -343,6 +440,7 @@ class PwpbClientReader
using internal::StreamResponseClientCall::channel_id;
using internal::Call::Cancel;
+ using internal::Call::RequestCompletion;
using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.
@@ -401,7 +499,7 @@ class PwpbClientWriter
}
using internal::Call::Cancel;
- using internal::Call::CloseClientStream;
+ using internal::Call::RequestCompletion;
using internal::ClientCall::Abandon;
// Functions for setting RPC event callbacks.