Skip to content

Commit

Permalink
feat: Replaced QueryWriter component with RediSearchMeterRegistry
Browse files Browse the repository at this point in the history
Resolves #11
  • Loading branch information
Julien Ruaux committed Mar 30, 2023
1 parent b77cb10 commit 3ef3617
Show file tree
Hide file tree
Showing 12 changed files with 3,421 additions and 347 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.redis.smartcache;

import static com.redis.smartcache.core.RedisResultSetCache.METER_QUERY;
import static com.redis.smartcache.core.RedisResultSetCache.TAG_SQL;
import static com.redis.smartcache.core.RedisResultSetCache.TAG_TABLE;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -26,18 +30,22 @@
import com.redis.lettucemod.util.ClientBuilder;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.lettucemod.util.RedisURIBuilder;
import com.redis.micrometer.RedisTimeSeriesConfig;
import com.redis.micrometer.RediSearchMeterRegistry;
import com.redis.micrometer.RediSearchRegistryConfig;
import com.redis.micrometer.RedisRegistryConfig;
import com.redis.micrometer.RedisTimeSeriesMeterRegistry;
import com.redis.smartcache.core.Config;
import com.redis.smartcache.core.Config.DriverConfig;
import com.redis.smartcache.core.Config.RedisConfig;
import com.redis.smartcache.core.Config.RulesetConfig;
import com.redis.smartcache.core.ConfigManager;
import com.redis.smartcache.core.EvictingLinkedHashMap;
import com.redis.smartcache.core.KeyBuilder;
import com.redis.smartcache.core.Query;
import com.redis.smartcache.core.QueryRuleSession;
import com.redis.smartcache.core.QueryWriter;
import com.redis.smartcache.core.RedisResultSetCache;
import com.redis.smartcache.core.ResultSetCache;
import com.redis.smartcache.core.RuntimeSQLException;
import com.redis.smartcache.core.codec.ResultSetCodec;
import com.redis.smartcache.jdbc.SmartConnection;

Expand All @@ -46,6 +54,8 @@
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.config.MeterFilter;

