aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornanahpang <31627465+nanahpang@users.noreply.github.com>2023-11-08 14:42:20 -0800
committerGitHub <noreply@github.com>2023-11-08 14:42:20 -0800
commit1e15d00ec4d6eba6bd20621fc72ad51d0f62b654 (patch)
tree0ce8cda3621f27d95a83a1c8a10d4f0abf2ca6aa
parente58268525a9150c5fed18b9450514892dc4cd526 (diff)
downloadgrpc-grpc-1e15d00ec4d6eba6bd20621fc72ad51d0f62b654.tar.gz
[chaotic-good] Add client transport error handling. (#34611)
This is a follow-up PR of #34191, which handles the error condition of endpoints failed to write/read in chaotic-good client transport. This PR needs to be merged after #34191. <!-- If you know who should review your pull request, please assign it to that person, otherwise the pull request would get assigned randomly. If your pull request is for a specific language, please add the appropriate lang label. -->
-rw-r--r--CMakeLists.txt44
-rw-r--r--build_autogenerated.yaml30
-rw-r--r--src/core/BUILD1
-rw-r--r--src/core/ext/transport/chaotic_good/client_transport.cc19
-rw-r--r--src/core/ext/transport/chaotic_good/client_transport.h92
-rw-r--r--src/core/lib/promise/inter_activity_pipe.h11
-rw-r--r--src/core/lib/promise/mpsc.h10
-rw-r--r--test/core/transport/chaotic_good/BUILD38
-rw-r--r--test/core/transport/chaotic_good/client_transport_error_test.cc441
-rw-r--r--tools/run_tests/generated/tests.json24
10 files changed, 669 insertions, 41 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1255395029..edd4eb1223 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -957,6 +957,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx client_ssl_test)
endif()
add_dependencies(buildtests_cxx client_streaming_test)
+ add_dependencies(buildtests_cxx client_transport_error_test)
add_dependencies(buildtests_cxx client_transport_test)
add_dependencies(buildtests_cxx cmdline_test)
add_dependencies(buildtests_cxx codegen_test_full)
@@ -9408,6 +9409,49 @@ target_link_libraries(client_streaming_test
endif()
if(gRPC_BUILD_TESTS)
+add_executable(client_transport_error_test
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h
+ ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h
+ src/core/ext/transport/chaotic_good/client_transport.cc
+ src/core/ext/transport/chaotic_good/frame.cc
+ src/core/ext/transport/chaotic_good/frame_header.cc
+ src/core/lib/transport/promise_endpoint.cc
+ test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
+ test/core/transport/chaotic_good/client_transport_error_test.cc
+)
+target_compile_features(client_transport_error_test PUBLIC cxx_std_14)
+target_include_directories(client_transport_error_test
+ PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}/include
+ ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+ ${_gRPC_RE2_INCLUDE_DIR}
+ ${_gRPC_SSL_INCLUDE_DIR}
+ ${_gRPC_UPB_GENERATED_DIR}
+ ${_gRPC_UPB_GRPC_GENERATED_DIR}
+ ${_gRPC_UPB_INCLUDE_DIR}
+ ${_gRPC_XXHASH_INCLUDE_DIR}
+ ${_gRPC_ZLIB_INCLUDE_DIR}
+ third_party/googletest/googletest/include
+ third_party/googletest/googletest
+ third_party/googletest/googlemock/include
+ third_party/googletest/googlemock
+ ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(client_transport_error_test
+ ${_gRPC_ALLTARGETS_LIBRARIES}
+ gtest
+ ${_gRPC_PROTOBUF_LIBRARIES}
+ grpc_test_util
+)
+
+
+endif()
+if(gRPC_BUILD_TESTS)
+
add_executable(client_transport_test
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc
${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index f01b7329bd..70a8757ec1 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -7093,6 +7093,36 @@ targets:
- grpc_authorization_provider
- grpc_unsecure
- grpc_test_util
+- name: client_transport_error_test
+ gtest: true
+ build: test
+ language: c++
+ headers:
+ - src/core/ext/transport/chaotic_good/client_transport.h
+ - src/core/ext/transport/chaotic_good/frame.h
+ - src/core/ext/transport/chaotic_good/frame_header.h
+ - src/core/lib/promise/detail/join_state.h
+ - src/core/lib/promise/event_engine_wakeup_scheduler.h
+ - src/core/lib/promise/inter_activity_pipe.h
+ - src/core/lib/promise/join.h
+ - src/core/lib/promise/mpsc.h
+ - src/core/lib/promise/try_join.h
+ - src/core/lib/promise/wait_set.h
+ - src/core/lib/transport/promise_endpoint.h
+ - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h
+ src:
+ - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto
+ - src/core/ext/transport/chaotic_good/client_transport.cc
+ - src/core/ext/transport/chaotic_good/frame.cc
+ - src/core/ext/transport/chaotic_good/frame_header.cc
+ - src/core/lib/transport/promise_endpoint.cc
+ - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc
+ - test/core/transport/chaotic_good/client_transport_error_test.cc
+ deps:
+ - gtest
+ - protobuf
+ - grpc_test_util
+ uses_polling: false
- name: client_transport_test
gtest: true
build: test
diff --git a/src/core/BUILD b/src/core/BUILD
index 5bfd4ad159..901806791d 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -6227,6 +6227,7 @@ grpc_cc_library(
"arena",
"chaotic_good_frame",
"chaotic_good_frame_header",
+ "context",
"event_engine_wakeup_scheduler",
"for_each",
"grpc_promise_endpoint",
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index 7049226881..686a024827 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -63,6 +63,7 @@ ClientTransport::ClientTransport(
ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
"client_transport")),
arena_(MakeScopedArena(1024, &memory_allocator_)),
+ context_(arena_.get()),
event_engine_(event_engine) {
auto write_loop = Loop([this] {
return TrySeq(
@@ -111,11 +112,10 @@ ClientTransport::ClientTransport(
writer_ = MakeActivity(
// Continuously write next outgoing frames to promise endpoints.
std::move(write_loop), EventEngineWakeupScheduler(event_engine_),
- [](absl::Status status) {
- GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
- status.code() == absl::StatusCode::kInternal);
- // TODO(ladynana): handle the promise endpoint write failures with
- // outgoing_frames.close() once available.
+ [this](absl::Status status) {
+ if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
+ this->AbortWithError();
+ }
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
@@ -176,11 +176,10 @@ ClientTransport::ClientTransport(
reader_ = MakeActivity(
// Continuously read next incoming frames from promise endpoints.
std::move(read_loop), EventEngineWakeupScheduler(event_engine_),
- [](absl::Status status) {
- GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
- status.code() == absl::StatusCode::kInternal);
- // TODO(ladynana): handle the promise endpoint read failures with
- // iterating stream_map_ and close all the pipes once available.
+ [this](absl::Status status) {
+ if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
+ this->AbortWithError();
+ }
},
// Hold Arena in activity for GetContext<Arena> usage.
arena_.get());
diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h
index a2110f1192..4972630cf9 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.h
+++ b/src/core/ext/transport/chaotic_good/client_transport.h
@@ -34,7 +34,6 @@
#include <grpc/event_engine/event_engine.h>
#include <grpc/event_engine/memory_allocator.h>
-#include <grpc/support/log.h>
#include "src/core/ext/transport/chaotic_good/frame.h"
#include "src/core/ext/transport/chaotic_good/frame_header.h"
@@ -42,6 +41,7 @@
#include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/context.h"
#include "src/core/lib/promise/for_each.h"
#include "src/core/lib/promise/if.h"
#include "src/core/lib/promise/inter_activity_pipe.h"
@@ -75,6 +75,19 @@ class ClientTransport {
reader_.reset();
}
}
+ void AbortWithError() {
+ // Mark transport as unavailable when the endpoint write/read failed.
+ // Close all the available pipes.
+ if (!outgoing_frames_.IsClosed()) {
+ outgoing_frames_.MarkClosed();
+ }
+ MutexLock lock(&mu_);
+ for (const auto& pair : stream_map_) {
+ if (!pair.second->IsClose()) {
+ pair.second->MarkClose();
+ }
+ }
+ }
auto AddStream(CallArgs call_args) {
// At this point, the connection is set up.
// Start sending data frames.
@@ -119,8 +132,11 @@ class ClientTransport {
outgoing_frames.Send(ClientFrame(std::move(frame))),
[](bool success) -> absl::Status {
if (!success) {
- return absl::InternalError(
- "Send frame to outgoing_frames failed.");
+ // TODO(ladynana): propagate the actual error message
+ // from EventEngine.
+ return absl::UnavailableError(
+ "Transport closed due to endpoint write/read "
+ "failed.");
}
return absl::OkStatus();
});
@@ -137,38 +153,51 @@ class ClientTransport {
// Save incomming frame results to call_args.
[server_initial_metadata, server_to_client_messages](
absl::optional<ServerFrame> server_frame) mutable {
- GPR_ASSERT(server_frame.has_value());
- auto frame = std::move(
- absl::get<ServerFragmentFrame>(*server_frame));
+ bool transport_closed = false;
+ ServerFragmentFrame frame;
+ if (!server_frame.has_value()) {
+ // Incoming server frame pipe is closed, which only
+ // happens when transport is aborted.
+ transport_closed = true;
+ } else {
+ frame = std::move(
+ absl::get<ServerFragmentFrame>(*server_frame));
+ };
bool has_headers = (frame.headers != nullptr);
bool has_message = (frame.message != nullptr);
bool has_trailers = (frame.trailers != nullptr);
return TrySeq(
- If(
- has_headers,
- [server_initial_metadata,
- headers = std::move(frame.headers)]() mutable {
- return server_initial_metadata->Push(
- std::move(headers));
- },
- [] { return false; }),
- If(
- has_message,
- [server_to_client_messages,
- message = std::move(frame.message)]() mutable {
- return server_to_client_messages->Push(
- std::move(message));
- },
- [] { return false; }),
- If(
- has_trailers,
- [trailers = std::move(frame.trailers)]() mutable
- -> LoopCtl<ServerMetadataHandle> {
- return std::move(trailers);
- },
- []() -> LoopCtl<ServerMetadataHandle> {
- return Continue();
- }));
+ If((!transport_closed) && has_headers,
+ [server_initial_metadata,
+ headers = std::move(frame.headers)]() mutable {
+ return server_initial_metadata->Push(
+ std::move(headers));
+ },
+ [] { return false; }),
+ If((!transport_closed) && has_message,
+ [server_to_client_messages,
+ message = std::move(frame.message)]() mutable {
+ return server_to_client_messages->Push(
+ std::move(message));
+ },
+ [] { return false; }),
+ If((!transport_closed) && has_trailers,
+ [trailers = std::move(frame.trailers)]() mutable
+ -> LoopCtl<ServerMetadataHandle> {
+ return std::move(trailers);
+ },
+ [transport_closed]()
+ -> LoopCtl<ServerMetadataHandle> {
+ if (transport_closed) {
+ // TODO(ladynana): propagate the actual error
+ // message from EventEngine.
+ return ServerMetadataFromStatus(
+ absl::UnavailableError(
+ "Transport closed due to endpoint "
+ "write/read failed."));
+ }
+ return Continue();
+ }));
});
})),
[](std::tuple<Empty, ServerMetadataHandle> ret) {
@@ -204,6 +233,7 @@ class ClientTransport {
std::shared_ptr<FrameHeader> frame_header_;
MemoryAllocator memory_allocator_;
ScopedArenaPtr arena_;
+ promise_detail::Context<Arena> context_;
// Use to synchronize writer_ and reader_ activity with outside activities;
std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
};
diff --git a/src/core/lib/promise/inter_activity_pipe.h b/src/core/lib/promise/inter_activity_pipe.h
index e90a61192b..a7594fb26a 100644
--- a/src/core/lib/promise/inter_activity_pipe.h
+++ b/src/core/lib/promise/inter_activity_pipe.h
@@ -83,6 +83,11 @@ class InterActivityPipe {
on_available.Wakeup();
}
+ bool IsClosed() {
+ MutexLock lock(&mu_);
+ return closed_;
+ }
+
private:
Mutex mu_;
std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_);
@@ -108,6 +113,12 @@ class InterActivityPipe {
if (center_ != nullptr) center_->MarkClosed();
}
+ bool IsClose() { return center_->IsClosed(); }
+
+ void MarkClose() {
+ if (center_ != nullptr) center_->MarkClosed();
+ }
+
auto Push(T value) {
return [center = center_, value = std::move(value)]() mutable {
return center->Push(value);
diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h
index 8525739068..c12544282f 100644
--- a/src/core/lib/promise/mpsc.h
+++ b/src/core/lib/promise/mpsc.h
@@ -107,6 +107,12 @@ class Center : public RefCounted<Center<T>> {
receiver_closed_ = true;
}
+ // Return whether the receiver is closed.
+ bool IsClosed() {
+ MutexLock lock(&mu_);
+ return receiver_closed_;
+ }
+
private:
Mutex mu_;
const size_t max_queued_;
@@ -164,6 +170,10 @@ class MpscReceiver {
~MpscReceiver() {
if (center_ != nullptr) center_->ReceiverClosed();
}
+ bool IsClosed() { return center_->IsClosed(); }
+ void MarkClosed() {
+ if (center_ != nullptr) center_->ReceiverClosed();
+ }
MpscReceiver(const MpscReceiver&) = delete;
MpscReceiver& operator=(const MpscReceiver&) = delete;
// Only movable until it's first polled, and so we don't need to contend with
diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD
index 036c894e32..fc698aeec4 100644
--- a/test/core/transport/chaotic_good/BUILD
+++ b/test/core/transport/chaotic_good/BUILD
@@ -118,3 +118,41 @@ grpc_cc_test(
"//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
],
)
+
+grpc_cc_test(
+ name = "client_transport_error_test",
+ srcs = ["client_transport_error_test.cc"],
+ external_deps = [
+ "absl/functional:any_invocable",
+ "absl/status",
+ "absl/status:statusor",
+ "absl/strings:str_format",
+ "absl/types:optional",
+ "gtest",
+ ],
+ language = "C++",
+ uses_event_engine = False,
+ uses_polling = False,
+ deps = [
+ "//:grpc",
+ "//:grpc_public_hdrs",
+ "//:iomgr_timer",
+ "//:ref_counted_ptr",
+ "//src/core:activity",
+ "//src/core:arena",
+ "//src/core:chaotic_good_client_transport",
+ "//src/core:event_engine_wakeup_scheduler",
+ "//src/core:grpc_promise_endpoint",
+ "//src/core:if",
+ "//src/core:join",
+ "//src/core:loop",
+ "//src/core:memory_quota",
+ "//src/core:pipe",
+ "//src/core:resource_quota",
+ "//src/core:seq",
+ "//src/core:slice",
+ "//src/core:slice_buffer",
+ "//test/core/event_engine/fuzzing_event_engine",
+ "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto",
+ ],
+)
diff --git a/test/core/transport/chaotic_good/client_transport_error_test.cc b/test/core/transport/chaotic_good/client_transport_error_test.cc
new file mode 100644
index 0000000000..3b30c4ca33
--- /dev/null
+++ b/test/core/transport/chaotic_good/client_transport_error_test.cc
@@ -0,0 +1,441 @@
+// Copyright 2023 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "absl/status/status.h"
+
+#include "src/core/ext/transport/chaotic_good/client_transport.h"
+#include "src/core/lib/transport/promise_endpoint.h"
+#include "src/core/lib/transport/transport.h"
+
+// IWYU pragma: no_include <sys/socket.h>
+
+#include <stddef.h>
+
+#include <algorithm> // IWYU pragma: keep
+#include <memory>
+#include <string> // IWYU pragma: keep
+#include <tuple>
+#include <utility>
+#include <vector> // IWYU pragma: keep
+
+#include "absl/functional/any_invocable.h"
+#include "absl/status/statusor.h" // IWYU pragma: keep
+#include "absl/strings/str_format.h" // IWYU pragma: keep
+#include "absl/types/optional.h" // IWYU pragma: keep
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include <grpc/event_engine/event_engine.h>
+#include <grpc/event_engine/memory_allocator.h>
+#include <grpc/event_engine/slice.h> // IWYU pragma: keep
+#include <grpc/event_engine/slice_buffer.h>
+#include <grpc/grpc.h>
+#include <grpc/status.h> // IWYU pragma: keep
+
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/timer_manager.h"
+#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
+#include "src/core/lib/promise/if.h"
+#include "src/core/lib/promise/join.h"
+#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/pipe.h"
+#include "src/core/lib/promise/seq.h"
+#include "src/core/lib/resource_quota/arena.h"
+#include "src/core/lib/resource_quota/memory_quota.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
+#include "src/core/lib/slice/slice_buffer.h"
+#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep
+#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep
+#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
+#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
+
+using testing::MockFunction;
+using testing::Return;
+using testing::Sequence;
+using testing::StrictMock;
+using testing::WithArgs;
+
+namespace grpc_core {
+namespace chaotic_good {
+namespace testing {
+
+class MockEndpoint
+ : public grpc_event_engine::experimental::EventEngine::Endpoint {
+ public:
+ MOCK_METHOD(
+ bool, Read,
+ (absl::AnyInvocable<void(absl::Status)> on_read,
+ grpc_event_engine::experimental::SliceBuffer* buffer,
+ const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
+ args),
+ (override));
+
+ MOCK_METHOD(
+ bool, Write,
+ (absl::AnyInvocable<void(absl::Status)> on_writable,
+ grpc_event_engine::experimental::SliceBuffer* data,
+ const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
+ args),
+ (override));
+
+ MOCK_METHOD(
+ const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
+ GetPeerAddress, (), (const, override));
+ MOCK_METHOD(
+ const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
+ GetLocalAddress, (), (const, override));
+};
+
+class ClientTransportTest : public ::testing::Test {
+ public:
+ ClientTransportTest()
+ : control_endpoint_ptr_(new StrictMock<MockEndpoint>()),
+ data_endpoint_ptr_(new StrictMock<MockEndpoint>()),
+ memory_allocator_(
+ ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator(
+ "test")),
+ control_endpoint_(*control_endpoint_ptr_),
+ data_endpoint_(*data_endpoint_ptr_),
+ event_engine_(std::make_shared<
+ grpc_event_engine::experimental::FuzzingEventEngine>(
+ []() {
+ grpc_timer_manager_set_threading(false);
+ grpc_event_engine::experimental::FuzzingEventEngine::Options
+ options;
+ return options;
+ }(),
+ fuzzing_event_engine::Actions())),
+ arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)),
+ pipe_client_to_server_messages_(arena_.get()),
+ pipe_server_to_client_messages_(arena_.get()),
+ pipe_server_intial_metadata_(arena_.get()),
+ pipe_client_to_server_messages_second_(arena_.get()),
+ pipe_server_to_client_messages_second_(arena_.get()),
+ pipe_server_intial_metadata_second_(arena_.get()) {}
+ // Initial ClientTransport with read expecations
+ void InitialClientTransport() {
+ client_transport_ = std::make_unique<ClientTransport>(
+ std::make_unique<PromiseEndpoint>(
+ std::unique_ptr<MockEndpoint>(control_endpoint_ptr_),
+ SliceBuffer()),
+ std::make_unique<PromiseEndpoint>(
+ std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()),
+ event_engine_);
+ }
+ // Send messages from client to server.
+ auto SendClientToServerMessages(
+ Pipe<MessageHandle>& pipe_client_to_server_messages,
+ int num_of_messages) {
+ return Loop([&pipe_client_to_server_messages, num_of_messages,
+ this]() mutable {
+ bool has_message = (num_of_messages > 0);
+ return If(
+ has_message,
+ Seq(pipe_client_to_server_messages.sender.Push(
+ arena_->MakePooled<Message>()),
+ [&num_of_messages]() -> LoopCtl<absl::Status> {
+ num_of_messages--;
+ return Continue();
+ }),
+ [&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> {
+ pipe_client_to_server_messages.sender.Close();
+ return absl::OkStatus();
+ });
+ });
+ }
+ // Add stream into client transport, and expect return trailers of
+ // "grpc-status:code".
+ auto AddStream(CallArgs args) {
+ return client_transport_->AddStream(std::move(args));
+ }
+
+ private:
+ MockEndpoint* control_endpoint_ptr_;
+ MockEndpoint* data_endpoint_ptr_;
+ size_t initial_arena_size = 1024;
+ MemoryAllocator memory_allocator_;
+
+ protected:
+ MockEndpoint& control_endpoint_;
+ MockEndpoint& data_endpoint_;
+ std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
+ event_engine_;
+ std::unique_ptr<ClientTransport> client_transport_;
+ ScopedArenaPtr arena_;
+ Pipe<MessageHandle> pipe_client_to_server_messages_;
+ Pipe<MessageHandle> pipe_server_to_client_messages_;
+ Pipe<ServerMetadataHandle> pipe_server_intial_metadata_;
+ // Added for mutliple streams tests.
+ Pipe<MessageHandle> pipe_client_to_server_messages_second_;
+ Pipe<MessageHandle> pipe_server_to_client_messages_second_;
+ Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_;
+ absl::AnyInvocable<void(absl::Status)> read_callback_;
+ Sequence control_endpoint_sequence_;
+ Sequence data_endpoint_sequence_;
+ // Added to verify received message payload.
+ const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
+};
+
+TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) {
+ // Mock write failed and read is pending.
+ EXPECT_CALL(control_endpoint_, Write)
+ .WillOnce(
+ WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+ on_write(absl::InternalError("control endpoint write failed."));
+ return false;
+ }));
+ EXPECT_CALL(data_endpoint_, Write)
+ .WillOnce(
+ WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+ on_write(absl::InternalError("data endpoint write failed."));
+ return false;
+ }));
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence_)
+ .WillOnce(Return(false));
+ InitialClientTransport();
+ ClientMetadataHandle md;
+ auto args = CallArgs{std::move(md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
+ StrictMock<MockFunction<void(absl::Status)>> on_done;
+ EXPECT_CALL(on_done, Call(absl::OkStatus()));
+ auto activity = MakeActivity(
+ Seq(
+ // Concurrently: write and read messages in client transport.
+ Join(
+ // Add first stream with call_args into client transport.
+ // Expect return trailers "grpc-status:unavailable".
+ AddStream(std::move(args)),
+ // Send messages to call_args.client_to_server_messages pipe,
+ // which will be eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+ // Once complete, verify successful sending and the received value.
+ [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+ GRPC_STATUS_UNAVAILABLE);
+ EXPECT_TRUE(std::get<1>(ret).ok());
+ return absl::OkStatus();
+ }),
+ EventEngineWakeupScheduler(event_engine_),
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ // Wait until ClientTransport's internal activities to finish.
+ event_engine_->TickUntilIdle();
+ event_engine_->UnsetGlobalHooks();
+}
+
+TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) {
+ // Mock read failed.
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence_)
+ .WillOnce(WithArgs<0>(
+ [](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
+ on_read(absl::InternalError("control endpoint read failed."));
+ // Return false to mock EventEngine read not finish.
+ return false;
+ }));
+ InitialClientTransport();
+ ClientMetadataHandle md;
+ auto args = CallArgs{std::move(md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
+ StrictMock<MockFunction<void(absl::Status)>> on_done;
+ EXPECT_CALL(on_done, Call(absl::OkStatus()));
+ auto activity = MakeActivity(
+ Seq(
+ // Concurrently: write and read messages in client transport.
+ Join(
+ // Add first stream with call_args into client transport.
+ // Expect return trailers "grpc-status:unavailable".
+ AddStream(std::move(args)),
+ // Send messages to call_args.client_to_server_messages pipe.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+ // Once complete, verify successful sending and the received value.
+ [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+ GRPC_STATUS_UNAVAILABLE);
+ EXPECT_TRUE(std::get<1>(ret).ok());
+ return absl::OkStatus();
+ }),
+ EventEngineWakeupScheduler(event_engine_),
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ // Wait until ClientTransport's internal activities to finish.
+ event_engine_->TickUntilIdle();
+ event_engine_->UnsetGlobalHooks();
+}
+
+TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) {
+ // Mock write failed at first stream and second stream's write will fail too.
+ EXPECT_CALL(control_endpoint_, Write)
+ .Times(1)
+ .WillRepeatedly(
+ WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+ on_write(absl::InternalError("control endpoint write failed."));
+ return false;
+ }));
+ EXPECT_CALL(data_endpoint_, Write)
+ .Times(1)
+ .WillRepeatedly(
+ WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) {
+ on_write(absl::InternalError("data endpoint write failed."));
+ return false;
+ }));
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence_)
+ .WillOnce(Return(false));
+ InitialClientTransport();
+ ClientMetadataHandle first_stream_md;
+ ClientMetadataHandle second_stream_md;
+ auto first_stream_args =
+ CallArgs{std::move(first_stream_md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
+ auto second_stream_args =
+ CallArgs{std::move(second_stream_md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_second_.sender,
+ &pipe_client_to_server_messages_second_.receiver,
+ &pipe_server_to_client_messages_second_.sender};
+ StrictMock<MockFunction<void(absl::Status)>> on_done;
+ EXPECT_CALL(on_done, Call(absl::OkStatus()));
+ auto activity = MakeActivity(
+ Seq(
+ // Concurrently: write and read messages from client transport.
+ Join(
+ // Add first stream with call_args into client transport.
+ // Expect return trailers "grpc-status:unavailable".
+ AddStream(std::move(first_stream_args)),
+ // Send messages to first stream's
+ // call_args.client_to_server_messages pipe.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+ // Once complete, verify successful sending and the received value.
+ [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+ GRPC_STATUS_UNAVAILABLE);
+ EXPECT_TRUE(std::get<1>(ret).ok());
+ return absl::OkStatus();
+ },
+ Join(
+ // Add second stream with call_args into client transport.
+ // Expect return trailers "grpc-status:unavailable".
+ AddStream(std::move(second_stream_args)),
+ // Send messages to second stream's
+ // call_args.client_to_server_messages pipe.
+ SendClientToServerMessages(pipe_client_to_server_messages_second_,
+ 1)),
+ // Once complete, verify successful sending and the received value.
+ [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+ GRPC_STATUS_UNAVAILABLE);
+ EXPECT_TRUE(std::get<1>(ret).ok());
+ return absl::OkStatus();
+ }),
+ EventEngineWakeupScheduler(event_engine_),
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ // Wait until ClientTransport's internal activities to finish.
+ event_engine_->TickUntilIdle();
+ event_engine_->UnsetGlobalHooks();
+}
+
+TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) {
+ // Mock read failed at first stream, and second stream's write will fail too.
+ EXPECT_CALL(control_endpoint_, Read)
+ .InSequence(control_endpoint_sequence_)
+ .WillOnce(WithArgs<0>(
+ [](absl::AnyInvocable<void(absl::Status)> on_read) mutable {
+ on_read(absl::InternalError("control endpoint read failed."));
+ // Return false to mock EventEngine read not finish.
+ return false;
+ }));
+ InitialClientTransport();
+ ClientMetadataHandle first_stream_md;
+ ClientMetadataHandle second_stream_md;
+ auto first_stream_args =
+ CallArgs{std::move(first_stream_md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_.sender,
+ &pipe_client_to_server_messages_.receiver,
+ &pipe_server_to_client_messages_.sender};
+ auto second_stream_args =
+ CallArgs{std::move(second_stream_md),
+ ClientInitialMetadataOutstandingToken::Empty(),
+ nullptr,
+ &pipe_server_intial_metadata_second_.sender,
+ &pipe_client_to_server_messages_second_.receiver,
+ &pipe_server_to_client_messages_second_.sender};
+ StrictMock<MockFunction<void(absl::Status)>> on_done;
+ EXPECT_CALL(on_done, Call(absl::OkStatus()));
+ auto activity = MakeActivity(
+ Seq(
+ // Concurrently: write and read messages from client transport.
+ Join(
+ // Add first stream with call_args into client transport.
+ AddStream(std::move(first_stream_args)),
+ // Send messages to first stream's
+ // call_args.client_to_server_messages pipe, which will be
+ // eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_, 1)),
+ // Once complete, verify successful sending and the received value.
+ [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+ GRPC_STATUS_UNAVAILABLE);
+ EXPECT_TRUE(std::get<1>(ret).ok());
+ return absl::OkStatus();
+ },
+ Join(
+ // Add second stream with call_args into client transport.
+ AddStream(std::move(second_stream_args)),
+ // Send messages to second stream's
+ // call_args.client_to_server_messages pipe, which will be
+ // eventually sent to control/data endpoints.
+ SendClientToServerMessages(pipe_client_to_server_messages_second_,
+ 1)),
+ // Once complete, verify successful sending and the received value.
+ [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) {
+ EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(),
+ GRPC_STATUS_UNAVAILABLE);
+ EXPECT_TRUE(std::get<1>(ret).ok());
+ return absl::OkStatus();
+ }),
+ EventEngineWakeupScheduler(event_engine_),
+ [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
+ // Wait until ClientTransport's internal activities to finish.
+ event_engine_->TickUntilIdle();
+ event_engine_->UnsetGlobalHooks();
+}
+
+} // namespace testing
+} // namespace chaotic_good
+} // namespace grpc_core
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ // Must call to create default EventEngine.
+ grpc_init();
+ int ret = RUN_ALL_TESTS();
+ grpc_shutdown();
+ return ret;
+}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 0050efad85..6b389d2da6 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -2220,6 +2220,30 @@
"flaky": false,
"gtest": true,
"language": "c++",
+ "name": "client_transport_error_test",
+ "platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "uses_polling": false
+ },
+ {
+ "args": [],
+ "benchmark": false,
+ "ci_platforms": [
+ "linux",
+ "mac",
+ "posix",
+ "windows"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [],
+ "flaky": false,
+ "gtest": true,
+ "language": "c++",
"name": "client_transport_test",
"platforms": [
"linux",