aboutsummaryrefslogtreecommitdiff
path: root/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc')
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc373
1 files changed, 243 insertions, 130 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
index 158428996f..c495d83cd5 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
@@ -37,9 +37,11 @@
#include <grpc/impl/connectivity_state.h>
#include <grpc/support/log.h>
+#include "src/core/ext/filters/client_channel/client_channel_internal.h"
#include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h"
#include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
#include "src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h"
+#include "src/core/ext/filters/client_channel/resolver/xds/xds_dependency_manager.h"
#include "src/core/ext/xds/xds_bootstrap.h"
#include "src/core/ext/xds/xds_bootstrap_grpc.h"
#include "src/core/ext/xds/xds_client.h"
@@ -56,6 +58,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@@ -64,7 +67,8 @@
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
+#include "src/core/lib/security/credentials/xds/xds_credentials.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -73,6 +77,10 @@ TraceFlag grpc_xds_cluster_impl_lb_trace(false, "xds_cluster_impl_lb");
namespace {
+using OptionalLabelComponent =
+ ClientCallTracer::CallAttemptTracer::OptionalLabelComponent;
+using XdsConfig = XdsDependencyManager::XdsConfig;
+
//
// global circuit breaker atomic map
//
@@ -155,37 +163,24 @@ class XdsClusterImplLbConfig : public LoadBalancingPolicy::Config {
absl::string_view name() const override { return kXdsClusterImpl; }
+ const std::string& cluster_name() const { return cluster_name_; }
RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
return child_policy_;
}
- const std::string& cluster_name() const { return cluster_name_; }
- const std::string& eds_service_name() const { return eds_service_name_; }
- const absl::optional<GrpcXdsBootstrap::GrpcXdsServer>&
- lrs_load_reporting_server() const {
- return lrs_load_reporting_server_;
- };
- uint32_t max_concurrent_requests() const { return max_concurrent_requests_; }
- RefCountedPtr<XdsEndpointResource::DropConfig> drop_config() const {
- return drop_config_;
- }
static const JsonLoaderInterface* JsonLoader(const JsonArgs&);
void JsonPostLoad(const Json& json, const JsonArgs& args,
ValidationErrors* errors);
private:
- RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
std::string cluster_name_;
- std::string eds_service_name_;
- absl::optional<GrpcXdsBootstrap::GrpcXdsServer> lrs_load_reporting_server_;
- uint32_t max_concurrent_requests_;
- RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
+ RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
};
// xDS Cluster Impl LB policy.
class XdsClusterImplLb : public LoadBalancingPolicy {
public:
- XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client, Args args);
+ XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client, Args args);
absl::string_view name() const override { return kXdsClusterImpl; }
@@ -223,6 +218,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
uint32_t max_concurrent_requests_;
+ std::shared_ptr<std::map<std::string, std::string>> service_labels_;
RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
RefCountedPtr<XdsClusterDropStats> drop_stats_;
RefCountedPtr<SubchannelPicker> picker_;
@@ -236,7 +232,8 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
std::move(xds_cluster_impl_policy)) {}
RefCountedPtr<SubchannelInterface> CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr<SubchannelPicker> picker) override;
};
@@ -245,16 +242,25 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
void ShutdownLocked() override;
+ void ResetState();
+ void ReportTransientFailure(absl::Status status);
+
OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
const ChannelArgs& args);
absl::Status UpdateChildPolicyLocked(
- absl::StatusOr<ServerAddressList> addresses, std::string resolution_note,
- const ChannelArgs& args);
+ absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
+ std::string resolution_note, const ChannelArgs& args);
+
+ absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
+ MaybeCreateCertificateProviderLocked(
+ const XdsClusterResource& cluster_resource) const;
void MaybeUpdatePickerLocked();
// Current config from the resolver.
RefCountedPtr<XdsClusterImplLbConfig> config_;
+ std::shared_ptr<const XdsClusterResource> cluster_resource_;
+ RefCountedPtr<XdsEndpointResource::DropConfig> drop_config_;
// Current concurrent number of requests.
RefCountedPtr<CircuitBreakerCallCounterMap::CallCounter> call_counter_;
@@ -263,7 +269,7 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
bool shutting_down_ = false;
// The xds client.
- RefCountedPtr<XdsClient> xds_client_;
+ RefCountedPtr<GrpcXdsClient> xds_client_;
// The stats for client-side load reporting.
RefCountedPtr<XdsClusterDropStats> drop_stats_;
@@ -355,8 +361,9 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
RefCountedPtr<SubchannelPicker> picker)
: call_counter_(xds_cluster_impl_lb->call_counter_),
max_concurrent_requests_(
- xds_cluster_impl_lb->config_->max_concurrent_requests()),
- drop_config_(xds_cluster_impl_lb->config_->drop_config()),
+ xds_cluster_impl_lb->cluster_resource_->max_concurrent_requests),
+ service_labels_(xds_cluster_impl_lb->cluster_resource_->telemetry_labels),
+ drop_config_(xds_cluster_impl_lb->drop_config_),
drop_stats_(xds_cluster_impl_lb->drop_stats_),
picker_(std::move(picker)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
@@ -367,9 +374,14 @@ XdsClusterImplLb::Picker::Picker(XdsClusterImplLb* xds_cluster_impl_lb,
LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
LoadBalancingPolicy::PickArgs args) {
+ auto* call_state = static_cast<ClientChannelLbCallState*>(args.call_state);
+ if (call_state->GetCallAttemptTracer() != nullptr) {
+ call_state->GetCallAttemptTracer()->AddOptionalLabels(
+ OptionalLabelComponent::kXdsServiceLabels, service_labels_);
+ }
// Handle EDS drops.
const std::string* drop_category;
- if (drop_config_->ShouldDrop(&drop_category)) {
+ if (drop_config_ != nullptr && drop_config_->ShouldDrop(&drop_category)) {
if (drop_stats_ != nullptr) drop_stats_->AddCallDropped(*drop_category);
return PickResult::Drop(absl::UnavailableError(
absl::StrCat("EDS-configured drop: ", *drop_category)));
@@ -421,7 +433,7 @@ LoadBalancingPolicy::PickResult XdsClusterImplLb::Picker::Pick(
// XdsClusterImplLb
//
-XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<XdsClient> xds_client,
+XdsClusterImplLb::XdsClusterImplLb(RefCountedPtr<GrpcXdsClient> xds_client,
Args args)
: LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
@@ -443,6 +455,11 @@ void XdsClusterImplLb::ShutdownLocked() {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] shutting down", this);
}
shutting_down_ = true;
+ ResetState();
+ xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
+}
+
+void XdsClusterImplLb::ResetState() {
// Remove the child policy's interested_parties pollset_set from the
// xDS policy.
if (child_policy_ != nullptr) {
@@ -454,7 +471,18 @@ void XdsClusterImplLb::ShutdownLocked() {
// the child.
picker_.reset();
drop_stats_.reset();
- xds_client_.reset(DEBUG_LOCATION, "XdsClusterImpl");
+}
+
+void XdsClusterImplLb::ReportTransientFailure(absl::Status status) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
+ gpr_log(GPR_INFO,
+ "[xds_cluster_impl_lb %p] reporting TRANSIENT_FAILURE: %s", this,
+ status.ToString().c_str());
+ }
+ ResetState();
+ channel_control_helper()->UpdateState(
+ GRPC_CHANNEL_TRANSIENT_FAILURE, status,
+ MakeRefCounted<TransientFailurePicker>(status));
}
void XdsClusterImplLb::ExitIdleLocked() {
@@ -467,56 +495,174 @@ void XdsClusterImplLb::ResetBackoffLocked() {
if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked();
}
+std::string GetEdsResourceName(const XdsClusterResource& cluster_resource) {
+ auto* eds = absl::get_if<XdsClusterResource::Eds>(&cluster_resource.type);
+ if (eds == nullptr) return "";
+ return eds->eds_service_name;
+}
+
absl::Status XdsClusterImplLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO, "[xds_cluster_impl_lb %p] Received update", this);
}
- // Update config.
- const bool is_initial_update = config_ == nullptr;
- auto old_config = std::move(config_);
- config_ = std::move(args.config);
- // On initial update, create drop stats.
- if (is_initial_update) {
- if (config_->lrs_load_reporting_server().has_value()) {
- drop_stats_ = xds_client_->AddClusterDropStats(
- config_->lrs_load_reporting_server().value(), config_->cluster_name(),
- config_->eds_service_name());
- if (drop_stats_ == nullptr) {
- gpr_log(GPR_ERROR,
- "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
- "LRS server %s, cluster %s, EDS service name %s, load "
- "reporting for drops will not be done.",
- this,
- config_->lrs_load_reporting_server()->server_uri().c_str(),
- config_->cluster_name().c_str(),
- config_->eds_service_name().c_str());
- }
+ // Grab new LB policy config.
+ auto new_config = args.config.TakeAsSubclass<XdsClusterImplLbConfig>();
+ // Cluster name should never change, because the cds policy will assign a
+ // different priority child name if that happens, which means that this
+ // policy instance will get replaced instead of being updated.
+ if (config_ != nullptr) {
+ GPR_ASSERT(config_->cluster_name() == new_config->cluster_name());
+ }
+ // Get xDS config.
+ auto new_xds_config = args.args.GetObjectRef<XdsConfig>();
+ if (new_xds_config == nullptr) {
+ // Should never happen.
+ absl::Status status = absl::InternalError(
+ "xDS config not passed to xds_cluster_impl LB policy");
+ ReportTransientFailure(status);
+ return status;
+ }
+ auto it = new_xds_config->clusters.find(new_config->cluster_name());
+ if (it == new_xds_config->clusters.end() || !it->second.ok() ||
+ it->second->cluster == nullptr) {
+ // Should never happen.
+ absl::Status status = absl::InternalError(absl::StrCat(
+ "xDS config has no entry for cluster ", new_config->cluster_name()));
+ ReportTransientFailure(status);
+ return status;
+ }
+ auto& new_cluster_config = *it->second;
+ auto* endpoint_config =
+ absl::get_if<XdsConfig::ClusterConfig::EndpointConfig>(
+ &new_cluster_config.children);
+ if (endpoint_config == nullptr) {
+ // Should never happen.
+ absl::Status status = absl::InternalError(
+ absl::StrCat("cluster config for ", new_config->cluster_name(),
+ " has no endpoint config"));
+ ReportTransientFailure(status);
+ return status;
+ }
+ auto xds_cert_provider =
+ MaybeCreateCertificateProviderLocked(*new_cluster_config.cluster);
+ if (!xds_cert_provider.ok()) {
+ // Should never happen.
+ ReportTransientFailure(xds_cert_provider.status());
+ return xds_cert_provider.status();
+ }
+ if (*xds_cert_provider != nullptr) {
+ args.args = args.args.SetObject(std::move(*xds_cert_provider));
+ }
+ // Now we've verified the new config is good.
+ // Get new and old (if any) EDS service name.
+ std::string new_eds_service_name =
+ GetEdsResourceName(*new_cluster_config.cluster);
+ std::string old_eds_service_name =
+ cluster_resource_ == nullptr ? ""
+ : GetEdsResourceName(*cluster_resource_);
+ // Update drop stats if needed.
+ // Note: We need a drop stats object whenever load reporting is enabled,
+ // even if we have no EDS drop config, because we also use it when
+ // reporting circuit breaker drops.
+ if (!new_cluster_config.cluster->lrs_load_reporting_server.has_value()) {
+ drop_stats_.reset();
+ } else if (cluster_resource_ == nullptr ||
+ old_eds_service_name != new_eds_service_name ||
+ cluster_resource_->lrs_load_reporting_server !=
+ new_cluster_config.cluster->lrs_load_reporting_server) {
+ drop_stats_ = xds_client_->AddClusterDropStats(
+ *new_cluster_config.cluster->lrs_load_reporting_server,
+ new_config->cluster_name(), new_eds_service_name);
+ if (drop_stats_ == nullptr) {
+ gpr_log(
+ GPR_ERROR,
+ "[xds_cluster_impl_lb %p] Failed to get cluster drop stats for "
+ "LRS server %s, cluster %s, EDS service name %s, load "
+ "reporting for drops will not be done.",
+ this,
+ new_cluster_config.cluster->lrs_load_reporting_server->server_uri()
+ .c_str(),
+ new_config->cluster_name().c_str(), new_eds_service_name.c_str());
}
- call_counter_ = g_call_counter_map->GetOrCreate(
- config_->cluster_name(), config_->eds_service_name());
- } else {
- // Cluster name, EDS service name, and LRS server name should never
- // change, because the xds_cluster_resolver policy above us should be
- // swapped out if that happens.
- GPR_ASSERT(config_->cluster_name() == old_config->cluster_name());
- GPR_ASSERT(config_->eds_service_name() == old_config->eds_service_name());
- GPR_ASSERT(config_->lrs_load_reporting_server() ==
- old_config->lrs_load_reporting_server());
- }
- // Update picker if max_concurrent_requests has changed.
- if (is_initial_update || config_->max_concurrent_requests() !=
- old_config->max_concurrent_requests()) {
- MaybeUpdatePickerLocked();
}
+ // Update call counter if needed.
+ if (cluster_resource_ == nullptr ||
+ old_eds_service_name != new_eds_service_name) {
+ call_counter_ = g_call_counter_map->GetOrCreate(new_config->cluster_name(),
+ new_eds_service_name);
+ }
+ // Update config state, now that we're done comparing old and new fields.
+ config_ = std::move(new_config);
+ cluster_resource_ = new_cluster_config.cluster;
+ drop_config_ = endpoint_config->endpoints != nullptr
+ ? endpoint_config->endpoints->drop_config
+ : nullptr;
+ // Update picker in case some dependent config field changed.
+ MaybeUpdatePickerLocked();
// Update child policy.
return UpdateChildPolicyLocked(std::move(args.addresses),
std::move(args.resolution_note), args.args);
}
+absl::StatusOr<RefCountedPtr<XdsCertificateProvider>>
+XdsClusterImplLb::MaybeCreateCertificateProviderLocked(
+ const XdsClusterResource& cluster_resource) const {
+ // If the channel is not using XdsCreds, do nothing.
+ auto channel_credentials = channel_control_helper()->GetChannelCredentials();
+ if (channel_credentials == nullptr ||
+ channel_credentials->type() != XdsCredentials::Type()) {
+ return nullptr;
+ }
+ // Configure root cert.
+ absl::string_view root_provider_instance_name =
+ cluster_resource.common_tls_context.certificate_validation_context
+ .ca_certificate_provider_instance.instance_name;
+ absl::string_view root_cert_name =
+ cluster_resource.common_tls_context.certificate_validation_context
+ .ca_certificate_provider_instance.certificate_name;
+ RefCountedPtr<grpc_tls_certificate_provider> root_cert_provider;
+ if (!root_provider_instance_name.empty()) {
+ root_cert_provider =
+ xds_client_->certificate_provider_store()
+ .CreateOrGetCertificateProvider(root_provider_instance_name);
+ if (root_cert_provider == nullptr) {
+ return absl::InternalError(
+ absl::StrCat("Certificate provider instance name: \"",
+ root_provider_instance_name, "\" not recognized."));
+ }
+ }
+ // Configure identity cert.
+ absl::string_view identity_provider_instance_name =
+ cluster_resource.common_tls_context.tls_certificate_provider_instance
+ .instance_name;
+ absl::string_view identity_cert_name =
+ cluster_resource.common_tls_context.tls_certificate_provider_instance
+ .certificate_name;
+ RefCountedPtr<grpc_tls_certificate_provider> identity_cert_provider;
+ if (!identity_provider_instance_name.empty()) {
+ identity_cert_provider =
+ xds_client_->certificate_provider_store()
+ .CreateOrGetCertificateProvider(identity_provider_instance_name);
+ if (identity_cert_provider == nullptr) {
+ return absl::InternalError(
+ absl::StrCat("Certificate provider instance name: \"",
+ identity_provider_instance_name, "\" not recognized."));
+ }
+ }
+ // Configure SAN matchers.
+ const std::vector<StringMatcher>& san_matchers =
+ cluster_resource.common_tls_context.certificate_validation_context
+ .match_subject_alt_names;
+ // Create xds cert provider.
+ return MakeRefCounted<XdsCertificateProvider>(
+ root_cert_provider, root_cert_name, identity_cert_provider,
+ identity_cert_name, san_matchers);
+}
+
void XdsClusterImplLb::MaybeUpdatePickerLocked() {
// If we're dropping all calls, report READY, regardless of what (or
// whether) the child has reported.
- if (config_->drop_config() != nullptr && config_->drop_config()->drop_all()) {
+ if (drop_config_ != nullptr && drop_config_->drop_all()) {
auto drop_picker = MakeRefCounted<Picker>(this, picker_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_impl_lb_trace)) {
gpr_log(GPR_INFO,
@@ -548,8 +694,8 @@ OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
LoadBalancingPolicy::Args lb_policy_args;
lb_policy_args.work_serializer = work_serializer();
lb_policy_args.args = args;
- lb_policy_args.channel_control_helper =
- std::make_unique<Helper>(Ref(DEBUG_LOCATION, "Helper"));
+ lb_policy_args.channel_control_helper = std::make_unique<Helper>(
+ RefAsSubclass<XdsClusterImplLb>(DEBUG_LOCATION, "Helper"));
OrphanablePtr<LoadBalancingPolicy> lb_policy =
MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
&grpc_xds_cluster_impl_lb_trace);
@@ -567,8 +713,8 @@ OrphanablePtr<LoadBalancingPolicy> XdsClusterImplLb::CreateChildPolicyLocked(
}
absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
- absl::StatusOr<ServerAddressList> addresses, std::string resolution_note,
- const ChannelArgs& args) {
+ absl::StatusOr<std::shared_ptr<EndpointAddressesIterator>> addresses,
+ std::string resolution_note, const ChannelArgs& args) {
// Create policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
@@ -594,36 +740,39 @@ absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
//
RefCountedPtr<SubchannelInterface> XdsClusterImplLb::Helper::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the Picker.
- if (parent()->config_->lrs_load_reporting_server().has_value()) {
- auto locality_name = address.args().GetObjectRef<XdsLocalityName>();
+ if (parent()->cluster_resource_->lrs_load_reporting_server.has_value()) {
+ auto locality_name = per_address_args.GetObjectRef<XdsLocalityName>();
RefCountedPtr<XdsClusterLocalityStats> locality_stats =
parent()->xds_client_->AddClusterLocalityStats(
- parent()->config_->lrs_load_reporting_server().value(),
+ parent()->cluster_resource_->lrs_load_reporting_server.value(),
parent()->config_->cluster_name(),
- parent()->config_->eds_service_name(), std::move(locality_name));
+ GetEdsResourceName(*parent()->cluster_resource_),
+ std::move(locality_name));
if (locality_stats != nullptr) {
return MakeRefCounted<StatsSubchannelWrapper>(
parent()->channel_control_helper()->CreateSubchannel(
- std::move(address), args),
+ address, per_address_args, args),
std::move(locality_stats));
}
- gpr_log(
- GPR_ERROR,
- "[xds_cluster_impl_lb %p] Failed to get locality stats object for "
- "LRS server %s, cluster %s, EDS service name %s; load reports will "
- "not be generated (not wrapping subchannel)",
- parent(),
- parent()->config_->lrs_load_reporting_server()->server_uri().c_str(),
- parent()->config_->cluster_name().c_str(),
- parent()->config_->eds_service_name().c_str());
+ gpr_log(GPR_ERROR,
+ "[xds_cluster_impl_lb %p] Failed to get locality stats object for "
+ "LRS server %s, cluster %s, EDS service name %s; load reports will "
+ "not be generated (not wrapping subchannel)",
+ parent(),
+ parent()
+ ->cluster_resource_->lrs_load_reporting_server->server_uri()
+ .c_str(),
+ parent()->config_->cluster_name().c_str(),
+ GetEdsResourceName(*parent()->cluster_resource_).c_str());
}
// Load reporting not enabled, so don't wrap the subchannel.
return parent()->channel_control_helper()->CreateSubchannel(
- std::move(address), args);
+ address, per_address_args, args);
}
void XdsClusterImplLb::Helper::UpdateState(
@@ -649,67 +798,31 @@ void XdsClusterImplLb::Helper::UpdateState(
// factory
//
-struct DropCategory {
- std::string category;
- uint32_t requests_per_million;
-
- static const JsonLoaderInterface* JsonLoader(const JsonArgs&) {
- static const auto* loader =
- JsonObjectLoader<DropCategory>()
- .Field("category", &DropCategory::category)
- .Field("requests_per_million", &DropCategory::requests_per_million)
- .Finish();
- return loader;
- }
-};
-
const JsonLoaderInterface* XdsClusterImplLbConfig::JsonLoader(const JsonArgs&) {
static const auto* loader =
JsonObjectLoader<XdsClusterImplLbConfig>()
// Note: Some fields require custom processing, so they are
// handled in JsonPostLoad() instead.
.Field("clusterName", &XdsClusterImplLbConfig::cluster_name_)
- .OptionalField("edsServiceName",
- &XdsClusterImplLbConfig::eds_service_name_)
- .OptionalField("lrsLoadReportingServer",
- &XdsClusterImplLbConfig::lrs_load_reporting_server_)
- .OptionalField("maxConcurrentRequests",
- &XdsClusterImplLbConfig::max_concurrent_requests_)
.Finish();
return loader;
}
-void XdsClusterImplLbConfig::JsonPostLoad(const Json& json,
- const JsonArgs& args,
+void XdsClusterImplLbConfig::JsonPostLoad(const Json& json, const JsonArgs&,
ValidationErrors* errors) {
// Parse "childPolicy" field.
- {
- ValidationErrors::ScopedField field(errors, ".childPolicy");
- auto it = json.object().find("childPolicy");
- if (it == json.object().end()) {
- errors->AddError("field not present");
+ ValidationErrors::ScopedField field(errors, ".childPolicy");
+ auto it = json.object().find("childPolicy");
+ if (it == json.object().end()) {
+ errors->AddError("field not present");
+ } else {
+ auto lb_config =
+ CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
+ it->second);
+ if (!lb_config.ok()) {
+ errors->AddError(lb_config.status().message());
} else {
- auto lb_config = CoreConfiguration::Get()
- .lb_policy_registry()
- .ParseLoadBalancingConfig(it->second);
- if (!lb_config.ok()) {
- errors->AddError(lb_config.status().message());
- } else {
- child_policy_ = std::move(*lb_config);
- }
- }
- }
- // Parse "dropCategories" field.
- {
- auto value = LoadJsonObjectField<std::vector<DropCategory>>(
- json.object(), args, "dropCategories", errors);
- if (value.has_value()) {
- drop_config_ = MakeRefCounted<XdsEndpointResource::DropConfig>();
- for (size_t i = 0; i < value->size(); ++i) {
- DropCategory& drop_category = (*value)[i];
- drop_config_->AddCategory(std::move(drop_category.category),
- drop_category.requests_per_million);
- }
+ child_policy_ = std::move(*lb_config);
}
}
}