diff options
8 files changed, 124 insertions, 60 deletions
diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index fe73e1886..16ede8ae1 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -176,13 +176,15 @@ final class CdsLoadBalancer2 extends LoadBalancer { clusterState.result.lrsServerInfo(), clusterState.result.maxConcurrentRequests(), clusterState.result.upstreamTlsContext(), + clusterState.result.filterMetadata(), clusterState.result.outlierDetection()); } else { // logical DNS instance = DiscoveryMechanism.forLogicalDns( clusterState.name, clusterState.result.dnsHostName(), clusterState.result.lrsServerInfo(), clusterState.result.maxConcurrentRequests(), - clusterState.result.upstreamTlsContext()); + clusterState.result.upstreamTlsContext(), + clusterState.result.filterMetadata()); } instances.add(instance); } diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 45062f28f..e42619d9b 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; import io.grpc.Attributes; import io.grpc.ClientStreamTracer; import io.grpc.ClientStreamTracer.StreamInfo; @@ -54,6 +56,7 @@ import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -140,6 +143,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer { childLbHelper.updateDropPolicies(config.dropCategories); childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests); childLbHelper.updateSslContextProviderSupplier(config.tlsContext); + childLbHelper.updateFilterMetadata(config.filterMetadata); childSwitchLb.switchTo(config.childPolicy.getProvider()); childSwitchLb.handleResolvedAddresses( @@ -189,6 +193,7 @@ final class ClusterImplLoadBalancer extends LoadBalancer { private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; @Nullable private SslContextProviderSupplier sslContextProviderSupplier; + private Map<String, Struct> filterMetadata = ImmutableMap.of(); @Nullable private final ServerInfo lrsServerInfo; @@ -201,8 +206,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer { public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { currentState = newState; currentPicker = newPicker; - SubchannelPicker picker = - new RequestLimitingSubchannelPicker(newPicker, dropPolicies, maxConcurrentRequests); + SubchannelPicker picker = new RequestLimitingSubchannelPicker( + newPicker, dropPolicies, maxConcurrentRequests, filterMetadata); delegate().updateBalancingState(newState, picker); } @@ -311,20 +316,29 @@ final class ClusterImplLoadBalancer extends LoadBalancer { : null; } + private void updateFilterMetadata(Map<String, Struct> filterMetadata) { + this.filterMetadata = ImmutableMap.copyOf(filterMetadata); + } + private class RequestLimitingSubchannelPicker extends SubchannelPicker { private final SubchannelPicker delegate; private final List<DropOverload> dropPolicies; private final long maxConcurrentRequests; + private final Map<String, Struct> filterMetadata; private RequestLimitingSubchannelPicker(SubchannelPicker delegate, - List<DropOverload> dropPolicies, long maxConcurrentRequests) { + List<DropOverload> dropPolicies, long maxConcurrentRequests, + Map<String, Struct> filterMetadata) { this.delegate = delegate; this.dropPolicies = dropPolicies; this.maxConcurrentRequests = maxConcurrentRequests; + this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata"); } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { + args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER) + .accept(filterMetadata); for (DropOverload dropOverload : dropPolicies) { int rand = random.nextInt(1_000_000); if (rand < dropOverload.dropsPerMillion()) { diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java index ff32779b0..b928f6dae 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancerProvider.java @@ -19,6 +19,9 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; +import io.grpc.CallOptions; import io.grpc.Internal; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; @@ -34,6 +37,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import javax.annotation.Nullable; /** @@ -43,6 +47,11 @@ import javax.annotation.Nullable; */ @Internal public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider { + /** + * Consumer of filter metadata from the cluster used by the call. Consumer may not modify map. + */ + public static final CallOptions.Key<Consumer<Map<String, Struct>>> FILTER_METADATA_CONSUMER = + CallOptions.Key.createWithDefault("io.grpc.xds.internalFilterMetadataConsumer", (m) -> { }); @Override public boolean isAvailable() { @@ -89,16 +98,18 @@ public final class ClusterImplLoadBalancerProvider extends LoadBalancerProvider final List<DropOverload> dropCategories; // Provides the direct child policy and its config. final PolicySelection childPolicy; + final Map<String, Struct> filterMetadata; ClusterImplConfig(String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, List<DropOverload> dropCategories, PolicySelection childPolicy, - @Nullable UpstreamTlsContext tlsContext) { + @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) { this.cluster = checkNotNull(cluster, "cluster"); this.edsServiceName = edsServiceName; this.lrsServerInfo = lrsServerInfo; this.maxConcurrentRequests = maxConcurrentRequests; this.tlsContext = tlsContext; + this.filterMetadata = ImmutableMap.copyOf(filterMetadata); this.dropCategories = Collections.unmodifiableList( new ArrayList<>(checkNotNull(dropCategories, "dropCategories"))); this.childPolicy = checkNotNull(childPolicy, "childPolicy"); diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 881628784..f1fb6c0fb 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -21,6 +21,8 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; import io.grpc.InternalLogId; @@ -184,10 +186,11 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { if (instance.type == DiscoveryMechanism.Type.EDS) { state = new EdsClusterState(instance.cluster, instance.edsServiceName, instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, - instance.outlierDetection); + instance.filterMetadata, instance.outlierDetection); } else { // logical DNS state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName, - instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext); + instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext, + instance.filterMetadata); } clusterStates.put(instance.cluster, state); state.start(); @@ -323,6 +326,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { protected final Long maxConcurrentRequests; @Nullable protected final UpstreamTlsContext tlsContext; + protected final Map<String, Struct> filterMetadata; @Nullable protected final OutlierDetection outlierDetection; // Resolution status, may contain most recent error encountered. @@ -337,11 +341,12 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - @Nullable OutlierDetection outlierDetection) { + Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) { this.name = name; this.lrsServerInfo = lrsServerInfo; this.maxConcurrentRequests = maxConcurrentRequests; this.tlsContext = tlsContext; + this.filterMetadata = ImmutableMap.copyOf(filterMetadata); this.outlierDetection = outlierDetection; } @@ -360,8 +365,10 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { private EdsClusterState(String name, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, @Nullable OutlierDetection outlierDetection) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, outlierDetection); + @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata, + @Nullable OutlierDetection outlierDetection) { + super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + outlierDetection); this.edsServiceName = edsServiceName; } @@ -447,8 +454,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { Map<String, PriorityChildConfig> priorityChildConfigs = generateEdsBasedPriorityChildConfigs( name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext, - outlierDetection, endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, - dropOverloads); + filterMetadata, outlierDetection, endpointLbPolicy, lbRegistry, + prioritizedLocalityWeights, dropOverloads); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityChildConfigs, @@ -533,8 +540,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { private LogicalDnsClusterState(String name, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext) { - super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, null); + @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) { + super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName"); nameResolverFactory = checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory"); @@ -623,8 +630,8 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { addresses.add(eag); } PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( - name, lrsServerInfo, maxConcurrentRequests, tlsContext, lbRegistry, - Collections.<DropOverload>emptyList()); + name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, + lbRegistry, Collections.<DropOverload>emptyList()); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); @@ -707,14 +714,14 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { */ private static PriorityChildConfig generateDnsBasedPriorityChildConfig( String cluster, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, LoadBalancerRegistry lbRegistry, - List<DropOverload> dropOverloads) { + @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata, + LoadBalancerRegistry lbRegistry, List<DropOverload> dropOverloads) { // Override endpoint-level LB policy with pick_first for logical DNS cluster. PolicySelection endpointLbPolicy = new PolicySelection(lbRegistry.getProvider("pick_first"), null); ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, null, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbPolicy, tlsContext); + dropOverloads, endpointLbPolicy, tlsContext, filterMetadata); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); PolicySelection clusterImplPolicy = @@ -731,6 +738,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildConfigs( String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, + Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection, PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry, Map<String, Map<Locality, Integer>> prioritizedLocalityWeights, List<DropOverload> dropOverloads) { @@ -738,7 +746,7 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { for (String priority : prioritizedLocalityWeights.keySet()) { ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, - dropOverloads, endpointLbPolicy, tlsContext); + dropOverloads, endpointLbPolicy, tlsContext, filterMetadata); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); PolicySelection priorityChildPolicy = diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index 6488a719a..48ac4155b 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -19,6 +19,8 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Struct; import io.grpc.Internal; import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.Helper; @@ -129,6 +131,7 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi final String dnsHostName; @Nullable final OutlierDetection outlierDetection; + final Map<String, Struct> filterMetadata; enum Type { EDS, @@ -138,7 +141,7 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi private DiscoveryMechanism(String cluster, Type type, @Nullable String edsServiceName, @Nullable String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - @Nullable OutlierDetection outlierDetection) { + Map<String, Struct> filterMetadata, @Nullable OutlierDetection outlierDetection) { this.cluster = checkNotNull(cluster, "cluster"); this.type = checkNotNull(type, "type"); this.edsServiceName = edsServiceName; @@ -146,28 +149,29 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi this.lrsServerInfo = lrsServerInfo; this.maxConcurrentRequests = maxConcurrentRequests; this.tlsContext = tlsContext; + this.filterMetadata = ImmutableMap.copyOf(checkNotNull(filterMetadata, "filterMetadata")); this.outlierDetection = outlierDetection; } static DiscoveryMechanism forEds(String cluster, @Nullable String edsServiceName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext, + @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata, OutlierDetection outlierDetection) { return new DiscoveryMechanism(cluster, Type.EDS, edsServiceName, null, lrsServerInfo, - maxConcurrentRequests, tlsContext, outlierDetection); + maxConcurrentRequests, tlsContext, filterMetadata, outlierDetection); } static DiscoveryMechanism forLogicalDns(String cluster, String dnsHostName, @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext) { + @Nullable UpstreamTlsContext tlsContext, Map<String, Struct> filterMetadata) { return new DiscoveryMechanism(cluster, Type.LOGICAL_DNS, null, dnsHostName, - lrsServerInfo, maxConcurrentRequests, tlsContext, null); + lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null); } @Override public int hashCode() { return Objects.hash(cluster, type, lrsServerInfo, maxConcurrentRequests, tlsContext, - edsServiceName, dnsHostName); + edsServiceName, dnsHostName, filterMetadata, outlierDetection); } @Override @@ -185,7 +189,9 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi && Objects.equals(dnsHostName, that.dnsHostName) && Objects.equals(lrsServerInfo, that.lrsServerInfo) && Objects.equals(maxConcurrentRequests, that.maxConcurrentRequests) - && Objects.equals(tlsContext, that.tlsContext); + && Objects.equals(tlsContext, that.tlsContext) + && Objects.equals(filterMetadata, that.filterMetadata) + && Objects.equals(outlierDetection, that.outlierDetection); } @Override @@ -198,7 +204,10 @@ public final class ClusterResolverLoadBalancerProvider extends LoadBalancerProvi .add("dnsHostName", dnsHostName) .add("lrsServerInfo", lrsServerInfo) // Exclude tlsContext as its string representation is cumbersome. - .add("maxConcurrentRequests", maxConcurrentRequests); + .add("maxConcurrentRequests", maxConcurrentRequests) + .add("filterMetadata", filterMetadata) + // Exclude outlierDetection as its string representation is long. + ; return toStringHelper.toString(); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java index 6b6c48972..c6340156d 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClusterResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsClusterResource.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Duration; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; +import com.google.protobuf.Struct; import com.google.protobuf.util.Durations; import io.envoyproxy.envoy.config.cluster.v3.CircuitBreakers.Thresholds; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -160,6 +161,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> { } updateBuilder.lbPolicyConfig(lbPolicyConfig); + updateBuilder.filterMetadata( + ImmutableMap.copyOf(cluster.getMetadata().getFilterMetadataMap())); return updateBuilder.build(); } @@ -559,14 +562,21 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> { @Nullable abstract OutlierDetection outlierDetection(); - static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) { - checkNotNull(prioritizedClusterNames, "prioritizedClusterNames"); + abstract ImmutableMap<String, Struct> filterMetadata(); + + private static Builder newBuilder(String clusterName) { return new AutoValue_XdsClusterResource_CdsUpdate.Builder() .clusterName(clusterName) - .clusterType(ClusterType.AGGREGATE) .minRingSize(0) .maxRingSize(0) .choiceCount(0) + .filterMetadata(ImmutableMap.of()); + } + + static Builder forAggregate(String clusterName, List<String> prioritizedClusterNames) { + checkNotNull(prioritizedClusterNames, "prioritizedClusterNames"); + return newBuilder(clusterName) + .clusterType(ClusterType.AGGREGATE) .prioritizedClusterNames(ImmutableList.copyOf(prioritizedClusterNames)); } @@ -574,12 +584,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> { @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext, @Nullable OutlierDetection outlierDetection) { - return new AutoValue_XdsClusterResource_CdsUpdate.Builder() - .clusterName(clusterName) + return newBuilder(clusterName) .clusterType(ClusterType.EDS) - .minRingSize(0) - .maxRingSize(0) - .choiceCount(0) .edsServiceName(edsServiceName) .lrsServerInfo(lrsServerInfo) .maxConcurrentRequests(maxConcurrentRequests) @@ -591,12 +597,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> { @Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext upstreamTlsContext) { - return new AutoValue_XdsClusterResource_CdsUpdate.Builder() - .clusterName(clusterName) + return newBuilder(clusterName) .clusterType(ClusterType.LOGICAL_DNS) - .minRingSize(0) - .maxRingSize(0) - .choiceCount(0) .dnsHostName(dnsHostName) .lrsServerInfo(lrsServerInfo) .maxConcurrentRequests(maxConcurrentRequests) @@ -685,6 +687,8 @@ class XdsClusterResource extends XdsResourceType<CdsUpdate> { protected abstract Builder outlierDetection(OutlierDetection outlierDetection); + protected abstract Builder filterMetadata(ImmutableMap<String, Struct> filterMetadata); + abstract CdsUpdate build(); } } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index fa08d5edd..cf140a076 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -177,7 +177,8 @@ public class ClusterImplLoadBalancerTest { Object weightedTargetConfig = new Object(); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -202,7 +203,8 @@ public class ClusterImplLoadBalancerTest { ClusterImplConfig configWithWeightedTarget = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWeightedTarget); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -215,7 +217,8 @@ public class ClusterImplLoadBalancerTest { ClusterImplConfig configWithWrrLocality = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null); + new PolicySelection(wrrLocalityProvider, wrrLocalityConfig), null, + Collections.emptyMap()); deliverAddressesAndConfig(Collections.singletonList(endpoint), configWithWrrLocality); childBalancer = Iterables.getOnlyElement(downstreamBalancers); assertThat(childBalancer.name).isEqualTo(XdsLbPolicies.WRR_LOCALITY_POLICY_NAME); @@ -239,7 +242,8 @@ public class ClusterImplLoadBalancerTest { Object weightedTargetConfig = new Object(); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -258,7 +262,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -284,7 +289,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); @@ -368,7 +374,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.singletonList(DropOverload.create("throttle", 500_000)), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); when(mockRandom.nextInt(anyInt())).thenReturn(499_999, 999_999, 1_000_000); @@ -397,7 +404,8 @@ public class ClusterImplLoadBalancerTest { // Config update updates drop policies. config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.singletonList(DropOverload.create("lb", 1_000_000)), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); loadBalancer.acceptResolvedAddresses( ResolvedAddresses.newBuilder() .setAddresses(Collections.singletonList(endpoint)) @@ -444,7 +452,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, maxConcurrentRequests, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); assertThat(downstreamBalancers).hasSize(1); // one leaf balancer @@ -486,7 +495,8 @@ public class ClusterImplLoadBalancerTest { maxConcurrentRequests = 101L; config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, maxConcurrentRequests, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); result = currentPicker.pickSubchannel(pickSubchannelArgs); @@ -532,7 +542,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); assertThat(downstreamBalancers).hasSize(1); // one leaf balancer @@ -578,7 +589,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); // One locality with two endpoints. EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality); @@ -615,7 +627,8 @@ public class ClusterImplLoadBalancerTest { buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext, + Collections.emptyMap()); // One locality with two endpoints. EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality); @@ -638,7 +651,8 @@ public class ClusterImplLoadBalancerTest { // Removes UpstreamTlsContext from the config. config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null, + Collections.emptyMap()); deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config); assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer); subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections @@ -652,7 +666,8 @@ public class ClusterImplLoadBalancerTest { CommonTlsContextTestsUtil.buildUpstreamTlsContext("google_cloud_private_spiffe1", true); config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, null, Collections.<DropOverload>emptyList(), - new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext); + new PolicySelection(weightedTargetProvider, weightedTargetConfig), upstreamTlsContext, + Collections.emptyMap()); deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config); assertThat(Iterables.getOnlyElement(downstreamBalancers)).isSameInstanceAs(leafBalancer); subchannel = leafBalancer.helper.createSubchannel(args); // creates new connections diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 99b8605b4..dd503592c 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -136,15 +136,16 @@ public class ClusterResolverLoadBalancerTest { FailurePercentageEjection.create(100, 100, 100, 100)); private final DiscoveryMechanism edsDiscoveryMechanism1 = DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - null); + Collections.emptyMap(), null); private final DiscoveryMechanism edsDiscoveryMechanism2 = DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, tlsContext, - null); + Collections.emptyMap(), null); private final DiscoveryMechanism edsDiscoveryMechanismWithOutlierDetection = DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, tlsContext, - outlierDetection); + Collections.emptyMap(), outlierDetection); private final DiscoveryMechanism logicalDnsDiscoveryMechanism = - DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null); + DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, DNS_HOST_NAME, LRS_SERVER_INFO, 300L, null, + Collections.emptyMap()); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { |