diff options
Diffstat (limited to 'src/core/ext/xds/xds_cluster.cc')
-rw-r--r-- | src/core/ext/xds/xds_cluster.cc | 115 |
1 files changed, 104 insertions, 11 deletions
diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc index f4cddf8f95..5c2cca5933 100644 --- a/src/core/ext/xds/xds_cluster.cc +++ b/src/core/ext/xds/xds_cluster.cc @@ -38,13 +38,17 @@ #include "envoy/config/core/v3/address.upb.h" #include "envoy/config/core/v3/base.upb.h" #include "envoy/config/core/v3/config_source.upb.h" +#include "envoy/config/core/v3/extension.upb.h" #include "envoy/config/core/v3/health_check.upb.h" +#include "envoy/config/core/v3/protocol.upb.h" #include "envoy/config/endpoint/v3/endpoint.upb.h" #include "envoy/config/endpoint/v3/endpoint_components.upb.h" #include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h" #include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h" +#include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h" #include "google/protobuf/any.upb.h" #include "google/protobuf/duration.upb.h" +#include "google/protobuf/struct.upb.h" #include "google/protobuf/wrappers.upb.h" #include "upb/base/string_view.h" #include "upb/text/encode.h" @@ -105,17 +109,14 @@ std::string XdsClusterResource::ToString() const { contents.push_back( absl::StrCat("common_tls_context=", common_tls_context.ToString())); } + if (connection_idle_timeout != Duration::Zero()) { + contents.push_back(absl::StrCat("connection_idle_timeout=", + connection_idle_timeout.ToString())); + } contents.push_back( absl::StrCat("max_concurrent_requests=", max_concurrent_requests)); - if (!override_host_statuses.empty()) { - std::vector<const char*> statuses; - statuses.reserve(override_host_statuses.size()); - for (const auto& status : override_host_statuses) { - statuses.push_back(status.ToString()); - } - contents.push_back(absl::StrCat("override_host_statuses={", - absl::StrJoin(statuses, ", "), "}")); - } + contents.push_back(absl::StrCat("override_host_statuses=", + override_host_statuses.ToString())); return absl::StrCat("{", absl::StrJoin(contents, ", "), "}"); } @@ -407,6 +408,50 @@ void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context, } } +void ParseUpstreamConfig( + const XdsResourceType::DecodeContext& context, + const envoy_config_core_v3_TypedExtensionConfig* upstream_config, + XdsClusterResource* cds_update, ValidationErrors* errors) { + ValidationErrors::ScopedField field(errors, ".typed_config"); + const auto* typed_config = + envoy_config_core_v3_TypedExtensionConfig_typed_config(upstream_config); + auto extension = ExtractXdsExtension(context, typed_config, errors); + if (!extension.has_value()) return; + if (extension->type != + "envoy.extensions.upstreams.http.v3.HttpProtocolOptions") { + ValidationErrors::ScopedField field(errors, ".type_url"); + errors->AddError("unsupported upstream config type"); + return; + } + absl::string_view* serialized_http_protocol_options = + absl::get_if<absl::string_view>(&extension->value); + if (serialized_http_protocol_options == nullptr) { + errors->AddError("can't decode HttpProtocolOptions"); + return; + } + const auto* http_protocol_options = + envoy_extensions_upstreams_http_v3_HttpProtocolOptions_parse( + serialized_http_protocol_options->data(), + serialized_http_protocol_options->size(), context.arena); + if (http_protocol_options == nullptr) { + errors->AddError("can't decode HttpProtocolOptions"); + return; + } + ValidationErrors::ScopedField field2(errors, ".common_http_protocol_options"); + const auto* common_http_protocol_options = + envoy_extensions_upstreams_http_v3_HttpProtocolOptions_common_http_protocol_options( + http_protocol_options); + if (common_http_protocol_options != nullptr) { + const auto* idle_timeout = + envoy_config_core_v3_HttpProtocolOptions_idle_timeout( + common_http_protocol_options); + if (idle_timeout != nullptr) { + ValidationErrors::ScopedField field(errors, ".idle_timeout"); + cds_update->connection_idle_timeout = ParseDuration(idle_timeout, errors); + } + } +} + absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse( const XdsResourceType::DecodeContext& context, const envoy_config_cluster_v3_Cluster* cluster) { @@ -474,6 +519,13 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse( cds_update->lrs_load_reporting_server.emplace( static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(context.server)); } + // Protocol options. + auto* upstream_config = + envoy_config_cluster_v3_Cluster_upstream_config(cluster); + if (upstream_config != nullptr) { + ValidationErrors::ScopedField field(&errors, ".upstream_config"); + ParseUpstreamConfig(context, upstream_config, cds_update.get(), &errors); + } // The Cluster resource encodes the circuit breaking parameters in a list of // Thresholds messages, where each message specifies the parameters for a // particular RoutingPriority. we will look only at the first entry in the @@ -625,6 +677,7 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse( // Validate override host status. const auto* common_lb_config = envoy_config_cluster_v3_Cluster_common_lb_config(cluster); + bool override_host_status_found = false; if (common_lb_config != nullptr) { ValidationErrors::ScopedField field(&errors, ".common_lb_config"); const auto* override_host_status = @@ -638,9 +691,48 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse( for (size_t i = 0; i < size; ++i) { auto status = XdsHealthStatus::FromUpb(statuses[i]); if (status.has_value()) { - cds_update->override_host_statuses.insert(*status); + cds_update->override_host_statuses.Add(*status); + } + } + override_host_status_found = true; + } + } + // If the field is not set, we default to [UNKNOWN, HEALTHY]. + if (!override_host_status_found) { + cds_update->override_host_statuses.Add( + XdsHealthStatus(XdsHealthStatus::kUnknown)); + cds_update->override_host_statuses.Add( + XdsHealthStatus(XdsHealthStatus::kHealthy)); + } + // Record telemetry labels (if any). + const envoy_config_core_v3_Metadata* metadata = + envoy_config_cluster_v3_Cluster_metadata(cluster); + if (metadata != nullptr) { + google_protobuf_Struct* telemetry_labels_struct; + if (envoy_config_core_v3_Metadata_filter_metadata_get( + metadata, + StdStringToUpbString( + absl::string_view("com.google.csm.telemetry_labels")), + &telemetry_labels_struct)) { + auto telemetry_labels = + std::make_shared<std::map<std::string, std::string>>(); + size_t iter = kUpb_Map_Begin; + const google_protobuf_Struct_FieldsEntry* fields_entry; + while ((fields_entry = google_protobuf_Struct_fields_next( + telemetry_labels_struct, &iter)) != nullptr) { + // Adds any entry whose value is a string to telemetry_labels. + const google_protobuf_Value* value = + google_protobuf_Struct_FieldsEntry_value(fields_entry); + if (google_protobuf_Value_has_string_value(value)) { + telemetry_labels->emplace( + UpbStringToStdString( + google_protobuf_Struct_FieldsEntry_key(fields_entry)), + UpbStringToStdString(google_protobuf_Value_string_value(value))); } } + if (!telemetry_labels->empty()) { + cds_update->telemetry_labels = std::move(telemetry_labels); + } } } // Return result. @@ -658,7 +750,8 @@ void MaybeLogCluster(const XdsResourceType::DecodeContext& context, const upb_MessageDef* msg_type = envoy_config_cluster_v3_Cluster_getmsgdef(context.symtab); char buf[10240]; - upb_TextEncode(cluster, msg_type, nullptr, 0, buf, sizeof(buf)); + upb_TextEncode(reinterpret_cast<const upb_Message*>(cluster), msg_type, + nullptr, 0, buf, sizeof(buf)); gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", context.client, buf); } } |