aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/transport/chaotic_good/client_transport.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chaotic_good/client_transport.cc')
-rw-r--r--src/core/ext/transport/chaotic_good/client_transport.cc299
1 files changed, 233 insertions, 66 deletions
diff --git a/src/core/ext/transport/chaotic_good/client_transport.cc b/src/core/ext/transport/chaotic_good/client_transport.cc
index 88364499d2..b0d5f6b3b2 100644
--- a/src/core/ext/transport/chaotic_good/client_transport.cc
+++ b/src/core/ext/transport/chaotic_good/client_transport.cc
@@ -16,10 +16,15 @@
#include "src/core/ext/transport/chaotic_good/client_transport.h"
+#include <cstdint>
+#include <cstdlib>
#include <memory>
#include <string>
#include <tuple>
+#include <utility>
+#include "absl/random/bit_gen_ref.h"
+#include "absl/random/random.h"
#include "absl/status/statusor.h"
#include <grpc/event_engine/event_engine.h>
@@ -30,10 +35,18 @@
#include "src/core/ext/transport/chaotic_good/frame_header.h"
#include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
#include "src/core/lib/gprpp/match.h"
+#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/promise/activity.h"
+#include "src/core/lib/promise/all_ok.h"
#include "src/core/lib/promise/event_engine_wakeup_scheduler.h"
-#include "src/core/lib/promise/join.h"
#include "src/core/lib/promise/loop.h"
+#include "src/core/lib/promise/map.h"
+#include "src/core/lib/promise/promise.h"
+#include "src/core/lib/promise/try_join.h"
+#include "src/core/lib/promise/try_seq.h"
+#include "src/core/lib/resource_quota/arena.h"
+#include "src/core/lib/resource_quota/resource_quota.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_buffer.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -42,79 +55,233 @@
namespace grpc_core {
namespace chaotic_good {
-ClientTransport::ClientTransport(
- std::unique_ptr<PromiseEndpoint> control_endpoint,
- std::unique_ptr<PromiseEndpoint> data_endpoint,
- std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
- : outgoing_frames_(MpscReceiver<ClientFrame>(4)),
- control_endpoint_(std::move(control_endpoint)),
- data_endpoint_(std::move(data_endpoint)),
- control_endpoint_write_buffer_(SliceBuffer()),
- data_endpoint_write_buffer_(SliceBuffer()),
- hpack_compressor_(std::make_unique<HPackCompressor>()),
- event_engine_(event_engine) {
- auto write_loop = Loop([this] {
- return Seq(
+auto ChaoticGoodClientTransport::TransportWriteLoop() {
+ return Loop([this] {
+ return TrySeq(
// Get next outgoing frame.
- this->outgoing_frames_.Next(),
- // Construct data buffers that will be sent to the endpoints.
+ outgoing_frames_.Next(),
+ // Serialize and write it out.
[this](ClientFrame client_frame) {
- MatchMutable(
- &client_frame,
- [this](ClientFragmentFrame* frame) mutable {
- control_endpoint_write_buffer_.Append(
- frame->Serialize(hpack_compressor_.get()));
- if (frame->message != nullptr) {
- auto frame_header =
- FrameHeader::Parse(
- reinterpret_cast<const uint8_t*>(GRPC_SLICE_START_PTR(
- control_endpoint_write_buffer_.c_slice_buffer()
- ->slices[0])))
- .value();
- std::string message_padding(frame_header.message_padding,
- '0');
- Slice slice(grpc_slice_from_cpp_string(message_padding));
- // Append message payload to data_endpoint_buffer.
- data_endpoint_write_buffer_.Append(std::move(slice));
- // Append message payload to data_endpoint_buffer.
- frame->message->payload()->MoveFirstNBytesIntoSliceBuffer(
- frame->message->payload()->Length(),
- data_endpoint_write_buffer_);
- }
- },
- [this](CancelFrame* frame) mutable {
- control_endpoint_write_buffer_.Append(
- frame->Serialize(hpack_compressor_.get()));
- });
- return absl::OkStatus();
+ return transport_.WriteFrame(GetFrameInterface(client_frame));
},
- // Write buffers to corresponding endpoints concurrently.
- [this]() {
- return Join(this->control_endpoint_->Write(
- std::move(control_endpoint_write_buffer_)),
- this->data_endpoint_->Write(
- std::move(data_endpoint_write_buffer_)));
- },
- // Finish writes and return status.
- [](std::tuple<absl::Status, absl::Status> ret)
- -> LoopCtl<absl::Status> {
- // If writes failed, return failure status.
- if (!(std::get<0>(ret).ok() || std::get<1>(ret).ok())) {
- // TODO(ladynana): handle the promise endpoint write failures with
- // closing the transport.
- return absl::InternalError("Promise endpoint writes failed.");
- }
+ []() -> LoopCtl<absl::Status> {
+ // The write failures will be caught in TrySeq and exit loop.
+ // Therefore, only need to return Continue() in the last lambda
+ // function.
return Continue();
});
});
- writer_ = MakeActivity(
- // Continuously write next outgoing frames to promise endpoints.
- std::move(write_loop), EventEngineWakeupScheduler(event_engine_),
- [](absl::Status status) {
- GPR_ASSERT(status.code() == absl::StatusCode::kCancelled ||
- status.code() == absl::StatusCode::kInternal);
+}
+
+absl::optional<CallHandler> ChaoticGoodClientTransport::LookupStream(
+ uint32_t stream_id) {
+ MutexLock lock(&mu_);
+ auto it = stream_map_.find(stream_id);
+ if (it == stream_map_.end()) {
+ return absl::nullopt;
+ }
+ return it->second;
+}
+
+auto ChaoticGoodClientTransport::PushFrameIntoCall(ServerFragmentFrame frame,
+ CallHandler call_handler) {
+ auto& headers = frame.headers;
+ return TrySeq(
+ If(
+ headers != nullptr,
+ [call_handler, &headers]() mutable {
+ return call_handler.PushServerInitialMetadata(std::move(headers));
+ },
+ []() -> StatusFlag { return Success{}; }),
+ [call_handler, message = std::move(frame.message)]() mutable {
+ return If(
+ message.has_value(),
+ [&call_handler, &message]() mutable {
+ return call_handler.PushMessage(std::move(message->message));
+ },
+ []() -> StatusFlag { return Success{}; });
+ },
+ [call_handler, trailers = std::move(frame.trailers)]() mutable {
+ return If(
+ trailers != nullptr,
+ [&call_handler, &trailers]() mutable {
+ return call_handler.PushServerTrailingMetadata(
+ std::move(trailers));
+ },
+ []() -> StatusFlag { return Success{}; });
});
}
+auto ChaoticGoodClientTransport::TransportReadLoop() {
+ return Loop([this] {
+ return TrySeq(
+ transport_.ReadFrameBytes(),
+ [](std::tuple<FrameHeader, BufferPair> frame_bytes)
+ -> absl::StatusOr<std::tuple<FrameHeader, BufferPair>> {
+ const auto& frame_header = std::get<0>(frame_bytes);
+ if (frame_header.type != FrameType::kFragment) {
+ return absl::InternalError(
+ absl::StrCat("Expected fragment frame, got ",
+ static_cast<int>(frame_header.type)));
+ }
+ return frame_bytes;
+ },
+ [this](std::tuple<FrameHeader, BufferPair> frame_bytes) {
+ const auto& frame_header = std::get<0>(frame_bytes);
+ auto& buffers = std::get<1>(frame_bytes);
+ absl::optional<CallHandler> call_handler =
+ LookupStream(frame_header.stream_id);
+ ServerFragmentFrame frame;
+ absl::Status deserialize_status;
+ if (call_handler.has_value()) {
+ deserialize_status = transport_.DeserializeFrame(
+ frame_header, std::move(buffers), call_handler->arena(), frame,
+ FrameLimits{1024 * 1024 * 1024, aligned_bytes_ - 1});
+ } else {
+ // Stream not found, skip the frame.
+ transport_.SkipFrame(frame_header, std::move(buffers));
+ deserialize_status = absl::OkStatus();
+ }
+ return If(
+ deserialize_status.ok() && call_handler.has_value(),
+ [this, &frame, &call_handler]() {
+ return call_handler->SpawnWaitable(
+ "push-frame", [this, call_handler = *call_handler,
+ frame = std::move(frame)]() mutable {
+ return Map(call_handler.CancelIfFails(PushFrameIntoCall(
+ std::move(frame), call_handler)),
+ [](StatusFlag f) {
+ return StatusCast<absl::Status>(f);
+ });
+ });
+ },
+ [&deserialize_status]() -> absl::Status {
+ // Stream not found, nothing to do.
+ return std::move(deserialize_status);
+ });
+ },
+ []() -> LoopCtl<absl::Status> { return Continue{}; });
+ });
+}
+
+auto ChaoticGoodClientTransport::OnTransportActivityDone() {
+ return [this](absl::Status status) {
+ if (!(status.ok() || status.code() == absl::StatusCode::kCancelled)) {
+ this->AbortWithError();
+ }
+ };
+}
+
+ChaoticGoodClientTransport::ChaoticGoodClientTransport(
+ std::unique_ptr<PromiseEndpoint> control_endpoint,
+ std::unique_ptr<PromiseEndpoint> data_endpoint,
+ std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)
+ : outgoing_frames_(4),
+ transport_(std::move(control_endpoint), std::move(data_endpoint)),
+ writer_{
+ MakeActivity(
+ // Continuously write next outgoing frames to promise endpoints.
+ TransportWriteLoop(), EventEngineWakeupScheduler(event_engine),
+ OnTransportActivityDone()),
+ },
+ reader_{MakeActivity(
+ // Continuously read next incoming frames from promise endpoints.
+ TransportReadLoop(), EventEngineWakeupScheduler(event_engine),
+ OnTransportActivityDone())} {}
+
+ChaoticGoodClientTransport::~ChaoticGoodClientTransport() {
+ if (writer_ != nullptr) {
+ writer_.reset();
+ }
+ if (reader_ != nullptr) {
+ reader_.reset();
+ }
+}
+
+void ChaoticGoodClientTransport::AbortWithError() {
+ // Mark transport as unavailable when the endpoint write/read failed.
+ // Close all the available pipes.
+ outgoing_frames_.MarkClosed();
+ ReleasableMutexLock lock(&mu_);
+ StreamMap stream_map = std::move(stream_map_);
+ stream_map_.clear();
+ lock.Release();
+ for (const auto& pair : stream_map) {
+ auto call_handler = pair.second;
+ call_handler.SpawnInfallible("cancel", [call_handler]() mutable {
+ call_handler.Cancel(ServerMetadataFromStatus(
+ absl::UnavailableError("Transport closed.")));
+ return Empty{};
+ });
+ }
+}
+
+uint32_t ChaoticGoodClientTransport::MakeStream(CallHandler call_handler) {
+ ReleasableMutexLock lock(&mu_);
+ const uint32_t stream_id = next_stream_id_++;
+ stream_map_.emplace(stream_id, call_handler);
+ lock.Release();
+ call_handler.OnDone([this, stream_id]() {
+ MutexLock lock(&mu_);
+ stream_map_.erase(stream_id);
+ });
+ return stream_id;
+}
+
+auto ChaoticGoodClientTransport::CallOutboundLoop(uint32_t stream_id,
+ CallHandler call_handler) {
+ auto send_fragment = [stream_id,
+ outgoing_frames = outgoing_frames_.MakeSender()](
+ ClientFragmentFrame frame) mutable {
+ frame.stream_id = stream_id;
+ return Map(outgoing_frames.Send(std::move(frame)),
+ [](bool success) -> absl::Status {
+ if (!success) {
+ // Failed to send outgoing frame.
+ return absl::UnavailableError("Transport closed.");
+ }
+ return absl::OkStatus();
+ });
+ };
+ return TrySeq(
+ // Wait for initial metadata then send it out.
+ call_handler.PullClientInitialMetadata(),
+ [send_fragment](ClientMetadataHandle md) mutable {
+ ClientFragmentFrame frame;
+ frame.headers = std::move(md);
+ return send_fragment(std::move(frame));
+ },
+ // Continuously send client frame with client to server messages.
+ ForEach(OutgoingMessages(call_handler),
+ [send_fragment,
+ aligned_bytes = aligned_bytes_](MessageHandle message) mutable {
+ ClientFragmentFrame frame;
+ // Construct frame header (flags, header_length and
+ // trailer_length will be added in serialization).
+ const uint32_t message_length = message->payload()->Length();
+ const uint32_t padding =
+ message_length % aligned_bytes == 0
+ ? 0
+ : aligned_bytes - message_length % aligned_bytes;
+ GPR_ASSERT((message_length + padding) % aligned_bytes == 0);
+ frame.message = FragmentMessage(std::move(message), padding,
+ message_length);
+ return send_fragment(std::move(frame));
+ }),
+ [send_fragment]() mutable {
+ ClientFragmentFrame frame;
+ frame.end_of_stream = true;
+ return send_fragment(std::move(frame));
+ });
+}
+
+void ChaoticGoodClientTransport::StartCall(CallHandler call_handler) {
+ // At this point, the connection is set up.
+ // Start sending data frames.
+ call_handler.SpawnGuarded("outbound_loop", [this, call_handler]() mutable {
+ return CallOutboundLoop(MakeStream(call_handler), call_handler);
+ });
+}
+
} // namespace chaotic_good
} // namespace grpc_core