-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: Fix load reporting when pick first is used for locality-routing. #11495
Changes from 2 commits
169537c
abaf93d
3a08e3f
885bf99
0a2e1fb
60fb499
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright 2024 The gRPC Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.grpc; | ||
|
||
/** | ||
* An internal class. Do not use. | ||
* | ||
* <p>An interface to provide the address connected by subchannel. | ||
*/ | ||
@Internal | ||
public interface InternalSubchannelAddress { | ||
|
||
/** | ||
* Return attributes of the server address connected by sub channel. | ||
*/ | ||
public Attributes getConnectedAddressAttributes(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import io.grpc.ClientStreamTracer; | ||
import io.grpc.ClientStreamTracer.StreamInfo; | ||
import io.grpc.ConnectivityState; | ||
import io.grpc.ConnectivityStateInfo; | ||
import io.grpc.EquivalentAddressGroup; | ||
import io.grpc.InternalLogId; | ||
import io.grpc.LoadBalancer; | ||
|
@@ -59,6 +60,7 @@ | |
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import javax.annotation.Nullable; | ||
|
||
/** | ||
|
@@ -77,10 +79,9 @@ | |
Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING")) | ||
|| Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING")); | ||
|
||
private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS = | ||
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats"); | ||
private static final Attributes.Key<String> ATTR_CLUSTER_LOCALITY_NAME = | ||
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName"); | ||
private static final Attributes.Key<AtomicReference<ClusterLocality>> | ||
ATTR_CLUSTER_LOCALITY = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can put ATTR_CLUSTER_LOCALITY on the same line as the type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality"); | ||
|
||
private final XdsLogger logger; | ||
private final Helper helper; | ||
|
@@ -213,35 +214,43 @@ | |
@Override | ||
public Subchannel createSubchannel(CreateSubchannelArgs args) { | ||
List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses()); | ||
Locality locality = args.getAddresses().get(0).getAttributes().get( | ||
InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality | ||
String localityName = args.getAddresses().get(0).getAttributes().get( | ||
InternalXdsAttributes.ATTR_LOCALITY_NAME); | ||
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain | ||
// attributes with its locality, including endpoints in LOGICAL_DNS clusters. | ||
// In case of not (which really shouldn't), loads are aggregated under an empty locality. | ||
if (locality == null) { | ||
locality = Locality.create("", "", ""); | ||
localityName = ""; | ||
} | ||
final ClusterLocalityStats localityStats = | ||
(lrsServerInfo == null) | ||
? null | ||
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, | ||
edsServiceName, locality); | ||
|
||
// This clusterLocality ideally should not be utilized. We derive locality | ||
// information from the first address, even prior to the subchannel becoming ready. | ||
// This is primarily for the purpose of facilitating load reporting in the pre-READY | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no load reporting pre-READY. But an LB API could return the subchannel even before it is ready, and PF does this. We generally won't end up reporting load for such picks because the channel will ignore the selected (not-ready) subchannel, but we needed to handle the case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, updated the comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you miss pushing the update? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline, we both are able to see the updated comment. |
||
// state of the subchannel. | ||
ClusterLocality clusterLocality = createClusterLocalityFromAttributes( | ||
args.getAddresses().get(0).getAttributes()); | ||
AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>( | ||
clusterLocality); | ||
Attributes attrs = args.getAttributes().toBuilder() | ||
.set(ATTR_CLUSTER_LOCALITY_STATS, localityStats) | ||
.set(ATTR_CLUSTER_LOCALITY_NAME, localityName) | ||
.set(ATTR_CLUSTER_LOCALITY, localityAtomicReference) | ||
.build(); | ||
args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build(); | ||
final Subchannel subchannel = delegate().createSubchannel(args); | ||
|
||
return new ForwardingSubchannel() { | ||
@Override | ||
public void start(SubchannelStateListener listener) { | ||
delegate().start(new SubchannelStateListener() { | ||
@Override | ||
public void onSubchannelState(ConnectivityStateInfo newState) { | ||
if (newState.getState().equals(ConnectivityState.READY)) { | ||
// Get locality based on the connected address attributes | ||
ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes( | ||
subchannel.getConnectedAddressAttributes()); | ||
ClusterLocality oldClusterLocality = localityAtomicReference | ||
.getAndSet(updatedClusterLocality); | ||
oldClusterLocality.release(); | ||
} | ||
listener.onSubchannelState(newState); | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
if (localityStats != null) { | ||
localityStats.release(); | ||
if (localityAtomicReference.get() != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can't possibly be null AFAICT There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. Removed the |
||
localityAtomicReference.get().release(); | ||
} | ||
delegate().shutdown(); | ||
} | ||
|
@@ -274,6 +283,28 @@ | |
return newAddresses; | ||
} | ||
|
||
private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) { | ||
Locality locality = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY); | ||
String localityName = addressAttributes.get(InternalXdsAttributes.ATTR_LOCALITY_NAME); | ||
|
||
// Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain | ||
// attributes with its locality, including endpoints in LOGICAL_DNS clusters. | ||
// In case of not (which really shouldn't), loads are aggregated under an empty | ||
// locality. | ||
if (locality == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It shouldn't be, but is possible that locality is set but locality name is null. You should probably have an else clause that does a null check on localityName. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is existing code, and it seems in general code assumes the values to be non-null. I'd much rather we handle that centrally in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you suggesting to move the locality name null check to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see now Larry's comment wasn't about the Locality struct, but instead the attribute. That had been discussed when the code was originally introduced: This PR is likely to be reverted (in some way) later, once the old PF policy goes away. So changes to the existing code are likely to be lost. |
||
locality = Locality.create("", "", ""); | ||
localityName = ""; | ||
} | ||
|
||
final ClusterLocalityStats localityStats = | ||
(lrsServerInfo == null) | ||
? null | ||
: xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, | ||
edsServiceName, locality); | ||
|
||
return new ClusterLocality(localityStats, localityName); | ||
} | ||
|
||
@Override | ||
protected Helper delegate() { | ||
return helper; | ||
|
@@ -362,10 +393,12 @@ | |
} | ||
} | ||
final ClusterLocalityStats stats = | ||
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS); | ||
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() | ||
.getClusterLocalityStats(); | ||
if (stats != null) { | ||
ejona86 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
String localityName = | ||
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME); | ||
result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get() | ||
.getClusterLocalityName(); | ||
args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName); | ||
|
||
ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( | ||
|
@@ -447,4 +480,33 @@ | |
stats.recordBackendLoadMetricStats(report.getNamedMetrics()); | ||
} | ||
} | ||
|
||
/** | ||
* Represents the locality attributes of a cluster. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is actually the stats and the name. Name could be considered an attribute, the the stats aren't. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. PTAL |
||
*/ | ||
static final class ClusterLocality { | ||
private final ClusterLocalityStats clusterLocalityStats; | ||
private final String clusterLocalityName; | ||
|
||
@VisibleForTesting | ||
ClusterLocality(ClusterLocalityStats localityStats, String localityName) { | ||
this.clusterLocalityStats = localityStats; | ||
this.clusterLocalityName = localityName; | ||
} | ||
|
||
ClusterLocalityStats getClusterLocalityStats() { | ||
return clusterLocalityStats; | ||
} | ||
|
||
String getClusterLocalityName() { | ||
return clusterLocalityName; | ||
} | ||
|
||
@VisibleForTesting | ||
void release() { | ||
if (clusterLocalityStats != null) { | ||
clusterLocalityStats.release(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,7 +91,7 @@ private synchronized void releaseClusterDropCounter( | |
String cluster, @Nullable String edsServiceName) { | ||
checkState(allDropStats.containsKey(cluster) | ||
&& allDropStats.get(cluster).containsKey(edsServiceName), | ||
"stats for cluster %s, edsServiceName %s not exits", cluster, edsServiceName); | ||
"stats for cluster %s, edsServiceName %s not exist", cluster, edsServiceName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/not exist/do not exist/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
ReferenceCounted<ClusterDropStats> ref = allDropStats.get(cluster).get(edsServiceName); | ||
ref.release(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This says it provides the
address
, but the method provides address attributes.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the interface name to
InternalSubchannelAddressAttributes
.