aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/transport/chttp2/transport/internal.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/internal.h')
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h124
1 files changed, 63 insertions, 61 deletions
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index d8799e4cb1..d41d6356f2 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -29,7 +29,6 @@
#include <utility>
#include "absl/container/flat_hash_map.h"
-#include "absl/meta/type_traits.h"
#include "absl/random/random.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
@@ -58,8 +57,10 @@
#include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
#include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
#include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
+#include "src/core/lib/channel/call_tracer.h"
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/channel/channelz.h"
+#include "src/core/lib/channel/tcp_tracer.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/gprpp/bitset.h"
#include "src/core/lib/gprpp/debug_location.h"
@@ -70,6 +71,7 @@
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/resource_quota/arena.h"
#include "src/core/lib/resource_quota/memory_quota.h"
#include "src/core/lib/slice/slice.h"
@@ -78,8 +80,6 @@
#include "src/core/lib/transport/connectivity_state.h"
#include "src/core/lib/transport/metadata_batch.h"
#include "src/core/lib/transport/transport.h"
-#include "src/core/lib/transport/transport_fwd.h"
-#include "src/core/lib/transport/transport_impl.h"
// Flag that this closure barrier may be covering a write in a pollset, and so
// we should not complete this closure until we can prove that the write got
@@ -203,18 +203,6 @@ struct grpc_chttp2_stream_link {
grpc_chttp2_stream* next;
grpc_chttp2_stream* prev;
};
-// We keep several sets of connection wide parameters
-typedef enum {
- // The settings our peer has asked for (and we have acked)
- GRPC_PEER_SETTINGS = 0,
- // The settings we'd like to have
- GRPC_LOCAL_SETTINGS,
- // The settings we've published to our peer
- GRPC_SENT_SETTINGS,
- // The settings the peer has acked
- GRPC_ACKED_SETTINGS,
- GRPC_NUM_SETTING_SETS
-} grpc_chttp2_setting_set;
typedef enum {
GRPC_CHTTP2_NO_GOAWAY_SEND,
@@ -236,26 +224,40 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;
-struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
+struct grpc_chttp2_transport final
+ : public grpc_core::Transport,
+ public grpc_core::FilterStackTransport,
+ public grpc_core::RefCounted<grpc_chttp2_transport,
+ grpc_core::NonPolymorphicRefCount>,
+ public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
grpc_endpoint* ep, bool is_client);
- ~grpc_chttp2_transport();
-
- // Make this be able to be contained in RefCountedPtr<>
- // Can't yet make this derive from RefCounted because we need to keep
- // `grpc_transport base` first.
- // TODO(ctiller): Make a transport interface.
- void IncrementRefCount() { refs.Ref(); }
- void Unref() {
- if (refs.Unref()) delete this;
- }
- grpc_core::RefCountedPtr<grpc_chttp2_transport> Ref() {
- IncrementRefCount();
- return grpc_core::RefCountedPtr<grpc_chttp2_transport>(this);
+ ~grpc_chttp2_transport() override;
+
+ void Orphan() override;
+
+ size_t SizeOfStream() const override;
+ bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override;
+ void PerformStreamOp(grpc_stream* gs,
+ grpc_transport_stream_op_batch* op) override;
+ void DestroyStream(grpc_stream* gs,
+ grpc_closure* then_schedule_closure) override;
+
+ grpc_core::FilterStackTransport* filter_stack_transport() override {
+ return this;
}
+ grpc_core::ClientTransport* client_transport() override { return nullptr; }
+ grpc_core::ServerTransport* server_transport() override { return nullptr; }
+
+ absl::string_view GetTransportName() const override;
+ void InitStream(grpc_stream* gs, grpc_stream_refcount* refcount,
+ const void* server_data, grpc_core::Arena* arena) override;
+ void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override;
+ void SetPollsetSet(grpc_stream* stream,
+ grpc_pollset_set* pollset_set) override;
+ void PerformOp(grpc_transport_op* op) override;
+ grpc_endpoint* GetEndpoint() override;
- grpc_transport base; // must be first
- grpc_core::RefCount refs;
grpc_endpoint* ep;
grpc_core::Slice peer_string;
@@ -305,7 +307,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
};
grpc_closure write_action_begin_locked;
- grpc_closure write_action;
grpc_closure write_action_end_locked;
grpc_closure read_action_locked;
@@ -319,7 +320,7 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_stream** accepting_stream = nullptr;
// accept stream callback
- void (*accept_stream_cb)(void* user_data, grpc_transport* transport,
+ void (*accept_stream_cb)(void* user_data, grpc_core::Transport* transport,
const void* server_data);
// registered_method_matcher_cb is called before invoking the recv initial
// metadata callback.
@@ -346,12 +347,8 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
- /// bitmask of setting indexes to send out
- /// Hack: it's common for implementations to assume 65536 bytes initial send
- /// window -- this should by rights be 0
- uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
/// settings values
- uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS];
+ grpc_core::Http2SettingsManager settings;
grpc_event_engine::experimental::EventEngine::TaskHandle
settings_ack_watchdog =
@@ -371,8 +368,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_core::Chttp2PingAbusePolicy ping_abuse_policy;
grpc_core::Chttp2PingRatePolicy ping_rate_policy;
grpc_core::Chttp2PingCallbacks ping_callbacks;
- absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
- delayed_ping_timer_handle;
+ grpc_event_engine::experimental::EventEngine::TaskHandle
+ delayed_ping_timer_handle =
+ grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
grpc_closure retry_initiate_ping_locked;
grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy;
@@ -444,21 +442,20 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
grpc_closure destructive_reclaimer_locked;
// next bdp ping timer handle
- absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
- next_bdp_ping_timer_handle;
+ grpc_event_engine::experimental::EventEngine::TaskHandle
+ next_bdp_ping_timer_handle =
+ grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
// keep-alive ping support
/// Closure to initialize a keepalive ping
grpc_closure init_keepalive_ping_locked;
- /// Closure to run when the keepalive ping is sent
- grpc_closure start_keepalive_ping_locked;
/// Closure to run when the keepalive ping ack is received
grpc_closure finish_keepalive_ping_locked;
- /// Closure to run when the keepalive ping timeouts
- grpc_closure keepalive_watchdog_fired_locked;
/// timer to initiate ping events
- absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle>
- keepalive_ping_timer_handle;
+ grpc_event_engine::experimental::EventEngine::TaskHandle
+ keepalive_ping_timer_handle =
+ grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
+ ;
/// time duration in between pings
grpc_core::Duration keepalive_time;
/// grace period to wait for data after sending a ping before keepalives
@@ -492,6 +489,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
+ /// write execution state of the transport
+ grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
+
/// policy for how much data we're willing to put into one http2 write
grpc_core::Chttp2WriteSizePolicy write_size_policy;
@@ -511,8 +511,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// if keepalive pings are allowed when there's no outstanding streams
bool keepalive_permit_without_calls = false;
- /// If start_keepalive_ping_locked has been called
- bool keepalive_ping_started = false;
// bdp estimator
bool bdp_ping_blocked =
@@ -521,30 +519,22 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized {
/// is the transport destroying itself?
uint8_t destroying = false;
- /// is there a read request to the endpoint outstanding?
- uint8_t endpoint_reading = 1;
-
/// is this a client?
bool is_client;
- /// are the local settings dirty and need to be sent?
- bool dirtied_local_settings = true;
- /// have local settings been sent?
- bool sent_local_settings = false;
-
/// If start_bdp_ping_locked has been called
bool bdp_ping_started = false;
// True if pings should be acked
bool ack_pings = true;
/// True if the keepalive system wants to see some data incoming
bool keepalive_incoming_data_wanted = false;
+ /// True if we count stream allocation (instead of HTTP2 concurrency) for
+ /// MAX_CONCURRENT_STREAMS
+ bool max_concurrent_streams_overload_protection = false;
// What percentage of rst_stream frames on the server should cause a ping
// frame to be generated.
uint8_t ping_on_rst_stream_percent;
-
- /// write execution state of the transport
- grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
};
typedef enum {
@@ -650,6 +640,12 @@ struct grpc_chttp2_stream {
/// Byte counter for number of bytes written
size_t byte_counter = 0;
+ /// Only set when enabled.
+ grpc_core::CallTracerInterface* call_tracer = nullptr;
+
+ /// Only set when enabled.
+ std::shared_ptr<grpc_core::TcpTracerInterface> tcp_tracer;
+
// time this stream was created
gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC);
@@ -670,6 +666,12 @@ struct grpc_chttp2_stream {
#define GRPC_ARG_PING_TIMEOUT_MS "grpc.http2.ping_timeout_ms"
+// EXPERIMENTAL: provide protection against overloading a server with too many
+// requests: wait for streams to be deallocated before they stop counting
+// against MAX_CONCURRENT_STREAMS
+#define GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION \
+ "grpc.http.overload_protection"
+
/// Transport writing call flow:
/// grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
/// go out on the wire.