Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.x] Adding provision to invoke stop-replication from other plugins #1502

Open
wants to merge 3 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions .github/workflows/maven-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ name: Publish snapshots to maven
on:
workflow_dispatch:
push:
branches: [
main
1.*
2.*
]

branches:
- main
- '[0-9]+.[0-9]+'
- '[0-9]+.x'
jobs:
build-and-publish-snapshots:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.opensearch.gradle.test.RestIntegTestTask
buildscript {
ext {
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.19.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.20.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")
// e.g. 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
version_tokens = opensearch_version.tokenize('-')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ import org.opensearch.replication.action.status.ReplicationStatusAction
import org.opensearch.replication.action.status.ShardsInfoAction
import org.opensearch.replication.action.status.TranportShardsInfoAction
import org.opensearch.replication.action.status.TransportReplicationStatusAction
import org.opensearch.replication.action.stop.StopIndexReplicationAction
import org.opensearch.replication.action.stop.TransportStopIndexReplicationAction
import org.opensearch.replication.action.stop.TransportInternalStopIndexReplicationAction
import org.opensearch.replication.action.update.TransportUpdateIndexReplicationAction
import org.opensearch.replication.action.update.UpdateIndexReplicationAction
import org.opensearch.replication.metadata.ReplicationMetadataManager
Expand Down Expand Up @@ -99,6 +99,8 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.commons.utils.OpenForTesting
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand Down Expand Up @@ -233,7 +235,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
ActionHandler(GetFileChunkAction.INSTANCE, TransportGetFileChunkAction::class.java),
ActionHandler(UpdateAutoFollowPatternAction.INSTANCE, TransportUpdateAutoFollowPatternAction::class.java),
ActionHandler(AutoFollowClusterManagerNodeAction.INSTANCE, TransportAutoFollowClusterManagerNodeAction::class.java),
ActionHandler(StopIndexReplicationAction.INSTANCE, TransportStopIndexReplicationAction::class.java),
ActionHandler(STOP_REPLICATION_ACTION_TYPE, TransportStopIndexReplicationAction::class.java),
ActionHandler(INTERNAL_STOP_REPLICATION_ACTION_TYPE, TransportInternalStopIndexReplicationAction::class.java),
ActionHandler(PauseIndexReplicationAction.INSTANCE, TransportPauseIndexReplicationAction::class.java),
ActionHandler(ResumeIndexReplicationAction.INSTANCE, TransportResumeIndexReplicationAction::class.java),
ActionHandler(UpdateIndexReplicationAction.INSTANCE, TransportUpdateIndexReplicationAction::class.java),
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.replication.action.stop

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.ActionRequest
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.common.inject.Inject
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_NAME
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.replication.util.suspendExecute
import org.opensearch.tasks.Task
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService

/* Internal transport action used by Index Management plugin to invoke stop replication
It transforms the request, and invokes the actual stop replication action (TransportStopIndexReplicationAction)
*/
class TransportInternalStopIndexReplicationAction @Inject constructor (
val name: String,
val transportService: TransportService,
val clusterService: ClusterService,
val threadPool: ThreadPool,
val client: Client,
val actionFilters: ActionFilters,
val indexNameExpressionResolver: IndexNameExpressionResolver,
val replicationMetadataManager: ReplicationMetadataManager,
): HandledTransportAction<ActionRequest, AcknowledgedResponse> (INTERNAL_STOP_REPLICATION_ACTION_NAME, transportService, actionFilters, ::StopIndexReplicationRequest),
CoroutineScope by GlobalScope {
companion object {
private val log = LogManager.getLogger(TransportInternalStopIndexReplicationAction::class.java)
}

@Throws(Exception::class)
override fun doExecute(task: Task?, request: ActionRequest?, listener: ActionListener<AcknowledgedResponse>?) {
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
val transformedRequest = if (request is StopIndexReplicationRequest) {
request
} else {
request?.let { recreateObject(it) { StopIndexReplicationRequest(it) } }
?: throw IllegalArgumentException("Request cannot be null")
}

try {
val response = client.suspendExecute(STOP_REPLICATION_ACTION_TYPE, transformedRequest, true)
log.info("Stop replication successful for index[${transformedRequest.indexName}] with response: " + response.isAcknowledged)
listener?.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Stop replication failed for index[${transformedRequest.indexName}] with error ${e.stackTraceToString()}")
listener?.onFailure(e)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

package org.opensearch.replication.action.stop

import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATED_INDEX_SETTING
import org.opensearch.replication.action.index.block.IndexBlockUpdateType
import org.opensearch.replication.action.index.block.UpdateIndexBlockAction
Expand Down Expand Up @@ -60,6 +62,15 @@ import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
import java.io.IOException

/*
The classes StopIndexReplicationRequest and StopIndexReplicationAction have been moved from ccr to common-utils
and are imported here through org.opensearch.commons.replication.
This helps in making these classes re-usable by other plugins like ism.
PR details:
[1] https://github.com/opensearch-project/common-utils/pull/789
[2] https://github.com/opensearch-project/cross-cluster-replication/pull/1502
*/

class TransportStopIndexReplicationAction @Inject constructor(transportService: TransportService,
clusterService: ClusterService,
threadPool: ThreadPool,
Expand All @@ -68,7 +79,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
IndexNameExpressionResolver,
val client: Client,
val replicationMetadataManager: ReplicationMetadataManager) :
TransportMasterNodeAction<StopIndexReplicationRequest, AcknowledgedResponse> (StopIndexReplicationAction.NAME,
TransportMasterNodeAction<StopIndexReplicationRequest, AcknowledgedResponse> (STOP_REPLICATION_ACTION_NAME,
transportService, clusterService, threadPool, actionFilters, ::StopIndexReplicationRequest,
indexNameExpressionResolver), CoroutineScope by GlobalScope {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.replication.rest

import org.opensearch.replication.action.stop.StopIndexReplicationAction
import org.opensearch.replication.action.stop.StopIndexReplicationRequest
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_TYPE
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestChannel
Expand All @@ -38,7 +38,7 @@ class StopIndexReplicationHandler : BaseRestHandler() {
val stopReplicationRequest = StopIndexReplicationRequest.fromXContent(parser, followIndex)
return RestChannelConsumer { channel: RestChannel? ->
client.admin().cluster()
.execute(StopIndexReplicationAction.INSTANCE, stopReplicationRequest, RestToXContentListener(channel))
.execute(STOP_REPLICATION_ACTION_TYPE, stopReplicationRequest, RestToXContentListener(channel))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import org.opensearch.replication.action.repository.GetFileChunkAction
import org.opensearch.replication.action.repository.GetStoreMetadataAction
import org.opensearch.replication.action.resume.ResumeIndexReplicationAction
import org.opensearch.replication.action.status.ReplicationStatusAction
import org.opensearch.replication.action.stop.StopIndexReplicationAction
import org.opensearch.replication.action.update.UpdateIndexReplicationAction
import org.opensearch.replication.metadata.ReplicationMetadataManager
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.authuser.User
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME
import org.opensearch.commons.replication.action.ReplicationActions.INTERNAL_STOP_REPLICATION_ACTION_NAME
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.core.action.ActionResponse
Expand All @@ -49,7 +50,7 @@ class SecurityContext {
val LEADER_USER_ACTIONS = listOf(GetChangesAction.NAME, GetFileChunkAction.NAME)
val FOLLOWER_USER_ACTIONS = listOf(ReplayChangesAction.NAME,
ReplicateIndexAction.NAME, PauseIndexReplicationAction.NAME,
ResumeIndexReplicationAction.NAME, StopIndexReplicationAction.NAME,
ResumeIndexReplicationAction.NAME, STOP_REPLICATION_ACTION_NAME, INTERNAL_STOP_REPLICATION_ACTION_NAME,
UpdateIndexReplicationAction.NAME, ReplicationStatusAction.NAME,
UpdateAutoFollowPatternAction.NAME)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.replication.action.ReplicationActions.STOP_REPLICATION_ACTION_NAME
import org.opensearch.snapshots.SnapshotState
import org.opensearch.tasks.TaskInfo
import org.opensearch.test.OpenSearchTestCase
Expand Down Expand Up @@ -324,7 +325,7 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
"indices:admin/plugins/replication/index/start",
"indices:admin/plugins/replication/index/pause",
"indices:admin/plugins/replication/index/resume",
"indices:admin/plugins/replication/index/stop",
"$STOP_REPLICATION_ACTION_NAME",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, if user wants to use CCR in ISM in a security enabled cluster, they will still need the internal replication action allowed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats right !

For current ccr users, no change - they can continue to use roles like cross_cluster_replication_follower_full_access for all ccr operations.
For users who use ccr + ism, they'll additionally need permission for the internal replication action indices:internal/plugins/replication/index/stop

"indices:admin/plugins/replication/index/update",
"indices:admin/plugins/replication/index/status_check"
]
Expand Down
Loading
Loading