Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make number of segment metadata files in remote segment store configurable #11329

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -167,7 +166,7 @@ public void testRemoteTranslogCleanup() throws Exception {
}

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startNode();
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
Expand All @@ -177,17 +176,20 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");

IndexShard indexShard = getIndexShard(dataNode);
int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles();
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
if (numberOfIterations <= lastNMetadataFilesToKeep) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
}
}, 30, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING,
RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ Runnable getGlobalCheckpointSyncer() {

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
private final RemoteStoreFileDownloader fileDownloader;
private final RecoverySettings recoverySettings;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -469,6 +470,7 @@ public boolean shouldCache(Query query) {
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteStoreStatsTrackerFactory = remoteStoreStatsTrackerFactory;
this.recoverySettings = recoverySettings;
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
}

Expand Down Expand Up @@ -567,6 +569,10 @@ public String getNodeId() {
return translogConfig.getNodeId();
}

public RecoverySettings getRecoverySettings() {
return recoverySettings;
}

public RemoteStoreFileDownloader getFileDownloader() {
return fileDownloader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL
);

public static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
public static final int LAST_N_METADATA_FILES_TO_KEEP = 10;

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -205,7 +203,7 @@ private boolean syncSegments() {
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
if (isRefreshAfterCommit()) {
remoteDirectory.deleteStaleSegmentsAsync(LAST_N_METADATA_FILES_TO_KEEP);
remoteDirectory.deleteStaleSegmentsAsync(indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles());
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,10 @@ public Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
* @throws IOException in case of I/O error while reading from / writing to remote segment store
*/
public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
if (lastNMetadataFilesToKeep <= 0) {
logger.info("Stale segment deletion is disabled if cluster.remote_store.index.min.segment-metadata is set to < 1");
return;
}
List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,14 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
try {
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
} catch (Exception e) {
logger.error(
"Exception while downloading segment files from remote store, will continue with peer to peer segment copy",
e
);
}
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ public class RecoverySettings {
Property.NodeScope
);

/**
* Controls minimum number of metadata files to keep in remote segment store.
* {@code value < 1} will disable deletion of stale segment metadata files.
*/
public static final Setting<Integer> CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING = Setting.intSetting(
"cluster.remote_store.index.min.segment-metadata",
10,
Property.NodeScope,
Property.Dynamic
);

// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

Expand All @@ -171,6 +182,7 @@ public class RecoverySettings {
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionRetryTimeout;
private volatile TimeValue internalActionLongTimeout;
private volatile int minRemoteSegmentMetadataFiles;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

Expand Down Expand Up @@ -212,6 +224,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this::setInternalActionLongTimeout
);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout);
minRemoteSegmentMetadataFiles = CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(
CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING,
this::setMinRemoteSegmentMetadataFiles
);
}

public RateLimiter rateLimiter() {
Expand Down Expand Up @@ -307,4 +324,12 @@ public int getMaxConcurrentRemoteStoreStreams() {
private void setMaxConcurrentRemoteStoreStreams(int maxConcurrentRemoteStoreStreams) {
this.maxConcurrentRemoteStoreStreams = maxConcurrentRemoteStoreStreams;
}

private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles) {
this.minRemoteSegmentMetadataFiles = minRemoteSegmentMetadataFiles;
}

public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void testAfterMultipleCommits() throws IOException {
setup(true, 3);
assertDocs(indexShard, "1", "2", "3");

for (int i = 0; i < RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP + 3; i++) {
for (int i = 0; i < indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles() + 3; i++) {
indexDocs(4 * (i + 1), 4);
flushShard(indexShard);
}
Expand Down