aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/benchmark.cc
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/benchmark.cc')
-rw-r--r--pw_rpc/benchmark.cc37
1 files changed, 28 insertions, 9 deletions
diff --git a/pw_rpc/benchmark.cc b/pw_rpc/benchmark.cc
index dfa11b64e..a8cec9522 100644
--- a/pw_rpc/benchmark.cc
+++ b/pw_rpc/benchmark.cc
@@ -39,17 +39,36 @@ void BenchmarkService::UnaryEcho(ConstByteSpan request,
.IgnoreError();
}
+BenchmarkService::ReaderWriterId BenchmarkService::AllocateReaderWriterId() {
+ return next_reader_writer_id_++;
+}
+
void BenchmarkService::BidirectionalEcho(
RawServerReaderWriter& new_reader_writer) {
- reader_writer_ = std::move(new_reader_writer);
-
- reader_writer_.set_on_next([this](ConstByteSpan request) {
- Status status = reader_writer_.Write(request);
- if (!status.ok()) {
- reader_writer_.Finish(status)
- .IgnoreError(); // TODO(b/242598609): Handle Status properly
- }
- });
+ auto id = AllocateReaderWriterId();
+
+ struct Captures {
+ BenchmarkService* self;
+ ReaderWriterId id;
+ };
+
+ auto captures = std::make_unique<Captures>(Captures{.self = this, .id = id});
+ new_reader_writer.set_on_next(
+ [captures = std::move(captures)](ConstByteSpan request) {
+ auto& reader_writers = captures->self->reader_writers_;
+ auto rw_id = captures->id;
+ auto reader_writer = reader_writers.find(rw_id);
+ if (reader_writer == reader_writers.end()) {
+ return;
+ }
+ Status status = reader_writer->second.Write(request);
+ if (!status.ok()) {
+ reader_writer->second.Finish(status)
+ .IgnoreError(); // TODO: b/242598609 - Handle Status properly
+ reader_writers.erase(rw_id);
+ }
+ });
+ reader_writers_.insert({id, std::move(new_reader_writer)});
}
} // namespace pw::rpc