Skip to content

Commit

Permalink
Adding Unit test for checkpointRefreshListener
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 committed May 6, 2022
1 parent b216116 commit 4928270
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
Expand Down Expand Up @@ -674,8 +673,7 @@ public static final IndexShard newIndexShard(
Arrays.asList(listeners),
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
SegmentReplicationCheckpointPublisher.EMPTY
cbs
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
Expand Down Expand Up @@ -178,25 +179,8 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.common.lucene.Lucene.cleanLuceneIndex;
import static org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS;
Expand Down Expand Up @@ -3424,6 +3408,17 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

/**
* here we are mocking a SegmentReplicationcheckpointPublisher and testing that when a refresh happens on index shard if CheckpointRefreshListener is added to the InternalrefreshListerners List
*/
public void testCheckpointRefreshListener() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(mock), true);
shard.refresh("test");
assertEquals(shard.getEngine().config().getInternalRefreshListener().get(1).getClass(), CheckpointRefreshListener.class);
closeShards(shard);
}

public void testIndexCheckOnStartup() throws Exception {
final IndexShard indexShard = newStartedShard(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,66 @@ protected IndexShard newShard(
);
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetadata indexMetadata for the shard, including any mapping
* @param storeProvider an optional custom store provider to use. If null a default file based store will be created
* @param indexReaderWrapper an optional wrapper to be used during search
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(
ShardRouting routing,
ShardPath shardPath,
IndexMetadata indexMetadata,
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
@Nullable EngineFactory engineFactory,
@Nullable EngineConfigFactory engineConfigFactory,
Runnable globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer,
IndexEventListener indexEventListener,
IndexingOperationListener... listeners
) throws IOException {
return newShard(routing, shardPath, indexMetadata, storeProvider, indexReaderWrapper, engineFactory, engineConfigFactory, globalCheckpointSyncer, retentionLeaseSyncer, indexEventListener, SegmentReplicationCheckpointPublisher.EMPTY, listeners);
}

/**
* creates a new initializing shard. The shard will will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
protected IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
randomAlphaOfLength(10),
true,
ShardRoutingState.INITIALIZING,
RecoverySource.EmptyStoreRecoverySource.INSTANCE
);
final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir());
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000))
.put(Settings.EMPTY)
.build();
IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName())
.settings(indexSettings)
.primaryTerm(0, primaryTerm)
.putMapping("{ \"properties\": {} }").build();
return newShard(shardRouting, shardPath, metadata, null, null, new InternalEngineFactory(), new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), () -> {}, RetentionLeaseSyncer.EMPTY, EMPTY_EVENT_LISTENER, checkpointPublisher);
}

/**
* creates a new initializing shard.
* @param routing shard routing to use
Expand All @@ -418,6 +478,7 @@ protected IndexShard newShard(
* @param indexReaderWrapper an optional wrapper to be used during search
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param checkpointPublisher segment Replication Checkpoint Publisher to publish checkpoint
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(
Expand All @@ -431,6 +492,7 @@ protected IndexShard newShard(
Runnable globalCheckpointSyncer,
RetentionLeaseSyncer retentionLeaseSyncer,
IndexEventListener indexEventListener,
SegmentReplicationCheckpointPublisher checkpointPublisher,
IndexingOperationListener... listeners
) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
Expand Down Expand Up @@ -479,7 +541,7 @@ protected IndexShard newShard(
globalCheckpointSyncer,
retentionLeaseSyncer,
breakerService,
SegmentReplicationCheckpointPublisher.EMPTY
checkpointPublisher
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
success = true;
Expand Down

0 comments on commit 4928270

Please sign in to comment.