Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return transport addresses from UnicastHostsProvider #31426

Merged
merged 2 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.classic.AzureServiceDisableException;
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.InetAddresses;
Expand All @@ -47,9 +45,6 @@
import java.util.ArrayList;
import java.util.List;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;

public class AzureUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {

public enum HostType {
Expand Down Expand Up @@ -104,7 +99,7 @@ public static Deployment fromString(String string) {

private final TimeValue refreshInterval;
private long lastRefresh;
private List<DiscoveryNode> cachedDiscoNodes;
private List<TransportAddress> dynamicHosts;
private final HostType hostType;
private final String publicEndpointName;
private final String deploymentName;
Expand Down Expand Up @@ -137,30 +132,30 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
*/
@Override
public List<DiscoveryNode> buildDynamicNodes() {
public List<TransportAddress> buildDynamicHosts() {
if (refreshInterval.millis() != 0) {
if (cachedDiscoNodes != null &&
if (dynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
logger.trace("using cache to retrieve node list");
return cachedDiscoNodes;
return dynamicHosts;
}
lastRefresh = System.currentTimeMillis();
}
logger.debug("start building nodes list using Azure API");

cachedDiscoNodes = new ArrayList<>();
dynamicHosts = new ArrayList<>();

HostedServiceGetDetailedResponse detailed;
try {
detailed = azureComputeService.getServiceDetails();
} catch (AzureServiceDisableException e) {
logger.debug("Azure discovery service has been disabled. Returning empty list of nodes.");
return cachedDiscoNodes;
return dynamicHosts;
} catch (AzureServiceRemoteException e) {
// We got a remote exception
logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage());
logger.trace("AzureServiceRemoteException caught", e);
return cachedDiscoNodes;
return dynamicHosts;
}

InetAddress ipAddress = null;
Expand Down Expand Up @@ -212,18 +207,17 @@ public List<DiscoveryNode> buildDynamicNodes() {
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
for (TransportAddress address : addresses) {
logger.trace("adding {}, transport_address {}", networkAddress, address);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
dynamicHosts.add(address);
}
} catch (Exception e) {
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
}
}
}

logger.debug("{} node(s) added", cachedDiscoNodes.size());
logger.debug("{} addresses added", dynamicHosts.size());

return cachedDiscoNodes;
return dynamicHosts;
}

protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.amazonaws.services.ec2.model.Tag;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
Expand All @@ -46,8 +44,6 @@
import java.util.Set;

import static java.util.Collections.disjoint;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.TAG_PREFIX;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_DNS;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_IP;
Expand All @@ -70,15 +66,15 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos

private final String hostType;

private final DiscoNodesCache discoNodes;
private final TransportAddressesCache dynamicHosts;

AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
super(settings);
this.transportService = transportService;
this.awsEc2Service = awsEc2Service;

this.hostType = AwsEc2Service.HOST_TYPE_SETTING.get(settings);
this.discoNodes = new DiscoNodesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
this.dynamicHosts = new TransportAddressesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));

this.bindAnyGroup = AwsEc2Service.ANY_GROUP_SETTING.get(settings);
this.groups = new HashSet<>();
Expand All @@ -96,13 +92,13 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}

@Override
public List<DiscoveryNode> buildDynamicNodes() {
return discoNodes.getOrRefresh();
public List<TransportAddress> buildDynamicHosts() {
return dynamicHosts.getOrRefresh();
}

protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {

final List<DiscoveryNode> discoNodes = new ArrayList<>();
final List<TransportAddress> dynamicHosts = new ArrayList<>();

final DescribeInstancesResult descInstances;
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
Expand All @@ -115,7 +111,7 @@ protected List<DiscoveryNode> fetchDynamicNodes() {
} catch (final AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);
return discoNodes;
return dynamicHosts;
}

logger.trace("building dynamic unicast discovery nodes...");
Expand Down Expand Up @@ -179,8 +175,7 @@ && disjoint(securityGroupIds, groups)) {
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
for (int i = 0; i < addresses.length; i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode(instance.getInstanceId(), "#cloud-" + instance.getInstanceId() + "-" + i,
addresses[i], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
dynamicHosts.add(addresses[i]);
}
} catch (final Exception e) {
final String finalAddress = address;
Expand All @@ -194,9 +189,9 @@ && disjoint(securityGroupIds, groups)) {
}
}

logger.debug("using dynamic discovery nodes {}", discoNodes);
logger.debug("using dynamic transport addresses {}", dynamicHosts);

return discoNodes;
return dynamicHosts;
}

private DescribeInstancesRequest buildDescribeInstancesRequest() {
Expand All @@ -222,11 +217,11 @@ private DescribeInstancesRequest buildDescribeInstancesRequest() {
return describeInstancesRequest;
}

private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
private final class TransportAddressesCache extends SingleObjectCache<List<TransportAddress>> {

private boolean empty = true;

protected DiscoNodesCache(TimeValue refreshInterval) {
protected TransportAddressesCache(TimeValue refreshInterval) {
super(refreshInterval, new ArrayList<>());
}

Expand All @@ -236,8 +231,8 @@ protected boolean needsRefresh() {
}

@Override
protected List<DiscoveryNode> refresh() {
final List<DiscoveryNode> nodes = fetchDynamicNodes();
protected List<TransportAddress> refresh() {
final List<TransportAddress> nodes = fetchDynamicNodes();
empty = nodes.isEmpty();
return nodes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -87,16 +86,16 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
null);
}

protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
return buildDynamicNodes(nodeSettings, nodes, null);
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
return buildDynamicHosts(nodeSettings, nodes, null);
}

protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
logger.debug("--> nodes found: {}", discoveryNodes);
return discoveryNodes;
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
} catch (IOException e) {
fail("Unexpected IOException");
return null;
Expand All @@ -107,7 +106,7 @@ public void testDefaultSettings() throws InterruptedException {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
List<TransportAddress> discoveryNodes = buildDynamicHosts(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
}

Expand All @@ -119,12 +118,11 @@ public void testPrivateIp() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> transportAddresses = buildDynamicHosts(nodeSettings, nodes);
assertThat(transportAddresses, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
for (TransportAddress address : transportAddresses) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
assertEquals(address, expected);
}
Expand All @@ -138,12 +136,11 @@ public void testPublicIp() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
assertEquals(address, expected);
}
Expand All @@ -159,13 +156,12 @@ public void testPrivateDns() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
assertEquals(address, expected);
Expand All @@ -182,13 +178,12 @@ public void testPublicDns() throws InterruptedException {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
assertEquals(address, expected);
Expand All @@ -201,7 +196,7 @@ public void testInvalidHostType() throws InterruptedException {
.build();

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
buildDynamicNodes(nodeSettings, 1);
buildDynamicHosts(nodeSettings, 1);
});
assertThat(exception.getMessage(), containsString("does_not_exist is unknown for discovery.ec2.host_type"));
}
Expand All @@ -227,8 +222,8 @@ public void testFilterByTags() throws InterruptedException {
}

logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(prodInstances));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(prodInstances));
}

public void testFilterByMultipleTags() throws InterruptedException {
Expand Down Expand Up @@ -258,8 +253,8 @@ public void testFilterByMultipleTags() throws InterruptedException {
}

logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(prodInstances));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(prodInstances));
}

public void testReadHostFromTag() throws InterruptedException, UnknownHostException {
Expand All @@ -285,11 +280,11 @@ public void testReadHostFromTag() throws InterruptedException, UnknownHostExcept
}

logger.info("started [{}] instances", nodes);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(nodes));
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(discoveryNode.getName());
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(nodes));
int node = 1;
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get("node" + node++);
assertEquals(address, expected);
}
}
Expand All @@ -306,13 +301,13 @@ public void testGetNodeListEmptyCache() throws Exception {
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return new ArrayList<>();
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(3));
}
Expand All @@ -323,18 +318,18 @@ public void testGetNodeListCached() throws Exception {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, plugin.ec2Service) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1);
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(2));
}
Expand Down
Loading