diff --git a/api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java b/api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java
new file mode 100644
index 00000000000..cfc2f7c5137
--- /dev/null
+++ b/api/src/main/java/io/grpc/InternalSubchannelAddressAttributes.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2024 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc;
+
+/**
+ * An internal class. Do not use.
+ *
+ *
An interface to provide the attributes for address connected by subchannel.
+ */
+@Internal
+public interface InternalSubchannelAddressAttributes {
+
+ /**
+ * Return attributes of the server address connected by sub channel.
+ */
+ public Attributes getConnectedAddressAttributes();
+}
diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java
index 15106a5ffc6..0fbce5fa5be 100644
--- a/api/src/main/java/io/grpc/LoadBalancer.java
+++ b/api/src/main/java/io/grpc/LoadBalancer.java
@@ -1428,6 +1428,18 @@ public void updateAddresses(List addrs) {
public Object getInternalSubchannel() {
throw new UnsupportedOperationException();
}
+
+ /**
+ * (Internal use only) returns attributes of the address subchannel is connected to.
+ *
+ * Warning: this is INTERNAL API, is not supposed to be used by external users, and may
+ * change without notice. If you think you must use it, please file an issue and we can consider
+ * removing its "internal" status.
+ */
+ @Internal
+ public Attributes getConnectedAddressAttributes() {
+ throw new UnsupportedOperationException();
+ }
}
/**
diff --git a/core/src/main/java/io/grpc/internal/InternalSubchannel.java b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
index a986cb2deff..70e42e2f5f1 100644
--- a/core/src/main/java/io/grpc/internal/InternalSubchannel.java
+++ b/core/src/main/java/io/grpc/internal/InternalSubchannel.java
@@ -157,6 +157,8 @@ protected void handleNotInUse() {
private Status shutdownReason;
+ private volatile Attributes connectedAddressAttributes;
+
InternalSubchannel(List addressGroups, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
@@ -525,6 +527,13 @@ public void run() {
return channelStatsFuture;
}
+ /**
+ * Return attributes for server address connected by sub channel.
+ */
+ public Attributes getConnectedAddressAttributes() {
+ return connectedAddressAttributes;
+ }
+
ConnectivityState getState() {
return state.getState();
}
@@ -568,6 +577,7 @@ public void run() {
} else if (pendingTransport == transport) {
activeTransport = transport;
pendingTransport = null;
+ connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
gotoNonErrorState(READY);
}
}
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 7f45ca967ea..07dcf9ee7bb 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -2044,6 +2044,11 @@ public void updateAddresses(List addrs) {
subchannel.updateAddresses(addrs);
}
+ @Override
+ public Attributes getConnectedAddressAttributes() {
+ return subchannel.getConnectedAddressAttributes();
+ }
+
private List stripOverrideAuthorityAttributes(
List eags) {
List eagsWithoutOverrideAttr = new ArrayList<>();
diff --git a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
index f7631c34c0d..e4d9f27ed46 100644
--- a/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
+++ b/core/src/test/java/io/grpc/internal/InternalSubchannelTest.java
@@ -1339,6 +1339,32 @@ public void channelzStatContainsTransport() throws Exception {
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr2);
}
+ @Test
+ public void connectedAddressAttributes_ready() {
+ SocketAddress addr = new SocketAddress() {};
+ Attributes attr = Attributes.newBuilder().set(Attributes.Key.create("some-key"), "1").build();
+ createInternalSubchannel(new EquivalentAddressGroup(Arrays.asList(addr), attr));
+
+ assertEquals(IDLE, internalSubchannel.getState());
+ assertNoCallbackInvoke();
+ assertNull(internalSubchannel.obtainActiveTransport());
+ assertNull(internalSubchannel.getConnectedAddressAttributes());
+
+ assertExactCallbackInvokes("onStateChange:CONNECTING");
+ assertEquals(CONNECTING, internalSubchannel.getState());
+ verify(mockTransportFactory).newClientTransport(
+ eq(addr),
+ eq(createClientTransportOptions().setEagAttributes(attr)),
+ isA(TransportLogger.class));
+ assertNull(internalSubchannel.getConnectedAddressAttributes());
+
+ internalSubchannel.obtainActiveTransport();
+ transports.peek().listener.transportReady();
+ assertExactCallbackInvokes("onStateChange:READY");
+ assertEquals(READY, internalSubchannel.getState());
+ assertEquals(attr, internalSubchannel.getConnectedAddressAttributes());
+ }
+
/** Create ClientTransportOptions. Should not be reused if it may be mutated. */
private ClientTransportFactory.ClientTransportOptions createClientTransportOptions() {
return new ClientTransportFactory.ClientTransportOptions()
diff --git a/util/src/main/java/io/grpc/util/ForwardingSubchannel.java b/util/src/main/java/io/grpc/util/ForwardingSubchannel.java
index 51f2583186e..416be378162 100644
--- a/util/src/main/java/io/grpc/util/ForwardingSubchannel.java
+++ b/util/src/main/java/io/grpc/util/ForwardingSubchannel.java
@@ -74,11 +74,17 @@ public Object getInternalSubchannel() {
return delegate().getInternalSubchannel();
}
+
@Override
public void updateAddresses(List addrs) {
delegate().updateAddresses(addrs);
}
+ @Override
+ public Attributes getConnectedAddressAttributes() {
+ return delegate().getConnectedAddressAttributes();
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
index 702b2aa6caa..0ea2c7dd75f 100644
--- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
@@ -27,6 +27,7 @@
import io.grpc.ClientStreamTracer;
import io.grpc.ClientStreamTracer.StreamInfo;
import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InternalLogId;
import io.grpc.LoadBalancer;
@@ -59,6 +60,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
@@ -77,10 +79,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer {
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
- private static final Attributes.Key ATTR_CLUSTER_LOCALITY_STATS =
- Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
- private static final Attributes.Key ATTR_CLUSTER_LOCALITY_NAME =
- Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName");
+ private static final Attributes.Key> ATTR_CLUSTER_LOCALITY =
+ Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
private final XdsLogger logger;
private final Helper helper;
@@ -213,36 +213,45 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
List addresses = withAdditionalAttributes(args.getAddresses());
- Locality locality = args.getAddresses().get(0).getAttributes().get(
- InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality
- String localityName = args.getAddresses().get(0).getAttributes().get(
- InternalXdsAttributes.ATTR_LOCALITY_NAME);
- // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
- // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
- // In case of not (which really shouldn't), loads are aggregated under an empty locality.
- if (locality == null) {
- locality = Locality.create("", "", "");
- localityName = "";
- }
- final ClusterLocalityStats localityStats =
- (lrsServerInfo == null)
- ? null
- : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
- edsServiceName, locality);
-
+ // This value for ClusterLocality is not recommended for general use.
+ // Currently, we extract locality data from the first address, even before the subchannel is
+ // READY.
+ // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
+ // might return the subchannel before it is READY. Typically, we wouldn't report load for such
+ // selections because the channel will disregard the chosen (not-ready) subchannel.
+ // However, we needed to ensure this case is handled.
+ ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
+ args.getAddresses().get(0).getAttributes());
+ AtomicReference localityAtomicReference = new AtomicReference<>(
+ clusterLocality);
Attributes attrs = args.getAttributes().toBuilder()
- .set(ATTR_CLUSTER_LOCALITY_STATS, localityStats)
- .set(ATTR_CLUSTER_LOCALITY_NAME, localityName)
+ .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference)
.build();
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
final Subchannel subchannel = delegate().createSubchannel(args);
return new ForwardingSubchannel() {
+ @Override
+ public void start(SubchannelStateListener listener) {
+ delegate().start(new SubchannelStateListener() {
+ @Override
+ public void onSubchannelState(ConnectivityStateInfo newState) {
+ if (newState.getState().equals(ConnectivityState.READY)) {
+ // Get locality based on the connected address attributes
+ ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
+ subchannel.getConnectedAddressAttributes());
+ ClusterLocality oldClusterLocality = localityAtomicReference
+ .getAndSet(updatedClusterLocality);
+ oldClusterLocality.release();
+ }
+ listener.onSubchannelState(newState);
+ }
+ });
+ }
+
@Override
public void shutdown() {
- if (localityStats != null) {
- localityStats.release();
- }
+ localityAtomicReference.get().release();
delegate().shutdown();
}
@@ -274,6 +283,28 @@ private List withAdditionalAttributes(
return newAddresses;
}
+ private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
+ Locality locality = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY);
+ String localityName = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY_NAME);
+
+ // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
+ // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
+ // In case of not (which really shouldn't), loads are aggregated under an empty
+ // locality.
+ if (locality == null) {
+ locality = Locality.create("", "", "");
+ localityName = "";
+ }
+
+ final ClusterLocalityStats localityStats =
+ (lrsServerInfo == null)
+ ? null
+ : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
+ edsServiceName, locality);
+
+ return new ClusterLocality(localityStats, localityName);
+ }
+
@Override
protected Helper delegate() {
return helper;
@@ -361,18 +392,23 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
"Cluster max concurrent requests limit exceeded"));
}
}
- final ClusterLocalityStats stats =
- result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
- if (stats != null) {
- String localityName =
- result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME);
- args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
-
- ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
- stats, inFlights, result.getStreamTracerFactory());
- ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
- .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
- return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
+ final AtomicReference clusterLocality =
+ result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
+
+ if (clusterLocality != null) {
+ ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
+ if (stats != null) {
+ String localityName =
+ result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
+ .getClusterLocalityName();
+ args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
+
+ ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
+ stats, inFlights, result.getStreamTracerFactory());
+ ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
+ .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
+ return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
+ }
}
}
return result;
@@ -447,4 +483,33 @@ public void onLoadReport(MetricReport report) {
stats.recordBackendLoadMetricStats(report.getNamedMetrics());
}
}
+
+ /**
+ * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
+ */
+ static final class ClusterLocality {
+ private final ClusterLocalityStats clusterLocalityStats;
+ private final String clusterLocalityName;
+
+ @VisibleForTesting
+ ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
+ this.clusterLocalityStats = localityStats;
+ this.clusterLocalityName = localityName;
+ }
+
+ ClusterLocalityStats getClusterLocalityStats() {
+ return clusterLocalityStats;
+ }
+
+ String getClusterLocalityName() {
+ return clusterLocalityName;
+ }
+
+ @VisibleForTesting
+ void release() {
+ if (clusterLocalityStats != null) {
+ clusterLocalityStats.release();
+ }
+ }
+ }
}
diff --git a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java
index 393cce16194..be9d3587d14 100644
--- a/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java
+++ b/xds/src/main/java/io/grpc/xds/client/LoadStatsManager2.java
@@ -91,7 +91,7 @@ private synchronized void releaseClusterDropCounter(
String cluster, @Nullable String edsServiceName) {
checkState(allDropStats.containsKey(cluster)
&& allDropStats.get(cluster).containsKey(edsServiceName),
- "stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName);
+ "stats for cluster %s, edsServiceName %s do not exist", cluster, edsServiceName);
ReferenceCounted ref = allDropStats.get(cluster).get(edsServiceName);
ref.release();
}
diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
index 4e12a5717ae..0082a2aa59d 100644
--- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java
@@ -16,6 +16,7 @@
package io.grpc.xds;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
@@ -29,6 +30,7 @@
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancer;
@@ -40,7 +42,9 @@
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
+import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
+import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
@@ -76,9 +80,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -145,7 +151,7 @@ public AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) {
return new AtomicLong();
}
};
- private final Helper helper = new FakeLbHelper();
+ private final FakeLbHelper helper = new FakeLbHelper();
private PickSubchannelArgs pickSubchannelArgs = new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT,
new PickDetailsConsumer() {});
@@ -272,9 +278,10 @@ public void pick_addsLocalityLabel() {
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
- Subchannel subchannel = leafBalancer.helper.createSubchannel(
- CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
- leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
+ leafBalancer.createSubChannel();
+ FakeSubchannel fakeSubchannel = helper.subchannels.poll();
+ fakeSubchannel.setConnectedEagIndex(0);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
assertThat(currentState).isEqualTo(ConnectivityState.READY);
PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class);
@@ -300,9 +307,10 @@ public void recordLoadStats() {
EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality);
deliverAddressesAndConfig(Collections.singletonList(endpoint), config);
FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers);
- Subchannel subchannel = leafBalancer.helper.createSubchannel(
- CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
- leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
+ Subchannel subchannel = leafBalancer.createSubChannel();
+ FakeSubchannel fakeSubchannel = helper.subchannels.poll();
+ fakeSubchannel.setConnectedEagIndex(0);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
assertThat(currentState).isEqualTo(ConnectivityState.READY);
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isTrue();
@@ -357,7 +365,7 @@ public void recordLoadStats() {
TOLERANCE).of(0.009);
streamTracer3.streamClosed(Status.OK);
- subchannel.shutdown(); // stats recorder released
+ subchannel.shutdown(); // stats recorder released
clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER));
// Locality load is reported for one last time in case of loads occurred since the previous
// load report.
@@ -373,6 +381,95 @@ public void recordLoadStats() {
assertThat(clusterStats.upstreamLocalityStatsList()).isEmpty(); // no longer reported
}
+ // TODO(dnvindhya): This test has been added as a fix to verify
+ // https://github.com/grpc/grpc-java/issues/11434.
+ // Once we update PickFirstLeafLoadBalancer as default LoadBalancer, update the test.
+ @Test
+ public void pickFirstLoadReport_onUpdateAddress() {
+ Locality locality1 =
+ Locality.create("test-region", "test-zone", "test-subzone");
+ Locality locality2 =
+ Locality.create("other-region", "other-zone", "other-subzone");
+
+ LoadBalancerProvider pickFirstProvider = LoadBalancerRegistry
+ .getDefaultRegistry().getProvider("pick_first");
+ Object pickFirstConfig = pickFirstProvider.parseLoadBalancingPolicyConfig(new HashMap<>())
+ .getConfig();
+ ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO,
+ null, Collections.emptyList(),
+ GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(pickFirstProvider,
+ pickFirstConfig),
+ null, Collections.emptyMap());
+ EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr1", locality1);
+ EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr2", locality2);
+ deliverAddressesAndConfig(Arrays.asList(endpoint1, endpoint2), config);
+
+ // Leaf balancer is created by Pick First. Get FakeSubchannel created to update attributes
+ // A real subchannel would get these attributes from the connected address's EAG locality.
+ FakeSubchannel fakeSubchannel = helper.subchannels.poll();
+ fakeSubchannel.setConnectedEagIndex(0);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ assertThat(currentState).isEqualTo(ConnectivityState.READY);
+ PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
+ assertThat(result.getStatus().isOk()).isTrue();
+
+ ClientStreamTracer streamTracer1 = result.getStreamTracerFactory().newClientStreamTracer(
+ ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // first RPC call
+ streamTracer1.streamClosed(Status.OK);
+
+ ClusterStats clusterStats = Iterables.getOnlyElement(
+ loadStatsManager.getClusterStatsReports(CLUSTER));
+ UpstreamLocalityStats localityStats = Iterables.getOnlyElement(
+ clusterStats.upstreamLocalityStatsList());
+ assertThat(localityStats.locality()).isEqualTo(locality1);
+ assertThat(localityStats.totalIssuedRequests()).isEqualTo(1L);
+ assertThat(localityStats.totalSuccessfulRequests()).isEqualTo(1L);
+ assertThat(localityStats.totalErrorRequests()).isEqualTo(0L);
+
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.IDLE));
+ loadBalancer.requestConnection();
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
+
+ // Faksubchannel mimics update address and returns different locality
+ fakeSubchannel.setConnectedEagIndex(1);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ result = currentPicker.pickSubchannel(pickSubchannelArgs);
+ assertThat(result.getStatus().isOk()).isTrue();
+ ClientStreamTracer streamTracer2 = result.getStreamTracerFactory().newClientStreamTracer(
+ ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // second RPC call
+ streamTracer2.streamClosed(Status.UNAVAILABLE);
+
+ clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER));
+ UpstreamLocalityStats localityStats1 = Iterables.get(clusterStats.upstreamLocalityStatsList(),
+ 0);
+ assertThat(localityStats1.locality()).isEqualTo(locality1);
+ assertThat(localityStats1.totalIssuedRequests()).isEqualTo(0L);
+ assertThat(localityStats1.totalSuccessfulRequests()).isEqualTo(0L);
+ assertThat(localityStats1.totalErrorRequests()).isEqualTo(0L);
+ UpstreamLocalityStats localityStats2 = Iterables.get(clusterStats.upstreamLocalityStatsList(),
+ 1);
+ assertThat(localityStats2.locality()).isEqualTo(locality2);
+ assertThat(localityStats2.totalIssuedRequests()).isEqualTo(1L);
+ assertThat(localityStats2.totalSuccessfulRequests()).isEqualTo(0L);
+ assertThat(localityStats2.totalErrorRequests()).isEqualTo(1L);
+
+ loadBalancer.shutdown();
+ loadBalancer = null;
+ // No more references are held for localityStats1 hence dropped.
+ // Locality load is reported for one last time in case of loads occurred since the previous
+ // load report.
+ clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER));
+ localityStats2 = Iterables.getOnlyElement(clusterStats.upstreamLocalityStatsList());
+
+ assertThat(localityStats2.locality()).isEqualTo(locality2);
+ assertThat(localityStats2.totalIssuedRequests()).isEqualTo(0L);
+ assertThat(localityStats2.totalSuccessfulRequests()).isEqualTo(0L);
+ assertThat(localityStats2.totalErrorRequests()).isEqualTo(0L);
+ assertThat(localityStats2.totalRequestsInProgress()).isEqualTo(0L);
+
+ assertThat(loadStatsManager.getClusterStatsReports(CLUSTER)).isEmpty();
+ }
+
@Test
public void dropRpcsWithRespectToLbConfigDropCategories() {
LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider();
@@ -391,9 +488,11 @@ public void dropRpcsWithRespectToLbConfigDropCategories() {
assertThat(leafBalancer.name).isEqualTo("round_robin");
assertThat(Iterables.getOnlyElement(leafBalancer.addresses).getAddresses())
.isEqualTo(endpoint.getAddresses());
- Subchannel subchannel = leafBalancer.helper.createSubchannel(
- CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
- leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
+ leafBalancer.createSubChannel();
+ FakeSubchannel fakeSubchannel = helper.subchannels.poll();
+ fakeSubchannel.setConnectedEagIndex(0);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+
assertThat(currentState).isEqualTo(ConnectivityState.READY);
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
assertThat(result.getStatus().isOk()).isFalse();
@@ -470,9 +569,11 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu
assertThat(leafBalancer.name).isEqualTo("round_robin");
assertThat(Iterables.getOnlyElement(leafBalancer.addresses).getAddresses())
.isEqualTo(endpoint.getAddresses());
- Subchannel subchannel = leafBalancer.helper.createSubchannel(
- CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
- leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
+ leafBalancer.createSubChannel();
+ FakeSubchannel fakeSubchannel = helper.subchannels.poll();
+ fakeSubchannel.setConnectedEagIndex(0);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ assertThat(currentState).isEqualTo(ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
for (int i = 0; i < maxConcurrentRequests; i++) {
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
@@ -562,9 +663,11 @@ private void subtest_maxConcurrentRequests_appliedWithDefaultValue(
assertThat(leafBalancer.name).isEqualTo("round_robin");
assertThat(Iterables.getOnlyElement(leafBalancer.addresses).getAddresses())
.isEqualTo(endpoint.getAddresses());
- Subchannel subchannel = leafBalancer.helper.createSubchannel(
- CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build());
- leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY);
+ leafBalancer.createSubChannel();
+ FakeSubchannel fakeSubchannel = helper.subchannels.poll();
+ fakeSubchannel.setConnectedEagIndex(0);
+ fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ assertThat(currentState).isEqualTo(ConnectivityState.READY);
assertThat(currentState).isEqualTo(ConnectivityState.READY);
for (int i = 0; i < ClusterImplLoadBalancer.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) {
PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs);
@@ -830,19 +933,24 @@ public void shutdown() {
downstreamBalancers.remove(this);
}
- void deliverSubchannelState(final Subchannel subchannel, ConnectivityState state) {
- SubchannelPicker picker = new SubchannelPicker() {
- @Override
- public PickResult pickSubchannel(PickSubchannelArgs args) {
- return PickResult.withSubchannel(subchannel);
+ Subchannel createSubChannel() {
+ Subchannel subchannel = helper.createSubchannel(
+ CreateSubchannelArgs.newBuilder().setAddresses(addresses).build());
+ subchannel.start(infoObject -> {
+ if (infoObject.getState() == ConnectivityState.READY) {
+ helper.updateBalancingState(
+ ConnectivityState.READY,
+ new FixedResultPicker(PickResult.withSubchannel(subchannel)));
}
- };
- helper.updateBalancingState(state, picker);
+ });
+ return subchannel;
}
}
private final class FakeLbHelper extends LoadBalancer.Helper {
+ private final Queue subchannels = new LinkedList<>();
+
@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
@@ -857,7 +965,9 @@ public void updateBalancingState(
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
- return new FakeSubchannel(args.getAddresses(), args.getAttributes());
+ FakeSubchannel subchannel = new FakeSubchannel(args.getAddresses(), args.getAttributes());
+ subchannels.add(subchannel);
+ return subchannel;
}
@Override
@@ -869,17 +979,27 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String author
public String getAuthority() {
return AUTHORITY;
}
+
+ @Override
+ public void refreshNameResolution() {}
}
private static final class FakeSubchannel extends Subchannel {
private final List eags;
private final Attributes attrs;
+ private SubchannelStateListener listener;
+ private Attributes connectedAttributes;
private FakeSubchannel(List eags, Attributes attrs) {
this.eags = eags;
this.attrs = attrs;
}
+ @Override
+ public void start(SubchannelStateListener listener) {
+ this.listener = checkNotNull(listener, "listener");
+ }
+
@Override
public void shutdown() {
}
@@ -901,6 +1021,19 @@ public Attributes getAttributes() {
@Override
public void updateAddresses(List addrs) {
}
+
+ @Override
+ public Attributes getConnectedAddressAttributes() {
+ return connectedAttributes;
+ }
+
+ public void updateState(ConnectivityStateInfo newState) {
+ listener.onSubchannelState(newState);
+ }
+
+ public void setConnectedEagIndex(int eagIndex) {
+ this.connectedAttributes = eags.get(eagIndex).getAttributes();
+ }
}
private final class FakeXdsClient extends XdsClient {