Skip to content

Commit

Permalink
fix tests instead of updating disconnectFromNodesExcept
Browse files Browse the repository at this point in the history
Signed-off-by: Rahul Karajgikar <[email protected]>
  • Loading branch information
Rahul Karajgikar committed Sep 6, 2024
1 parent 9e322c4 commit 85a9e37
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.opensearch.common.settings.Setting.Property;
import static org.opensearch.common.settings.Setting.positiveTimeSetting;
Expand Down Expand Up @@ -104,7 +103,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {

// contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of
// disconnecting
private final Map<DiscoveryNode, ConnectionTarget> targetsByNode = new HashMap<>();
protected final Map<DiscoveryNode, ConnectionTarget> targetsByNode = new HashMap<>();

private final TimeValue reconnectInterval;
private volatile ConnectionChecker connectionChecker;
Expand All @@ -116,6 +115,11 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
}

// exposed for testing
protected ConnectionTarget createConnectionTarget(DiscoveryNode discoveryNode) {
return new ConnectionTarget(discoveryNode);
}

/**
* Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all
* connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known.
Expand Down Expand Up @@ -175,18 +179,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
runnables.add(targetsByNode.get(discoveryNode).disconnect());
}

// There might be some stale nodes that are in pendingDisconnect set from before but are not connected anymore
// So these nodes would not be there in targetsByNode and would not have disconnect() called for them
// This code block clears the pending disconnect for these nodes that don't have entries in targetsByNode
// to avoid permanently blocking node joins
// This situation should ideally not happen, this is just for extra safety
transportService.removePendingDisconnections(
transportService.getPendingDisconnections()
.stream()
.filter(discoveryNode -> !discoveryNodes.nodeExists(discoveryNode) && !targetsByNode.containsKey(discoveryNode))
.collect(Collectors.toSet())
);
}
runnables.forEach(Runnable::run);
}
Expand Down Expand Up @@ -334,7 +326,7 @@ private enum ActivityType {
*
* @opensearch.internal
*/
private class ConnectionTarget {
protected class ConnectionTarget {
private final DiscoveryNode discoveryNode;

private PlainListenableActionFuture<Void> future = PlainListenableActionFuture.newListenableFuture();
Expand Down Expand Up @@ -524,8 +516,6 @@ private void onCompletion(ActivityType completedActivityType, @Nullable Exceptio

if (completedActivityType.equals(ActivityType.DISCONNECTING)) {
final ConnectionTarget removedTarget = targetsByNode.remove(discoveryNode);
// if we remove from targetsByNode, we also remove from underlying pendingDisconnects for consistency
// transportService.markDisconnectAsCompleted(new HashSet<>(Collections.singleton(discoveryNode)));
assert removedTarget == this : removedTarget + " vs " + this;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,23 +247,12 @@ public void disconnectFromNode(DiscoveryNode node) {
logger.debug("Removed node [{}] from pending disconnections list", node);
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return pendingDisconnections;
}

@Override
public void setPendingDisconnection(DiscoveryNode node) {
logger.debug("marking disconnection as pending for node: [{}]", node);
pendingDisconnections.add(node);
}

@Override
public void removePendingDisconnection(DiscoveryNode node) {
logger.debug("marking disconnection as completed for node: [{}]", node);
pendingDisconnections.remove(node);
}

/**
* Returns the number of nodes this manager is connected to.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,8 @@ void connectToNode(

void disconnectFromNode(DiscoveryNode node);

Set<DiscoveryNode> getPendingDisconnections();

void setPendingDisconnection(DiscoveryNode node);

void removePendingDisconnection(DiscoveryNode node);

Set<DiscoveryNode> getAllConnectedNodes();

int size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,11 @@ public void disconnectFromNode(DiscoveryNode node) {
delegate.disconnectFromNode(node);
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return delegate.getPendingDisconnections();
}

@Override
public void setPendingDisconnection(DiscoveryNode node) {
delegate.setPendingDisconnection(node);
}

@Override
public void removePendingDisconnection(DiscoveryNode node) {
delegate.removePendingDisconnection(node);
}

@Override
public ConnectionProfile getConnectionProfile() {
return delegate.getConnectionProfile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,18 +773,10 @@ public void disconnectFromNode(DiscoveryNode node) {
connectionManager.disconnectFromNode(node);
}

public Set<DiscoveryNode> getPendingDisconnections() {
return connectionManager.getPendingDisconnections();
}

public void setPendingDisconnections(Set<DiscoveryNode> nodes) {
nodes.forEach(connectionManager::setPendingDisconnection);
}

public void removePendingDisconnections(Set<DiscoveryNode> nodes) {
nodes.forEach(connectionManager::removePendingDisconnection);
}

public void addMessageListener(TransportMessageListener listener) {
messageListener.listeners.add(listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1923,11 +1923,6 @@ private final class TestClusterNode {
protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() {
return new MockSinglePrioritizingExecutor(node.getName(), deterministicTaskQueue, threadPool);
}

@Override
protected void connectToNodesAndWait(ClusterState newClusterState) {
// don't do anything, and don't block
}
}
);
recoverySettings = new RecoverySettings(settings, clusterSettings);
Expand Down Expand Up @@ -2094,7 +2089,7 @@ public void onFailure(final Exception e) {
rerouteService,
threadPool
);
nodeConnectionsService = new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService);
nodeConnectionsService = createTestNodeConnectionsService(clusterService.getSettings(), threadPool, transportService);
final MetadataMappingService metadataMappingService = new MetadataMappingService(clusterService, indicesService);
indicesClusterStateService = new IndicesClusterStateService(
settings,
Expand Down Expand Up @@ -2490,6 +2485,24 @@ protected void assertSnapshotOrGenericThread() {
}
}

public NodeConnectionsService createTestNodeConnectionsService(
Settings settings,
ThreadPool threadPool,
TransportService transportService
) {
return new NodeConnectionsService(settings, threadPool, transportService) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
// just update targetsByNode to ensure disconnect runs for these nodes
// we rely on disconnect to run for keeping track of pendingDisconnects and ensuring node-joins can happen
for (final DiscoveryNode discoveryNode : discoveryNodes) {
this.targetsByNode.put(discoveryNode, createConnectionTarget(discoveryNode));
}
onCompletion.run();
}
};
}

public ClusterInfoService getMockClusterInfoService() {
return clusterInfoService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -1150,9 +1151,12 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE)
);
clusterService = new ClusterService(settings, clusterSettings, clusterManagerService, clusterApplierService);
clusterService.setNodeConnectionsService(
new NodeConnectionsService(clusterService.getSettings(), threadPool, transportService)
NodeConnectionsService nodeConnectionsService = createTestNodeConnectionsService(
clusterService.getSettings(),
threadPool,
transportService
);
clusterService.setNodeConnectionsService(nodeConnectionsService);
repositoriesService = new RepositoriesService(
settings,
clusterService,
Expand Down Expand Up @@ -1588,6 +1592,24 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
}
}

public static NodeConnectionsService createTestNodeConnectionsService(
Settings settings,
ThreadPool threadPool,
TransportService transportService
) {
return new NodeConnectionsService(settings, threadPool, transportService) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) {
// just update targetsByNode to ensure disconnect runs for these nodes
// we rely on disconnect to run for keeping track of pendingDisconnects and ensuring node-joins can happen
for (final DiscoveryNode discoveryNode : discoveryNodes) {
this.targetsByNode.put(discoveryNode, createConnectionTarget(discoveryNode));
}
onCompletion.run();
}
};
}

static class DisruptableClusterApplierService extends ClusterApplierService {
private final String nodeName;
private final DeterministicTaskQueue deterministicTaskQueue;
Expand Down Expand Up @@ -1641,11 +1663,6 @@ public void onNewClusterState(String source, Supplier<ClusterState> clusterState
}
}

@Override
protected void connectToNodesAndWait(ClusterState newClusterState) {
// don't do anything, and don't block
}

@Override
protected boolean applicationMayFail() {
return this.applicationMayFail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,21 +123,11 @@ public void disconnectFromNode(DiscoveryNode node) {
delegate.disconnectFromNode(node);
}

@Override
public Set<DiscoveryNode> getPendingDisconnections() {
return delegate.getPendingDisconnections();
}

@Override
public void setPendingDisconnection(DiscoveryNode node) {
delegate.setPendingDisconnection(node);
}

@Override
public void removePendingDisconnection(DiscoveryNode node) {
delegate.removePendingDisconnection(node);
}

@Override
public int size() {
return delegate.size();
Expand Down

0 comments on commit 85a9e37

Please sign in to comment.