The Redis Flink Connector is a highly performant, scalable Flink Source and Sink connector for Redis. It is designed and built to provide a simple, scalable means of using Redis as a source and Sink for your stream-processing use cases in Flink.
The Redis Flink Connector supports partitioned streams, allowing you to configure how many separate partitions you want for your stream of data. This allows you to scale your stream across a Redis Cluster, allowing Flink to manage the work of coordinating which consumer owns which stream.
The Redis Flink Connector supports exactly-once semantics. This is tied into the checkpointing mechanism in Flink. Please note that "exactly once" refers to is at the checkpoint level, so in the case of a failure in your pipeline you may see messages within a checkpoint being delivered more than once
To use the Flink stream source, you can create a RedisSourceConfig
.
The configuration options are as follows:
The following table describes the fields in that class:
Field | Type | Default Value | Required |
---|---|---|---|
|
|
|
No |
|
|
|
No |
|
|
|
No |
|
|
|
No |
|
|
N/A |
Yes |
|
|
N/A |
Yes |
|
|
N/A |
Yes |
|
|
|
No |
|
|
|
No |
|
|
|
No |
|
|
|
No |
You can then initialize the Source Builder using:
RedisSourceBuilder<RedisMessage> sourceBuilder = new RedisSourceBuilder<>(sourceConfig, new RedisMessageDeserializer());
After that, all that’s left is to use your environment to add the source to your pipeline:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(globalConfig);
env.enableCheckpointing(5000);
env.setParallelism(4);
TypeInformation<RedisMessage> typeInfo = TypeInformation.of(RedisMessage.class);
String sourceName = "Redis to Redis";
env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), sourceName, typeInfo).sinkTo(sinkBuilder.build());
To use the Redis Stream Sink, you can initialize a RedisSinkConfig
object with the following:
The following table describes the fields in that class:
Field | Type | Default Value | Required | description |
---|---|---|---|---|
|
|
|
No |
the Redis host name |
|
|
|
No |
the Redis port |
|
|
|
No |
the Redis password |
|
|
|
No |
the Redis user |
|
|
N/A |
Yes |
the Topic Name |
|
|
N/A |
Yes |
the number of partitions |
|
|
|
No |
whether to flush writes on checkpoint |
|
|
|
No |
the stream name to serialization errors to |
You then have to initialize the builder and sink to it:
RedisSinkBuilder<RedisMessage> sinkBuilder = new RedisSinkBuilder<>(new RedisPassthroughSerializer(), sinkConfig);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(globalConfig);
env.enableCheckpointing(5000);
env.setParallelism(4);
TypeInformation<RedisMessage> typeInfo = TypeInformation.of(RedisMessage.class);
String sourceName = "Redis to Redis";
env.fromSource(sourceBuilder.build(), WatermarkStrategy.noWatermarks(), sourceName, typeInfo).sinkTo(sinkBuilder.build());
The Redis Flink Connector’s natural data type is the RedisMessage
class. This class contains the data of the message (a Map<String,String>
) and the key for the Message (a String
).
The RedisPasssthroughSerializer
and the RedisMessageDeserializer
are a simple serializer/deserializer pair that allows you to work directly with the RedisMessage
object.
The RedisObjectSerializer
and RedisObjectDeserializer
are generic serializers/deserializers that allow you to work with your standard serializable POJOs.
You can use these if you want to work with your own objects domain objects, the object is serialized to JSON and added as the
data
field of the Stream Message that is sent to Redis. If you need to add specific modules to the ObjectMapper
(e.g. JavaTimeModule
), you can do so by
passing in an ObjectMapperSupplier
to the RedisObjectSerializer
and RedisObjectDeserializer
constructors. E.g.
RedisObjectSerializer<Person> serializer = new RedisObjectSerializer<>(() -> {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
});
If you use these, you may also want to provide a RedisKeyExtractor
to extract the key from the object, otherwise, a hashcode extracted from the JSON payload of the object will act as the key.
The key determines what partition that a message will be sent to.
You can configure the serializer and key extractor in the RedisSinkBuilder
:
RedisSinkBuilder<Person> sinkBuilder = new RedisSinkBuilder<Person>(new RedisObjectSerializer<>(), sinkConfig).keyExtractor(Person::getName);
And you can configure which deserializer to use in the RedisSourceBuilder
:
RedisSourceBuilder<Person> sourceBuilder = new RedisSourceBuilder<>(sourceConfig, new RedisObjectDeserializer<>(Person.class));
You can run the demo in this repo by running:
docker compose up -d
./example-redis-job.sh
This will spin up Redis, a Flink Job Manager and Task Manager, and start a Job with Redis as the Source and Sink.
Redis Flink Connector is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Flink Connector on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.
Redis Flink Connector is licensed under the Business Source License 1.1. Copyright © 2024 Redis, Inc. See LICENSE for details.