-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[2.x] Adding new stop-replication action in ism #1370
base: 2.x
Are you sure you want to change the base?
Changes from all commits
f252711
af906fc
c040f26
1580ea5
579945e
158a94b
35f79e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -35,6 +35,7 @@ buildscript { | |||||||
job_scheduler_no_snapshot = opensearch_build | ||||||||
notifications_no_snapshot = opensearch_build | ||||||||
security_no_snapshot = opensearch_build | ||||||||
ccr_no_snapshot = opensearch_build | ||||||||
if (buildVersionQualifier) { | ||||||||
opensearch_build += "-${buildVersionQualifier}" | ||||||||
job_scheduler_no_snapshot += "-${buildVersionQualifier}" | ||||||||
|
@@ -66,6 +67,10 @@ buildscript { | |||||||
kotlin_version = System.getProperty("kotlin.version", "1.8.21") | ||||||||
|
||||||||
security_plugin_version = System.getProperty("security.version", opensearch_build) | ||||||||
ccr_version = System.getProperty("ccr.version", opensearch_build) | ||||||||
ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + | ||||||||
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip' | ||||||||
ccr_resource_folder = "src/test/resources/replication" | ||||||||
Comment on lines
+71
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
} | ||||||||
|
||||||||
repositories { | ||||||||
|
@@ -230,6 +235,7 @@ dependencies { | |||||||
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip" | ||||||||
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip" | ||||||||
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip" | ||||||||
opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip" | ||||||||
} | ||||||||
|
||||||||
repositories { | ||||||||
|
@@ -313,6 +319,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler") | |||||||
def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core") | ||||||||
def notificationsFile = resolvePluginFile("notifications") | ||||||||
def securityPluginFile = resolvePluginFile("opensearch-security") | ||||||||
def ccrFile = resolvePluginFile("opensearch-security") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
ext.getPluginResource = { download_to_folder, download_from_src -> | ||||||||
def src_split = download_from_src.split("/") | ||||||||
|
@@ -393,6 +400,7 @@ testClusters.integTest { | |||||||
if (securityEnabled) { | ||||||||
plugin(provider(securityPluginFile)) | ||||||||
} | ||||||||
plugin(provider(ccrFile)) | ||||||||
setting 'path.repo', repo.absolutePath | ||||||||
} | ||||||||
|
||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
## Version 2.19.1.0 2025-02-27 | ||
|
||
Compatible with OpenSearch 2.19.1 | ||
|
||
### Maintenance | ||
* CVE fix - Bump logback to 1.5.16 ([#1383](https://github.com/opensearch-project/index-management/pull/1383/files)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.action | ||
|
||
import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext | ||
|
||
/** | ||
* ISM action to stop replication on indices replicated on a follower cluster. | ||
*/ | ||
class StopReplicationAction( | ||
index: Int, | ||
) : Action(name, index) { | ||
companion object { | ||
const val name = "stop_replication" | ||
} | ||
|
||
private val attemptStopReplicationStep = AttemptStopReplicationStep() | ||
|
||
private val steps = listOf(attemptStopReplicationStep) | ||
|
||
override fun getStepToExecute(context: StepContext): Step = attemptStopReplicationStep | ||
|
||
override fun getSteps(): List<Step> = steps | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.action | ||
|
||
import org.opensearch.core.common.io.stream.StreamInput | ||
import org.opensearch.core.xcontent.XContentParser | ||
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser | ||
|
||
class StopReplicationActionParser : ActionParser() { | ||
override fun fromStreamInput(sin: StreamInput): Action { | ||
val index = sin.readInt() | ||
return StopReplicationAction(index) | ||
} | ||
|
||
override fun fromXContent(xcp: XContentParser, index: Int): Action { | ||
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) | ||
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) | ||
|
||
return StopReplicationAction(index) | ||
} | ||
|
||
override fun getActionType(): String = StopReplicationAction.name | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication | ||
|
||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.ExceptionsHelper | ||
import org.opensearch.action.support.master.AcknowledgedResponse | ||
import org.opensearch.commons.replication.ReplicationPluginInterface | ||
import org.opensearch.commons.replication.action.StopIndexReplicationRequest | ||
import org.opensearch.indexmanagement.opensearchapi.suspendUntil | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData | ||
import org.opensearch.snapshots.SnapshotInProgressException | ||
import org.opensearch.transport.RemoteTransportException | ||
|
||
class AttemptStopReplicationStep : Step(name) { | ||
private val logger = LogManager.getLogger(javaClass) | ||
private var stepStatus = StepStatus.STARTING | ||
private var info: Map<String, Any>? = null | ||
private var replicationPluginInterface: ReplicationPluginInterface = ReplicationPluginInterface() | ||
fun setReplicationPluginInterface(replicationPluginInterface: ReplicationPluginInterface) { | ||
this.replicationPluginInterface = replicationPluginInterface | ||
} | ||
|
||
override suspend fun execute(): Step { | ||
val context = this.context ?: return this | ||
val indexName = context.metadata.index | ||
try { | ||
val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName) | ||
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { | ||
replicationPluginInterface.stopReplication( | ||
context.client, | ||
stopIndexReplicationRequestObj, | ||
it, | ||
) | ||
} | ||
if (response.isAcknowledged) { | ||
stepStatus = StepStatus.COMPLETED | ||
info = mapOf("message" to getSuccessMessage(indexName)) | ||
} else { | ||
val message = getFailedMessage(indexName) | ||
logger.warn(message) | ||
stepStatus = StepStatus.FAILED | ||
info = mapOf("message" to message) | ||
} | ||
} catch (e: RemoteTransportException) { | ||
val cause = ExceptionsHelper.unwrapCause(e) | ||
if (cause is SnapshotInProgressException) { | ||
handleSnapshotException(indexName, cause) | ||
} else { | ||
handleException(indexName, cause as Exception) | ||
} | ||
} catch (e: SnapshotInProgressException) { | ||
handleSnapshotException(indexName, e) | ||
} catch (e: Exception) { | ||
handleException(indexName, e) | ||
} | ||
return this | ||
} | ||
|
||
private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) { | ||
val message = getSnapshotMessage(indexName) | ||
logger.error(message, e) | ||
stepStatus = StepStatus.FAILED | ||
info = mapOf("message" to message) | ||
} | ||
|
||
private fun handleException(indexName: String, e: Exception) { | ||
val message = getFailedMessage(indexName) | ||
logger.error(message, e) | ||
stepStatus = StepStatus.FAILED | ||
val mutableInfo = mutableMapOf("message" to message) | ||
val errorMessage = e.message | ||
if (errorMessage != null) mutableInfo["cause"] = errorMessage | ||
info = mutableInfo.toMap() | ||
} | ||
|
||
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy( | ||
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), | ||
transitionTo = null, | ||
info = info, | ||
) | ||
|
||
override fun isIdempotent() = false | ||
|
||
companion object { | ||
const val name = "attempt_stop_replication" | ||
|
||
fun getFailedMessage(index: String) = "Failed to stop replication [index=$index]" | ||
|
||
fun getSuccessMessage(index: String) = "Successfully stopped replication [index=$index]" | ||
|
||
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying stop replication [index=$index]" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.validation | ||
|
||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.cluster.metadata.MetadataCreateIndexService | ||
import org.opensearch.cluster.service.ClusterService | ||
import org.opensearch.common.settings.Settings | ||
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate | ||
import org.opensearch.indexmanagement.util.OpenForTesting | ||
import org.opensearch.indices.InvalidIndexNameException | ||
import org.opensearch.monitor.jvm.JvmService | ||
|
||
@OpenForTesting | ||
class ValidateStopReplication( | ||
settings: Settings, | ||
clusterService: ClusterService, | ||
jvmService: JvmService, | ||
) : Validate(settings, clusterService, jvmService) { | ||
private val logger = LogManager.getLogger(javaClass) | ||
|
||
@Suppress("ReturnSuppressCount", "ReturnCount") | ||
override fun execute(indexName: String): Validate { | ||
// if these conditions are false, fail validation and do not execute stop_replication action | ||
if (!indexExists(indexName) || !validIndex(indexName)) { | ||
validationStatus = ValidationStatus.FAILED | ||
return this | ||
} | ||
validationMessage = getValidationPassedMessage(indexName) | ||
return this | ||
} | ||
|
||
private fun indexExists(indexName: String): Boolean { | ||
val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName) | ||
if (!isIndexExists) { | ||
val message = getNoIndexMessage(indexName) | ||
logger.warn(message) | ||
validationMessage = message | ||
return false | ||
} | ||
return true | ||
} | ||
|
||
private fun validIndex(indexName: String): Boolean { | ||
val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) } | ||
// If the index name is invalid for any reason, this will throw an exception giving the reason why in the message. | ||
// That will be displayed to the user as the cause. | ||
try { | ||
MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator) | ||
} catch (e: Exception) { | ||
val message = getIndexNotValidMessage(indexName) | ||
logger.warn(message) | ||
validationMessage = message | ||
return false | ||
} | ||
return true | ||
} | ||
|
||
@Suppress("TooManyFunctions") | ||
companion object { | ||
const val name = "validate_stop_replication" | ||
|
||
fun getNoIndexMessage(index: String) = "No such index [index=$index] for stop replication action." | ||
|
||
fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort stop replication action on it." | ||
|
||
fun getValidationPassedMessage(index: String) = "Stop replication action validation passed for [index=$index]" | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.indexmanagement.indexstatemanagement.action | ||
|
||
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase | ||
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy | ||
import org.opensearch.indexmanagement.indexstatemanagement.model.State | ||
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification | ||
import org.opensearch.indexmanagement.waitFor | ||
import java.time.Instant | ||
import java.time.temporal.ChronoUnit | ||
import java.util.Locale | ||
|
||
class StopReplicationActionIT : IndexStateManagementRestTestCase() { | ||
private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) | ||
|
||
fun `test stop_replication on a non-replicated index`() { | ||
val indexName = "${testIndexName}_index_1" | ||
val policyID = "${testIndexName}_testPolicyName_1" | ||
val actionConfig = StopReplicationAction(0) | ||
val states = | ||
listOf( | ||
State("StopReplicationState", listOf(actionConfig), listOf()), | ||
) | ||
|
||
val policy = | ||
Policy( | ||
id = policyID, | ||
description = "$testIndexName description", | ||
schemaVersion = 1L, | ||
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), | ||
errorNotification = randomErrorNotification(), | ||
defaultState = states[0].name, | ||
states = states, | ||
) | ||
createPolicy(policy, policyID) | ||
createIndex(indexName, policyID) | ||
val managedIndexConfig = getExistingManagedIndexConfig(indexName) | ||
// Change the start time so the job will trigger in 2 seconds. | ||
updateManagedIndexConfigStartTime(managedIndexConfig) | ||
|
||
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } | ||
|
||
// Need to wait two cycles. | ||
// Change the start time so the job will trigger in 2 seconds. | ||
updateManagedIndexConfigStartTime(managedIndexConfig) | ||
waitFor { | ||
val metadataInfo = getExplainManagedIndexMetaData(indexName).info.toString() | ||
assertTrue( | ||
metadataInfo.contains("cause=No replication in progress for index:" + indexName), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a failure? Can we also assert sth like step status is failed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes thats right, we're testing the negative case here. |
||
) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.