Skip to content

Commit

Permalink
feat: Added config update acks to StreamConfigManager.
Browse files Browse the repository at this point in the history
Resolves #25
  • Loading branch information
jruaux committed Jul 23, 2023
1 parent c2709cd commit 9a4e160
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@

public class RuleSessionManager implements AutoCloseable {

public static final String KEY_CONFIG = "config";

private final JavaPropsMapper mapper = Mappers.propsMapper();

private final Map<Config, ConfigManager<RulesetConfig>> configManagers = new HashMap<>();
Expand All @@ -32,9 +30,7 @@ public QueryRuleSession getRuleSession(Config config) {

private ConfigManager<RulesetConfig> createConfigManager(Config config) {
AbstractRedisClient client = clientManager.getClient(config.getRedis());
String key = KeyBuilder.of(config).build(KEY_CONFIG);
RulesetConfig ruleset = config.getRuleset();
StreamConfigManager<RulesetConfig> configManager = new StreamConfigManager<>(client, key, ruleset, mapper);
StreamConfigManager configManager = new StreamConfigManager(client, config, mapper);
try {
configManager.start();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import org.springframework.beans.BeanUtils;

import com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.smartcache.core.config.Config;
import com.redis.smartcache.core.config.RulesetConfig;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Limit;
Expand All @@ -26,7 +26,7 @@
import io.lettuce.core.XReadArgs;
import io.lettuce.core.XReadArgs.StreamOffset;

public class StreamConfigManager<T> implements ConfigManager<T>, Consumer<StreamMessage<String, String>> {
public class StreamConfigManager implements ConfigManager<RulesetConfig>, Consumer<StreamMessage<String, String>> {

private static final Logger log = Logger.getLogger(StreamConfigManager.class.getName());

Expand All @@ -36,11 +36,9 @@ public class StreamConfigManager<T> implements ConfigManager<T>, Consumer<Stream

private final AbstractRedisClient client;

private final String key;

private final JavaPropsMapper mapper;

private final T config;
private final Config config;

private Duration block = DEFAULT_BLOCK;

Expand All @@ -50,10 +48,11 @@ public class StreamConfigManager<T> implements ConfigManager<T>, Consumer<Stream

private ExecutorService executor;

public StreamConfigManager(AbstractRedisClient client, String key, T config, JavaPropsMapper mapper) {
this.client = client;
this.key = key;
private StatefulRedisModulesConnection<String, String> connection;

public StreamConfigManager(AbstractRedisClient client, Config config, JavaPropsMapper mapper) {
this.config = config;
this.client = client;
this.mapper = mapper;
}

Expand All @@ -71,11 +70,12 @@ public void setCount(OptionalLong count) {

@Override
public void start() throws IOException {
StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils.connection(client);
String key = key();
connection = RedisModulesUtils.connection(client);
List<StreamMessage<String, String>> messages = connection.sync().xrevrange(key, Range.create("-", "+"),
Limit.create(0, 1));
if (messages.isEmpty()) {
Map<String, String> map = mapper.writeValueAsMap(config);
Map<String, String> map = mapper.writeValueAsMap(config.getRuleset());
if (!map.isEmpty()) {
connection.sync().xadd(key, map);
}
Expand All @@ -87,20 +87,57 @@ public void start() throws IOException {
executor.submit(poller);
}

@SuppressWarnings("unchecked")
public String key() {
return keyBuilder().build();
}

private KeyBuilder keyBuilder() {
return KeyBuilder.of(config).withKeyspace("config");
}

@Override
public void accept(StreamMessage<String, String> message) {
try {
T newConfig = (T) mapper.readMapAs(message.getBody(), config.getClass());
RulesetConfig newConfig = mapper.readMapAs(message.getBody(), RulesetConfig.class);
if (newConfig != null) {
BeanUtils.copyProperties(newConfig, config);
log.log(Level.INFO, "Updated configuration id {0}: {1}", new Object[] { message.getId(), config });
config.getRuleset().setRules(newConfig.getRules());
double score = score(message);
String appInstanceId = appInstanceId();
String ackKey = ackKey();
connection.sync().zadd(ackKey, score, appInstanceId);
log.log(Level.INFO, "Updated configuration id {0}: {1}", new Object[] { message.getId(), config.getRuleset() });
}
} catch (Exception e) {
log.log(Level.SEVERE, "Could not parse config", e);
}
}

private String appInstanceId() {
if (config.getId() == null) {
return clientId();
}
return config.getId();
}

public String clientId() {
return String.valueOf(connection.sync().clientId());
}

public String ackKey() {
return keyBuilder().build("ack");
}

private double score(StreamMessage<String, String> message) {
String id = message.getId();
int offsetPosition = id.indexOf("-");
if (offsetPosition == -1) {
return Long.parseLong(id);
}
long millis = Long.parseLong(id.substring(0, offsetPosition));
double sequence = Long.parseLong(id.substring(offsetPosition + 1));
return millis + sequence / 1000;
}

public boolean isRunning() {
return poller.getState() == State.STARTED;
}
Expand All @@ -115,8 +152,8 @@ private XReadArgs xreadArgs() {
}

@Override
public T get() {
return config;
public RulesetConfig get() {
return config.getRuleset();
}

@Override
Expand All @@ -125,6 +162,8 @@ public void stop() throws InterruptedException, ExecutionException, TimeoutExcep
poller = null;
executor.shutdown();
executor = null;
connection.close();
connection = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public void run() {
while (!stop) {
connection.sync().xread(xreadArgs, offset).forEach(consumer);
}
connection.close();
this.state = State.STOPPED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class Config {

private String name = DEFAULT_NAME;

private String id;

private int queryCacheCapacity = DEFAULT_QUERY_CACHE_CAPACITY;

private DriverConfig driver = new DriverConfig();
Expand All @@ -22,6 +24,22 @@ public class Config {

private MetricsConfig metrics = new MetricsConfig();

/**
*
* @return ID that uniquely identifies this application instance.
*/
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

/**
*
* @return Name of this application. Should be the same across application instances.
*/
public String getName() {
return name;
}
Expand Down
Loading

0 comments on commit 9a4e160

Please sign in to comment.