diff options
author | nanahpang <31627465+nanahpang@users.noreply.github.com> | 2023-11-08 14:42:20 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-08 14:42:20 -0800 |
commit | 1e15d00ec4d6eba6bd20621fc72ad51d0f62b654 (patch) | |
tree | 0ce8cda3621f27d95a83a1c8a10d4f0abf2ca6aa | |
parent | e58268525a9150c5fed18b9450514892dc4cd526 (diff) | |
download | grpc-grpc-1e15d00ec4d6eba6bd20621fc72ad51d0f62b654.tar.gz |
[chaotic-good] Add client transport error handling. (#34611)
This is a follow-up PR of #34191, which handles the error condition of
endpoints failed to write/read in chaotic-good client transport.
This PR needs to be merged after #34191.
<!--
If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.
If your pull request is for a specific language, please add the
appropriate
lang label.
-->
-rw-r--r-- | CMakeLists.txt | 44 | ||||
-rw-r--r-- | build_autogenerated.yaml | 30 | ||||
-rw-r--r-- | src/core/BUILD | 1 | ||||
-rw-r--r-- | src/core/ext/transport/chaotic_good/client_transport.cc | 19 | ||||
-rw-r--r-- | src/core/ext/transport/chaotic_good/client_transport.h | 92 | ||||
-rw-r--r-- | src/core/lib/promise/inter_activity_pipe.h | 11 | ||||
-rw-r--r-- | src/core/lib/promise/mpsc.h | 10 | ||||
-rw-r--r-- | test/core/transport/chaotic_good/BUILD | 38 | ||||
-rw-r--r-- | test/core/transport/chaotic_good/client_transport_error_test.cc | 441 | ||||
-rw-r--r-- | tools/run_tests/generated/tests.json | 24 |
10 files changed, 669 insertions, 41 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 1255395029..edd4eb1223 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -957,6 +957,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx client_ssl_test) endif() add_dependencies(buildtests_cxx client_streaming_test) + add_dependencies(buildtests_cxx client_transport_error_test) add_dependencies(buildtests_cxx client_transport_test) add_dependencies(buildtests_cxx cmdline_test) add_dependencies(buildtests_cxx codegen_test_full) @@ -9408,6 +9409,49 @@ target_link_libraries(client_streaming_test endif() if(gRPC_BUILD_TESTS) +add_executable(client_transport_error_test + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h + ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.h + src/core/ext/transport/chaotic_good/client_transport.cc + src/core/ext/transport/chaotic_good/frame.cc + src/core/ext/transport/chaotic_good/frame_header.cc + src/core/lib/transport/promise_endpoint.cc + test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + test/core/transport/chaotic_good/client_transport_error_test.cc +) +target_compile_features(client_transport_error_test PUBLIC cxx_std_14) +target_include_directories(client_transport_error_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(client_transport_error_test + ${_gRPC_ALLTARGETS_LIBRARIES} + gtest + ${_gRPC_PROTOBUF_LIBRARIES} + grpc_test_util +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(client_transport_test ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.cc ${_gRPC_PROTO_GENS_DIR}/test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.grpc.pb.cc diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index f01b7329bd..70a8757ec1 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -7093,6 +7093,36 @@ targets: - grpc_authorization_provider - grpc_unsecure - grpc_test_util +- name: client_transport_error_test + gtest: true + build: test + language: c++ + headers: + - src/core/ext/transport/chaotic_good/client_transport.h + - src/core/ext/transport/chaotic_good/frame.h + - src/core/ext/transport/chaotic_good/frame_header.h + - src/core/lib/promise/detail/join_state.h + - src/core/lib/promise/event_engine_wakeup_scheduler.h + - src/core/lib/promise/inter_activity_pipe.h + - src/core/lib/promise/join.h + - src/core/lib/promise/mpsc.h + - src/core/lib/promise/try_join.h + - src/core/lib/promise/wait_set.h + - src/core/lib/transport/promise_endpoint.h + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h + src: + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.proto + - src/core/ext/transport/chaotic_good/client_transport.cc + - src/core/ext/transport/chaotic_good/frame.cc + - src/core/ext/transport/chaotic_good/frame_header.cc + - src/core/lib/transport/promise_endpoint.cc + - test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.cc + - test/core/transport/chaotic_good/client_transport_error_test.cc + deps: + - gtest + - protobuf + - grpc_test_util + uses_polling: false - name: client_transport_test gtest: true build: test diff --git a/src/core/BUILD b/src/core/BUILD index 5bfd4ad159..901806791d 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -6227,6 +6227,7 @@ grpc_cc_library( "arena", "chaotic_good_frame", "chaotic_good_frame_header", + "context", "event_engine_wakeup_scheduler", "for_each", "grpc_promise_endpoint", diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 7049226881..686a024827 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -63,6 +63,7 @@ ClientTransport::ClientTransport( ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( "client_transport")), arena_(MakeScopedArena(1024, &memory_allocator_)), + context_(arena_.get()), event_engine_(event_engine) { auto write_loop = Loop([this] { return TrySeq( @@ -111,11 +112,10 @@ ClientTransport::ClientTransport( writer_ = MakeActivity( // Continuously write next outgoing frames to promise endpoints. std::move(write_loop), EventEngineWakeupScheduler(event_engine_), - [](absl::Status status) { - GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || - status.code() == absl::StatusCode::kInternal); - // TODO(ladynana): handle the promise endpoint write failures with - // outgoing_frames.close() once available. + [this](absl::Status status) { + if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { + this->AbortWithError(); + } }, // Hold Arena in activity for GetContext<Arena> usage. arena_.get()); @@ -176,11 +176,10 @@ ClientTransport::ClientTransport( reader_ = MakeActivity( // Continuously read next incoming frames from promise endpoints. std::move(read_loop), EventEngineWakeupScheduler(event_engine_), - [](absl::Status status) { - GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || - status.code() == absl::StatusCode::kInternal); - // TODO(ladynana): handle the promise endpoint read failures with - // iterating stream_map_ and close all the pipes once available. + [this](absl::Status status) { + if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { + this->AbortWithError(); + } }, // Hold Arena in activity for GetContext<Arena> usage. arena_.get()); diff --git a/src/core/ext/transport/chaotic_good/client_transport.h b/src/core/ext/transport/chaotic_good/client_transport.h index a2110f1192..4972630cf9 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.h +++ b/src/core/ext/transport/chaotic_good/client_transport.h @@ -34,7 +34,6 @@ #include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/memory_allocator.h> -#include <grpc/support/log.h> #include "src/core/ext/transport/chaotic_good/frame.h" #include "src/core/ext/transport/chaotic_good/frame_header.h" @@ -42,6 +41,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_parser.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/context.h" #include "src/core/lib/promise/for_each.h" #include "src/core/lib/promise/if.h" #include "src/core/lib/promise/inter_activity_pipe.h" @@ -75,6 +75,19 @@ class ClientTransport { reader_.reset(); } } + void AbortWithError() { + // Mark transport as unavailable when the endpoint write/read failed. + // Close all the available pipes. + if (!outgoing_frames_.IsClosed()) { + outgoing_frames_.MarkClosed(); + } + MutexLock lock(&mu_); + for (const auto& pair : stream_map_) { + if (!pair.second->IsClose()) { + pair.second->MarkClose(); + } + } + } auto AddStream(CallArgs call_args) { // At this point, the connection is set up. // Start sending data frames. @@ -119,8 +132,11 @@ class ClientTransport { outgoing_frames.Send(ClientFrame(std::move(frame))), [](bool success) -> absl::Status { if (!success) { - return absl::InternalError( - "Send frame to outgoing_frames failed."); + // TODO(ladynana): propagate the actual error message + // from EventEngine. + return absl::UnavailableError( + "Transport closed due to endpoint write/read " + "failed."); } return absl::OkStatus(); }); @@ -137,38 +153,51 @@ class ClientTransport { // Save incomming frame results to call_args. [server_initial_metadata, server_to_client_messages]( absl::optional<ServerFrame> server_frame) mutable { - GPR_ASSERT(server_frame.has_value()); - auto frame = std::move( - absl::get<ServerFragmentFrame>(*server_frame)); + bool transport_closed = false; + ServerFragmentFrame frame; + if (!server_frame.has_value()) { + // Incoming server frame pipe is closed, which only + // happens when transport is aborted. + transport_closed = true; + } else { + frame = std::move( + absl::get<ServerFragmentFrame>(*server_frame)); + }; bool has_headers = (frame.headers != nullptr); bool has_message = (frame.message != nullptr); bool has_trailers = (frame.trailers != nullptr); return TrySeq( - If( - has_headers, - [server_initial_metadata, - headers = std::move(frame.headers)]() mutable { - return server_initial_metadata->Push( - std::move(headers)); - }, - [] { return false; }), - If( - has_message, - [server_to_client_messages, - message = std::move(frame.message)]() mutable { - return server_to_client_messages->Push( - std::move(message)); - }, - [] { return false; }), - If( - has_trailers, - [trailers = std::move(frame.trailers)]() mutable - -> LoopCtl<ServerMetadataHandle> { - return std::move(trailers); - }, - []() -> LoopCtl<ServerMetadataHandle> { - return Continue(); - })); + If((!transport_closed) && has_headers, + [server_initial_metadata, + headers = std::move(frame.headers)]() mutable { + return server_initial_metadata->Push( + std::move(headers)); + }, + [] { return false; }), + If((!transport_closed) && has_message, + [server_to_client_messages, + message = std::move(frame.message)]() mutable { + return server_to_client_messages->Push( + std::move(message)); + }, + [] { return false; }), + If((!transport_closed) && has_trailers, + [trailers = std::move(frame.trailers)]() mutable + -> LoopCtl<ServerMetadataHandle> { + return std::move(trailers); + }, + [transport_closed]() + -> LoopCtl<ServerMetadataHandle> { + if (transport_closed) { + // TODO(ladynana): propagate the actual error + // message from EventEngine. + return ServerMetadataFromStatus( + absl::UnavailableError( + "Transport closed due to endpoint " + "write/read failed.")); + } + return Continue(); + })); }); })), [](std::tuple<Empty, ServerMetadataHandle> ret) { @@ -204,6 +233,7 @@ class ClientTransport { std::shared_ptr<FrameHeader> frame_header_; MemoryAllocator memory_allocator_; ScopedArenaPtr arena_; + promise_detail::Context<Arena> context_; // Use to synchronize writer_ and reader_ activity with outside activities; std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; }; diff --git a/src/core/lib/promise/inter_activity_pipe.h b/src/core/lib/promise/inter_activity_pipe.h index e90a61192b..a7594fb26a 100644 --- a/src/core/lib/promise/inter_activity_pipe.h +++ b/src/core/lib/promise/inter_activity_pipe.h @@ -83,6 +83,11 @@ class InterActivityPipe { on_available.Wakeup(); } + bool IsClosed() { + MutexLock lock(&mu_); + return closed_; + } + private: Mutex mu_; std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_); @@ -108,6 +113,12 @@ class InterActivityPipe { if (center_ != nullptr) center_->MarkClosed(); } + bool IsClose() { return center_->IsClosed(); } + + void MarkClose() { + if (center_ != nullptr) center_->MarkClosed(); + } + auto Push(T value) { return [center = center_, value = std::move(value)]() mutable { return center->Push(value); diff --git a/src/core/lib/promise/mpsc.h b/src/core/lib/promise/mpsc.h index 8525739068..c12544282f 100644 --- a/src/core/lib/promise/mpsc.h +++ b/src/core/lib/promise/mpsc.h @@ -107,6 +107,12 @@ class Center : public RefCounted<Center<T>> { receiver_closed_ = true; } + // Return whether the receiver is closed. + bool IsClosed() { + MutexLock lock(&mu_); + return receiver_closed_; + } + private: Mutex mu_; const size_t max_queued_; @@ -164,6 +170,10 @@ class MpscReceiver { ~MpscReceiver() { if (center_ != nullptr) center_->ReceiverClosed(); } + bool IsClosed() { return center_->IsClosed(); } + void MarkClosed() { + if (center_ != nullptr) center_->ReceiverClosed(); + } MpscReceiver(const MpscReceiver&) = delete; MpscReceiver& operator=(const MpscReceiver&) = delete; // Only movable until it's first polled, and so we don't need to contend with diff --git a/test/core/transport/chaotic_good/BUILD b/test/core/transport/chaotic_good/BUILD index 036c894e32..fc698aeec4 100644 --- a/test/core/transport/chaotic_good/BUILD +++ b/test/core/transport/chaotic_good/BUILD @@ -118,3 +118,41 @@ grpc_cc_test( "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", ], ) + +grpc_cc_test( + name = "client_transport_error_test", + srcs = ["client_transport_error_test.cc"], + external_deps = [ + "absl/functional:any_invocable", + "absl/status", + "absl/status:statusor", + "absl/strings:str_format", + "absl/types:optional", + "gtest", + ], + language = "C++", + uses_event_engine = False, + uses_polling = False, + deps = [ + "//:grpc", + "//:grpc_public_hdrs", + "//:iomgr_timer", + "//:ref_counted_ptr", + "//src/core:activity", + "//src/core:arena", + "//src/core:chaotic_good_client_transport", + "//src/core:event_engine_wakeup_scheduler", + "//src/core:grpc_promise_endpoint", + "//src/core:if", + "//src/core:join", + "//src/core:loop", + "//src/core:memory_quota", + "//src/core:pipe", + "//src/core:resource_quota", + "//src/core:seq", + "//src/core:slice", + "//src/core:slice_buffer", + "//test/core/event_engine/fuzzing_event_engine", + "//test/core/event_engine/fuzzing_event_engine:fuzzing_event_engine_proto", + ], +) diff --git a/test/core/transport/chaotic_good/client_transport_error_test.cc b/test/core/transport/chaotic_good/client_transport_error_test.cc new file mode 100644 index 0000000000..3b30c4ca33 --- /dev/null +++ b/test/core/transport/chaotic_good/client_transport_error_test.cc @@ -0,0 +1,441 @@ +// Copyright 2023 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/status/status.h" + +#include "src/core/ext/transport/chaotic_good/client_transport.h" +#include "src/core/lib/transport/promise_endpoint.h" +#include "src/core/lib/transport/transport.h" + +// IWYU pragma: no_include <sys/socket.h> + +#include <stddef.h> + +#include <algorithm> // IWYU pragma: keep +#include <memory> +#include <string> // IWYU pragma: keep +#include <tuple> +#include <utility> +#include <vector> // IWYU pragma: keep + +#include "absl/functional/any_invocable.h" +#include "absl/status/statusor.h" // IWYU pragma: keep +#include "absl/strings/str_format.h" // IWYU pragma: keep +#include "absl/types/optional.h" // IWYU pragma: keep +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +#include <grpc/event_engine/event_engine.h> +#include <grpc/event_engine/memory_allocator.h> +#include <grpc/event_engine/slice.h> // IWYU pragma: keep +#include <grpc/event_engine/slice_buffer.h> +#include <grpc/grpc.h> +#include <grpc/status.h> // IWYU pragma: keep + +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/timer_manager.h" +#include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/event_engine_wakeup_scheduler.h" +#include "src/core/lib/promise/if.h" +#include "src/core/lib/promise/join.h" +#include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/pipe.h" +#include "src/core/lib/promise/seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" +#include "src/core/lib/slice/slice_buffer.h" +#include "src/core/lib/slice/slice_internal.h" // IWYU pragma: keep +#include "src/core/lib/transport/metadata_batch.h" // IWYU pragma: keep +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" +#include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" + +using testing::MockFunction; +using testing::Return; +using testing::Sequence; +using testing::StrictMock; +using testing::WithArgs; + +namespace grpc_core { +namespace chaotic_good { +namespace testing { + +class MockEndpoint + : public grpc_event_engine::experimental::EventEngine::Endpoint { + public: + MOCK_METHOD( + bool, Read, + (absl::AnyInvocable<void(absl::Status)> on_read, + grpc_event_engine::experimental::SliceBuffer* buffer, + const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs* + args), + (override)); + + MOCK_METHOD( + bool, Write, + (absl::AnyInvocable<void(absl::Status)> on_writable, + grpc_event_engine::experimental::SliceBuffer* data, + const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs* + args), + (override)); + + MOCK_METHOD( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress&, + GetPeerAddress, (), (const, override)); + MOCK_METHOD( + const grpc_event_engine::experimental::EventEngine::ResolvedAddress&, + GetLocalAddress, (), (const, override)); +}; + +class ClientTransportTest : public ::testing::Test { + public: + ClientTransportTest() + : control_endpoint_ptr_(new StrictMock<MockEndpoint>()), + data_endpoint_ptr_(new StrictMock<MockEndpoint>()), + memory_allocator_( + ResourceQuota::Default()->memory_quota()->CreateMemoryAllocator( + "test")), + control_endpoint_(*control_endpoint_ptr_), + data_endpoint_(*data_endpoint_ptr_), + event_engine_(std::make_shared< + grpc_event_engine::experimental::FuzzingEventEngine>( + []() { + grpc_timer_manager_set_threading(false); + grpc_event_engine::experimental::FuzzingEventEngine::Options + options; + return options; + }(), + fuzzing_event_engine::Actions())), + arena_(MakeScopedArena(initial_arena_size, &memory_allocator_)), + pipe_client_to_server_messages_(arena_.get()), + pipe_server_to_client_messages_(arena_.get()), + pipe_server_intial_metadata_(arena_.get()), + pipe_client_to_server_messages_second_(arena_.get()), + pipe_server_to_client_messages_second_(arena_.get()), + pipe_server_intial_metadata_second_(arena_.get()) {} + // Initial ClientTransport with read expecations + void InitialClientTransport() { + client_transport_ = std::make_unique<ClientTransport>( + std::make_unique<PromiseEndpoint>( + std::unique_ptr<MockEndpoint>(control_endpoint_ptr_), + SliceBuffer()), + std::make_unique<PromiseEndpoint>( + std::unique_ptr<MockEndpoint>(data_endpoint_ptr_), SliceBuffer()), + event_engine_); + } + // Send messages from client to server. + auto SendClientToServerMessages( + Pipe<MessageHandle>& pipe_client_to_server_messages, + int num_of_messages) { + return Loop([&pipe_client_to_server_messages, num_of_messages, + this]() mutable { + bool has_message = (num_of_messages > 0); + return If( + has_message, + Seq(pipe_client_to_server_messages.sender.Push( + arena_->MakePooled<Message>()), + [&num_of_messages]() -> LoopCtl<absl::Status> { + num_of_messages--; + return Continue(); + }), + [&pipe_client_to_server_messages]() mutable -> LoopCtl<absl::Status> { + pipe_client_to_server_messages.sender.Close(); + return absl::OkStatus(); + }); + }); + } + // Add stream into client transport, and expect return trailers of + // "grpc-status:code". + auto AddStream(CallArgs args) { + return client_transport_->AddStream(std::move(args)); + } + + private: + MockEndpoint* control_endpoint_ptr_; + MockEndpoint* data_endpoint_ptr_; + size_t initial_arena_size = 1024; + MemoryAllocator memory_allocator_; + + protected: + MockEndpoint& control_endpoint_; + MockEndpoint& data_endpoint_; + std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> + event_engine_; + std::unique_ptr<ClientTransport> client_transport_; + ScopedArenaPtr arena_; + Pipe<MessageHandle> pipe_client_to_server_messages_; + Pipe<MessageHandle> pipe_server_to_client_messages_; + Pipe<ServerMetadataHandle> pipe_server_intial_metadata_; + // Added for mutliple streams tests. + Pipe<MessageHandle> pipe_client_to_server_messages_second_; + Pipe<MessageHandle> pipe_server_to_client_messages_second_; + Pipe<ServerMetadataHandle> pipe_server_intial_metadata_second_; + absl::AnyInvocable<void(absl::Status)> read_callback_; + Sequence control_endpoint_sequence_; + Sequence data_endpoint_sequence_; + // Added to verify received message payload. + const std::string message_ = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; +}; + +TEST_F(ClientTransportTest, AddOneStreamWithWriteFailed) { + // Mock write failed and read is pending. + EXPECT_CALL(control_endpoint_, Write) + .WillOnce( + WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) { + on_write(absl::InternalError("control endpoint write failed.")); + return false; + })); + EXPECT_CALL(data_endpoint_, Write) + .WillOnce( + WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) { + on_write(absl::InternalError("data endpoint write failed.")); + return false; + })); + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence_) + .WillOnce(Return(false)); + InitialClientTransport(); + ClientMetadataHandle md; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + StrictMock<MockFunction<void(absl::Status)>> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + auto activity = MakeActivity( + Seq( + // Concurrently: write and read messages in client transport. + Join( + // Add first stream with call_args into client transport. + // Expect return trailers "grpc-status:unavailable". + AddStream(std::move(args)), + // Send messages to call_args.client_to_server_messages pipe, + // which will be eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_, 1)), + // Once complete, verify successful sending and the received value. + [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) { + EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(), + GRPC_STATUS_UNAVAILABLE); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + EventEngineWakeupScheduler(event_engine_), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddOneStreamWithReadFailed) { + // Mock read failed. + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence_) + .WillOnce(WithArgs<0>( + [](absl::AnyInvocable<void(absl::Status)> on_read) mutable { + on_read(absl::InternalError("control endpoint read failed.")); + // Return false to mock EventEngine read not finish. + return false; + })); + InitialClientTransport(); + ClientMetadataHandle md; + auto args = CallArgs{std::move(md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + StrictMock<MockFunction<void(absl::Status)>> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + auto activity = MakeActivity( + Seq( + // Concurrently: write and read messages in client transport. + Join( + // Add first stream with call_args into client transport. + // Expect return trailers "grpc-status:unavailable". + AddStream(std::move(args)), + // Send messages to call_args.client_to_server_messages pipe. + SendClientToServerMessages(pipe_client_to_server_messages_, 1)), + // Once complete, verify successful sending and the received value. + [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) { + EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(), + GRPC_STATUS_UNAVAILABLE); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + EventEngineWakeupScheduler(event_engine_), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddMultipleStreamWithWriteFailed) { + // Mock write failed at first stream and second stream's write will fail too. + EXPECT_CALL(control_endpoint_, Write) + .Times(1) + .WillRepeatedly( + WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) { + on_write(absl::InternalError("control endpoint write failed.")); + return false; + })); + EXPECT_CALL(data_endpoint_, Write) + .Times(1) + .WillRepeatedly( + WithArgs<0>([](absl::AnyInvocable<void(absl::Status)> on_write) { + on_write(absl::InternalError("data endpoint write failed.")); + return false; + })); + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence_) + .WillOnce(Return(false)); + InitialClientTransport(); + ClientMetadataHandle first_stream_md; + ClientMetadataHandle second_stream_md; + auto first_stream_args = + CallArgs{std::move(first_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + auto second_stream_args = + CallArgs{std::move(second_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_second_.sender, + &pipe_client_to_server_messages_second_.receiver, + &pipe_server_to_client_messages_second_.sender}; + StrictMock<MockFunction<void(absl::Status)>> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + auto activity = MakeActivity( + Seq( + // Concurrently: write and read messages from client transport. + Join( + // Add first stream with call_args into client transport. + // Expect return trailers "grpc-status:unavailable". + AddStream(std::move(first_stream_args)), + // Send messages to first stream's + // call_args.client_to_server_messages pipe. + SendClientToServerMessages(pipe_client_to_server_messages_, 1)), + // Once complete, verify successful sending and the received value. + [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) { + EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(), + GRPC_STATUS_UNAVAILABLE); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }, + Join( + // Add second stream with call_args into client transport. + // Expect return trailers "grpc-status:unavailable". + AddStream(std::move(second_stream_args)), + // Send messages to second stream's + // call_args.client_to_server_messages pipe. + SendClientToServerMessages(pipe_client_to_server_messages_second_, + 1)), + // Once complete, verify successful sending and the received value. + [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) { + EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(), + GRPC_STATUS_UNAVAILABLE); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + EventEngineWakeupScheduler(event_engine_), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +TEST_F(ClientTransportTest, AddMultipleStreamWithReadFailed) { + // Mock read failed at first stream, and second stream's write will fail too. + EXPECT_CALL(control_endpoint_, Read) + .InSequence(control_endpoint_sequence_) + .WillOnce(WithArgs<0>( + [](absl::AnyInvocable<void(absl::Status)> on_read) mutable { + on_read(absl::InternalError("control endpoint read failed.")); + // Return false to mock EventEngine read not finish. + return false; + })); + InitialClientTransport(); + ClientMetadataHandle first_stream_md; + ClientMetadataHandle second_stream_md; + auto first_stream_args = + CallArgs{std::move(first_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_.sender, + &pipe_client_to_server_messages_.receiver, + &pipe_server_to_client_messages_.sender}; + auto second_stream_args = + CallArgs{std::move(second_stream_md), + ClientInitialMetadataOutstandingToken::Empty(), + nullptr, + &pipe_server_intial_metadata_second_.sender, + &pipe_client_to_server_messages_second_.receiver, + &pipe_server_to_client_messages_second_.sender}; + StrictMock<MockFunction<void(absl::Status)>> on_done; + EXPECT_CALL(on_done, Call(absl::OkStatus())); + auto activity = MakeActivity( + Seq( + // Concurrently: write and read messages from client transport. + Join( + // Add first stream with call_args into client transport. + AddStream(std::move(first_stream_args)), + // Send messages to first stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_, 1)), + // Once complete, verify successful sending and the received value. + [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) { + EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(), + GRPC_STATUS_UNAVAILABLE); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }, + Join( + // Add second stream with call_args into client transport. + AddStream(std::move(second_stream_args)), + // Send messages to second stream's + // call_args.client_to_server_messages pipe, which will be + // eventually sent to control/data endpoints. + SendClientToServerMessages(pipe_client_to_server_messages_second_, + 1)), + // Once complete, verify successful sending and the received value. + [](const std::tuple<ServerMetadataHandle, absl::Status>& ret) { + EXPECT_EQ(std::get<0>(ret)->get(GrpcStatusMetadata()).value(), + GRPC_STATUS_UNAVAILABLE); + EXPECT_TRUE(std::get<1>(ret).ok()); + return absl::OkStatus(); + }), + EventEngineWakeupScheduler(event_engine_), + [&on_done](absl::Status status) { on_done.Call(std::move(status)); }); + // Wait until ClientTransport's internal activities to finish. + event_engine_->TickUntilIdle(); + event_engine_->UnsetGlobalHooks(); +} + +} // namespace testing +} // namespace chaotic_good +} // namespace grpc_core + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + // Must call to create default EventEngine. + grpc_init(); + int ret = RUN_ALL_TESTS(); + grpc_shutdown(); + return ret; +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 0050efad85..6b389d2da6 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -2220,6 +2220,30 @@ "flaky": false, "gtest": true, "language": "c++", + "name": "client_transport_error_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": false + }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", "name": "client_transport_test", "platforms": [ "linux", |