diff options
Diffstat (limited to 'src/core/ext/transport/chttp2/transport/internal.h')
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 124 |
1 files changed, 63 insertions, 61 deletions
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index d8799e4cb1..d41d6356f2 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -29,7 +29,6 @@ #include <utility> #include "absl/container/flat_hash_map.h" -#include "absl/meta/type_traits.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" @@ -58,8 +57,10 @@ #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h" #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h" #include "src/core/ext/transport/chttp2/transport/write_size_policy.h" +#include "src/core/lib/channel/call_tracer.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channelz.h" +#include "src/core/lib/channel/tcp_tracer.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/bitset.h" #include "src/core/lib/gprpp/debug_location.h" @@ -70,6 +71,7 @@ #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/resource_quota/arena.h" #include "src/core/lib/resource_quota/memory_quota.h" #include "src/core/lib/slice/slice.h" @@ -78,8 +80,6 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata_batch.h" #include "src/core/lib/transport/transport.h" -#include "src/core/lib/transport/transport_fwd.h" -#include "src/core/lib/transport/transport_impl.h" // Flag that this closure barrier may be covering a write in a pollset, and so // we should not complete this closure until we can prove that the write got @@ -203,18 +203,6 @@ struct grpc_chttp2_stream_link { grpc_chttp2_stream* next; grpc_chttp2_stream* prev; }; -// We keep several sets of connection wide parameters -typedef enum { - // The settings our peer has asked for (and we have acked) - GRPC_PEER_SETTINGS = 0, - // The settings we'd like to have - GRPC_LOCAL_SETTINGS, - // The settings we've published to our peer - GRPC_SENT_SETTINGS, - // The settings the peer has acked - GRPC_ACKED_SETTINGS, - GRPC_NUM_SETTING_SETS -} grpc_chttp2_setting_set; typedef enum { GRPC_CHTTP2_NO_GOAWAY_SEND, @@ -236,26 +224,40 @@ typedef enum { GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED, } grpc_chttp2_keepalive_state; -struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { +struct grpc_chttp2_transport final + : public grpc_core::Transport, + public grpc_core::FilterStackTransport, + public grpc_core::RefCounted<grpc_chttp2_transport, + grpc_core::NonPolymorphicRefCount>, + public grpc_core::KeepsGrpcInitialized { grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep, bool is_client); - ~grpc_chttp2_transport(); - - // Make this be able to be contained in RefCountedPtr<> - // Can't yet make this derive from RefCounted because we need to keep - // `grpc_transport base` first. - // TODO(ctiller): Make a transport interface. - void IncrementRefCount() { refs.Ref(); } - void Unref() { - if (refs.Unref()) delete this; - } - grpc_core::RefCountedPtr<grpc_chttp2_transport> Ref() { - IncrementRefCount(); - return grpc_core::RefCountedPtr<grpc_chttp2_transport>(this); + ~grpc_chttp2_transport() override; + + void Orphan() override; + + size_t SizeOfStream() const override; + bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override; + void PerformStreamOp(grpc_stream* gs, + grpc_transport_stream_op_batch* op) override; + void DestroyStream(grpc_stream* gs, + grpc_closure* then_schedule_closure) override; + + grpc_core::FilterStackTransport* filter_stack_transport() override { + return this; } + grpc_core::ClientTransport* client_transport() override { return nullptr; } + grpc_core::ServerTransport* server_transport() override { return nullptr; } + + absl::string_view GetTransportName() const override; + void InitStream(grpc_stream* gs, grpc_stream_refcount* refcount, + const void* server_data, grpc_core::Arena* arena) override; + void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override; + void SetPollsetSet(grpc_stream* stream, + grpc_pollset_set* pollset_set) override; + void PerformOp(grpc_transport_op* op) override; + grpc_endpoint* GetEndpoint() override; - grpc_transport base; // must be first - grpc_core::RefCount refs; grpc_endpoint* ep; grpc_core::Slice peer_string; @@ -305,7 +307,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { }; grpc_closure write_action_begin_locked; - grpc_closure write_action; grpc_closure write_action_end_locked; grpc_closure read_action_locked; @@ -319,7 +320,7 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_chttp2_stream** accepting_stream = nullptr; // accept stream callback - void (*accept_stream_cb)(void* user_data, grpc_transport* transport, + void (*accept_stream_cb)(void* user_data, grpc_core::Transport* transport, const void* server_data); // registered_method_matcher_cb is called before invoking the recv initial // metadata callback. @@ -346,12 +347,8 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND; - /// bitmask of setting indexes to send out - /// Hack: it's common for implementations to assume 65536 bytes initial send - /// window -- this should by rights be 0 - uint32_t force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE; /// settings values - uint32_t settings[GRPC_NUM_SETTING_SETS][GRPC_CHTTP2_NUM_SETTINGS]; + grpc_core::Http2SettingsManager settings; grpc_event_engine::experimental::EventEngine::TaskHandle settings_ack_watchdog = @@ -371,8 +368,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_core::Chttp2PingAbusePolicy ping_abuse_policy; grpc_core::Chttp2PingRatePolicy ping_rate_policy; grpc_core::Chttp2PingCallbacks ping_callbacks; - absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> - delayed_ping_timer_handle; + grpc_event_engine::experimental::EventEngine::TaskHandle + delayed_ping_timer_handle = + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; grpc_closure retry_initiate_ping_locked; grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy; @@ -444,21 +442,20 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { grpc_closure destructive_reclaimer_locked; // next bdp ping timer handle - absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> - next_bdp_ping_timer_handle; + grpc_event_engine::experimental::EventEngine::TaskHandle + next_bdp_ping_timer_handle = + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; // keep-alive ping support /// Closure to initialize a keepalive ping grpc_closure init_keepalive_ping_locked; - /// Closure to run when the keepalive ping is sent - grpc_closure start_keepalive_ping_locked; /// Closure to run when the keepalive ping ack is received grpc_closure finish_keepalive_ping_locked; - /// Closure to run when the keepalive ping timeouts - grpc_closure keepalive_watchdog_fired_locked; /// timer to initiate ping events - absl::optional<grpc_event_engine::experimental::EventEngine::TaskHandle> - keepalive_ping_timer_handle; + grpc_event_engine::experimental::EventEngine::TaskHandle + keepalive_ping_timer_handle = + grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid; + ; /// time duration in between pings grpc_core::Duration keepalive_time; /// grace period to wait for data after sending a ping before keepalives @@ -492,6 +489,9 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow; + /// write execution state of the transport + grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; + /// policy for how much data we're willing to put into one http2 write grpc_core::Chttp2WriteSizePolicy write_size_policy; @@ -511,8 +511,6 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { /// if keepalive pings are allowed when there's no outstanding streams bool keepalive_permit_without_calls = false; - /// If start_keepalive_ping_locked has been called - bool keepalive_ping_started = false; // bdp estimator bool bdp_ping_blocked = @@ -521,30 +519,22 @@ struct grpc_chttp2_transport : public grpc_core::KeepsGrpcInitialized { /// is the transport destroying itself? uint8_t destroying = false; - /// is there a read request to the endpoint outstanding? - uint8_t endpoint_reading = 1; - /// is this a client? bool is_client; - /// are the local settings dirty and need to be sent? - bool dirtied_local_settings = true; - /// have local settings been sent? - bool sent_local_settings = false; - /// If start_bdp_ping_locked has been called bool bdp_ping_started = false; // True if pings should be acked bool ack_pings = true; /// True if the keepalive system wants to see some data incoming bool keepalive_incoming_data_wanted = false; + /// True if we count stream allocation (instead of HTTP2 concurrency) for + /// MAX_CONCURRENT_STREAMS + bool max_concurrent_streams_overload_protection = false; // What percentage of rst_stream frames on the server should cause a ping // frame to be generated. uint8_t ping_on_rst_stream_percent; - - /// write execution state of the transport - grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE; }; typedef enum { @@ -650,6 +640,12 @@ struct grpc_chttp2_stream { /// Byte counter for number of bytes written size_t byte_counter = 0; + /// Only set when enabled. + grpc_core::CallTracerInterface* call_tracer = nullptr; + + /// Only set when enabled. + std::shared_ptr<grpc_core::TcpTracerInterface> tcp_tracer; + // time this stream was created gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC); @@ -670,6 +666,12 @@ struct grpc_chttp2_stream { #define GRPC_ARG_PING_TIMEOUT_MS "grpc.http2.ping_timeout_ms" +// EXPERIMENTAL: provide protection against overloading a server with too many +// requests: wait for streams to be deallocated before they stop counting +// against MAX_CONCURRENT_STREAMS +#define GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION \ + "grpc.http.overload_protection" + /// Transport writing call flow: /// grpc_chttp2_initiate_write() is called anywhere that we know bytes need to /// go out on the wire. |