diff options
Diffstat (limited to 'src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc')
-rw-r--r-- | src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc | 164 |
1 files changed, 85 insertions, 79 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc index 0b877298ad..2bcf45a211 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc @@ -32,6 +32,7 @@ #include <vector> #include "absl/base/thread_annotations.h" +#include "absl/meta/type_traits.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -73,6 +74,7 @@ #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy_factory.h" #include "src/core/lib/load_balancing/subchannel_interface.h" +#include "src/core/lib/resolver/endpoint_addresses.h" #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" @@ -245,14 +247,13 @@ class OldWeightedRoundRobin : public LoadBalancingPolicy { WeightedRoundRobinSubchannelData> { public: WeightedRoundRobinSubchannelList(OldWeightedRoundRobin* policy, - ServerAddressList addresses, + EndpointAddressesIterator* addresses, const ChannelArgs& args) : SubchannelList(policy, (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) ? "WeightedRoundRobinSubchannelList" : nullptr), - std::move(addresses), policy->channel_control_helper(), - args) { + addresses, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' // pollset_sets will include the LB policy's pollset_set. @@ -609,10 +610,9 @@ void OldWeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { scheduler_ = std::move(scheduler); } // Start timer. - WeakRefCountedPtr<Picker> self = WeakRef(); timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( config_->weight_update_period(), - [self = std::move(self), + [self = WeakRefAsSubclass<Picker>(), work_serializer = wrr_->work_serializer()]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; @@ -672,12 +672,11 @@ void OldWeightedRoundRobin::ResetBackoffLocked() { absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) { global_stats().IncrementWrrUpdates(); - config_ = std::move(args.config); - ServerAddressList addresses; + config_ = args.config.TakeAsSubclass<WeightedRoundRobinConfig>(); + std::shared_ptr<EndpointAddressesIterator> addresses; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { - gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[WRR %p] received update", this); } // Weed out duplicate addresses. Also sort the addresses so that if // the set of the addresses don't change, their indexes in the @@ -696,10 +695,12 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) { return memcmp(addr1.addr, addr2.addr, addr1.len) < 0; } }; - std::set<ServerAddress, AddressLessThan> ordered_addresses( - args.addresses->begin(), args.addresses->end()); - addresses = - ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()); + std::set<ServerAddress, AddressLessThan> ordered_addresses; + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { + ordered_addresses.insert(endpoint); + }); + addresses = std::make_shared<EndpointAddressesListIterator>( + ServerAddressList(ordered_addresses.begin(), ordered_addresses.end())); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this, @@ -716,8 +717,8 @@ absl::Status OldWeightedRoundRobin::UpdateLocked(UpdateArgs args) { this, latest_pending_subchannel_list_.get()); } latest_pending_subchannel_list_ = - MakeRefCounted<WeightedRoundRobinSubchannelList>( - this, std::move(addresses), args.args); + MakeRefCounted<WeightedRoundRobinSubchannelList>(this, addresses.get(), + args.args); latest_pending_subchannel_list_->StartWatchingLocked(args.args); // If the new list is empty, immediately promote it to // subchannel_list_ and report TRANSIENT_FAILURE. @@ -755,8 +756,9 @@ OldWeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) { auto weight = it->second->RefIfNonZero(); if (weight != nullptr) return weight; } - auto weight = - MakeRefCounted<AddressWeight>(Ref(DEBUG_LOCATION, "AddressWeight"), *key); + auto weight = MakeRefCounted<AddressWeight>( + RefAsSubclass<OldWeightedRoundRobin>(DEBUG_LOCATION, "AddressWeight"), + *key); address_weight_map_.emplace(*key, weight.get()); return weight; } @@ -831,7 +833,8 @@ void OldWeightedRoundRobin::WeightedRoundRobinSubchannelList:: } p->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, absl::Status(), - MakeRefCounted<Picker>(p->Ref(), this)); + MakeRefCounted<Picker>(p->RefAsSubclass<OldWeightedRoundRobin>(), + this)); } else if (num_connecting_ > 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with subchannel list %p", @@ -1009,7 +1012,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy { // Represents the weight for a given address. class EndpointWeight : public RefCounted<EndpointWeight> { public: - EndpointWeight(RefCountedPtr<WeightedRoundRobin> wrr, std::string key) + EndpointWeight(RefCountedPtr<WeightedRoundRobin> wrr, + EndpointAddressSet key) : wrr_(std::move(wrr)), key_(std::move(key)) {} ~EndpointWeight() override; @@ -1023,7 +1027,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy { private: RefCountedPtr<WeightedRoundRobin> wrr_; - const std::string key_; + const EndpointAddressSet key_; Mutex mu_; float weight_ ABSL_GUARDED_BY(&mu_) = 0; @@ -1035,13 +1039,13 @@ class WeightedRoundRobin : public LoadBalancingPolicy { public: class WrrEndpoint : public Endpoint { public: - WrrEndpoint(RefCountedPtr<WrrEndpointList> endpoint_list, - const ServerAddress& address, const ChannelArgs& args, + WrrEndpoint(RefCountedPtr<EndpointList> endpoint_list, + const EndpointAddresses& addresses, const ChannelArgs& args, std::shared_ptr<WorkSerializer> work_serializer) : Endpoint(std::move(endpoint_list)), weight_(policy<WeightedRoundRobin>()->GetOrCreateWeight( - address.address())) { - Init(address, args, std::move(work_serializer)); + addresses.addresses())) { + Init(addresses, args, std::move(work_serializer)); } RefCountedPtr<EndpointWeight> weight() const { return weight_; } @@ -1063,7 +1067,9 @@ class WeightedRoundRobin : public LoadBalancingPolicy { }; RefCountedPtr<SubchannelInterface> CreateSubchannel( - ServerAddress address, const ChannelArgs& args) override; + const grpc_resolved_address& address, + const ChannelArgs& per_address_args, + const ChannelArgs& args) override; // Called when the child policy reports a connectivity state update. void OnStateUpdate(absl::optional<grpc_connectivity_state> old_state, @@ -1074,16 +1080,17 @@ class WeightedRoundRobin : public LoadBalancingPolicy { }; WrrEndpointList(RefCountedPtr<WeightedRoundRobin> wrr, - const ServerAddressList& addresses, const ChannelArgs& args) + EndpointAddressesIterator* endpoints, + const ChannelArgs& args) : EndpointList(std::move(wrr), GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) ? "WrrEndpointList" : nullptr) { - Init(addresses, args, - [&](RefCountedPtr<WrrEndpointList> endpoint_list, - const ServerAddress& address, const ChannelArgs& args) { + Init(endpoints, args, + [&](RefCountedPtr<EndpointList> endpoint_list, + const EndpointAddresses& addresses, const ChannelArgs& args) { return MakeOrphanable<WrrEndpoint>( - std::move(endpoint_list), address, args, + std::move(endpoint_list), addresses, args, policy<WeightedRoundRobin>()->work_serializer()); }); } @@ -1192,7 +1199,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy { void ShutdownLocked() override; RefCountedPtr<EndpointWeight> GetOrCreateWeight( - const grpc_resolved_address& address); + const std::vector<grpc_resolved_address>& addresses); RefCountedPtr<WeightedRoundRobinConfig> config_; @@ -1205,7 +1212,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy { OrphanablePtr<WrrEndpointList> latest_pending_endpoint_list_; Mutex endpoint_weight_map_mu_; - std::map<std::string, EndpointWeight*, std::less<>> endpoint_weight_map_ + std::map<EndpointAddressSet, EndpointWeight*> endpoint_weight_map_ ABSL_GUARDED_BY(&endpoint_weight_map_mu_); bool shutdown_ = false; @@ -1245,7 +1252,7 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight( gpr_log(GPR_INFO, "[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f: " "error_util_penalty=%f, weight=%f (not updating)", - wrr_.get(), key_.c_str(), qps, eps, utilization, + wrr_.get(), key_.ToString().c_str(), qps, eps, utilization, error_utilization_penalty, weight); } return; @@ -1258,7 +1265,7 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight( "[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f " "error_util_penalty=%f : setting weight=%f weight_=%f now=%s " "last_update_time_=%s non_empty_since_=%s", - wrr_.get(), key_.c_str(), qps, eps, utilization, + wrr_.get(), key_.ToString().c_str(), qps, eps, utilization, error_utilization_penalty, weight, weight_, now.ToString().c_str(), last_update_time_.ToString().c_str(), non_empty_since_.ToString().c_str()); @@ -1277,7 +1284,7 @@ float WeightedRoundRobin::EndpointWeight::GetWeight( "[WRR %p] subchannel %s: getting weight: now=%s " "weight_expiration_period=%s blackout_period=%s " "last_update_time_=%s non_empty_since_=%s weight_=%f", - wrr_.get(), key_.c_str(), now.ToString().c_str(), + wrr_.get(), key_.ToString().c_str(), now.ToString().c_str(), weight_expiration_period.ToString().c_str(), blackout_period.ToString().c_str(), last_update_time_.ToString().c_str(), @@ -1446,10 +1453,9 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { gpr_log(GPR_INFO, "[WRR %p picker %p] scheduling timer for %s", wrr_.get(), this, config_->weight_update_period().ToString().c_str()); } - WeakRefCountedPtr<Picker> self = WeakRef(); timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( config_->weight_update_period(), - [self = std::move(self), + [self = WeakRefAsSubclass<Picker>(), work_serializer = wrr_->work_serializer()]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; @@ -1509,60 +1515,59 @@ void WeightedRoundRobin::ResetBackoffLocked() { absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { global_stats().IncrementWrrUpdates(); - config_ = std::move(args.config); - ServerAddressList addresses; + config_ = args.config.TakeAsSubclass<WeightedRoundRobinConfig>(); + std::shared_ptr<EndpointAddressesIterator> addresses; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { - gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses", - this, args.addresses->size()); + gpr_log(GPR_INFO, "[WRR %p] received update", this); } - // Weed out duplicate addresses. Also sort the addresses so that if - // the set of the addresses don't change, their indexes in the - // subchannel list don't change, since this avoids unnecessary churn - // in the picker. Note that this does not ensure that if a given - // address remains present that it will have the same index; if, - // for example, an address at the end of the list is replaced with one - // that sorts much earlier in the list, then all of the addresses in - // between those two positions will have changed indexes. - struct AddressLessThan { - bool operator()(const ServerAddress& address1, - const ServerAddress& address2) const { - const grpc_resolved_address& addr1 = address1.address(); - const grpc_resolved_address& addr2 = address2.address(); - if (addr1.len != addr2.len) return addr1.len < addr2.len; - return memcmp(addr1.addr, addr2.addr, addr1.len) < 0; + // Weed out duplicate endpoints. Also sort the endpoints so that if + // the set of endpoints doesn't change, their indexes in the endpoint + // list don't change, since this avoids unnecessary churn in the + // picker. Note that this does not ensure that if a given endpoint + // remains present that it will have the same index; if, for example, + // an endpoint at the end of the list is replaced with one that sorts + // much earlier in the list, then all of the endpoints in between those + // two positions will have changed indexes. + struct EndpointAddressesLessThan { + bool operator()(const EndpointAddresses& endpoint1, + const EndpointAddresses& endpoint2) const { + // Compare unordered addresses only, not channel args. + EndpointAddressSet e1(endpoint1.addresses()); + EndpointAddressSet e2(endpoint2.addresses()); + return e1 < e2; } }; - std::set<ServerAddress, AddressLessThan> ordered_addresses( - args.addresses->begin(), args.addresses->end()); + std::set<EndpointAddresses, EndpointAddressesLessThan> ordered_addresses; + (*args.addresses)->ForEach([&](const EndpointAddresses& endpoint) { + ordered_addresses.insert(endpoint); + }); addresses = - ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()); + std::make_shared<EndpointAddressesListIterator>(EndpointAddressesList( + ordered_addresses.begin(), ordered_addresses.end())); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this, args.addresses.status().ToString().c_str()); } - // If we already have a subchannel list, then keep using the existing + // If we already have an endpoint list, then keep using the existing // list, but still report back that the update was not accepted. if (endpoint_list_ != nullptr) return args.addresses.status(); } - // Create new subchannel list, replacing the previous pending list, if any. + // Create new endpoint list, replacing the previous pending list, if any. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) && latest_pending_endpoint_list_ != nullptr) { - gpr_log(GPR_INFO, "[WRR %p] replacing previous pending subchannel list %p", + gpr_log(GPR_INFO, "[WRR %p] replacing previous pending endpoint list %p", this, latest_pending_endpoint_list_.get()); } - latest_pending_endpoint_list_ = - MakeOrphanable<WrrEndpointList>(Ref(), std::move(addresses), args.args); + latest_pending_endpoint_list_ = MakeOrphanable<WrrEndpointList>( + RefAsSubclass<WeightedRoundRobin>(), addresses.get(), args.args); // If the new list is empty, immediately promote it to // endpoint_list_ and report TRANSIENT_FAILURE. - // TODO(roth): As part of adding dualstack backend support, we need to - // also handle the case where the list of addresses for a given - // endpoint is empty. if (latest_pending_endpoint_list_->size() == 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) && endpoint_list_ != nullptr) { - gpr_log(GPR_INFO, "[WRR %p] replacing previous subchannel list %p", this, + gpr_log(GPR_INFO, "[WRR %p] replacing previous endpoint list %p", this, endpoint_list_.get()); } endpoint_list_ = std::move(latest_pending_endpoint_list_); @@ -1584,18 +1589,18 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { } RefCountedPtr<WeightedRoundRobin::EndpointWeight> -WeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) { - auto key = grpc_sockaddr_to_uri(&address); - if (!key.ok()) return nullptr; +WeightedRoundRobin::GetOrCreateWeight( + const std::vector<grpc_resolved_address>& addresses) { + EndpointAddressSet key(addresses); MutexLock lock(&endpoint_weight_map_mu_); - auto it = endpoint_weight_map_.find(*key); + auto it = endpoint_weight_map_.find(key); if (it != endpoint_weight_map_.end()) { auto weight = it->second->RefIfNonZero(); if (weight != nullptr) return weight; } auto weight = MakeRefCounted<EndpointWeight>( - Ref(DEBUG_LOCATION, "EndpointWeight"), *key); - endpoint_weight_map_.emplace(*key, weight.get()); + RefAsSubclass<WeightedRoundRobin>(DEBUG_LOCATION, "EndpointWeight"), key); + endpoint_weight_map_.emplace(key, weight.get()); return weight; } @@ -1619,10 +1624,11 @@ void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OobWatcher:: RefCountedPtr<SubchannelInterface> WeightedRoundRobin::WrrEndpointList::WrrEndpoint::CreateSubchannel( - ServerAddress address, const ChannelArgs& args) { + const grpc_resolved_address& address, const ChannelArgs& per_address_args, + const ChannelArgs& args) { auto* wrr = policy<WeightedRoundRobin>(); - auto subchannel = - wrr->channel_control_helper()->CreateSubchannel(std::move(address), args); + auto subchannel = wrr->channel_control_helper()->CreateSubchannel( + address, per_address_args, args); // Start OOB watch if configured. if (wrr->config_->enable_oob_load_report()) { subchannel->AddDataWatcher(MakeOobBackendMetricWatcher( @@ -1657,7 +1663,7 @@ void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OnStateUpdate( } else if (new_state == GRPC_CHANNEL_READY) { // If we transition back to READY state, restart the blackout period. // Skip this if this is the initial notification for this - // subchannel (which happens whenever we get updated addresses and + // endpoint (which happens whenever we get updated addresses and // create a new endpoint list). Also skip it if the previous state // was READY (which should never happen in practice, but we've seen // at least one bug that caused this in the outlier_detection @@ -1753,7 +1759,7 @@ void WeightedRoundRobin::WrrEndpointList:: } wrr->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, absl::Status(), - MakeRefCounted<Picker>(wrr->Ref(), this)); + MakeRefCounted<Picker>(wrr->RefAsSubclass<WeightedRoundRobin>(), this)); } else if (num_connecting_ > 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with endpoint list %p", |