aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/xds/xds_cluster.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/xds/xds_cluster.cc')
-rw-r--r--src/core/ext/xds/xds_cluster.cc115
1 files changed, 104 insertions, 11 deletions
diff --git a/src/core/ext/xds/xds_cluster.cc b/src/core/ext/xds/xds_cluster.cc
index f4cddf8f95..5c2cca5933 100644
--- a/src/core/ext/xds/xds_cluster.cc
+++ b/src/core/ext/xds/xds_cluster.cc
@@ -38,13 +38,17 @@
#include "envoy/config/core/v3/address.upb.h"
#include "envoy/config/core/v3/base.upb.h"
#include "envoy/config/core/v3/config_source.upb.h"
+#include "envoy/config/core/v3/extension.upb.h"
#include "envoy/config/core/v3/health_check.upb.h"
+#include "envoy/config/core/v3/protocol.upb.h"
#include "envoy/config/endpoint/v3/endpoint.upb.h"
#include "envoy/config/endpoint/v3/endpoint_components.upb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.upb.h"
#include "envoy/extensions/transport_sockets/tls/v3/tls.upb.h"
+#include "envoy/extensions/upstreams/http/v3/http_protocol_options.upb.h"
#include "google/protobuf/any.upb.h"
#include "google/protobuf/duration.upb.h"
+#include "google/protobuf/struct.upb.h"
#include "google/protobuf/wrappers.upb.h"
#include "upb/base/string_view.h"
#include "upb/text/encode.h"
@@ -105,17 +109,14 @@ std::string XdsClusterResource::ToString() const {
contents.push_back(
absl::StrCat("common_tls_context=", common_tls_context.ToString()));
}
+ if (connection_idle_timeout != Duration::Zero()) {
+ contents.push_back(absl::StrCat("connection_idle_timeout=",
+ connection_idle_timeout.ToString()));
+ }
contents.push_back(
absl::StrCat("max_concurrent_requests=", max_concurrent_requests));
- if (!override_host_statuses.empty()) {
- std::vector<const char*> statuses;
- statuses.reserve(override_host_statuses.size());
- for (const auto& status : override_host_statuses) {
- statuses.push_back(status.ToString());
- }
- contents.push_back(absl::StrCat("override_host_statuses={",
- absl::StrJoin(statuses, ", "), "}"));
- }
+ contents.push_back(absl::StrCat("override_host_statuses=",
+ override_host_statuses.ToString()));
return absl::StrCat("{", absl::StrJoin(contents, ", "), "}");
}
@@ -407,6 +408,50 @@ void ParseLbPolicyConfig(const XdsResourceType::DecodeContext& context,
}
}
+void ParseUpstreamConfig(
+ const XdsResourceType::DecodeContext& context,
+ const envoy_config_core_v3_TypedExtensionConfig* upstream_config,
+ XdsClusterResource* cds_update, ValidationErrors* errors) {
+ ValidationErrors::ScopedField field(errors, ".typed_config");
+ const auto* typed_config =
+ envoy_config_core_v3_TypedExtensionConfig_typed_config(upstream_config);
+ auto extension = ExtractXdsExtension(context, typed_config, errors);
+ if (!extension.has_value()) return;
+ if (extension->type !=
+ "envoy.extensions.upstreams.http.v3.HttpProtocolOptions") {
+ ValidationErrors::ScopedField field(errors, ".type_url");
+ errors->AddError("unsupported upstream config type");
+ return;
+ }
+ absl::string_view* serialized_http_protocol_options =
+ absl::get_if<absl::string_view>(&extension->value);
+ if (serialized_http_protocol_options == nullptr) {
+ errors->AddError("can't decode HttpProtocolOptions");
+ return;
+ }
+ const auto* http_protocol_options =
+ envoy_extensions_upstreams_http_v3_HttpProtocolOptions_parse(
+ serialized_http_protocol_options->data(),
+ serialized_http_protocol_options->size(), context.arena);
+ if (http_protocol_options == nullptr) {
+ errors->AddError("can't decode HttpProtocolOptions");
+ return;
+ }
+ ValidationErrors::ScopedField field2(errors, ".common_http_protocol_options");
+ const auto* common_http_protocol_options =
+ envoy_extensions_upstreams_http_v3_HttpProtocolOptions_common_http_protocol_options(
+ http_protocol_options);
+ if (common_http_protocol_options != nullptr) {
+ const auto* idle_timeout =
+ envoy_config_core_v3_HttpProtocolOptions_idle_timeout(
+ common_http_protocol_options);
+ if (idle_timeout != nullptr) {
+ ValidationErrors::ScopedField field(errors, ".idle_timeout");
+ cds_update->connection_idle_timeout = ParseDuration(idle_timeout, errors);
+ }
+ }
+}
+
absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
const XdsResourceType::DecodeContext& context,
const envoy_config_cluster_v3_Cluster* cluster) {
@@ -474,6 +519,13 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
cds_update->lrs_load_reporting_server.emplace(
static_cast<const GrpcXdsBootstrap::GrpcXdsServer&>(context.server));
}
+ // Protocol options.
+ auto* upstream_config =
+ envoy_config_cluster_v3_Cluster_upstream_config(cluster);
+ if (upstream_config != nullptr) {
+ ValidationErrors::ScopedField field(&errors, ".upstream_config");
+ ParseUpstreamConfig(context, upstream_config, cds_update.get(), &errors);
+ }
// The Cluster resource encodes the circuit breaking parameters in a list of
// Thresholds messages, where each message specifies the parameters for a
// particular RoutingPriority. we will look only at the first entry in the
@@ -625,6 +677,7 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
// Validate override host status.
const auto* common_lb_config =
envoy_config_cluster_v3_Cluster_common_lb_config(cluster);
+ bool override_host_status_found = false;
if (common_lb_config != nullptr) {
ValidationErrors::ScopedField field(&errors, ".common_lb_config");
const auto* override_host_status =
@@ -638,9 +691,48 @@ absl::StatusOr<std::shared_ptr<const XdsClusterResource>> CdsResourceParse(
for (size_t i = 0; i < size; ++i) {
auto status = XdsHealthStatus::FromUpb(statuses[i]);
if (status.has_value()) {
- cds_update->override_host_statuses.insert(*status);
+ cds_update->override_host_statuses.Add(*status);
+ }
+ }
+ override_host_status_found = true;
+ }
+ }
+ // If the field is not set, we default to [UNKNOWN, HEALTHY].
+ if (!override_host_status_found) {
+ cds_update->override_host_statuses.Add(
+ XdsHealthStatus(XdsHealthStatus::kUnknown));
+ cds_update->override_host_statuses.Add(
+ XdsHealthStatus(XdsHealthStatus::kHealthy));
+ }
+ // Record telemetry labels (if any).
+ const envoy_config_core_v3_Metadata* metadata =
+ envoy_config_cluster_v3_Cluster_metadata(cluster);
+ if (metadata != nullptr) {
+ google_protobuf_Struct* telemetry_labels_struct;
+ if (envoy_config_core_v3_Metadata_filter_metadata_get(
+ metadata,
+ StdStringToUpbString(
+ absl::string_view("com.google.csm.telemetry_labels")),
+ &telemetry_labels_struct)) {
+ auto telemetry_labels =
+ std::make_shared<std::map<std::string, std::string>>();
+ size_t iter = kUpb_Map_Begin;
+ const google_protobuf_Struct_FieldsEntry* fields_entry;
+ while ((fields_entry = google_protobuf_Struct_fields_next(
+ telemetry_labels_struct, &iter)) != nullptr) {
+ // Adds any entry whose value is a string to telemetry_labels.
+ const google_protobuf_Value* value =
+ google_protobuf_Struct_FieldsEntry_value(fields_entry);
+ if (google_protobuf_Value_has_string_value(value)) {
+ telemetry_labels->emplace(
+ UpbStringToStdString(
+ google_protobuf_Struct_FieldsEntry_key(fields_entry)),
+ UpbStringToStdString(google_protobuf_Value_string_value(value)));
}
}
+ if (!telemetry_labels->empty()) {
+ cds_update->telemetry_labels = std::move(telemetry_labels);
+ }
}
}
// Return result.
@@ -658,7 +750,8 @@ void MaybeLogCluster(const XdsResourceType::DecodeContext& context,
const upb_MessageDef* msg_type =
envoy_config_cluster_v3_Cluster_getmsgdef(context.symtab);
char buf[10240];
- upb_TextEncode(cluster, msg_type, nullptr, 0, buf, sizeof(buf));
+ upb_TextEncode(reinterpret_cast<const upb_Message*>(cluster), msg_type,
+ nullptr, 0, buf, sizeof(buf));
gpr_log(GPR_DEBUG, "[xds_client %p] Cluster: %s", context.client, buf);
}
}