Skip to content

redis-field-engineering/redis-flink-connector-dist

Repository files navigation

Redis Flink Connector

The Redis Flink Connector is a highly performant, scalable Flink Source and Sink connector for Redis. It is designed and built to provide a simple, scalable means of using Redis as a source and Sink for your stream-processing use cases in Flink.

Partitioned Streams

The Redis Flink Connector supports partitioned streams, allowing you to configure how many separate partitions you want for your stream of data. This allows you to scale your stream across a Redis Cluster, allowing Flink to manage the work of coordinating which consumer owns which stream.

Exactly-Once Semantics

The Redis Flink Connector supports exactly-once semantics. This is tied into the checkpointing mechanism in Flink. Please note that "exactly once" refers to is at the checkpoint level, so in the case of a failure in your pipeline you may see messages within a checkpoint being delivered more than once

Gradle

Add the following to your build.gradle file

build.gradle
dependencies {
    implementation 'com.redis:redis-flink-connector-spring:0.0.4'
}

Using the Stream Source

To use the Flink stream source, you can create a RedisSourceConfig.

The configuration options are as follows:

The following table describes the fields in that class:

Field Type Default Value Required

host

String

"localhost"

No

port

int

6379

No

password

String

"" (empty string)

No

user

String

"default"

No

consumerGroup

String

N/A

Yes

topicName

String

N/A

Yes

numPartitions

int

N/A

Yes

useClusterApi

boolean

false

No

requireAck

boolean

true

No

startingId

StreamEntryID

StreamEntryID.XGROUP_LAST_ENTRY

No

failedDeserializationStreamName

String

"" (empty string)

No

You can then initialize the Source Builder using:

RedisSourceBuilder<RedisMessage> sourceBuilder = new RedisSourceBuilder<>(sourceConfig, new RedisMessageDeserializer());

After that, all that’s left is to use your environment to add the source to your pipeline:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(globalConfig);
env.enableCheckpointing(5000);
env.setParallelism(4);
TypeInformation<RedisMessage> typeInfo = TypeInformation.of(RedisMessage.class);
String sourceName = "Redis to Redis";
env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), sourceName, typeInfo).sinkTo(sinkBuilder.build());

Using the Redis Stream Sink

To use the Redis Stream Sink, you can initialize a RedisSinkConfig object with the following:

The following table describes the fields in that class:

Field Type Default Value Required description

host

String

"localhost"

No

the Redis host name

port

int

6379

No

the Redis port

password

String

"" (empty string)

No

the Redis password

user

String

"default"

No

the Redis user

topicName

String

N/A

Yes

the Topic Name

numPartitions

int

N/A

Yes

the number of partitions

flushOnCheckpoint

boolean

false

No

whether to flush writes on checkpoint

failedSerializationStreamName

String

"" (empty string)

No

the stream name to serialization errors to

You then have to initialize the builder and sink to it:

RedisSinkBuilder<RedisMessage> sinkBuilder = new RedisSinkBuilder<>(new RedisPassthroughSerializer(), sinkConfig);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(globalConfig);
env.enableCheckpointing(5000);
env.setParallelism(4);
TypeInformation<RedisMessage> typeInfo = TypeInformation.of(RedisMessage.class);
String sourceName = "Redis to Redis";
env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), sourceName, typeInfo).sinkTo(sinkBuilder.build());

Serializers and Keys

The Redis Flink Connector’s natural data type is the RedisMessage class. This class contains the data of the message (a Map<String,String>) and the key for the Message (a String). The RedisPasssthroughSerializer and the RedisMessageDeserializer are a simple serializer/deserializer pair that allows you to work directly with the RedisMessage object.

The RedisObjectSerializer and RedisObjectDeserializer are generic serializers/deserializers that allow you to work with your standard serializable POJOs. You can use these if you want to work with your own objects domain objects, the object is serialized to JSON and added as the data field of the Stream Message that is sent to Redis. If you need to add specific modules to the ObjectMapper (e.g. JavaTimeModule), you can do so by passing in an ObjectMapperSupplier to the RedisObjectSerializer and RedisObjectDeserializer constructors. E.g.

RedisObjectSerializer<Person> serializer = new RedisObjectSerializer<>(() -> {
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.registerModule(new JavaTimeModule());
            return objectMapper;
        });

If you use these, you may also want to provide a RedisKeyExtractor to extract the key from the object, otherwise, a hashcode extracted from the JSON payload of the object will act as the key.

The key determines what partition that a message will be sent to.

Configure Serializer and Key Extractor

You can configure the serializer and key extractor in the RedisSinkBuilder:

RedisSinkBuilder<Person> sinkBuilder = new RedisSinkBuilder<Person>(new RedisObjectSerializer<>(), sinkConfig).keyExtractor(Person::getName);

And you can configure which deserializer to use in the RedisSourceBuilder:

RedisSourceBuilder<Person> sourceBuilder = new RedisSourceBuilder<>(sourceConfig, new RedisObjectDeserializer<>(Person.class));

Quick Start

You can run the demo in this repo by running:

docker compose up -d
./example-redis-job.sh

This will spin up Redis, a Flink Job Manager and Task Manager, and start a Job with Redis as the Source and Sink.

Support

Redis Flink Connector is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Flink Connector on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.

License

Redis Flink Connector is licensed under the Business Source License 1.1. Copyright © 2024 Redis, Inc. See LICENSE for details.

About

Flink Source and Sink for Redis Streams

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages