diff --git a/docs/en/connector-v2/sink/InfluxDB.md b/docs/en/connector-v2/sink/InfluxDB.md new file mode 100644 index 00000000000..cff9787f94f --- /dev/null +++ b/docs/en/connector-v2/sink/InfluxDB.md @@ -0,0 +1,104 @@ +# InfluxDB + +> InfluxDB sink connector + +## Description + +Write data to InfluxDB. + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|-----------------------------|----------|----------|-------------------------------| +| url | string | yes | - | +| database | string | yes | | +| measurement | string | yes | | +| username | string | no | - | +| password | string | no | - | +| key_time | string | yes | processing time | +| key_tags | array | no | exclude `field` & `key_time` | +| batch_size | int | no | 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| connect_timeout_ms | long | no | 15000 | + +### url +the url to connect to influxDB e.g. +``` +http://influxdb-host:8086 +``` + +### database [string] + +The name of `influxDB` database + +### measurement [string] + +The name of `influxDB` measurement + +### username [string] + +`influxDB` user username + +### password [string] + +`influxDB` user password + +### key_time [string] + +Specify field-name of the `influxDB` measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp + +### key_tags [array] + +Specify field-name of the `influxDB` measurement tags in SeaTunnelRow. +If not specified, include all fields with `influxDB` measurement field + +### batch_size [int] + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB + +### batch_interval_ms [int] + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB + +### max_retries [int] + +The number of retries to flush failed + +### retry_backoff_multiplier_ms [int] + +Using as a multiplier for generating the next delay for backoff + +### max_retry_backoff_ms [int] + +The amount of time to wait before attempting to retry a request to `influxDB` + +### connect_timeout_ms [long] +the timeout for connecting to InfluxDB, in milliseconds + +## Examples +```hocon +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } +} + +``` + +## Changelog + +### next version + +- Add InfluxDB Sink Connector \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 11d7fd4a92b..1ac1e268725 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -140,3 +140,4 @@ seatunnel.sink.S3File = connector-file-s3 seatunnel.source.Amazondynamodb = connector-amazondynamodb seatunnel.sink.Amazondynamodb = connector-amazondynamodb seatunnel.sink.StarRocks = connector-starrocks +seatunnel.sink.InfluxDB = connector-influxdb diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java index 8743d5aa2c5..3ad3a99d53b 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/client/InfluxDBClient.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.client; import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import lombok.extern.slf4j.Slf4j; import okhttp3.HttpUrl; @@ -75,4 +76,18 @@ public Response intercept(Chain chain) throws IOException { log.info("connect influxdb successful. sever version :{}.", version); return influxDB; } + + public static void setWriteProperty(InfluxDB influxDB, SinkConfig sinkConfig) { + String rp = sinkConfig.getRp(); + if (!StringUtils.isEmpty(rp)) { + influxDB.setRetentionPolicy(rp); + } + } + + public static InfluxDB getWriteClient(SinkConfig sinkConfig) throws ConnectException { + InfluxDB influxDB = getInfluxDB(sinkConfig); + influxDB.setDatabase(sinkConfig.getDatabase()); + setWriteProperty(getInfluxDB(sinkConfig), sinkConfig); + return influxDB; + } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java index 9a04e7d4aec..1332c5cab57 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/InfluxDBConfig.java @@ -23,7 +23,6 @@ import lombok.Data; import java.io.Serializable; -import java.util.List; @Data public class InfluxDBConfig implements Serializable { @@ -33,34 +32,16 @@ public class InfluxDBConfig implements Serializable { public static final String URL = "url"; private static final String CONNECT_TIMEOUT_MS = "connect_timeout_ms"; private static final String QUERY_TIMEOUT_SEC = "query_timeout_sec"; - - public static final String SQL = "sql"; - public static final String SQL_WHERE = "where"; - public static final String DATABASES = "database"; - public static final String SPLIT_COLUMN = "split_column"; - private static final String PARTITION_NUM = "partition_num"; - private static final String UPPER_BOUND = "upper_bound"; - private static final String LOWER_BOUND = "lower_bound"; - - private static final String DEFAULT_FORMAT = "MSGPACK"; - private static final String EPOCH = "epoch"; - - public static final String DEFAULT_PARTITIONS = "0"; + protected static final String EPOCH = "epoch"; private static final int DEFAULT_QUERY_TIMEOUT_SEC = 3; private static final long DEFAULT_CONNECT_TIMEOUT_MS = 15000; - private static final String DEFAULT_EPOCH = "n"; private String url; private String username; private String password; - private String sql; - private int partitionNum = 0; - private String splitKey; - private long lowerBound; - private long upperBound; private String database; private String format = DEFAULT_FORMAT; @@ -69,11 +50,8 @@ public class InfluxDBConfig implements Serializable { private String epoch = DEFAULT_EPOCH; - List columnsIndex; - public InfluxDBConfig(Config config) { this.url = config.getString(URL); - this.sql = config.getString(SQL); if (config.hasPath(USERNAME)) { this.username = config.getString(USERNAME); @@ -81,18 +59,6 @@ public InfluxDBConfig(Config config) { if (config.hasPath(PASSWORD)) { this.password = config.getString(PASSWORD); } - if (config.hasPath(PARTITION_NUM)) { - this.partitionNum = config.getInt(PARTITION_NUM); - } - if (config.hasPath(UPPER_BOUND)) { - this.upperBound = config.getInt(UPPER_BOUND); - } - if (config.hasPath(LOWER_BOUND)) { - this.lowerBound = config.getInt(LOWER_BOUND); - } - if (config.hasPath(SPLIT_COLUMN)) { - this.splitKey = config.getString(SPLIT_COLUMN); - } if (config.hasPath(DATABASES)) { this.database = config.getString(DATABASES); } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java new file mode 100644 index 00000000000..c97d807fabf --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.List; + +@Setter +@Getter +@ToString +public class SinkConfig extends InfluxDBConfig{ + public SinkConfig(Config config) { + super(config); + } + + private static final String KEY_TIME = "key_time"; + private static final String KEY_TAGS = "key_tags"; + public static final String KEY_MEASUREMENT = "measurement"; + + private static final String BATCH_SIZE = "batch_size"; + private static final String BATCH_INTERVAL_MS = "batch_interval_ms"; + private static final String MAX_RETRIES = "max_retries"; + private static final String WRITE_TIMEOUT = "write_timeout"; + private static final String RETRY_BACKOFF_MULTIPLIER_MS = "retry_backoff_multiplier_ms"; + private static final String MAX_RETRY_BACKOFF_MS = "max_retry_backoff_ms"; + private static final String RETENTION_POLICY = "rp"; + private static final int DEFAULT_BATCH_SIZE = 1024; + private static final int DEFAULT_WRITE_TIMEOUT = 5; + private static final TimePrecision DEFAULT_TIME_PRECISION = TimePrecision.NS; + + private String rp; + private String measurement; + private int writeTimeout = DEFAULT_WRITE_TIMEOUT; + private String keyTime; + private List keyTags; + private int batchSize = DEFAULT_BATCH_SIZE; + private Integer batchIntervalMs; + private int maxRetries; + private int retryBackoffMultiplierMs; + private int maxRetryBackoffMs; + private TimePrecision precision = DEFAULT_TIME_PRECISION; + + public static SinkConfig loadConfig(Config config) { + SinkConfig sinkConfig = new SinkConfig(config); + + if (config.hasPath(KEY_TIME)) { + sinkConfig.setKeyTime(config.getString(KEY_TIME)); + } + if (config.hasPath(KEY_TAGS)) { + sinkConfig.setKeyTags(config.getStringList(KEY_TAGS)); + } + if (config.hasPath(BATCH_INTERVAL_MS)) { + sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS)); + } + if (config.hasPath(MAX_RETRIES)) { + sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES)); + } + if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS)) { + sinkConfig.setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (config.hasPath(MAX_RETRY_BACKOFF_MS)) { + sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS)); + } + if (config.hasPath(WRITE_TIMEOUT)) { + sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT)); + } + if (config.hasPath(RETENTION_POLICY)) { + sinkConfig.setRp(config.getString(RETENTION_POLICY)); + } + if (config.hasPath(EPOCH)) { + sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH))); + } + sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT)); + return sinkConfig; + } + +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java new file mode 100644 index 00000000000..d0c3fe65e2e --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SourceConfig.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import lombok.Getter; + +import java.util.List; + +@Getter +public class SourceConfig extends InfluxDBConfig{ + public static final String SQL = "sql"; + public static final String SQL_WHERE = "where"; + public static final String SPLIT_COLUMN = "split_column"; + private static final String PARTITION_NUM = "partition_num"; + private static final String UPPER_BOUND = "upper_bound"; + private static final String LOWER_BOUND = "lower_bound"; + public static final String DEFAULT_PARTITIONS = "0"; + private String sql; + private int partitionNum = 0; + private String splitKey; + private long lowerBound; + private long upperBound; + + List columnsIndex; + + public SourceConfig(Config config) { + super(config); + } + + public static SourceConfig loadConfig(Config config) { + SourceConfig sourceConfig = new SourceConfig(config); + + sourceConfig.sql = config.getString(SQL); + + if (config.hasPath(PARTITION_NUM)) { + sourceConfig.partitionNum = config.getInt(PARTITION_NUM); + } + if (config.hasPath(UPPER_BOUND)) { + sourceConfig.upperBound = config.getInt(UPPER_BOUND); + } + if (config.hasPath(LOWER_BOUND)) { + sourceConfig.lowerBound = config.getInt(LOWER_BOUND); + } + if (config.hasPath(SPLIT_COLUMN)) { + sourceConfig.splitKey = config.getString(SPLIT_COLUMN); + } + return sourceConfig; + } + +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java new file mode 100644 index 00000000000..18af2cdd6e0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/TimePrecision.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.config; + +import java.util.concurrent.TimeUnit; + +public enum TimePrecision { + NS("NS", TimeUnit.NANOSECONDS), + U("U", TimeUnit.MICROSECONDS), + MS("MS", TimeUnit.MILLISECONDS), + S("S", TimeUnit.SECONDS), + M("M", TimeUnit.MINUTES), + H("H", TimeUnit.HOURS); + private String desc; + private TimeUnit precision; + + TimePrecision(String desc, TimeUnit precision) { + this.desc = desc; + this.precision = precision; + } + + public TimeUnit getTimeUnit() { + return this.precision; + } + + public static TimePrecision getPrecision(String desc) { + for (TimePrecision timePrecision : TimePrecision.values()) { + if (desc.equals(timePrecision.desc)) { + return timePrecision; + } + } + return TimePrecision.NS; + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java new file mode 100644 index 00000000000..8cc458939de --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/DefaultSerializer.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import com.google.common.base.Strings; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.influxdb.dto.Point; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DefaultSerializer implements Serializer { + private SeaTunnelRowType seaTunnelRowType; + + private final BiConsumer timestampExtractor; + private final BiConsumer fieldExtractor; + private final BiConsumer tagExtractor; + private String measurement; + + private TimeUnit precision; + + public DefaultSerializer(SeaTunnelRowType seaTunnelRowType, TimeUnit precision, List tagKeys, + String timestampKey, + String measurement) { + this.measurement = measurement; + this.seaTunnelRowType = seaTunnelRowType; + this.timestampExtractor = createTimestampExtractor(seaTunnelRowType, timestampKey); + this.tagExtractor = createTagExtractor(seaTunnelRowType, tagKeys); + List fieldKeys = getFieldKeys(seaTunnelRowType, timestampKey, tagKeys); + this.fieldExtractor = createFieldExtractor(seaTunnelRowType, fieldKeys); + this.precision = precision; + } + + @Override + public Point serialize(SeaTunnelRow seaTunnelRow) { + Point.Builder builder = Point.measurement(measurement); + timestampExtractor.accept(seaTunnelRow, builder); + tagExtractor.accept(seaTunnelRow, builder); + fieldExtractor.accept(seaTunnelRow, builder); + return builder.build(); + } + + private BiConsumer createFieldExtractor(SeaTunnelRowType seaTunnelRowType, List fieldKeys) { + return (row, builder) -> { + for (int i = 0; i < fieldKeys.size(); i++) { + String field = fieldKeys.get(i); + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(field); + SeaTunnelDataType dataType = seaTunnelRowType.getFieldType(indexOfSeaTunnelRow); + Object val = row.getField(indexOfSeaTunnelRow); + switch (dataType.getSqlType()) { + case BOOLEAN: + builder.addField(field, Boolean.valueOf((Boolean) val)); + break; + case SMALLINT: + builder.addField(field, Short.valueOf((Short) val)); + break; + case INT: + builder.addField(field, ((Number) val).intValue()); + break; + case BIGINT: + // Only timstamp support be bigint,however it is processed in specicalField + builder.addField(field, ((Number) val).longValue()); + break; + case FLOAT: + builder.addField(field, ((Number) val).floatValue()); + break; + case DOUBLE: + builder.addField(field, ((Number) val).doubleValue()); + break; + case STRING: + builder.addField(field, val.toString()); + break; + default: + throw new UnsupportedOperationException("Unsupported dataType: " + dataType); + } + } + }; + } + + private BiConsumer createTimestampExtractor(SeaTunnelRowType seaTunnelRowType, + String timeKey) { + //not config timeKey, use processing time + if (Strings.isNullOrEmpty(timeKey)) { + return (row, builder) -> builder.time(System.currentTimeMillis(), precision); + } + + int timeFieldIndex = seaTunnelRowType.indexOf(timeKey); + return (row, builder) -> { + Object time = row.getField(timeFieldIndex); + if (time == null) { + builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + SeaTunnelDataType timestampFieldType = seaTunnelRowType.getFieldType(timeFieldIndex); + switch (timestampFieldType.getSqlType()) { + case STRING: + builder.time(Long.parseLong((String) time), precision); + break; + case TIMESTAMP: + builder.time(LocalDateTime.class.cast(time) + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), precision); + break; + case BIGINT: + builder.time((Long) time, precision); + break; + default: + throw new UnsupportedOperationException("Unsupported data type: " + timestampFieldType); + } + }; + } + + private BiConsumer createTagExtractor(SeaTunnelRowType seaTunnelRowType, + List tagKeys) { + //not config tagKeys + if (CollectionUtils.isEmpty(tagKeys)) { + return (row, builder) -> {}; + } + + return (row, builder) -> { + for (int i = 0; i < tagKeys.size(); i++) { + String tagKey = tagKeys.get(i); + int indexOfSeaTunnelRow = seaTunnelRowType.indexOf(tagKey); + builder.tag(tagKey, row.getField(indexOfSeaTunnelRow).toString()); + } + }; + } + + private List getFieldKeys(SeaTunnelRowType seaTunnelRowType, + String timestampKey, + List tagKeys) { + return Stream.of(seaTunnelRowType.getFieldNames()) + .filter(name -> CollectionUtils.isEmpty(tagKeys) || !tagKeys.contains(name)) + .filter(name -> StringUtils.isEmpty(timestampKey) || !name.equals(timestampKey)) + .collect(Collectors.toList()); + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java new file mode 100644 index 00000000000..b910efafd40 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/serialize/Serializer.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.influxdb.dto.Point; + +public interface Serializer { + Point serialize(SeaTunnelRow seaTunnelRow); +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java new file mode 100644 index 00000000000..8d3eb7290ff --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; + +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class InfluxDBSink extends AbstractSimpleSink { + + private Config pluginConfig; + private SeaTunnelRowType seaTunnelRowType; + + @Override + public String getPluginName() { + return "InfluxDB"; + } + + @Override + public void prepare(Config config) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(config, URL, KEY_MEASUREMENT); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + this.pluginConfig = config; + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType); + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java new file mode 100644 index 00000000000..809a3eaaa88 --- /dev/null +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer; +import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; + +import java.io.IOException; +import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Slf4j +public class InfluxDBSinkWriter extends AbstractSinkWriter { + + private final Serializer serializer; + private InfluxDB influxDB; + private SinkConfig sinkConfig; + private final List batchList; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + private volatile Exception flushException; + private final Integer batchIntervalMs; + + public InfluxDBSinkWriter(Config pluginConfig, + SeaTunnelRowType seaTunnelRowType) throws ConnectException { + this.sinkConfig = SinkConfig.loadConfig(pluginConfig); + this.batchIntervalMs = sinkConfig.getBatchIntervalMs(); + this.serializer = new DefaultSerializer( + seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement()); + this.batchList = new ArrayList<>(); + + if (batchIntervalMs != null) { + scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("influxDB-sink-output-%s").build()); + scheduledFuture = scheduler.scheduleAtFixedRate( + () -> { + try { + flush(); + } catch (IOException e) { + flushException = e; + } + }, + batchIntervalMs, + batchIntervalMs, + TimeUnit.MILLISECONDS); + } + + connect(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + Point record = serializer.serialize(element); + write(record); + } + + @SneakyThrows + @Override + public Optional prepareCommit() { + // Flush to storage before snapshot state is performed + flush(); + return super.prepareCommit(); + } + + @Override + public void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduler.shutdown(); + } + + flush(); + + if (influxDB != null) { + influxDB.close(); + influxDB = null; + } + } + + public void write(Point record) throws IOException { + checkFlushException(); + + batchList.add(record); + if (sinkConfig.getBatchSize() > 0 + && batchList.size() >= sinkConfig.getBatchSize()) { + flush(); + } + } + + public void flush() throws IOException { + checkFlushException(); + if (batchList.isEmpty()) { + return; + } + BatchPoints.Builder batchPoints = BatchPoints.database(sinkConfig.getDatabase()); + for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) { + try { + batchPoints.points(batchList); + influxDB.write(batchPoints.build()); + } catch (Exception e) { + log.error("Writing records to influxdb failed, retry times = {}", i, e); + if (i >= sinkConfig.getMaxRetries()) { + throw new IOException("Writing records to InfluxDB failed.", e); + } + + try { + long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i, + sinkConfig.getMaxRetryBackoffMs()); + Thread.sleep(backoff); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException( + "Unable to flush; interrupted while doing another attempt.", e); + } + } + } + + batchList.clear(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to InfluxDB failed.", flushException); + } + } + + public void connect() throws ConnectException { + if (influxDB == null) { + influxDB = InfluxDBClient.getWriteClient(sinkConfig); + String version = influxDB.version(); + if (!influxDB.ping().isGood()) { + String errorMessage = + String.format( + "connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}", + sinkConfig.getUrl()); + throw new ConnectException(errorMessage); + } + log.info("connect influxdb successful. sever version :{}.", version); + } + } +} diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java index bc971476bda..804e804f560 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSource.java @@ -17,8 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.source; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL; import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.source.Boundedness; @@ -33,7 +32,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -53,7 +52,7 @@ @AutoService(SeaTunnelSource.class) public class InfluxDBSource implements SeaTunnelSource { private SeaTunnelRowType typeInfo; - private InfluxDBConfig influxDBConfig; + private SourceConfig sourceConfig; private List columnsIndexList; @@ -66,15 +65,15 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, URL, SQL); + CheckResult result = CheckConfigUtil.checkAllExists(config, SQL); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } try { - this.influxDBConfig = new InfluxDBConfig(config); + this.sourceConfig = SourceConfig.loadConfig(config); SeaTunnelSchema seatunnelSchema = SeaTunnelSchema.buildWithConfig(config); this.typeInfo = seatunnelSchema.getSeaTunnelRowType(); - this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(influxDBConfig)); + this.columnsIndexList = initColumnsIndex(InfluxDBClient.getInfluxDB(sourceConfig)); } catch (Exception e) { throw new PrepareFailException("InfluxDB", PluginType.SOURCE, e.toString()); } @@ -92,26 +91,26 @@ public SeaTunnelDataType getProducedType() { @Override public SourceReader createReader(SourceReader.Context readerContext) throws Exception { - return new InfluxdbSourceReader(influxDBConfig, readerContext, typeInfo, columnsIndexList); + return new InfluxdbSourceReader(sourceConfig, readerContext, typeInfo, columnsIndexList); } @Override public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new InfluxDBSourceSplitEnumerator(enumeratorContext, influxDBConfig); + return new InfluxDBSourceSplitEnumerator(enumeratorContext, sourceConfig); } @Override public SourceSplitEnumerator restoreEnumerator(SourceSplitEnumerator.Context enumeratorContext, InfluxDBSourceState checkpointState) throws Exception { - return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, influxDBConfig); + return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, sourceConfig); } private List initColumnsIndex(InfluxDB influxDB) { //query one row to get column info - String query = influxDBConfig.getSql() + QUERY_LIMIT; + String query = sourceConfig.getSql() + QUERY_LIMIT; List fieldNames = new ArrayList<>(); try { QueryResult queryResult = influxDB.query( - new Query(query, influxDBConfig.getDatabase())); + new Query(query, sourceConfig.getDatabase())); List serieList = queryResult.getResults().get(0).getSeries(); fieldNames.addAll(serieList.get(0).getColumns()); diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java index d22eba1166e..139a6e3ad5c 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxDBSourceSplitEnumerator.java @@ -17,10 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.source; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.SQL_WHERE; +import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig.SQL_WHERE; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig; import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState; import lombok.extern.slf4j.Slf4j; @@ -37,17 +37,17 @@ @Slf4j public class InfluxDBSourceSplitEnumerator implements SourceSplitEnumerator { - final InfluxDBConfig config; + final SourceConfig config; private final Context context; private final Map> pendingSplit; private final Object stateLock = new Object(); private volatile boolean shouldEnumerate; - public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBConfig config) { + public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, SourceConfig config) { this(context, null, config); } - public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBSourceState sourceState, InfluxDBConfig config) { + public InfluxDBSourceSplitEnumerator(SourceSplitEnumerator.Context context, InfluxDBSourceState sourceState, SourceConfig config) { this.context = context; this.config = config; this.pendingSplit = new HashMap<>(); @@ -113,7 +113,7 @@ private Set getInfluxDBSplit() { Set influxDBSourceSplits = new HashSet<>(); // no need numPartitions, use one partition if (config.getPartitionNum() == 0) { - influxDBSourceSplits.add(new InfluxDBSourceSplit(InfluxDBConfig.DEFAULT_PARTITIONS, sql)); + influxDBSourceSplits.add(new InfluxDBSourceSplit(SourceConfig.DEFAULT_PARTITIONS, sql)); return influxDBSourceSplits; } //calculate numRange base on (lowerBound upperBound partitionNum) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml index a3b93102e55..652782b0631 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/pom.xml @@ -26,6 +26,7 @@ connector-influxdb-e2e + org.apache.seatunnel connector-influxdb @@ -34,9 +35,9 @@ org.apache.seatunnel - connector-assert + connector-console ${project.version} test - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java deleted file mode 100644 index d39aa4f395e..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxDBSourceToAssertIT.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.e2e.connector.influxdb; - -import static org.awaitility.Awaitility.given; - -import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; -import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; -import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.TestContainer; - -import lombok.extern.slf4j.Slf4j; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.Date; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; - -@Slf4j -public class InfluxDBSourceToAssertIT extends TestSuiteBase implements TestResource { - - private static final String INFLUXDB_DOCKER_IMAGE = "influxdb:1.8"; - private static final String INFLUXDB_CONTAINER_HOST = "influxdb-host"; - private static final int INFLUXDB_CONTAINER_PORT = 8086; - private static final String INFLUXDB_DATABASE = "test"; - private static final String INFLUXDB_MEASUREMENT = "test"; - - private GenericContainer influxDBServer; - private InfluxDB influxDB; - - @BeforeAll - @Override - public void startUp() { - influxDBServer = new GenericContainer<>(INFLUXDB_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(INFLUXDB_CONTAINER_HOST) - .withExposedPorts(INFLUXDB_CONTAINER_PORT) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(INFLUXDB_DOCKER_IMAGE))); - Startables.deepStart(Stream.of(influxDBServer)).join(); - log.info("influxdb container started"); - given().ignoreExceptions() - .await() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> initializeInfluxDBClient()); - batchInsertData(); - } - - @TestTemplate - public void testInfluxDBSource(TestContainer container) throws IOException, InterruptedException { - Container.ExecResult execResult = container.executeJob("/influxdb_source_to_assert.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - } - - private void initializeInfluxDBClient() throws ConnectException { - InfluxDBConfig influxDBConfig = new InfluxDBConfig(String.format("http://%s:%s", influxDBServer.getHost(), influxDBServer.getFirstMappedPort())); - influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); - } - - public void batchInsertData() { - influxDB.createDatabase(INFLUXDB_DATABASE); - BatchPoints batchPoints = BatchPoints - .database(INFLUXDB_DATABASE) - .build(); - for (int i = 0; i < 100; i++) { - Point point = Point.measurement(INFLUXDB_MEASUREMENT) - .time(new Date().getTime(), TimeUnit.NANOSECONDS) - .tag("label", String.format("label_%s", i)) - .addField("f1", String.format("f1_%s", i)) - .addField("f2", Double.valueOf(i + 1)) - .addField("f3", Long.valueOf(i + 2)) - .addField("f4", Float.valueOf(i + 3)) - .addField("f5", Integer.valueOf(i)) - .addField("f6", (short) (i + 4)) - .addField("f7", i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE) - .build(); - batchPoints.point(point); - } - influxDB.write(batchPoints); - } - - @AfterAll - @Override - public void tearDown() { - if (influxDB != null) { - influxDB.close(); - } - if (influxDBServer != null) { - influxDBServer.stop(); - } - } -} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java new file mode 100644 index 00000000000..20cc6dce012 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.influxdb; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import lombok.extern.slf4j.Slf4j; +import org.influxdb.InfluxDB; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.io.IOException; +import java.net.ConnectException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import scala.Tuple2; + +@Slf4j +public class InfluxdbIT extends TestSuiteBase implements TestResource { + private static final String IMAGE = "influxdb:1.8"; + private static final String HOST = "influxdb-host"; + private static final int PORT = 8086; + private static final String INFLUXDB_DATABASE = "test"; + private static final String INFLUXDB_SOURCE_MEASUREMENT = "source"; + private static final String INFLUXDB_SINK_MEASUREMENT = "sink"; + + + private static final Tuple2> TEST_DATASET = generateTestDataSet(); + + private GenericContainer influxdbContainer; + private String influxDBConnectUrl; + + private InfluxDB influxDB; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.influxdbContainer = new GenericContainer<>(DockerImageName.parse(IMAGE)) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))) + .waitingFor(new HostPortWaitStrategy() + .withStartupTimeout(Duration.ofMinutes(2))); + Startables.deepStart(Stream.of(influxdbContainer)).join(); + influxDBConnectUrl = String.format("http://%s:%s", influxdbContainer.getHost(), influxdbContainer.getFirstMappedPort()); + log.info("Influxdb container started"); + this.initializeInfluxDBClient(); + this.initSourceData(); + } + + private void initSourceData() { + influxDB.createDatabase(INFLUXDB_DATABASE); + BatchPoints batchPoints = BatchPoints + .database(INFLUXDB_DATABASE) + .build(); + List rows = TEST_DATASET._2(); + SeaTunnelRowType rowType = TEST_DATASET._1(); + + for (int i = 0; i < rows.size(); i++) { + SeaTunnelRow row = rows.get(i); + Point point = Point.measurement(INFLUXDB_SOURCE_MEASUREMENT) + .time((Long) row.getField(0), TimeUnit.NANOSECONDS) + .tag(rowType.getFieldName(1), (String) row.getField(1)) + .addField(rowType.getFieldName(2), (String) row.getField(2)) + .addField(rowType.getFieldName(3), (Double) row.getField(3)) + .addField(rowType.getFieldName(4), (Long) row.getField(4)) + .addField(rowType.getFieldName(5), (Float) row.getField(5)) + .addField(rowType.getFieldName(6), (Integer) row.getField(6)) + .addField(rowType.getFieldName(7), (Short) row.getField(7)) + .addField(rowType.getFieldName(8), (Boolean) row.getField(8)) + .build(); + batchPoints.point(point); + } + influxDB.write(batchPoints); + } + + private static Tuple2> generateTestDataSet() { + SeaTunnelRowType rowType = new SeaTunnelRowType( + new String[]{ + "time", + "label", + "c_string", + "c_double", + "c_bigint", + "c_float", + "c_int", + "c_smallint", + "c_boolean" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE, + BasicType.STRING_TYPE, + BasicType.STRING_TYPE, + BasicType.DOUBLE_TYPE, + BasicType.LONG_TYPE, + BasicType.FLOAT_TYPE, + BasicType.INT_TYPE, + BasicType.SHORT_TYPE, + BasicType.BOOLEAN_TYPE + } + ); + + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + new Date().getTime(), + String.format("label_%s", i), + String.format("f1_%s", i), + Double.parseDouble("1.1"), + Long.parseLong("1"), + Float.parseFloat("1.1"), + Integer.valueOf(i), + Short.parseShort("1"), + i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE + }); + rows.add(row); + } + return Tuple2.apply(rowType, rows); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + influxDB.close(); + influxdbContainer.stop(); + } + + @TestTemplate + public void testInfluxdb(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/influxdb-to-influxdb.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + String sourceSql = String.format("select * from %s order by time", INFLUXDB_SOURCE_MEASUREMENT); + String sinkSql = String.format("select * from %s order by time", INFLUXDB_SINK_MEASUREMENT); + QueryResult sourceQueryResult = influxDB.query(new Query(sourceSql, INFLUXDB_DATABASE)); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + //assert data count + Assertions.assertEquals(sourceQueryResult.getResults().size(), sinkQueryResult.getResults().size()); + //assert data values + List> sourceValues = sourceQueryResult.getResults().get(0).getSeries().get(0).getValues(); + List> sinkValues = sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); + int rowSize = sourceValues.size(); + int colSize = sourceValues.get(0).size(); + + for (int row = 0; row < rowSize; row++) { + for (int col = 0; col < colSize; col++) { + Object sourceColValue = sourceValues.get(row).get(col); + Object sinkColValue = sinkValues.get(row).get(col); + + if (!Objects.deepEquals(sourceColValue, sinkColValue)) { + Assertions.assertEquals(sourceColValue, sinkColValue); + } + } + + } + } + + private void initializeInfluxDBClient() throws ConnectException { + InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); + influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf new file mode 100644 index 00000000000..f95af29a2c6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb-to-influxdb.conf @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + InfluxDB { + url = "http://influxdb-host:8086" + sql = "select label, c_string, c_double, c_bigint, c_float, c_int, c_smallint, c_boolean from source" + database = "test" + upper_bound = 99 + lower_bound = 0 + partition_num = 4 + split_column = "c_int" + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } +} + +transform { +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + measurement = "sink" + key_time = "time" + key_tags = ["label"] + batch_size = 1 + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf deleted file mode 100644 index ea0e6e17740..00000000000 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/influxdb_source_to_assert.conf +++ /dev/null @@ -1,188 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -###### -###### This config file is a demonstration of streaming processing in seatunnel config -###### - -env { - execution.parallelism = 1 - job.mode = "BATCH" - - # You can set spark configuration here - spark.app.name = "SeaTunnel" - spark.executor.instances = 2 - spark.executor.cores = 1 - spark.executor.memory = "1g" - spark.master = local -} - -source { - # This is a example source plugin **only for test and demonstrate the feature source plugin** - InfluxDB { - url = "http://influxdb-host:8086" - sql = "select label, f1, f2, f3, f4, f5, f6, f7 from test" - database = "test" - upper_bound = 99 - lower_bound = 0 - partition_num = 4 - split_column = "f5" - fields { - label = STRING - f1 = STRING - f2 = DOUBLE - f3 = BIGINT - f4 = FLOAT - f5 = INT - f6 = SMALLINT - f7 = BOOLEAN - } - } -} - -sink { - Assert { - rules = - { - row_rules = [ - { - rule_type = MAX_ROW - rule_value = 100 - }, - { - rule_type = MIN_ROW - rule_value = 100 - } - ], - field_rules = [{ - field_name = f1 - field_type = string - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN_LENGTH - rule_value = 4 - }, - { - rule_type = MAX_LENGTH - rule_value = 5 - } - ] - },{ - field_name = f2 - field_type = double - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 1 - }, - { - rule_type = MAX - rule_value = 100 - } - ] - },{ - field_name = f3 - field_type = long - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 2 - }, - { - rule_type = MAX - rule_value = 101 - } - ] - },{ - field_name = f4 - field_type = float - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 3 - }, - { - rule_type = MAX - rule_value = 102 - } - ] - },{ - field_name = f5 - field_type = int - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 99 - } - ] - },{ - field_name = f6 - field_type = short - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 4 - }, - { - rule_type = MAX - rule_value = 103 - } - ] - },{ - field_name = f7 - field_type = boolean - field_value = [ - { - rule_type = NOT_NULL - }, - { - rule_type = MIN - rule_value = 0 - }, - { - rule_type = MAX - rule_value = 1 - } - ] - } - ] - } - } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/category/sink-v2 -} \ No newline at end of file