/**
* The Java SQL framework allows for multiple database drivers. Each driver
Expand Down Expand Up @@ -82,12 +92,11 @@ public class Driver implements java.sql.Driver {
private static final JavaPropsMapper PROPS_MAPPER = propsMapper();
private static final JsonMapper JSON_MAPPER = jsonMapper();

private static final Map<String, java.sql.Driver> drivers = new HashMap<>();
private static final Map<RedisConfig, AbstractRedisClient> clients = new HashMap<>();
private static final Map<Config, MeterRegistry> registries = new HashMap<>();
private static final Map<Config, QueryRuleSession> sessions = new HashMap<>();
private static final Map<Config, ConfigManager<RulesetConfig>> configManagers = new HashMap<>();
private static final Map<Config, QueryWriter> queryWriters = new HashMap<>();
private final Map<RedisConfig, AbstractRedisClient> clients = new HashMap<>();
private final Map<Config, MeterRegistry> registries = new HashMap<>();
private final Map<Config, QueryRuleSession> sessions = new HashMap<>();
private final Map<Config, ConfigManager<RulesetConfig>> configManagers = new HashMap<>();
private final Map<Config, Map<String, Query>> queryCaches = new HashMap<>();

public static JavaPropsMapper propsMapper() {
return JavaPropsMapper.builder().serializationInclusion(Include.NON_NULL)
Expand Down Expand Up @@ -148,52 +157,55 @@ public SmartConnection connect(String url, Properties info) throws SQLException
}
config.getRedis().setUri(redisUri);
log.fine("Creating backend connection");
configManager(config);
Connection backendConnection = backendConnection(config.getDriver(), info);
log.fine("Creating SmartCache connection");
return makeConnection(config, backendConnection);
}

private SmartConnection makeConnection(Config conf, Connection backendConnection) throws SQLException {
QueryRuleSession ruleSession = sessions.computeIfAbsent(conf, this::ruleSession);
checkConfigManager(conf);
KeyBuilder cacheKeyBuilder = keyBuilder(conf, KEYSPACE_CACHE);
MeterRegistry meterRegistry = registries.computeIfAbsent(conf, this::createMeterRegistry);
QueryWriter queryWriter = queryWriters.computeIfAbsent(conf, this::createQueryWriter);
AbstractRedisClient client = redisClient(conf);
RedisCodec<String, ResultSet> codec = resultSetCodec(conf);
StatefulRedisModulesConnection<String, ResultSet> redisConnection = RedisModulesUtils.connection(client, codec);
ResultSetCache resultSetCache = new RedisResultSetCache(redisConnection, meterRegistry, cacheKeyBuilder,
ruleSession, queryWriter);
return new SmartConnection(backendConnection, resultSetCache);
private SmartConnection makeConnection(Config config, Connection backendConnection) {
return new SmartConnection(backendConnection, resultSetCache(config));
}

private void checkConfigManager(Config conf) throws SQLException {
if (configManagers.containsKey(conf)) {
return;
}
private ResultSetCache resultSetCache(Config config) {
QueryRuleSession session = sessions.computeIfAbsent(config, this::ruleSession);
KeyBuilder keyBuilder = keyBuilder(config, KEYSPACE_CACHE);
MeterRegistry registry = registries.computeIfAbsent(config, this::createMeterRegistry);
AbstractRedisClient client = client(config);
RedisCodec<String, ResultSet> codec = resultSetCodec(config);
StatefulRedisModulesConnection<String, ResultSet> connection = RedisModulesUtils.connection(client, codec);
Map<String, Query> queryCache = queryCache(config);
return new RedisResultSetCache(connection, registry, keyBuilder, session, queryCache);
}

private ConfigManager<RulesetConfig> configManager(Config config) {
return configManagers.computeIfAbsent(config, this::createConfigManager);
}

private ConfigManager<RulesetConfig> createConfigManager(Config config) {
String key = keyBuilder(config).create(KEY_CONFIG);
AbstractRedisClient redisClient = client(config);
Duration period = duration(config.getRuleset().getRefresh());
log.log(Level.FINE, "Creating config manager on key {0} with refresh interval {1}",
new Object[] { key, period });
try {
String key = keyBuilder(conf).create(KEY_CONFIG);
AbstractRedisClient redisClient = redisClient(conf);
Duration period = duration(conf.getRuleset().getRefresh());
log.log(Level.FINE, "Creating config manager on key {0} with refresh interval {1}",
new Object[] { key, period });
configManagers.put(conf, new ConfigManager<>(redisClient, JSON_MAPPER, key, conf.getRuleset(), period));
return new ConfigManager<>(redisClient, JSON_MAPPER, key, config.getRuleset(), period);
} catch (JsonProcessingException e) {
throw new SQLException("Could not create config manager", e);
throw new RuntimeSQLException("Could not create config manager", e);
}
}

private RedisCodec<String, ResultSet> resultSetCodec(Config conf) {
int bufferSize = Math.toIntExact(conf.getRedis().getCodecBufferCapacity().toBytes());
return new ResultSetCodec(bufferSize);
private Map<String, Query> queryCache(Config config) {
return queryCaches.computeIfAbsent(config, this::createQueryCache);
}

private QueryWriter createQueryWriter(Config conf) {
String index = conf.getRedis().getKey().getPrefix() + "-" + KEYSPACE_QUERIES + "-idx";
KeyBuilder keyBuilder = keyBuilder(conf, KEYSPACE_QUERIES);
log.log(Level.FINE, "Creating query writer on index {0} and keyspace {1}",
new Object[] { index, keyBuilder.getKeyspace() });
return new QueryWriter(redisClient(conf), conf.getAnalyzer(), index, keyBuilder);
private Map<String, Query> createQueryCache(Config config) {
return new EvictingLinkedHashMap<>(config.getAnalyzer().getCacheCapacity());
}

private static RedisCodec<String, ResultSet> resultSetCodec(Config config) {
int bufferSize = Math.toIntExact(config.getRedis().getCodecBufferCapacity().toBytes());
return new ResultSetCodec(bufferSize);
}

public static KeyBuilder keyBuilder(Config config, String prefix) {
Expand All @@ -205,51 +217,88 @@ private static KeyBuilder keyBuilder(Config config) {
return new KeyBuilder(config.getRedis().getKey().getPrefix(), config.getRedis().getKey().getSeparator());
}

private AbstractRedisClient redisClient(Config conf) {
return clients.computeIfAbsent(conf.getRedis(), this::createRedisClient);
private AbstractRedisClient client(Config config) {
return clients.computeIfAbsent(config.getRedis(), this::createRedisClient);
}

private Duration duration(io.airlift.units.Duration duration) {
return Duration.ofMillis(duration.toMillis());
}

private RedisTimeSeriesMeterRegistry createMeterRegistry(Config conf) {
KeyBuilder keyBuilder = keyBuilder(conf).builder(KEYSPACE_METRICS);
String keyPrefix = keyBuilder.create(""); // Registry expects prefix that includes trailing separator
String keySeparator = keyBuilder.getSeparator();
Duration step = duration(conf.getMetrics().getStep());
RedisTimeSeriesConfig registryConfig = new RedisTimeSeriesConfig() {
private MeterRegistry createMeterRegistry(Config config) {
AbstractRedisClient client = client(config);
log.fine("Creating meter registry");
KeyBuilder keyBuilder = keyBuilder(config);
Duration step = duration(config.getMetrics().getStep());
String tsKeyspace = keyBuilder.create(KEYSPACE_METRICS);
RedisTimeSeriesMeterRegistry tsRegistry = new RedisTimeSeriesMeterRegistry(new RedisRegistryConfig() {

@Override
public String get(String key) {
return null;
}

@Override
public String keyPrefix() {
return keyPrefix;
public String keyspace() {
return tsKeyspace;
}

@Override
public String keySeparator() {
return keySeparator;
return keyBuilder.getSeparator();
}

@Override
public Duration step() {
return step;
}
}, Clock.SYSTEM, client);
tsRegistry.config().meterFilter(MeterFilter.ignoreTags(TAG_SQL, TAG_TABLE));
String searchKeyspace = keyBuilder.getKeyspace();
RediSearchMeterRegistry searchRegistry = new RediSearchMeterRegistry(new RediSearchRegistryConfig() {

@Override
public boolean enabled() {
return conf.getMetrics().isEnabled();
public String get(String key) {
return null;
}
};
log.fine("Creating meter registry");
return new RedisTimeSeriesMeterRegistry(registryConfig, Clock.SYSTEM, redisClient(conf));

@Override
public String keyspace() {
return searchKeyspace;
}

@Override
public String keySeparator() {
return keyBuilder.getSeparator();
}

@Override
public Duration step() {
return step;
}

@Override
public String[] nonKeyTags() {
return new String[] { TAG_SQL, TAG_TABLE };
}

@Override
public String indexPrefix() {
return keyBuilder.getKeyspace();
}

@Override
public String indexSuffix() {
return "idx";
}

}, Clock.SYSTEM, client);
searchRegistry.config().meterFilter(MeterFilter.acceptNameStartsWith(METER_QUERY))
.meterFilter(MeterFilter.deny());
return new CompositeMeterRegistry().add(tsRegistry).add(searchRegistry);
}

private static Connection backendConnection(DriverConfig config, Properties info) throws SQLException {
private Connection backendConnection(DriverConfig config, Properties info) throws SQLException {
Properties backendInfo = new Properties();
for (String name : info.stringPropertyNames()) {
if (name.startsWith(PROPS_SCHEMA.prefix())) {
Expand All @@ -266,20 +315,16 @@ private static Connection backendConnection(DriverConfig config, Properties info
return driver.connect(url, backendInfo);
}

public static synchronized java.sql.Driver backendDriver(String className) throws SQLException {
public synchronized java.sql.Driver backendDriver(String className) throws SQLException {
if (className == null || className.isEmpty()) {
throw new SQLException("No backend driver class specified");
}
if (drivers.containsKey(className)) {
return drivers.get(className);
}
java.sql.Driver driver;
try {
driver = (java.sql.Driver) Class.forName(className).getConstructor().newInstance();
} catch (Exception e) {
throw new SQLException("Could not load backend driver class '" + className + "'", e);
}
drivers.put(className, driver);
return driver;
}

Expand Down Expand Up @@ -342,19 +387,17 @@ public static void deregister() throws SQLException {
throw new IllegalStateException(
"Driver is not registered (or it has not been registered using Driver.register() method)");
}
clear();
registeredDriver.clear();
DriverManager.deregisterDriver(registeredDriver);
registeredDriver = null;
}

public static void clear() {
drivers.clear();
public void clear() {
configManagers.values().forEach(ConfigManager::close);
configManagers.clear();
registries.values().forEach(MeterRegistry::close);
registries.clear();
queryWriters.values().forEach(QueryWriter::close);
queryWriters.clear();
queryCaches.clear();
sessions.clear();
clients.values().forEach(c -> {
c.shutdown();
Expand All @@ -374,20 +417,20 @@ private QueryRuleSession ruleSession(Config config) {
return ruleSession;
}

private RedisURI redisURI(RedisConfig conf) {
private RedisURI redisURI(RedisConfig config) {
RedisURIBuilder builder = RedisURIBuilder.create();
builder.uri(conf.getUri());
builder.username(conf.getUsername());
builder.password(conf.getPassword());
builder.ssl(conf.isTls());
builder.sslVerifyMode(conf.getTlsVerify());
builder.uri(config.getUri());
builder.username(config.getUsername());
builder.password(config.getPassword());
builder.ssl(config.isTls());
builder.sslVerifyMode(config.getTlsVerify());
return builder.build();
}

private AbstractRedisClient createRedisClient(RedisConfig conf) {
RedisURI redisURI = redisURI(conf);
private AbstractRedisClient createRedisClient(RedisConfig config) {
RedisURI redisURI = redisURI(config);
log.log(Level.FINE, "Creating Redis client with URI {0}", redisURI);
return ClientBuilder.create(redisURI).cluster(conf.isCluster()).build();
return ClientBuilder.create(redisURI).cluster(config.isCluster()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ public Duration getStep() {
return step;
}

public void setStep(Duration seconds) {
this.step = seconds;
public void setStep(Duration duration) {
this.step = duration;
}

public boolean isEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public String create(String... ids) {
* this KeyBuilder is in keyspace "root" and the given id is "sub" then the
* returned KeyBuilder will create keys under "root:sub:"
*
* @param id sub-keyspace element
* @param id subkeyspace element
* @return KeyBuilder for sub-keyspace "id"
*/
public KeyBuilder builder(String id) {
Expand Down
Loading

0 comments on commit 3ef3617

Please sign in to comment.