diff options
Diffstat (limited to 'src/core/ext/transport/chaotic_good/client_transport.cc')
-rw-r--r-- | src/core/ext/transport/chaotic_good/client_transport.cc | 299 |
1 files changed, 233 insertions, 66 deletions
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc index 88364499d2..b0d5f6b3b2 100644 --- a/src/core/ext/transport/chaotic_good/client_transport.cc +++ b/src/core/ext/transport/chaotic_good/client_transport.cc @@ -16,10 +16,15 @@ #include "src/core/ext/transport/chaotic_good/client_transport.h" +#include <cstdint> +#include <cstdlib> #include <memory> #include <string> #include <tuple> +#include <utility> +#include "absl/random/bit_gen_ref.h" +#include "absl/random/random.h" #include "absl/status/statusor.h" #include <grpc/event_engine/event_engine.h> @@ -30,10 +35,18 @@ #include "src/core/ext/transport/chaotic_good/frame_header.h" #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h" #include "src/core/lib/gprpp/match.h" +#include "src/core/lib/gprpp/ref_counted_ptr.h" +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/promise/activity.h" +#include "src/core/lib/promise/all_ok.h" #include "src/core/lib/promise/event_engine_wakeup_scheduler.h" -#include "src/core/lib/promise/join.h" #include "src/core/lib/promise/loop.h" +#include "src/core/lib/promise/map.h" +#include "src/core/lib/promise/promise.h" +#include "src/core/lib/promise/try_join.h" +#include "src/core/lib/promise/try_seq.h" +#include "src/core/lib/resource_quota/arena.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "src/core/lib/slice/slice.h" #include "src/core/lib/slice/slice_buffer.h" #include "src/core/lib/slice/slice_internal.h" @@ -42,79 +55,233 @@ namespace grpc_core { namespace chaotic_good { -ClientTransport::ClientTransport( - std::unique_ptr<PromiseEndpoint> control_endpoint, - std::unique_ptr<PromiseEndpoint> data_endpoint, - std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine) - : outgoing_frames_(MpscReceiver<ClientFrame>(4)), - control_endpoint_(std::move(control_endpoint)), - data_endpoint_(std::move(data_endpoint)), - control_endpoint_write_buffer_(SliceBuffer()), - data_endpoint_write_buffer_(SliceBuffer()), - hpack_compressor_(std::make_unique<HPackCompressor>()), - event_engine_(event_engine) { - auto write_loop = Loop([this] { - return Seq( +auto ChaoticGoodClientTransport::TransportWriteLoop() { + return Loop([this] { + return TrySeq( // Get next outgoing frame. - this->outgoing_frames_.Next(), - // Construct data buffers that will be sent to the endpoints. + outgoing_frames_.Next(), + // Serialize and write it out. [this](ClientFrame client_frame) { - MatchMutable( - &client_frame, - [this](ClientFragmentFrame* frame) mutable { - control_endpoint_write_buffer_.Append( - frame->Serialize(hpack_compressor_.get())); - if (frame->message != nullptr) { - auto frame_header = - FrameHeader::Parse( - reinterpret_cast<const uint8_t*>(GRPC_SLICE_START_PTR( - control_endpoint_write_buffer_.c_slice_buffer() - ->slices[0]))) - .value(); - std::string message_padding(frame_header.message_padding, - '0'); - Slice slice(grpc_slice_from_cpp_string(message_padding)); - // Append message payload to data_endpoint_buffer. - data_endpoint_write_buffer_.Append(std::move(slice)); - // Append message payload to data_endpoint_buffer. - frame->message->payload()->MoveFirstNBytesIntoSliceBuffer( - frame->message->payload()->Length(), - data_endpoint_write_buffer_); - } - }, - [this](CancelFrame* frame) mutable { - control_endpoint_write_buffer_.Append( - frame->Serialize(hpack_compressor_.get())); - }); - return absl::OkStatus(); + return transport_.WriteFrame(GetFrameInterface(client_frame)); }, - // Write buffers to corresponding endpoints concurrently. - [this]() { - return Join(this->control_endpoint_->Write( - std::move(control_endpoint_write_buffer_)), - this->data_endpoint_->Write( - std::move(data_endpoint_write_buffer_))); - }, - // Finish writes and return status. - [](std::tuple<absl::Status, absl::Status> ret) - -> LoopCtl<absl::Status> { - // If writes failed, return failure status. - if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) { - // TODO(ladynana): handle the promise endpoint write failures with - // closing the transport. - return absl::InternalError("Promise endpoint writes failed."); - } + []() -> LoopCtl<absl::Status> { + // The write failures will be caught in TrySeq and exit loop. + // Therefore, only need to return Continue() in the last lambda + // function. return Continue(); }); }); - writer_ = MakeActivity( - // Continuously write next outgoing frames to promise endpoints. - std::move(write_loop), EventEngineWakeupScheduler(event_engine_), - [](absl::Status status) { - GPR_ASSERT(status.code() == absl::StatusCode::kCancelled || - status.code() == absl::StatusCode::kInternal); +} + +absl::optional<CallHandler> ChaoticGoodClientTransport::LookupStream( + uint32_t stream_id) { + MutexLock lock(&mu_); + auto it = stream_map_.find(stream_id); + if (it == stream_map_.end()) { + return absl::nullopt; + } + return it->second; +} + +auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame, + CallHandler call_handler) { + auto& headers = frame.headers; + return TrySeq( + If( + headers != nullptr, + [call_handler, &headers]() mutable { + return call_handler.PushServerInitialMetadata(std::move(headers)); + }, + []() -> StatusFlag { return Success{}; }), + [call_handler, message = std::move(frame.message)]() mutable { + return If( + message.has_value(), + [&call_handler, &message]() mutable { + return call_handler.PushMessage(std::move(message->message)); + }, + []() -> StatusFlag { return Success{}; }); + }, + [call_handler, trailers = std::move(frame.trailers)]() mutable { + return If( + trailers != nullptr, + [&call_handler, &trailers]() mutable { + return call_handler.PushServerTrailingMetadata( + std::move(trailers)); + }, + []() -> StatusFlag { return Success{}; }); }); } +auto ChaoticGoodClientTransport::TransportReadLoop() { + return Loop([this] { + return TrySeq( + transport_.ReadFrameBytes(), + [](std::tuple<FrameHeader, BufferPair> frame_bytes) + -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> { + const auto& frame_header = std::get<0>(frame_bytes); + if (frame_header.type != FrameType::kFragment) { + return absl::InternalError( + absl::StrCat("Expected fragment frame, got ", + static_cast<int>(frame_header.type))); + } + return frame_bytes; + }, + [this](std::tuple<FrameHeader, BufferPair> frame_bytes) { + const auto& frame_header = std::get<0>(frame_bytes); + auto& buffers = std::get<1>(frame_bytes); + absl::optional<CallHandler> call_handler = + LookupStream(frame_header.stream_id); + ServerFragmentFrame frame; + absl::Status deserialize_status; + if (call_handler.has_value()) { + deserialize_status = transport_.DeserializeFrame( + frame_header, std::move(buffers), call_handler->arena(), frame, + FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1}); + } else { + // Stream not found, skip the frame. + transport_.SkipFrame(frame_header, std::move(buffers)); + deserialize_status = absl::OkStatus(); + } + return If( + deserialize_status.ok() && call_handler.has_value(), + [this, &frame, &call_handler]() { + return call_handler->SpawnWaitable( + "push-frame", [this, call_handler = *call_handler, + frame = std::move(frame)]() mutable { + return Map(call_handler.CancelIfFails(PushFrameIntoCall( + std::move(frame), call_handler)), + [](StatusFlag f) { + return StatusCast<absl::Status>(f); + }); + }); + }, + [&deserialize_status]() -> absl::Status { + // Stream not found, nothing to do. + return std::move(deserialize_status); + }); + }, + []() -> LoopCtl<absl::Status> { return Continue{}; }); + }); +} + +auto ChaoticGoodClientTransport::OnTransportActivityDone() { + return [this](absl::Status status) { + if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) { + this->AbortWithError(); + } + }; +} + +ChaoticGoodClientTransport::ChaoticGoodClientTransport( + std::unique_ptr<PromiseEndpoint> control_endpoint, + std::unique_ptr<PromiseEndpoint> data_endpoint, + std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine) + : outgoing_frames_(4), + transport_(std::move(control_endpoint), std::move(data_endpoint)), + writer_{ + MakeActivity( + // Continuously write next outgoing frames to promise endpoints. + TransportWriteLoop(), EventEngineWakeupScheduler(event_engine), + OnTransportActivityDone()), + }, + reader_{MakeActivity( + // Continuously read next incoming frames from promise endpoints. + TransportReadLoop(), EventEngineWakeupScheduler(event_engine), + OnTransportActivityDone())} {} + +ChaoticGoodClientTransport::~ChaoticGoodClientTransport() { + if (writer_ != nullptr) { + writer_.reset(); + } + if (reader_ != nullptr) { + reader_.reset(); + } +} + +void ChaoticGoodClientTransport::AbortWithError() { + // Mark transport as unavailable when the endpoint write/read failed. + // Close all the available pipes. + outgoing_frames_.MarkClosed(); + ReleasableMutexLock lock(&mu_); + StreamMap stream_map = std::move(stream_map_); + stream_map_.clear(); + lock.Release(); + for (const auto& pair : stream_map) { + auto call_handler = pair.second; + call_handler.SpawnInfallible("cancel", [call_handler]() mutable { + call_handler.Cancel(ServerMetadataFromStatus( + absl::UnavailableError("Transport closed."))); + return Empty{}; + }); + } +} + +uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) { + ReleasableMutexLock lock(&mu_); + const uint32_t stream_id = next_stream_id_++; + stream_map_.emplace(stream_id, call_handler); + lock.Release(); + call_handler.OnDone([this, stream_id]() { + MutexLock lock(&mu_); + stream_map_.erase(stream_id); + }); + return stream_id; +} + +auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id, + CallHandler call_handler) { + auto send_fragment = [stream_id, + outgoing_frames = outgoing_frames_.MakeSender()]( + ClientFragmentFrame frame) mutable { + frame.stream_id = stream_id; + return Map(outgoing_frames.Send(std::move(frame)), + [](bool success) -> absl::Status { + if (!success) { + // Failed to send outgoing frame. + return absl::UnavailableError("Transport closed."); + } + return absl::OkStatus(); + }); + }; + return TrySeq( + // Wait for initial metadata then send it out. + call_handler.PullClientInitialMetadata(), + [send_fragment](ClientMetadataHandle md) mutable { + ClientFragmentFrame frame; + frame.headers = std::move(md); + return send_fragment(std::move(frame)); + }, + // Continuously send client frame with client to server messages. + ForEach(OutgoingMessages(call_handler), + [send_fragment, + aligned_bytes = aligned_bytes_](MessageHandle message) mutable { + ClientFragmentFrame frame; + // Construct frame header (flags, header_length and + // trailer_length will be added in serialization). + const uint32_t message_length = message->payload()->Length(); + const uint32_t padding = + message_length % aligned_bytes == 0 + ? 0 + : aligned_bytes - message_length % aligned_bytes; + GPR_ASSERT((message_length + padding) % aligned_bytes == 0); + frame.message = FragmentMessage(std::move(message), padding, + message_length); + return send_fragment(std::move(frame)); + }), + [send_fragment]() mutable { + ClientFragmentFrame frame; + frame.end_of_stream = true; + return send_fragment(std::move(frame)); + }); +} + +void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) { + // At this point, the connection is set up. + // Start sending data frames. + call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable { + return CallOutboundLoop(MakeStream(call_handler), call_handler); + }); +} + } // namespace chaotic_good } // namespace grpc_core |