Skip to content

Commit

Permalink
Run code snippets in ReindexDocumentationIT (#35165)
Browse files Browse the repository at this point in the history
Closes #32093

(cherry picked from commit 12072b2)
  • Loading branch information
vladimirdolzhenko committed Nov 2, 2018
1 parent d91dc73 commit 16c7bcc
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,73 +23,133 @@
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequestBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.CancelTests;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.index.reindex.ReindexRequestBuilder;
import org.elasticsearch.index.reindex.RethrottleAction;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.hamcrest.Matcher;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

public class ReindexDocumentationIT extends ESIntegTestCase {

public void reindex() {
// Semaphore used to allow & block indexing operations during the test
private static final Semaphore ALLOWED_OPERATIONS = new Semaphore(0);
private static final String INDEX_NAME = "source_index";

@Override
protected boolean ignoreExternalCluster() {
return true;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, ReindexCancellationPlugin.class);
}

@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Collections.singletonList(ReindexPlugin.class);
}

@Before
public void setup() {
client().admin().indices().prepareCreate(INDEX_NAME).get();
}

public void testReindex() {
Client client = client();
// tag::reindex1
BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
BulkByScrollResponse response =
ReindexAction.INSTANCE.newRequestBuilder(client)
.source("source_index")
.destination("target_index")
.filter(QueryBuilders.matchQuery("category", "xzy")) // <1>
.get();
// end::reindex1
}

public void updateByQuery() {
public void testUpdateByQuery() {
Client client = client();
client.admin().indices().prepareCreate("foo").get();
client.admin().indices().prepareCreate("bar").get();
client.admin().indices().preparePutMapping(INDEX_NAME).setType("_doc").setSource("cat", "type=keyword").get();
{
// tag::update-by-query
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query
}
{
// tag::update-by-query-filter
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.filter(QueryBuilders.termQuery("level", "awesome"))
.size(1000)
.script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap()));
.script(new Script(ScriptType.INLINE,
"ctx._source.awesome = 'absolutely'",
"painless",
Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query-filter
}
{
// tag::update-by-query-size
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.source().setSize(500);
.source()
.setSize(500);
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query-size
}
{
// tag::update-by-query-sort
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
.source().addSort("cat", SortOrder.DESC);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.size(100)
.source()
.addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query-sort
}
{
// tag::update-by-query-script
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
.script(new Script(
ScriptType.INLINE,
Expand All @@ -106,66 +166,86 @@ public void updateByQuery() {
}
{
// tag::update-by-query-multi-index
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query-multi-index
}
{
// tag::update-by-query-routing
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query-routing
}
{
// tag::update-by-query-pipeline
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
UpdateByQueryRequestBuilder updateByQuery =
UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();
// end::update-by-query-pipeline
}
}

public void testTasks() throws Exception {
final Client client = client();
final ReindexRequestBuilder builder = reindexAndPartiallyBlock();

{
// tag::update-by-query-list-tasks
ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
.setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
TaskId taskId = info.getTaskId();
BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
BulkByScrollTask.Status status =
(BulkByScrollTask.Status) info.getStatus();
// do stuff
}
// end::update-by-query-list-tasks
}

TaskInfo mainTask = CancelTests.findTaskToCancel(ReindexAction.NAME, builder.request().getSlices());
BulkByScrollTask.Status status = (BulkByScrollTask.Status) mainTask.getStatus();
assertNull(status.getReasonCancelled());
TaskId taskId = mainTask.getTaskId();
{
TaskId taskId = null;
// tag::update-by-query-get-task
GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();
// end::update-by-query-get-task
}
{
TaskId taskId = null;
// tag::update-by-query-cancel-task
// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
client.admin().cluster().prepareCancelTasks()
.setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();
client.admin().cluster().prepareCancelTasks()
.setTaskId(taskId).get().getTasks();
// end::update-by-query-cancel-task
}
{
TaskId taskId = null;
// tag::update-by-query-rethrottle
RethrottleAction.INSTANCE.newRequestBuilder(client)
.setTaskId(taskId)
.setRequestsPerSecond(2.0f)
.get();
// end::update-by-query-rethrottle
}

// unblocking the blocked update
ALLOWED_OPERATIONS.release(builder.request().getSlices());
}

public void deleteByQuery() {
public void testDeleteByQuery() {
Client client = client();
client.admin().indices().prepareCreate("persons").get();

// tag::delete-by-query-sync
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) // <1>
.source("persons") // <2>
.get(); // <3>
Expand All @@ -189,4 +269,76 @@ public void onFailure(Exception e) {
// end::delete-by-query-async
}

/**
* Similar to what CancelTests does: blocks some operations to be able to catch some tasks in running state
* @see CancelTests#testCancel(String, AbstractBulkByScrollRequestBuilder, CancelTests.CancelAssertion, Matcher)
*/
private ReindexRequestBuilder reindexAndPartiallyBlock() throws Exception {
final Client client = client();
final int numDocs = randomIntBetween(10, 100);
ALLOWED_OPERATIONS.release(numDocs);

indexRandom(true, false, true, IntStream.range(0, numDocs)
.mapToObj(i -> client().prepareIndex(INDEX_NAME, "_doc", Integer.toString(i)).setSource("n", Integer.toString(i)))
.collect(Collectors.toList()));

// Checks that the all documents have been indexed and correctly counted
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), numDocs);
assertThat(ALLOWED_OPERATIONS.drainPermits(), equalTo(0));

ReindexRequestBuilder builder = new ReindexRequestBuilder(client, ReindexAction.INSTANCE).source(INDEX_NAME)
.destination("target_index", "_doc");
// Scroll by 1 so that cancellation is easier to control
builder.source().setSize(1);

int numModifiedDocs = randomIntBetween(builder.request().getSlices() * 2, numDocs);
// chose to modify some of docs - rest is still blocked
ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());

// Now execute the reindex action...
builder.execute();

// 10 seconds is usually fine but on heavily loaded machines this can take a while
assertTrue("updates blocked", awaitBusy(
() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0,
1, TimeUnit.MINUTES));
return builder;
}

public static class ReindexCancellationPlugin extends Plugin {

@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexOperationListener(new BlockingOperationListener());
}
}

public static class BlockingOperationListener implements IndexingOperationListener {

@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index index) {
return preCheck(index, index.type());
}

@Override
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
return preCheck(delete, delete.type());
}

private <T extends Engine.Operation> T preCheck(T operation, String type) {
if (("_doc".equals(type) == false) || (operation.origin() != Engine.Operation.Origin.PRIMARY)) {
return operation;
}

try {
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
return operation;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new IllegalStateException("Something went wrong");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private void testCancel(String action, AbstractBulkByScrollRequestBuilder<?, ?>
assertion.assertThat(response, numDocs, numModifiedDocs);
}

private TaskInfo findTaskToCancel(String actionName, int workerCount) {
public static TaskInfo findTaskToCancel(String actionName, int workerCount) {
ListTasksResponse tasks;
long start = System.nanoTime();
do {
Expand Down

0 comments on commit 16c7bcc

Please sign in to comment.