diff --git a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/RuleSessionManager.java b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/RuleSessionManager.java index f1399e4..ea08a2d 100644 --- a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/RuleSessionManager.java +++ b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/RuleSessionManager.java @@ -12,8 +12,6 @@ public class RuleSessionManager implements AutoCloseable { - public static final String KEY_CONFIG = "config"; - private final JavaPropsMapper mapper = Mappers.propsMapper(); private final Map> configManagers = new HashMap<>(); @@ -32,9 +30,7 @@ public QueryRuleSession getRuleSession(Config config) { private ConfigManager createConfigManager(Config config) { AbstractRedisClient client = clientManager.getClient(config.getRedis()); - String key = KeyBuilder.of(config).build(KEY_CONFIG); - RulesetConfig ruleset = config.getRuleset(); - StreamConfigManager configManager = new StreamConfigManager<>(client, key, ruleset, mapper); + StreamConfigManager configManager = new StreamConfigManager(client, config, mapper); try { configManager.start(); } catch (IOException e) { diff --git a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamConfigManager.java b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamConfigManager.java index e367daa..c1f7020 100644 --- a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamConfigManager.java +++ b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamConfigManager.java @@ -13,11 +13,11 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.springframework.beans.BeanUtils; - import com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper; import com.redis.lettucemod.api.StatefulRedisModulesConnection; import com.redis.lettucemod.util.RedisModulesUtils; +import com.redis.smartcache.core.config.Config; +import com.redis.smartcache.core.config.RulesetConfig; import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.Limit; @@ -26,7 +26,7 @@ import io.lettuce.core.XReadArgs; import io.lettuce.core.XReadArgs.StreamOffset; -public class StreamConfigManager implements ConfigManager, Consumer> { +public class StreamConfigManager implements ConfigManager, Consumer> { private static final Logger log = Logger.getLogger(StreamConfigManager.class.getName()); @@ -36,11 +36,9 @@ public class StreamConfigManager implements ConfigManager, Consumer implements ConfigManager, Consumer connection; + + public StreamConfigManager(AbstractRedisClient client, Config config, JavaPropsMapper mapper) { this.config = config; + this.client = client; this.mapper = mapper; } @@ -71,11 +70,12 @@ public void setCount(OptionalLong count) { @Override public void start() throws IOException { - StatefulRedisModulesConnection connection = RedisModulesUtils.connection(client); + String key = key(); + connection = RedisModulesUtils.connection(client); List> messages = connection.sync().xrevrange(key, Range.create("-", "+"), Limit.create(0, 1)); if (messages.isEmpty()) { - Map map = mapper.writeValueAsMap(config); + Map map = mapper.writeValueAsMap(config.getRuleset()); if (!map.isEmpty()) { connection.sync().xadd(key, map); } @@ -87,20 +87,57 @@ public void start() throws IOException { executor.submit(poller); } - @SuppressWarnings("unchecked") + public String key() { + return keyBuilder().build(); + } + + private KeyBuilder keyBuilder() { + return KeyBuilder.of(config).withKeyspace("config"); + } + @Override public void accept(StreamMessage message) { try { - T newConfig = (T) mapper.readMapAs(message.getBody(), config.getClass()); + RulesetConfig newConfig = mapper.readMapAs(message.getBody(), RulesetConfig.class); if (newConfig != null) { - BeanUtils.copyProperties(newConfig, config); - log.log(Level.INFO, "Updated configuration id {0}: {1}", new Object[] { message.getId(), config }); + config.getRuleset().setRules(newConfig.getRules()); + double score = score(message); + String appInstanceId = appInstanceId(); + String ackKey = ackKey(); + connection.sync().zadd(ackKey, score, appInstanceId); + log.log(Level.INFO, "Updated configuration id {0}: {1}", new Object[] { message.getId(), config.getRuleset() }); } } catch (Exception e) { log.log(Level.SEVERE, "Could not parse config", e); } } + private String appInstanceId() { + if (config.getId() == null) { + return clientId(); + } + return config.getId(); + } + + public String clientId() { + return String.valueOf(connection.sync().clientId()); + } + + public String ackKey() { + return keyBuilder().build("ack"); + } + + private double score(StreamMessage message) { + String id = message.getId(); + int offsetPosition = id.indexOf("-"); + if (offsetPosition == -1) { + return Long.parseLong(id); + } + long millis = Long.parseLong(id.substring(0, offsetPosition)); + double sequence = Long.parseLong(id.substring(offsetPosition + 1)); + return millis + sequence / 1000; + } + public boolean isRunning() { return poller.getState() == State.STARTED; } @@ -115,8 +152,8 @@ private XReadArgs xreadArgs() { } @Override - public T get() { - return config; + public RulesetConfig get() { + return config.getRuleset(); } @Override @@ -125,6 +162,8 @@ public void stop() throws InterruptedException, ExecutionException, TimeoutExcep poller = null; executor.shutdown(); executor = null; + connection.close(); + connection = null; } } diff --git a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamPoller.java b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamPoller.java index 9443905..e7e8420 100644 --- a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamPoller.java +++ b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/StreamPoller.java @@ -37,7 +37,6 @@ public void run() { while (!stop) { connection.sync().xread(xreadArgs, offset).forEach(consumer); } - connection.close(); this.state = State.STOPPED; } diff --git a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/config/Config.java b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/config/Config.java index bb8f6be..a7b926c 100644 --- a/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/config/Config.java +++ b/core/redis-smart-cache-core/src/main/java/com/redis/smartcache/core/config/Config.java @@ -10,6 +10,8 @@ public class Config { private String name = DEFAULT_NAME; + private String id; + private int queryCacheCapacity = DEFAULT_QUERY_CACHE_CAPACITY; private DriverConfig driver = new DriverConfig(); @@ -22,6 +24,22 @@ public class Config { private MetricsConfig metrics = new MetricsConfig(); + /** + * + * @return ID that uniquely identifies this application instance. + */ + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + /** + * + * @return Name of this application. Should be the same across application instances. + */ public String getName() { return name; } diff --git a/core/redis-smart-cache-core/src/test/java/com/redis/smartcache/core/ConfigTests.java b/core/redis-smart-cache-core/src/test/java/com/redis/smartcache/core/ConfigTests.java index 1d6ed38..8ff19c3 100644 --- a/core/redis-smart-cache-core/src/test/java/com/redis/smartcache/core/ConfigTests.java +++ b/core/redis-smart-cache-core/src/test/java/com/redis/smartcache/core/ConfigTests.java @@ -25,6 +25,7 @@ import io.airlift.units.Duration; import io.lettuce.core.Range; +import io.lettuce.core.ScoredValue; import io.lettuce.core.StreamMessage; import io.lettuce.core.XReadArgs.StreamOffset; @@ -49,50 +50,50 @@ void keyBuilder() { @SuppressWarnings("unchecked") @Test void updateStreamConfig() throws Exception { - String key = "updateStreamConfig"; try (RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI()); StatefulRedisModulesConnection connection = client.connect()) { - RulesetConfig conf = new RulesetConfig(); JavaPropsMapper mapper = Mappers.propsMapper(); - try (StreamConfigManager manager = new StreamConfigManager<>(client, key, conf, mapper)) { + Config config = new Config(); + try (StreamConfigManager manager = new StreamConfigManager(client, config, mapper)) { manager.start(); + String key = manager.key(); Assertions.assertNotNull(connection.sync().xread(StreamOffset.latest(key))); await().until(manager::isRunning); Map body = new HashMap<>(); body.put("rules[0].ttl", "123s"); connection.sync().xadd(key, body); - await().until(() -> conf.getRules().length == 1); - await().until(() -> conf.getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 123); + await().until(() -> config.getRuleset().getRules().length == 1); + await().until(() -> config.getRuleset().getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 123); body.put("rules[0].ttl", "456s"); connection.sync().xadd(key, body); - await().until(() -> conf.getRules().length == 1); - await().until(() -> conf.getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 456); + await().until(() -> config.getRuleset().getRules().length == 1); + await().until(() -> config.getRuleset().getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 456); } - RulesetConfig conf2 = new RulesetConfig(); - try (StreamConfigManager manager2 = new StreamConfigManager<>(client, key, conf2, mapper)) { + Config config2 = new Config(); + try (StreamConfigManager manager2 = new StreamConfigManager(client, config2, mapper)) { manager2.start(); - await().until(() -> conf2.getRules().length == 1); - await().until(() -> conf2.getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 456); + await().until(() -> config2.getRuleset().getRules().length == 1); + await().until(() -> config2.getRuleset().getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 456); } } } @Test void duplicateConfig() throws Exception { - String key = "duplicateStreamConfig"; + String key = "smartcache:config"; try (RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI()); StatefulRedisModulesConnection connection = client.connect()) { + Config config = new Config(); Map body = new HashMap<>(); body.put("rules[0].ttl", "0s"); connection.sync().xadd(key, body); - RulesetConfig conf = new RulesetConfig(); - conf.setRules(RuleConfig.passthrough().build()); + config.getRuleset().setRules(RuleConfig.passthrough().build()); JavaPropsMapper mapper = Mappers.propsMapper(); - try (StreamConfigManager manager = new StreamConfigManager<>(client, key, conf, mapper)) { + try (StreamConfigManager manager = new StreamConfigManager(client, config, mapper)) { manager.start(); await().until(manager::isRunning); - Assertions.assertEquals(1, conf.getRules().length); - Assertions.assertEquals(0, conf.getRules()[0].getTtl().getValue(TimeUnit.SECONDS)); + Assertions.assertEquals(1, config.getRuleset().getRules().length); + Assertions.assertEquals(0, config.getRuleset().getRules()[0].getTtl().getValue(TimeUnit.SECONDS)); } } } @@ -100,27 +101,27 @@ void duplicateConfig() throws Exception { @Test void initialStreamConfig() throws Exception { JavaPropsMapper mapper = Mappers.propsMapper(); - Map properties = new HashMap<>(); - properties.put("rules.1.tables.1", "customer"); - properties.put("rules.1.ttl", "1.00h"); - properties.put("rules.2.regex", "SELECT \\* FROM customers"); - properties.put("rules.2.ttl", "30.00m"); - properties.put("rules.3.ttl", "10.00s"); - properties.put("rules.4.query-ids.1", "ab324499"); - properties.put("rules.4.ttl", "10.00s"); - RulesetConfig conf = new RulesetConfig(); + Config config = new Config(); RuleConfig rule1 = RuleConfig.tables("customer").ttl(Duration.valueOf("1h")).build(); RuleConfig rule2 = RuleConfig.regex("SELECT \\* FROM customers").ttl(Duration.valueOf("30m")).build(); RuleConfig rule3 = RuleConfig.passthrough().ttl(Duration.valueOf("10s")).build(); RuleConfig rule4 = RuleConfig.queryIds("ab324499").ttl(Duration.valueOf("10s")).build(); - conf.setRules(rule1, rule2, rule3, rule4); - String key = "config"; + config.getRuleset().setRules(rule1, rule2, rule3, rule4); try (RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI()); StatefulRedisModulesConnection connection = client.connect(); - StreamConfigManager manager = new StreamConfigManager<>(client, key, conf, mapper)) { + StreamConfigManager manager = new StreamConfigManager(client, config, mapper)) { manager.start(); + String key = manager.key(); List> messages = connection.sync().xrange(key, Range.unbounded()); Assertions.assertEquals(1, messages.size()); + Map properties = new HashMap<>(); + properties.put("rules.1.tables.1", "customer"); + properties.put("rules.1.ttl", "1.00h"); + properties.put("rules.2.regex", "SELECT \\* FROM customers"); + properties.put("rules.2.ttl", "30.00m"); + properties.put("rules.3.ttl", "10.00s"); + properties.put("rules.4.query-ids.1", "ab324499"); + properties.put("rules.4.ttl", "10.00s"); Assertions.assertEquals(properties, messages.get(0).getBody()); } } @@ -149,24 +150,24 @@ void configMapper() throws IOException { @SuppressWarnings("unchecked") @Test void disableCaching() throws Exception { - String key = "disableCaching"; try (RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI()); StatefulRedisModulesConnection connection = client.connect()) { - RulesetConfig conf = new RulesetConfig(); + Config config = new Config(); RuleConfig rule1 = RuleConfig.tables("customer").ttl(Duration.valueOf("1h")).build(); RuleConfig rule2 = RuleConfig.regex("SELECT \\* FROM customers").ttl(Duration.valueOf("30m")).build(); RuleConfig rule3 = RuleConfig.passthrough().ttl(Duration.valueOf("10s")).build(); - conf.setRules(rule1, rule2, rule3); + config.getRuleset().setRules(rule1, rule2, rule3); JavaPropsMapper mapper = Mappers.propsMapper(); - try (StreamConfigManager manager = new StreamConfigManager<>(client, key, conf, mapper)) { + try (StreamConfigManager manager = new StreamConfigManager(client, config, mapper)) { manager.start(); + String key = manager.key(); Assertions.assertNotNull(connection.sync().xread(StreamOffset.latest(key))); await().until(manager::isRunning); Map body = new HashMap<>(); body.put("rules[0].ttl", "0s"); connection.sync().xadd(key, body); - await().until(() -> conf.getRules().length == 1); - await().until(() -> conf.getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 0); + await().until(() -> config.getRuleset().getRules().length == 1); + await().until(() -> config.getRuleset().getRules()[0].getTtl().getValue(TimeUnit.SECONDS) == 0); } } } @@ -214,15 +215,15 @@ void rulesetEquals() { @SuppressWarnings("unchecked") @Test void ruleSessionUpdate() throws Exception { - String key = "updateStreamConfig"; try (RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI()); StatefulRedisModulesConnection connection = client.connect()) { - RulesetConfig rulesetConfig = new RulesetConfig(); - QueryRuleSession session = QueryRuleSession.of(rulesetConfig); - rulesetConfig.addPropertyChangeListener(session); + Config config = new Config(); + QueryRuleSession session = QueryRuleSession.of(config.getRuleset()); + config.getRuleset().addPropertyChangeListener(session); JavaPropsMapper mapper = Mappers.propsMapper(); - try (StreamConfigManager manager = new StreamConfigManager<>(client, key, rulesetConfig, mapper)) { + try (StreamConfigManager manager = new StreamConfigManager(client, config, mapper)) { manager.start(); + String key = manager.key(); Assertions.assertNotNull(connection.sync().xread(StreamOffset.latest(key))); await().until(manager::isRunning); Map body = new HashMap<>(); @@ -245,4 +246,48 @@ void ruleSessionUpdate() throws Exception { } } + @Test + void configUpdateAck() throws Exception { + Config config = new Config(); + assertConfigUpdateAck(config); + } + + @Test + void configUpdateAckWithID() throws Exception { + String id = "myAppId"; + Config config = new Config(); + config.setId(id); + assertConfigUpdateAck(config); + } + + @SuppressWarnings("unchecked") + private void assertConfigUpdateAck(Config config) throws Exception { + try (RedisModulesClient client = RedisModulesClient.create(redis.getRedisURI()); + StatefulRedisModulesConnection connection = client.connect()) { + JavaPropsMapper mapper = Mappers.propsMapper(); + try (StreamConfigManager manager = new StreamConfigManager(client, config, mapper)) { + manager.start(); + Assertions.assertNotNull(connection.sync().xread(StreamOffset.latest(manager.key()))); + String id = config.getId() == null ? manager.clientId() : config.getId(); + await().until(manager::isRunning); + Map body = new HashMap<>(); + body.put("rules[0].ttl", "123s"); + assertMessageIdEquals(connection, manager.ackKey(), connection.sync().xadd(manager.key(), body), id); + body.put("rules[0].ttl", "456s"); + assertMessageIdEquals(connection, manager.ackKey(), connection.sync().xadd(manager.key(), body), id); + } + } + } + + private void assertMessageIdEquals(StatefulRedisModulesConnection connection, String ackKey, + String messageId, String appInstanceId) { + Awaitility.await().until(() -> !connection.sync().zrangeWithScores(ackKey, 0, -1).isEmpty()); + List> scoredValues = connection.sync().zrangeWithScores(ackKey, 0, -1); + Assertions.assertEquals(1, scoredValues.size()); + double score = scoredValues.get(0).getScore(); + Assertions.assertEquals(messageId, (long) score + "-0"); + Assertions.assertEquals(appInstanceId, scoredValues.get(0).getValue()); + + } + }