aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornanahpang <31627465+nanahpang@users.noreply.github.com>2023-11-06 12:24:06 -0800
committerGitHub <noreply@github.com>2023-11-06 12:24:06 -0800
commit3869ef09a554dd4e8e0703b4099cc9891bbdd8c2 (patch)
treeef8e9a7d8e83c8d1a3bfab5ea568f44bc7b6306f
parent1dbbdb5820d42bb22160537dfac002c17a118b8d (diff)
downloadgrpc-grpc-3869ef09a554dd4e8e0703b4099cc9891bbdd8c2.tar.gz
[chaotic-good] Add chaotic good client transport read (roll-forward) (#34806)
Roll forward #34657, which was reverted in #34761. Previous error in CMake: ``` [ RUN ] ClientTransportTest.AddOneStreamMultipleMessages unknown file: Failure Unexpected mock function call - returning directly. Function call: Call(CANCELLED: ) Google Mock tried the following 1 expectation, but it didn't match: /[var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc:484](https://cs.corp.google.com/piper///depot/google3/var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc?l=484): EXPECT_CALL(on_done, Call(absl::OkStatus()))... Expected arg #0: is equal to OK Actual: CANCELLED: Expected: to be called once Actual: never called - unsatisfied and active /[var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc:484](https://cs.corp.google.com/piper///depot/google3/var/local/git/grpc/test/core/transport/chaotic_good/client_transport_test.cc?l=484): Failure Actual function call count doesn't match EXPECT_CALL(on_done, Call(absl::OkStatus()))... Expected: to be called once Actual: never called - unsatisfied and active real 0.24 user 0.00 sys 0.00 2023-10-20 01:50:32,776 FAILED: cmake/build/client_transport_test --gtest_filter=ClientTransportTest.AddOneStreamMultipleMessages GRPC_POLL_STRATEGY=epoll1 [ret=139, pid=1663532, time=0.3sec] ``` <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. -->
-rw-r--r--build_autogenerated.yaml3
-rw-r--r--src/core/BUILD18
-rw-r--r--src/core/ext/transport/chaotic_good/client_transport.cc111
-rw-r--r--src/core/ext/transport/chaotic_good/client_transport.h106
-rw-r--r--src/core/ext/transport/chaotic_good/frame.cc6
-rw-r--r--src/core/ext/transport/chaotic_good/frame.h1
-rw-r--r--src/core/lib/transport/promise_endpoint.cc8
-rw-r--r--test/core/transport/chaotic_good/BUILD8
-rw-r--r--test/core/transport/chaotic_good/client_transport_test.cc489
9 files changed, 518 insertions, 232 deletions
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index a82085a2dc..a2addbdeaa 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -7082,12 +7082,13 @@ targets:
- src/core/ext/transport/chaotic_good/frame_header.h
- src/core/lib/promise/detail/join_state.h
- src/core/lib/promise/event_engine_wakeup_scheduler.h
+ - src/core/lib/promise/inter_activity_pipe.h
- src/core/lib/promise/join.h
- src/core/lib/promise/mpsc.h
+ - src/core/lib/promise/try_join.h
- src/core/lib/promise/wait_set.h
- src/core/lib/transport/promise_endpoint.h
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
- - test/core/promise/test_wakeup_schedulers.h
src:
- test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
- src/core/ext/transport/chaotic_good/client_transport.cc
diff --git a/src/core/BUILD b/src/core/BUILD
index d5a6b668a6..d0064a8610 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -6050,6 +6050,7 @@ grpc_cc_library(
"arena",
"bitset",
"chaotic_good_frame_header",
+ "context",
"no_destruct",
"slice",
"slice_buffer",
@@ -6212,29 +6213,42 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
+ "absl/random",
+ "absl/random:bit_gen_ref",
"absl/status",
+ "absl/status:statusor",
+ "absl/types:optional",
"absl/types:variant",
],
language = "c++",
deps = [
"activity",
+ "arena",
"chaotic_good_frame",
"chaotic_good_frame_header",
"event_engine_wakeup_scheduler",
"for_each",
"grpc_promise_endpoint",
- "join",
+ "if",
+ "inter_activity_pipe",
"loop",
"match",
+ "memory_quota",
"mpsc",
"pipe",
- "seq",
+ "poll",
+ "resource_quota",
"slice",
"slice_buffer",
+ "try_join",
+ "try_seq",
+ "//:exec_ctx",
"//:gpr",
"//:gpr_platform",
"//:grpc_base",
"//:hpack_encoder",
+ "//:hpack_parser",
+ "//:ref_counted_ptr",
],
)
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index 6eb6e4b86b..7049226881 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -20,17 +20,26 @@
#include <string>
#include <tuple>
+#include "absl/random/bit_gen_ref.h"
+#include "absl/random/random.h"
+#include "absl/status/statusor.h"
+
#include <grpc/event_engine/event_engine.h>
+#include <grpc/slice.h>
#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/match.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
-#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/try_join.h"
+#include "src/core/lib/resource_quota/arena.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -49,9 +58,14 @@ ClientTransport::ClientTransport(
control_endpoint_write_buffer_(SliceBuffer()),
data_endpoint_write_buffer_(SliceBuffer()),
hpack_compressor_(std::make_unique<HPackCompressor>()),
+ hpack_parser_(std::make_unique<HPackParser>()),
+ memory_allocator_(
+ ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
+ "client_transport")),
+ arena_(MakeScopedArena(1024, &memory_allocator_)),
event_engine_(event_engine) {
auto write_loop = Loop([this] {
- return Seq(
+ return TrySeq(
// Get next outgoing frame.
this->outgoing_frames_.Next(),
// Construct data buffers that will be sent to the endpoints.
@@ -81,20 +95,16 @@ ClientTransport::ClientTransport(
},
// Write buffers to corresponding endpoints concurrently.
[this]() {
- return Join(this->control_endpoint_->Write(
- std::move(control_endpoint_write_buffer_)),
- this->data_endpoint_->Write(
- std::move(data_endpoint_write_buffer_)));
+ return TryJoin(
+ control_endpoint_->Write(
+ std::move(control_endpoint_write_buffer_)),
+ data_endpoint_->Write(std::move(data_endpoint_write_buffer_)));
},
- // Finish writes and return status.
- [](std::tuple<absl::Status, absl::Status> ret)
- -> LoopCtl<absl::Status> {
- // If writes failed, return failure status.
- if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) {
- // TODO(ladynana): handle the promise endpoint write failures with
- // closing the transport.
- return absl::InternalError("Promise endpoint writes failed.");
- }
+ // Finish writes to difference endpoints and continue the loop.
+ []() -> LoopCtl<absl::Status> {
+ // The write failures will be caught in TrySeq and exit loop.
+ // Therefore, only need to return Continue() in the last lambda
+ // function.
return Continue();
});
});
@@ -104,7 +114,76 @@ ClientTransport::ClientTransport(
[](absl::Status status) {
GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
status.code() == absl::StatusCode::kInternal);
- });
+ // TODO(ladynana): handle the promise endpoint write failures with
+ // outgoing_frames.close() once available.
+ },
+ // Hold Arena in activity for GetContext<Arena> usage.
+ arena_.get());
+ auto read_loop = Loop([this] {
+ return TrySeq(
+ // Read frame header from control endpoint.
+ // TODO(ladynana): remove memcpy in ReadSlice.
+ this->control_endpoint_->ReadSlice(FrameHeader::frame_header_size_),
+ // Read different parts of the server frame from control/data endpoints
+ // based on frame header.
+ [this](Slice read_buffer) mutable {
+ frame_header_ = std::make_shared<FrameHeader>(
+ FrameHeader::Parse(
+ reinterpret_cast<const uint8_t*>(
+ GRPC_SLICE_START_PTR(read_buffer.c_slice())))
+ .value());
+ // Read header and trailers from control endpoint.
+ // Read message padding and message from data endpoint.
+ return TryJoin(
+ control_endpoint_->Read(frame_header_->GetFrameLength()),
+ data_endpoint_->Read(frame_header_->message_padding +
+ frame_header_->message_length));
+ },
+ // Construct and send the server frame to corresponding stream.
+ [this](std::tuple<SliceBuffer, SliceBuffer> ret) mutable {
+ control_endpoint_read_buffer_ = std::move(std::get<0>(ret));
+ // Discard message padding and only keep message in data read buffer.
+ std::get<1>(ret).MoveLastNBytesIntoSliceBuffer(
+ frame_header_->message_length, data_endpoint_read_buffer_);
+ ServerFragmentFrame frame;
+ // Initialized to get this_cpu() info in global_stat().
+ ExecCtx exec_ctx;
+ // Deserialize frame from read buffer.
+ absl::BitGen bitgen;
+ auto status = frame.Deserialize(hpack_parser_.get(), *frame_header_,
+ absl::BitGenRef(bitgen),
+ control_endpoint_read_buffer_);
+ GPR_ASSERT(status.ok());
+ // Move message into frame.
+ frame.message = arena_->MakePooled<Message>(
+ std::move(data_endpoint_read_buffer_), 0);
+ auto stream_id = frame.frame_header.stream_id;
+ {
+ MutexLock lock(&mu_);
+ return stream_map_[stream_id]->Push(ServerFrame(std::move(frame)));
+ }
+ },
+ // Check if send frame to corresponding stream successfully.
+ [](bool ret) -> LoopCtl<absl::Status> {
+ if (ret) {
+ // Send incoming frames successfully.
+ return Continue();
+ } else {
+ return absl::InternalError("Send incoming frames failed.");
+ }
+ });
+ });
+ reader_ = MakeActivity(
+ // Continuously read next incoming frames from promise endpoints.
+ std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
+ [](absl::Status status) {
+ GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
+ status.code() == absl::StatusCode::kInternal);
+ // TODO(ladynana): handle the promise endpoint read failures with
+ // iterating stream_map_ and close all the pipes once available.
+ },
+ // Hold Arena in activity for GetContext<Arena> usage.
+ arena_.get());
}
} // namespace chaotic_good
diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h
index 9b2129c083..a2110f1192 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.h
+++ b/src/core/ext/transport/chaotic_good/client_transport.h
@@ -17,29 +17,42 @@
#include <grpc/support/port_platform.h>
-#include <stddef.h>
#include <stdint.h>
+#include <stdio.h>
#include <initializer_list> // IWYU pragma: keep
+#include <map>
#include <memory>
+#include <tuple>
#include <type_traits>
#include <utility>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
+#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h>
+#include <grpc/event_engine/memory_allocator.h>
+#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
+#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/for_each.h"
+#include "src/core/lib/promise/if.h"
+#include "src/core/lib/promise/inter_activity_pipe.h"
+#include "src/core/lib/promise/loop.h"
#include "src/core/lib/promise/mpsc.h"
#include "src/core/lib/promise/pipe.h"
-#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/promise/poll.h"
+#include "src/core/lib/promise/try_join.h"
+#include "src/core/lib/promise/try_seq.h"
+#include "src/core/lib/resource_quota/arena.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "src/core/lib/transport/promise_endpoint.h"
@@ -58,18 +71,31 @@ class ClientTransport {
if (writer_ != nullptr) {
writer_.reset();
}
+ if (reader_ != nullptr) {
+ reader_.reset();
+ }
}
auto AddStream(CallArgs call_args) {
// At this point, the connection is set up.
// Start sending data frames.
uint32_t stream_id;
+ InterActivityPipe<ServerFrame, server_frame_queue_size_> pipe_server_frames;
{
MutexLock lock(&mu_);
stream_id = next_stream_id_++;
+ stream_map_.insert(
+ std::pair<uint32_t,
+ std::shared_ptr<InterActivityPipe<
+ ServerFrame, server_frame_queue_size_>::Sender>>(
+ stream_id, std::make_shared<InterActivityPipe<
+ ServerFrame, server_frame_queue_size_>::Sender>(
+ std::move(pipe_server_frames.sender))));
}
- return Seq(
- // Continuously send data frame with client to server messages.
- ForEach(std::move(*call_args.client_to_server_messages),
+ return TrySeq(
+ TryJoin(
+ // Continuously send client frame with client to server messages.
+ ForEach(
+ std::move(*call_args.client_to_server_messages),
[stream_id, initial_frame = true,
client_initial_metadata =
std::move(call_args.client_initial_metadata),
@@ -89,7 +115,7 @@ class ClientTransport {
frame.headers = std::move(client_initial_metadata);
initial_frame = false;
}
- return Seq(
+ return TrySeq(
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
@@ -98,24 +124,86 @@ class ClientTransport {
}
return absl::OkStatus();
});
- }));
+ }),
+ // Continuously receive server frames from endpoints and save
+ // results to call_args.
+ Loop([server_initial_metadata = call_args.server_initial_metadata,
+ server_to_client_messages =
+ call_args.server_to_client_messages,
+ receiver = std::move(pipe_server_frames.receiver)]() mutable {
+ return TrySeq(
+ // Receive incoming server frame.
+ receiver.Next(),
+ // Save incomming frame results to call_args.
+ [server_initial_metadata, server_to_client_messages](
+ absl::optional<ServerFrame> server_frame) mutable {
+ GPR_ASSERT(server_frame.has_value());
+ auto frame = std::move(
+ absl::get<ServerFragmentFrame>(*server_frame));
+ bool has_headers = (frame.headers != nullptr);
+ bool has_message = (frame.message != nullptr);
+ bool has_trailers = (frame.trailers != nullptr);
+ return TrySeq(
+ If(
+ has_headers,
+ [server_initial_metadata,
+ headers = std::move(frame.headers)]() mutable {
+ return server_initial_metadata->Push(
+ std::move(headers));
+ },
+ [] { return false; }),
+ If(
+ has_message,
+ [server_to_client_messages,
+ message = std::move(frame.message)]() mutable {
+ return server_to_client_messages->Push(
+ std::move(message));
+ },
+ [] { return false; }),
+ If(
+ has_trailers,
+ [trailers = std::move(frame.trailers)]() mutable
+ -> LoopCtl<ServerMetadataHandle> {
+ return std::move(trailers);
+ },
+ []() -> LoopCtl<ServerMetadataHandle> {
+ return Continue();
+ }));
+ });
+ })),
+ [](std::tuple<Empty, ServerMetadataHandle> ret) {
+ return std::move(std::get<1>(ret));
+ });
}
private:
// Max buffer is set to 4, so that for stream writes each time it will queue
// at most 2 frames.
MpscReceiver<ClientFrame> outgoing_frames_;
- Mutex mu_;
- uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
+ // Queue size of each stream pipe is set to 2, so that for each stream read it
+ // will queue at most 2 frames.
+ static const size_t server_frame_queue_size_ = 2;
// Assigned aligned bytes from setting frame.
size_t aligned_bytes = 64;
+ Mutex mu_;
+ uint32_t next_stream_id_ ABSL_GUARDED_BY(mu_) = 1;
+ // Map of stream incoming server frames, key is stream_id.
+ std::map<uint32_t, std::shared_ptr<InterActivityPipe<
+ ServerFrame, server_frame_queue_size_>::Sender>>
+ stream_map_ ABSL_GUARDED_BY(mu_);
ActivityPtr writer_;
ActivityPtr reader_;
std::unique_ptr<PromiseEndpoint> control_endpoint_;
std::unique_ptr<PromiseEndpoint> data_endpoint_;
SliceBuffer control_endpoint_write_buffer_;
SliceBuffer data_endpoint_write_buffer_;
+ SliceBuffer control_endpoint_read_buffer_;
+ SliceBuffer data_endpoint_read_buffer_;
std::unique_ptr<HPackCompressor> hpack_compressor_;
+ std::unique_ptr<HPackParser> hpack_parser_;
+ std::shared_ptr<FrameHeader> frame_header_;
+ MemoryAllocator memory_allocator_;
+ ScopedArenaPtr arena_;
// Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};
diff --git a/src/core/ext/transport/chaotic_good/frame.cc b/src/core/ext/transport/chaotic_good/frame.cc
index a463c2e02b..2f7f938e7c 100644
--- a/src/core/ext/transport/chaotic_good/frame.cc
+++ b/src/core/ext/transport/chaotic_good/frame.cc
@@ -32,6 +32,8 @@
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/no_destruct.h"
#include "src/core/lib/gprpp/status_helper.h"
+#include "src/core/lib/promise/context.h"
+#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
@@ -134,7 +136,9 @@ absl::StatusOr<Arena::PoolPtr<Metadata>> ReadMetadata(
absl::BitGenRef bitsrc) {
if (!maybe_slices.ok()) return maybe_slices.status();
auto& slices = *maybe_slices;
- Arena::PoolPtr<Metadata> metadata;
+ auto arena = GetContext<Arena>();
+ GPR_ASSERT(arena != nullptr);
+ Arena::PoolPtr<Metadata> metadata = arena->MakePooled<Metadata>(arena);
parser->BeginFrame(
metadata.get(), std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::max(),
diff --git a/src/core/ext/transport/chaotic_good/frame.h b/src/core/ext/transport/chaotic_good/frame.h
index eca8200a1a..8e5031802e 100644
--- a/src/core/ext/transport/chaotic_good/frame.h
+++ b/src/core/ext/transport/chaotic_good/frame.h
@@ -97,6 +97,7 @@ struct ServerFragmentFrame final : public FrameInterface {
FrameHeader frame_header;
ServerMetadataHandle headers;
+ MessageHandle message;
ServerMetadataHandle trailers;
bool operator==(const ServerFragmentFrame& other) const {
diff --git a/src/core/lib/transport/promise_endpoint.cc b/src/core/lib/transport/promise_endpoint.cc
index 660d183a64..e9bc70a2e1 100644
--- a/src/core/lib/transport/promise_endpoint.cc
+++ b/src/core/lib/transport/promise_endpoint.cc
@@ -46,10 +46,10 @@ PromiseEndpoint::PromiseEndpoint(
}
PromiseEndpoint::~PromiseEndpoint() {
- // Last write result has not been polled.
- GPR_ASSERT(!write_result_.has_value());
- // Last read result has not been polled.
- GPR_ASSERT(!read_result_.has_value());
+ // Promise endpoint close when last write result has not been polled.
+ write_result_.reset();
+ // Promise endpoint close when last read result has not been polled.
+ read_result_.reset();
}
const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD
index 9c7cd464cd..036c894e32 100644
--- a/test/core/transport/chaotic_good/BUILD
+++ b/test/core/transport/chaotic_good/BUILD
@@ -87,7 +87,9 @@ grpc_cc_test(
srcs = ["client_transport_test.cc"],
external_deps = [
"absl/functional:any_invocable",
+ "absl/status:statusor",
"absl/strings:str_format",
+ "absl/types:optional",
"gtest",
],
language = "C++",
@@ -95,12 +97,17 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//:grpc",
+ "//:grpc_public_hdrs",
"//:iomgr_timer",
"//:ref_counted_ptr",
"//src/core:activity",
"//src/core:arena",
"//src/core:chaotic_good_client_transport",
+ "//src/core:event_engine_wakeup_scheduler",
+ "//src/core:if",
"//src/core:join",
+ "//src/core:loop",
+ "//src/core:map",
"//src/core:memory_quota",
"//src/core:pipe",
"//src/core:resource_quota",
@@ -109,6 +116,5 @@ grpc_cc_test(
"//src/core:slice_buffer",
"//test/core/event_engine/fuzzing_event_engine",
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
- "//test/core/promise:test_wakeup_schedulers",
],
)
diff --git a/test/core/transport/chaotic_good/client_transport_test.cc b/test/core/transport/chaotic_good/client_transport_test.cc
index 96889082a2..86bbf578c2 100644
--- a/test/core/transport/chaotic_good/client_transport_test.cc
+++ b/test/core/transport/chaotic_good/client_transport_test.cc
@@ -18,36 +18,46 @@
#include <algorithm> // IWYU pragma: keep
#include <memory>
+#include <string> // IWYU pragma: keep
#include <tuple>
#include <vector> // IWYU pragma: keep
#include "absl/functional/any_invocable.h"
+#include "absl/status/statusor.h" // IWYU pragma: keep
#include "absl/strings/str_format.h" // IWYU pragma: keep
+#include "absl/types/optional.h" // IWYU pragma: keep
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
+#include <grpc/event_engine/slice.h> // IWYU pragma: keep
#include <grpc/event_engine/slice_buffer.h>
#include <grpc/grpc.h>
+#include <grpc/status.h> // IWYU pragma: keep
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/iomgr/timer_manager.h"
#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
+#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/join.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/map.h"
#include "src/core/lib/promise/pipe.h"
#include "src/core/lib/promise/seq.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/resource_quota/resource_quota.h"
-#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
+#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
+#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
-#include "test/core/promise/test_wakeup_schedulers.h"
using testing::MockFunction;
using testing::Return;
+using testing::Sequence;
using testing::StrictMock;
using testing::WithArgs;
@@ -101,29 +111,187 @@ class ClientTransportTest : public ::testing::Test {
return options;
}(),
fuzzing_event_engine::Actions())),
- client_transport_(
- std::make_unique<PromiseEndpoint>(
- std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
- SliceBuffer()),
- std::make_unique<PromiseEndpoint>(
- std::unique_ptr<MockEndpoint>(data_endpoint_ptr_),
- SliceBuffer()),
- std::static_pointer_cast<
- grpc_event_engine::experimental::EventEngine>(event_engine_)),
arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
pipe_client_to_server_messages_(arena_.get()),
- pipe_client_to_server_messages_second_(arena_.get()) {}
-
- std::vector<MessageHandle> CreateMessages(int num_of_messages) {
- std::vector<MessageHandle> messages;
- for (int i = 0; i < num_of_messages; i++) {
- SliceBuffer buffer;
- buffer.Append(
- Slice::FromCopiedString(absl::StrFormat("test message %d", i)));
- auto message = arena_->MakePooled<Message>(std::move(buffer), 0);
- messages.push_back(std::move(message));
+ pipe_server_to_client_messages_(arena_.get()),
+ pipe_server_intial_metadata_(arena_.get()),
+ pipe_client_to_server_messages_second_(arena_.get()),
+ pipe_server_to_client_messages_second_(arena_.get()),
+ pipe_server_intial_metadata_second_(arena_.get()) {}
+ // Expect how client transport will read from control/data endpoints with a
+ // test frame.
+ void AddReadExpectations(int num_of_streams) {
+ for (int i = 0; i < num_of_streams; i++) {
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence)
+ .WillOnce(WithArgs<0, 1>(
+ [this, i](absl::AnyInvocable<void(absl::Status)> on_read,
+ grpc_event_engine::experimental::SliceBuffer*
+ buffer) mutable {
+ // Construct test frame for EventEngine read: headers (15
+ // bytes), message(16 bytes), message padding (48 byte),
+ // trailers (15 bytes).
+ const std::string frame_header = {
+ static_cast<char>(0x80), // frame type = fragment
+ 0x03, // flag = has header + has trailer
+ 0x00,
+ 0x00,
+ static_cast<char>(i + 1), // stream id = 1
+ 0x00,
+ 0x00,
+ 0x00,
+ 0x1a, // header length = 26
+ 0x00,
+ 0x00,
+ 0x00,
+ 0x08, // message length = 8
+ 0x00,
+ 0x00,
+ 0x00,
+ 0x38, // message padding =56
+ 0x00,
+ 0x00,
+ 0x00,
+ 0x0f, // trailer length = 15
+ 0x00,
+ 0x00,
+ 0x00};
+ // Schedule mock_endpoint to read buffer.
+ grpc_event_engine::experimental::Slice slice(
+ grpc_slice_from_cpp_string(frame_header));
+ buffer->Append(std::move(slice));
+ // Execute read callback later to control when read starts.
+ if (i == 0) {
+ read_callback_ = std::move(on_read);
+ // Return false to mock EventEngine read not finish.
+ return false;
+ } else {
+ return true;
+ }
+ }));
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence)
+ .WillOnce(WithArgs<1>(
+ [](grpc_event_engine::experimental::SliceBuffer* buffer) {
+ // Encoded string of header ":path: /demo.Service/Step".
+ const std::string header = {
+ 0x10, 0x05, 0x3a, 0x70, 0x61, 0x74, 0x68, 0x12, 0x2f,
+ 0x64, 0x65, 0x6d, 0x6f, 0x2e, 0x53, 0x65, 0x72, 0x76,
+ 0x69, 0x63, 0x65, 0x2f, 0x53, 0x74, 0x65, 0x70};
+ // Encoded string of trailer "grpc-status: 0".
+ const std::string trailers = {0x10, 0x0b, 0x67, 0x72, 0x70,
+ 0x63, 0x2d, 0x73, 0x74, 0x61,
+ 0x74, 0x75, 0x73, 0x01, 0x30};
+ // Schedule mock_endpoint to read buffer.
+ grpc_event_engine::experimental::Slice slice(
+ grpc_slice_from_cpp_string(header + trailers));
+ buffer->Append(std::move(slice));
+ return true;
+ }));
+ }
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence)
+ .WillOnce(Return(false));
+ for (int i = 0; i < num_of_streams; i++) {
+ EXPECT_CALL(data_endpoint_, Read)
+ .InSequence(data_endpoint_sequence)
+ .WillOnce(WithArgs<1>(
+ [this](grpc_event_engine::experimental::SliceBuffer* buffer) {
+ const std::string message_padding = {
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+ grpc_event_engine::experimental::Slice slice(
+ grpc_slice_from_cpp_string(message_padding + message_));
+ buffer->Append(std::move(slice));
+ return true;
+ }));
}
- return messages;
+ }
+ // Initial ClientTransport with read expecations
+ void InitialClientTransport(int num_of_streams) {
+ // Read expectaions need to be added before transport initialization since
+ // reader_ activity loop is started in ClientTransport initialization,
+ AddReadExpectations(num_of_streams);
+ client_transport_ = std::make_unique<ClientTransport>(
+ std::make_unique<PromiseEndpoint>(
+ std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
+ SliceBuffer()),
+ std::make_unique<PromiseEndpoint>(
+ std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
+ event_engine_);
+ }
+ // Send messages from client to server.
+ auto SendClientToServerMessages(
+ Pipe<MessageHandle>& pipe_client_to_server_messages,
+ int num_of_messages) {
+ return Loop([&pipe_client_to_server_messages, num_of_messages,
+ this]() mutable {
+ bool has_message = (num_of_messages > 0);
+ return If(
+ has_message,
+ Seq(pipe_client_to_server_messages.sender.Push(
+ arena_->MakePooled<Message>()),
+ [&num_of_messages]() -> LoopCtl<absl::Status> {
+ num_of_messages--;
+ return Continue();
+ }),
+ [&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> {
+ pipe_client_to_server_messages.sender.Close();
+ return absl::OkStatus();
+ });
+ });
+ }
+ // Add stream into client transport, and expect return trailers of
+ // "grpc-status:code".
+ auto AddStream(CallArgs args, const grpc_status_code trailers) {
+ return Seq(client_transport_->AddStream(std::move(args)),
+ [trailers](ServerMetadataHandle ret) {
+ // AddStream will finish with server trailers:
+ // "grpc-status:code".
+ EXPECT_EQ(ret->get(GrpcStatusMetadata()).value(), trailers);
+ return trailers;
+ });
+ }
+ // Start read from control endpoints.
+ auto StartRead(const absl::Status& read_status) {
+ return [read_status, this] {
+ read_callback_(read_status);
+ return read_status;
+ };
+ }
+ // Receive messages from server to client.
+ auto ReceiveServerToClientMessages(
+ Pipe<ServerMetadataHandle>& pipe_server_intial_metadata,
+ Pipe<MessageHandle>& pipe_server_to_client_messages) {
+ return Seq(
+ // Receive server initial metadata.
+ Map(pipe_server_intial_metadata.receiver.Next(),
+ [](NextResult<ServerMetadataHandle> r) {
+ // Expect value: ":path: /demo.Service/Step"
+ EXPECT_TRUE(r.has_value());
+ EXPECT_EQ(
+ r.value()->get_pointer(HttpPathMetadata())->as_string_view(),
+ "/demo.Service/Step");
+ return absl::OkStatus();
+ }),
+ // Receive server to client messages.
+ Map(pipe_server_to_client_messages.receiver.Next(),
+ [this](NextResult<MessageHandle> r) {
+ EXPECT_TRUE(r.has_value());
+ EXPECT_EQ(r.value()->payload()->JoinIntoString(), message_);
+ return absl::OkStatus();
+ }),
+ [&pipe_server_intial_metadata,
+ &pipe_server_to_client_messages]() mutable {
+ // Close pipes after receive message.
+ pipe_server_to_client_messages.sender.Close();
+ pipe_server_intial_metadata.sender.Close();
+ return absl::OkStatus();
+ });
}
private:
@@ -131,97 +299,65 @@ class ClientTransportTest : public ::testing::Test {
MockEndpoint* data_endpoint_ptr_;
size_t initial_arena_size = 1024;
MemoryAllocator memory_allocator_;
+ Sequence control_endpoint_sequence;
+ Sequence data_endpoint_sequence;
protected:
MockEndpoint& control_endpoint_;
MockEndpoint& data_endpoint_;
std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
event_engine_;
- ClientTransport client_transport_;
+ std::unique_ptr<ClientTransport> client_transport_;
ScopedArenaPtr arena_;
Pipe<MessageHandle> pipe_client_to_server_messages_;
+ Pipe<MessageHandle> pipe_server_to_client_messages_;
+ Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
// Added for mutliple streams tests.
Pipe<MessageHandle> pipe_client_to_server_messages_second_;
-
- const absl::Status kDummyErrorStatus =
- absl::ErrnoToStatus(5566, "just an error");
- static constexpr size_t kDummyRequestSize = 5566u;
+ Pipe<MessageHandle> pipe_server_to_client_messages_second_;
+ Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
+ absl::AnyInvocable<void(absl::Status)> read_callback_;
+ // Added to verify received message payload.
+ const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
};
TEST_F(ClientTransportTest, AddOneStream) {
- auto messages = CreateMessages(1);
+ InitialClientTransport(1);
ClientMetadataHandle md;
- auto args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
+ auto args = CallArgs{std::move(md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).WillOnce(Return(true));
EXPECT_CALL(data_endpoint_, Write).WillOnce(Return(true));
auto activity = MakeActivity(
Seq(
- // Concurrently: send message into the pipe, and receive from the
- // pipe.
- Join(Seq(pipe_client_to_server_messages_.sender.Push(
- std::move(messages[0])),
- [this] {
- this->pipe_client_to_server_messages_.sender.Close();
- return absl::OkStatus();
- }),
- client_transport_.AddStream(std::move(args))),
- // Once complete, verify successful sending and the received value.
- [](const std::tuple<absl::Status, absl::Status>& ret) {
- EXPECT_TRUE(std::get<0>(ret).ok());
- EXPECT_TRUE(std::get<1>(ret).ok());
- return absl::OkStatus();
- }),
- InlineWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
- // Wait until ClientTransport's internal activities to finish.
- event_engine_->TickUntilIdle();
- event_engine_->UnsetGlobalHooks();
-}
-
-TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
- auto messages = CreateMessages(1);
- ClientMetadataHandle md;
- auto args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
- StrictMock<MockFunction<void(absl::Status)>> on_done;
- EXPECT_CALL(on_done, Call(absl::OkStatus()));
- EXPECT_CALL(control_endpoint_, Write)
- .WillOnce(
- WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
- on_write(this->kDummyErrorStatus);
- return false;
- }));
- EXPECT_CALL(data_endpoint_, Write)
- .WillOnce(
- WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
- on_write(this->kDummyErrorStatus);
- return false;
- }));
- auto activity = MakeActivity(
- Seq(
- // Concurrently: send message into the pipe, and receive from the
- // pipe.
- Join(Seq(pipe_client_to_server_messages_.sender.Push(
- std::move(messages[0])),
- [this] {
- this->pipe_client_to_server_messages_.sender.Close();
- return absl::OkStatus();
- }),
- client_transport_.AddStream(std::move(args))),
+ // Concurrently: write and read messages in client transport.
+ Join(
+ // Add first stream with call_args into client transport.
+ AddStream(std::move(args), GRPC_STATUS_OK),
+ // Start read from control endpoints.
+ StartRead(absl::OkStatus()),
+ // Send messages to call_args.client_to_server_messages pipe,
+ // which will be eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 1),
+ // Receive messages from control/data endpoints.
+ ReceiveServerToClientMessages(pipe_server_intial_metadata_,
+ pipe_server_to_client_messages_)),
// Once complete, verify successful sending and the received value.
- [](const std::tuple<absl::Status, absl::Status>& ret) {
- // TODO(ladynana): change these expectations to errors after the
- // writer activity closes transport for EE failures.
- EXPECT_TRUE(std::get<0>(ret).ok());
+ [](const std::tuple<grpc_status_code, absl::Status, absl::Status,
+ absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK);
EXPECT_TRUE(std::get<1>(ret).ok());
+ EXPECT_TRUE(std::get<2>(ret).ok());
+ EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
- InlineWakeupScheduler(),
+ EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@@ -229,89 +365,42 @@ TEST_F(ClientTransportTest, AddOneStreamWithEEFailed) {
}
TEST_F(ClientTransportTest, AddOneStreamMultipleMessages) {
- auto messages = CreateMessages(3);
+ InitialClientTransport(1);
ClientMetadataHandle md;
- auto args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
+ auto args = CallArgs{std::move(md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(3).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
- // Concurrently: send messages into the pipe, and receive from the
- // pipe.
- Join(Seq(pipe_client_to_server_messages_.sender.Push(
- std::move(messages[0])),
- pipe_client_to_server_messages_.sender.Push(
- std::move(messages[1])),
- pipe_client_to_server_messages_.sender.Push(
- std::move(messages[2])),
- [this] {
- this->pipe_client_to_server_messages_.sender.Close();
- return absl::OkStatus();
- }),
- client_transport_.AddStream(std::move(args))),
- // Once complete, verify successful sending and the received value.
- [](const std::tuple<absl::Status, absl::Status>& ret) {
- EXPECT_TRUE(std::get<0>(ret).ok());
- EXPECT_TRUE(std::get<1>(ret).ok());
- return absl::OkStatus();
- }),
- InlineWakeupScheduler(),
- [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
- // Wait until ClientTransport's internal activities to finish.
- event_engine_->TickUntilIdle();
- event_engine_->UnsetGlobalHooks();
-}
-
-TEST_F(ClientTransportTest, AddMultipleStreams) {
- auto messages = CreateMessages(2);
- ClientMetadataHandle md;
- auto first_stream_args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
- auto second_stream_args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
- StrictMock<MockFunction<void(absl::Status)>> on_done;
- EXPECT_CALL(on_done, Call(absl::OkStatus()));
- EXPECT_CALL(control_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
- EXPECT_CALL(data_endpoint_, Write).Times(2).WillRepeatedly(Return(true));
- auto activity = MakeActivity(
- Seq(
- // Concurrently: send messages into the pipe, and receive from the
- // pipe.
+ // Concurrently: write and read messages in client transport.
Join(
- // Send message to first stream pipe.
- Seq(pipe_client_to_server_messages_.sender.Push(
- std::move(messages[0])),
- [this] {
- pipe_client_to_server_messages_.sender.Close();
- return absl::OkStatus();
- }),
- // Send message to second stream pipe.
- Seq(pipe_client_to_server_messages_second_.sender.Push(
- std::move(messages[1])),
- [this] {
- pipe_client_to_server_messages_second_.sender.Close();
- return absl::OkStatus();
- }),
- // Receive message from first stream pipe.
- client_transport_.AddStream(std::move(first_stream_args)),
- // Receive message from second stream pipe.
- client_transport_.AddStream(std::move(second_stream_args))),
+ // Add first stream with call_args into client transport.
+ AddStream(std::move(args), GRPC_STATUS_OK),
+ // Start read from control endpoints.
+ StartRead(absl::OkStatus()),
+ // Send messages to call_args.client_to_server_messages pipe,
+ // which will be eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 3),
+ // Receive messages from control/data endpoints.
+ ReceiveServerToClientMessages(pipe_server_intial_metadata_,
+ pipe_server_to_client_messages_)),
// Once complete, verify successful sending and the received value.
- [](const std::tuple<absl::Status, absl::Status, absl::Status,
+ [](const std::tuple<grpc_status_code, absl::Status, absl::Status,
absl::Status>& ret) {
- EXPECT_TRUE(std::get<0>(ret).ok());
+ EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK);
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
- InlineWakeupScheduler(),
+ EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();
@@ -319,59 +408,63 @@ TEST_F(ClientTransportTest, AddMultipleStreams) {
}
TEST_F(ClientTransportTest, AddMultipleStreamsMultipleMessages) {
- auto messages = CreateMessages(6);
- ClientMetadataHandle md;
- auto first_stream_args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_.receiver, nullptr};
- auto second_stream_args = CallArgs{
- std::move(md), ClientInitialMetadataOutstandingToken::Empty(), nullptr,
- nullptr, &pipe_client_to_server_messages_second_.receiver, nullptr};
+ InitialClientTransport(2);
+ ClientMetadataHandle first_stream_md;
+ ClientMetadataHandle second_stream_md;
+ auto first_stream_args =
+ CallArgs{std::move(first_stream_md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
+ auto second_stream_args =
+ CallArgs{std::move(second_stream_md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_second_.sender,
+ &pipe_client_to_server_messages_second_.receiver,
+ &pipe_server_to_client_messages_second_.sender};
StrictMock<MockFunction<void(absl::Status)>> on_done;
EXPECT_CALL(on_done, Call(absl::OkStatus()));
EXPECT_CALL(control_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
EXPECT_CALL(data_endpoint_, Write).Times(6).WillRepeatedly(Return(true));
auto activity = MakeActivity(
Seq(
- // Concurrently: send messages into the pipe, and receive from the
- // pipe.
+ // Concurrently: write and read messages from client transport.
+ Join(
+ // Add first stream with call_args into client transport.
+ AddStream(std::move(first_stream_args), GRPC_STATUS_OK),
+ // Start read from control endpoints.
+ StartRead(absl::OkStatus()),
+ // Send messages to first stream's
+ // call_args.client_to_server_messages pipe, which will be
+ // eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 3),
+ // Receive first stream's messages from control/data endpoints.
+ ReceiveServerToClientMessages(pipe_server_intial_metadata_,
+ pipe_server_to_client_messages_)),
Join(
- // Send messages to first stream pipe.
- Seq(pipe_client_to_server_messages_.sender.Push(
- std::move(messages[0])),
- pipe_client_to_server_messages_.sender.Push(
- std::move(messages[1])),
- pipe_client_to_server_messages_.sender.Push(
- std::move(messages[2])),
- [this] {
- pipe_client_to_server_messages_.sender.Close();
- return absl::OkStatus();
- }),
- // Send messages to second stream pipe.
- Seq(pipe_client_to_server_messages_second_.sender.Push(
- std::move(messages[3])),
- pipe_client_to_server_messages_second_.sender.Push(
- std::move(messages[4])),
- pipe_client_to_server_messages_second_.sender.Push(
- std::move(messages[5])),
- [this] {
- pipe_client_to_server_messages_second_.sender.Close();
- return absl::OkStatus();
- }),
- // Receive messages from first stream pipe.
- client_transport_.AddStream(std::move(first_stream_args)),
- // Receive messages from second stream pipe.
- client_transport_.AddStream(std::move(second_stream_args))),
+ // Add second stream with call_args into client transport.
+ AddStream(std::move(second_stream_args), GRPC_STATUS_OK),
+ // Send messages to second stream's
+ // call_args.client_to_server_messages pipe, which will be
+ // eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_second_,
+ 3),
+ // Receive second stream's messages from control/data endpoints.
+ ReceiveServerToClientMessages(
+ pipe_server_intial_metadata_second_,
+ pipe_server_to_client_messages_second_)),
// Once complete, verify successful sending and the received value.
- [](const std::tuple<absl::Status, absl::Status, absl::Status,
- absl::Status>& ret) {
- EXPECT_TRUE(std::get<0>(ret).ok());
+ [](const std::tuple<grpc_status_code, absl::Status, absl::Status>&
+ ret) {
+ EXPECT_EQ(std::get<0>(ret), GRPC_STATUS_OK);
EXPECT_TRUE(std::get<1>(ret).ok());
EXPECT_TRUE(std::get<2>(ret).ok());
- EXPECT_TRUE(std::get<3>(ret).ok());
return absl::OkStatus();
}),
- InlineWakeupScheduler(),
+ EventEngineWakeupScheduler(event_engine_),
[&on_done](absl::Status status) { on_done.Call(std::move(status)); });
// Wait until ClientTransport's internal activities to finish.
event_engine_->TickUntilIdle();