Skip to content

Commit

Permalink
Merge pull request #311 from mwl/bug/311-nse-on-search-proxy
Browse files Browse the repository at this point in the history
Scheduler Search API endpoint returns "java.util.NoSuchElementException"
  • Loading branch information
James H. Fisher committed Oct 9, 2015
2 parents d64f3ac + b7d0a76 commit 8394231
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,7 @@ public static Task from(Protos.TaskInfo taskInfo, Protos.TaskStatus taskStatus)
String hostName = data.getProperty("hostname", "UNKNOWN");
String ipAddress = data.getProperty("ipAddress", hostName);
ZonedDateTime startedAt = ZonedDateTime.parse(data.getProperty("startedAt", ZonedDateTime.now().toString()));
Protos.TaskState taskState = null;
if (taskStatus == null) {
taskState = Protos.TaskState.TASK_STAGING;
} else {
taskState = taskStatus.getState();
}
final Protos.TaskState taskState = taskStatus == null ? Protos.TaskState.TASK_STAGING : taskStatus.getState();
return new Task(
hostName,
taskInfo.getTaskId().getValue(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.mesos.elasticsearch.scheduler.controllers;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
Expand All @@ -10,7 +11,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.*;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
Expand Down Expand Up @@ -50,15 +50,14 @@ public ResponseEntity<InputStreamResource> stats() throws IOException {
}

@RequestMapping("/_search")
public ResponseEntity<InputStreamResource> search(@RequestParam("q") String query, @RequestHeader(value = "X-ElasticSearch-Host", required = false) String elasticSearchHost) throws IOException {
HttpHost httpHost = null;
public ResponseEntity<InputStreamResource> search(@RequestParam("q") String query, @RequestParam(value = "node", required = false) String elasticSearchTaskId) throws IOException {
HttpHost httpHost;
Collection<Task> tasks = scheduler.getTasks().values();
Stream<HttpHost> httpHostStream = tasks.stream().map(task -> toHttpHost(task.getClientAddress()));

if (elasticSearchHost != null) {
httpHost = httpHostStream.filter(host -> host.toHostString().equalsIgnoreCase(elasticSearchHost)).findAny().get();
if (StringUtils.isNotBlank(elasticSearchTaskId)) {
httpHost = tasks.stream().filter(task -> task.getTaskId().equals(elasticSearchTaskId)).map(task -> toHttpHost(task.getClientAddress())).findAny().get();
} else {
httpHost = httpHostStream.skip(RandomUtils.nextInt(tasks.size())).findAny().get();
httpHost = tasks.stream().skip(RandomUtils.nextInt(tasks.size())).map(task -> toHttpHost(task.getClientAddress())).findAny().get();
}

HttpResponse esSearchResponse = httpClient.execute(httpHost, new HttpGet("/_search?q=" + URLEncoder.encode(query, "UTF-8")));
Expand Down
34 changes: 16 additions & 18 deletions scheduler/src/main/resources/public/app/controllers.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,26 +255,24 @@ controllers.controller('QueryBrowserController', function ($scope, $http, $locat
results: null,
};

$scope.$parent.$watch('nodes', function(value) {
if (value.length && $scope.query.node == '') {
$scope.query.node = value[0];
}
});

$scope.querySubmit = function() {
if ($scope.query.node && $scope.query.string) {
$http.defaults.headers.common['X-ElasticSearch-Host'] = $scope.query.node;
var success = function(data) {
$scope.query.results = data.hits;
}
var error = function(data) {
if (data.hasOwnProperty('error')) {
$scope.query.error = data.error;
} else {
$scope.query.error = "Unknown error"
if ($scope.query.string) {
Search.get(
{
q: $scope.query.string,
node: $scope.query.node
},
function(data) {
$scope.query.results = data.hits;
},
function(data) {
if (data.hasOwnProperty('error')) {
$scope.query.error = data.error;
} else {
$scope.query.error = "Unknown error"
}
}
}
Search.get({q: $scope.query.string}, success, error);
);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ <h3>Elasticsearch Query Browser</h3>
<input type="text" class="form-control query-string-input" ng-model="query.string" placeholder="Enter your query...">
</div>
<div class="form-group">
<select class="form-control" ng-model="query.node" ng-options="node for node in nodes"></select>
<select class="form-control" ng-model="query.node" ng-options="task.id as task.id for task in tasks">
<option value="">Any</option>
</select>
</div>
<button type="submit" class="btn btn-primary">Search</button>
<button type="button" class="btn btn-default" ng-click="resetQuery()">Reset</button>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import org.apache.commons.lang.StringUtils;
import org.apache.mesos.elasticsearch.common.cli.ElasticsearchCLIParameter;
import org.apache.mesos.elasticsearch.common.cli.ZookeeperCLIParameter;
Expand Down Expand Up @@ -50,7 +52,8 @@ protected CreateContainerCmd dockerCommand() {
return dockerClient
.createContainerCmd(SCHEDULER_IMAGE)
.withName(SCHEDULER_NAME + "_" + new SecureRandom().nextInt())
.withEnv("JAVA_OPTS=-Xms128m -Xmx256m")
.withExposedPorts(ExposedPort.tcp(5005)).withPortBindings(PortBinding.parse("5005:5005"))
.withEnv("JAVA_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Xms128m -Xmx256m")
.withExtraHosts(IntStream.rangeClosed(1, 3).mapToObj(value -> "slave" + value + ":" + mesosIp).toArray(String[]::new))
.withCmd(
ZookeeperCLIParameter.ZOOKEEPER_MESOS_URL, getZookeeperMesosUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class TasksResponse {
public TasksResponse(String schedulerIpAddress, int nodesCount) {
this.schedulerIpAddress = schedulerIpAddress;
this.nodesCount = nodesCount;
await().atMost(60, TimeUnit.SECONDS).until(new TasksCall());
await().atMost(60, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS).until(new TasksCall());
}

public TasksResponse(String schedulerIpAddress, int nodesCount, String nodesState) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.apache.mesos.elasticsearch.systemtest;

import com.github.dockerjava.api.command.CreateContainerCmd;
import com.jayway.awaitility.Awaitility;
import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import org.apache.mesos.mini.container.AbstractContainer;
import org.json.JSONObject;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

/**
* Test Search Proxy controller
*/
public class SearchProxySystemTest extends TestBase {

private static AbstractContainer dataImporter;

private static String searchEndpoint;
private static List<String> slavesElasticAddresses;
private static List<JSONObject> tasks;

@BeforeClass
public static void importData() throws Exception {
tasks = new TasksResponse(getScheduler().getIpAddress(), CLUSTER.getConfig().getNumberOfSlaves()).getTasks();
slavesElasticAddresses = tasks.stream().map(task -> task.getString("http_address")).collect(toList());

dataImporter = new AbstractContainer(CLUSTER.getConfig().dockerClient) {
private String imageName = "mwldk/shakespeare-import";

@Override
protected void pullImage() {
pullImage(imageName, "latest");
}

@Override
protected CreateContainerCmd dockerCommand() {
return dockerClient.createContainerCmd(imageName).withEnv("ELASTIC_SEARCH_URL=" + "http://" + slavesElasticAddresses.get(0));
}
};


Awaitility.await().atMost(5, TimeUnit.MINUTES).pollDelay(2, TimeUnit.SECONDS).until(() -> {
try {
return Unirest.get("http://" + slavesElasticAddresses.get(0) + "/_nodes").asJson().getBody().getObject().getJSONObject("nodes").length() == 3;
} catch (UnirestException e) {
return false;
}
});
final String importerId = CLUSTER.addAndStartContainer(dataImporter);
Awaitility.await().atMost(2, TimeUnit.MINUTES).pollDelay(1, TimeUnit.SECONDS).until(() -> !CLUSTER.getConfig().dockerClient.inspectContainerCmd(importerId).exec().getState().isRunning());

searchEndpoint = "http://" + getScheduler().getIpAddress() + ":31100/v1/es/_search";
}

@Test
public void canRetrieveSearchResultFromRandomNode() throws Exception {
final HttpResponse<JsonNode> response = Unirest.get(searchEndpoint).queryString("q", "love").asJson();
assertEquals(200, response.getStatus());
assertFalse(response.getBody().getObject().getBoolean("timed_out"));
assertEquals(10, response.getBody().getObject().getJSONObject("hits").getJSONArray("hits").length());
}

@Test
public void canRetrieveSearchResultFromParticularNode() throws Exception {
AtomicInteger evaluatedHosts = new AtomicInteger(0);
tasks.stream().forEach(task -> {
try {
final HttpResponse<JsonNode> response = Unirest.get(searchEndpoint).queryString("q", "love").queryString("node", task.getString("id")).asJson();
assertEquals(200, response.getStatus());
assertFalse(response.getBody().getObject().getBoolean("timed_out"));
assertEquals(10, response.getBody().getObject().getJSONObject("hits").getJSONArray("hits").length());
evaluatedHosts.getAndIncrement();
} catch (UnirestException e) {
throw new RuntimeException("Failed to contact search proxy: " + task, e);
}
});
assertEquals(tasks.size(), evaluatedHosts.get());
}
}

0 comments on commit 8394231

Please sign in to comment.