Skip to content

Commit

Permalink
Target Index Settings if create index during rollup (opensearch-proje…
Browse files Browse the repository at this point in the history
…ct#1377)

Co-authored-by: bowenlan-amzn <[email protected]>
  • Loading branch information
MrChaos1993 and bowenlan-amzn authored Feb 25, 2025
1 parent d330ebf commit 727b345
Show file tree
Hide file tree
Showing 21 changed files with 343 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class RollupMapperService(
} else {
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
val response = createTargetIndex(targetIndexResolvedName, job.targetIndexSettings, hasLegacyPlugin)
if (response.isAcknowledged) {
updateRollupIndexMappings(job, targetIndexResolvedName)
} else {
Expand Down Expand Up @@ -228,13 +228,17 @@ class RollupMapperService(
return RollupJobValidationResult.Valid
}

private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings =
if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
private suspend fun createTargetIndex(targetIndexName: String, targetIndexSettings: Settings?, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = Settings.builder().apply {
targetIndexSettings?.let { put(it) }
val rollupIndexSetting = if (hasLegacyPlugin) {
LegacyOpenDistroRollupSettings.ROLLUP_INDEX
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
RollupSettings.ROLLUP_INDEX
}
put(rollupIndexSetting.key, true)
}.build()

val request =
CreateIndexRequest(targetIndexName)
.settings(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.indexmanagement.rollup.model

import org.apache.commons.codec.digest.DigestUtils
import org.opensearch.Version
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
Expand All @@ -29,6 +31,7 @@ import java.time.temporal.ChronoUnit
data class ISMRollup(
val description: String,
val targetIndex: String,
val targetIndexSettings: Settings?,
val pageSize: Int,
val dimensions: List<Dimension>,
val metrics: List<RollupMetrics>,
Expand All @@ -55,6 +58,11 @@ data class ISMRollup(
.field(Rollup.PAGE_SIZE_FIELD, pageSize)
.field(Rollup.DIMENSIONS_FIELD, dimensions)
.field(Rollup.METRICS_FIELD, metrics)
if (targetIndexSettings != null) {
builder.startObject(Rollup.TARGET_INDEX_SETTINGS_FIELD)
targetIndexSettings.toXContent(builder, params)
builder.endObject()
}
builder.endObject()
return builder
}
Expand All @@ -74,6 +82,7 @@ data class ISMRollup(
description = this.description,
sourceIndex = sourceIndex,
targetIndex = this.targetIndex,
targetIndexSettings = this.targetIndexSettings,
metadataID = null,
pageSize = pageSize,
delay = null,
Expand All @@ -88,6 +97,11 @@ data class ISMRollup(
constructor(sin: StreamInput) : this(
description = sin.readString(),
targetIndex = sin.readString(),
targetIndexSettings = if (sin.version.onOrAfter(Version.V_3_0_0) && sin.readBoolean()) {
Settings.readSettingsFromStream(sin)
} else {
null
},
pageSize = sin.readInt(),
dimensions =
sin.let {
Expand All @@ -111,6 +125,7 @@ data class ISMRollup(
override fun toString(): String {
val sb = StringBuffer()
sb.append(targetIndex)
sb.append(targetIndexSettings)
sb.append(pageSize)
dimensions.forEach {
sb.append(it.type)
Expand All @@ -129,6 +144,10 @@ data class ISMRollup(
override fun writeTo(out: StreamOutput) {
out.writeString(description)
out.writeString(targetIndex)
if (out.version.onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
}
out.writeInt(pageSize)
out.writeVInt(dimensions.size)
for (dimension in dimensions) {
Expand All @@ -151,6 +170,7 @@ data class ISMRollup(
): ISMRollup {
var description = ""
var targetIndex = ""
var targetIndexSettings: Settings? = null
var pageSize = 0
val dimensions = mutableListOf<Dimension>()
val metrics = mutableListOf<RollupMetrics>()
Expand All @@ -164,6 +184,14 @@ data class ISMRollup(
when (fieldName) {
Rollup.DESCRIPTION_FIELD -> description = xcp.text()
Rollup.TARGET_INDEX_FIELD -> targetIndex = xcp.text()
Rollup.TARGET_INDEX_SETTINGS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT,
xcp.currentToken(),
xcp,
)
targetIndexSettings = Settings.fromXContent(xcp)
}
Rollup.PAGE_SIZE_FIELD -> pageSize = xcp.intValue()
Rollup.DIMENSIONS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
Expand Down Expand Up @@ -195,6 +223,7 @@ data class ISMRollup(
dimensions = dimensions,
metrics = metrics,
targetIndex = targetIndex,
targetIndexSettings = targetIndexSettings,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.indexmanagement.rollup.model

import org.opensearch.Version
import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
Expand Down Expand Up @@ -47,6 +50,7 @@ data class Rollup(
val description: String,
val sourceIndex: String,
val targetIndex: String,
val targetIndexSettings: Settings?,
val metadataID: String?,
@Deprecated("Will be ignored, to check the roles use user field") val roles: List<String> = listOf(),
val pageSize: Int,
Expand Down Expand Up @@ -87,6 +91,9 @@ data class Rollup(
}
}
require(sourceIndex != targetIndex) { "Your source and target index cannot be the same" }
if (targetIndexSettings != null) {
IndexScopedSettings(null, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS).validate(targetIndexSettings, true)
}
require(dimensions.filter { it.type == Dimension.Type.DATE_HISTOGRAM }.size == 1) {
"Must specify precisely one date histogram dimension" // this covers empty dimensions case too
}
Expand Down Expand Up @@ -129,6 +136,11 @@ data class Rollup(
description = sin.readString(),
sourceIndex = sin.readString(),
targetIndex = sin.readString(),
targetIndexSettings = if (sin.getVersion().onOrAfter(Version.V_3_0_0) && sin.readBoolean()) {
Settings.readSettingsFromStream(sin)
} else {
null
},
metadataID = sin.readOptionalString(),
roles = sin.readStringArray().toList(),
pageSize = sin.readInt(),
Expand Down Expand Up @@ -177,6 +189,11 @@ data class Rollup(
.field(CONTINUOUS_FIELD, continuous)
.field(DIMENSIONS_FIELD, dimensions.toTypedArray())
.field(RollupMetrics.METRICS_FIELD, metrics.toTypedArray())
if (targetIndexSettings != null) {
builder.startObject(TARGET_INDEX_SETTINGS_FIELD)
targetIndexSettings.toXContent(builder, params)
builder.endObject()
}
if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user)
if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject()
builder.endObject()
Expand All @@ -200,6 +217,10 @@ data class Rollup(
out.writeString(description)
out.writeString(sourceIndex)
out.writeString(targetIndex)
if (out.version.onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(targetIndexSettings != null)
if (targetIndexSettings != null) Settings.writeSettingsToStream(targetIndexSettings, out)
}
out.writeOptionalString(metadataID)
out.writeStringArray(emptyList<String>().toTypedArray())
out.writeInt(pageSize)
Expand Down Expand Up @@ -237,6 +258,7 @@ data class Rollup(
const val DESCRIPTION_FIELD = "description"
const val SOURCE_INDEX_FIELD = "source_index"
const val TARGET_INDEX_FIELD = "target_index"
const val TARGET_INDEX_SETTINGS_FIELD = "target_index_settings"
const val METADATA_ID_FIELD = "metadata_id"
const val ROLES_FIELD = "roles"
const val PAGE_SIZE_FIELD = "page_size"
Expand Down Expand Up @@ -275,6 +297,7 @@ data class Rollup(
var description: String? = null
var sourceIndex: String? = null
var targetIndex: String? = null
var targetIndexSettings: Settings? = null
var metadataID: String? = null
var pageSize: Int? = null
var delay: Long? = null
Expand All @@ -301,6 +324,10 @@ data class Rollup(
DESCRIPTION_FIELD -> description = xcp.text()
SOURCE_INDEX_FIELD -> sourceIndex = xcp.text()
TARGET_INDEX_FIELD -> targetIndex = xcp.text()
TARGET_INDEX_SETTINGS_FIELD -> {
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
targetIndexSettings = Settings.fromXContent(xcp)
}
METADATA_ID_FIELD -> metadataID = xcp.textOrNull()
ROLES_FIELD -> {
// Parsing but not storing the field, deprecated
Expand Down Expand Up @@ -357,6 +384,7 @@ data class Rollup(
description = requireNotNull(description) { "Rollup description is null" },
sourceIndex = requireNotNull(sourceIndex) { "Rollup source index is null" },
targetIndex = requireNotNull(targetIndex) { "Rollup target index is null" },
targetIndexSettings = targetIndexSettings,
metadataID = metadataID,
pageSize = requireNotNull(pageSize) { "Rollup page size is null" },
delay = delay,
Expand Down
10 changes: 9 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 22
"schema_version": 23
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -380,6 +380,10 @@
}
}
},
"target_index_settings": {
"dynamic": "true",
"type": "object"
},
"page_size": {
"type": "long"
},
Expand Down Expand Up @@ -1004,6 +1008,10 @@
}
}
},
"target_index_settings": {
"dynamic": "true",
"type": "object"
},
"page_size": {
"type": "long"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {
val configSchemaVersion = 22
val configSchemaVersion = 23
val historySchemaVersion = 7

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() {
private fun createISMRollup(targetIdxRollup: String): ISMRollup = ISMRollup(
description = "basic search test",
targetIndex = targetIdxRollup,
targetIndexSettings = null,
pageSize = 100,
dimensions =
listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class RollupSecurityBehaviorIT : SecurityRestTestCase() {
description = "basic stats test",
sourceIndex = sourceIndex,
targetIndex = targetIndex,
targetIndexSettings = null,
metadataID = null,
roles = emptyList(),
pageSize = 100,
Expand Down
Loading

0 comments on commit 727b345

Please sign in to comment.