Skip to content

Commit

Permalink
refactor: simplified configuration, moved key mgmt to ResultSetCache
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Jul 2, 2022
1 parent dc779d7 commit b48bcc0
Show file tree
Hide file tree
Showing 19 changed files with 496 additions and 624 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,73 @@
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.sql.rowset.RowSetFactory;
import javax.sql.rowset.RowSetProvider;

import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
import com.fasterxml.jackson.dataformat.javaprop.JavaPropsSchema;
import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.json.SetMode;
import com.redis.micrometer.RedisTimeSeriesConfig;
import com.redis.micrometer.RedisTimeSeriesMeterRegistry;
import com.redis.sidecar.core.Config;
import com.redis.sidecar.config.Config;
import com.redis.sidecar.config.Pool;
import com.redis.sidecar.core.ByteArrayResultSetCodec;
import com.redis.sidecar.core.ConfigUpdater;
import com.redis.sidecar.core.ResultSetCache;
import com.redis.sidecar.core.StringResultSetCache;
import com.redis.sidecar.jdbc.SidecarConnection;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.support.ConnectionPoolSupport;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;

public class SidecarDriver implements Driver {

private static final Logger log = Logger.getLogger(SidecarDriver.class.getName());

public static final String JDBC_URL_REGEX = "jdbc\\:(rediss?(\\-(socket|sentinel))?\\:\\/\\/.*)";
private static final String JDBC_URL_REGEX = "jdbc\\:(rediss?(\\-(socket|sentinel))?\\:\\/\\/.*)";
private static final Pattern JDBC_URL_PATTERN = Pattern.compile(JDBC_URL_REGEX);

public static final String PROPERTY_PREFIX = "sidecar";
public static final String PROPERTY_KEYSPACE = PROPERTY_PREFIX + ".keyspace";
public static final String PROPERTY_CACHE_NAME = PROPERTY_PREFIX + ".cache-name";
public static final String PROPERTY_METRICS = PROPERTY_PREFIX + ".metrics";
public static final String PROPERTY_METRICS_STEP = PROPERTY_METRICS + ".step";
public static final String PROPERTY_DRIVER_PREFIX = PROPERTY_PREFIX + ".driver";
public static final String PROPERTY_DRIVER_CLASSNAME = PROPERTY_DRIVER_PREFIX + ".class-name";
public static final String PROPERTY_DRIVER_URL = PROPERTY_DRIVER_PREFIX + ".url";
public static final String PROPERTY_CLUSTER = PROPERTY_PREFIX + ".cluster";

public static final String KEY_SEPARATOR = ":";
public static final String DEFAULT_KEYSPACE = "sidecar";
public static final String DEFAULT_CACHE_NAME = "default";
public static final Duration DEFAULT_METRICS_STEP = Duration.ofMinutes(1);

static {
try {
Expand All @@ -52,66 +81,116 @@ public class SidecarDriver implements Driver {
}
}

private static final Map<String, AbstractRedisClient> redisClients = new HashMap<>();
private static final Map<String, MeterRegistry> meterRegistries = new HashMap<>();
private static final Map<String, ConfigUpdater> configUpdaters = new HashMap<>();

private final ObjectMapper mapper = new ObjectMapper();
private static AbstractRedisClient redisClient;
private static ConfigUpdater configUpdater;

@Override
public Connection connect(String url, Properties info) throws SQLException {
Config config;
try {
config = config(info);
} catch (IOException e) {
throw new SQLException("Could not load configuration", e);
}
if (isEmpty(config.getDriver().getClassName())) {
throw new SQLException("No backend driver class specified");
}
if (isEmpty(config.getDriver().getUrl())) {
throw new SQLException("No backend URL specified");
}
Connection backendConnection = backendConnection(info);
Matcher matcher = JDBC_URL_PATTERN.matcher(url);
if (!matcher.find()) {
throw new SQLException("Invalid connection URL: " + url);
}
String redisURI = matcher.group(1);
AbstractRedisClient client;
if (redisClients.containsKey(redisURI)) {
client = redisClients.get(redisURI);
} else {
client = config.getRedis().isCluster() ? RedisModulesClusterClient.create(redisURI)
: RedisModulesClient.create(redisURI);
redisClients.put(redisURI, client);
boolean cluster = Boolean.parseBoolean(info.getProperty(PROPERTY_CLUSTER));
String cacheName = info.getProperty(PROPERTY_CACHE_NAME, DEFAULT_CACHE_NAME);
String keyspace = key(info.getProperty(PROPERTY_KEYSPACE, DEFAULT_KEYSPACE), cacheName);
checkRedisClient(redisURI, cluster, keyspace);
RowSetFactory rowSetFactory = RowSetProvider.newFactory();
Config config = new Config();
try {
configUpdater.create(key(keyspace, "config"), config);
} catch (JsonProcessingException e) {
throw new SQLException("Could not initialize config object", e);
}
String configId = redisURI + ":" + config.getCacheName();
if (configUpdaters.containsKey(configId)) {
config = configUpdaters.get(configId).getConfig();
} else {
try {
ObjectWriter writer = mapper.writerFor(config.getClass());
StatefulRedisModulesConnection<String, String> connection = client instanceof RedisModulesClusterClient
? ((RedisModulesClusterClient) client).connect()
: ((RedisModulesClient) client).connect();
connection.sync().jsonSet(config.configKey(), "$", writer.writeValueAsString(config), SetMode.NX);
configUpdaters.put(configId, new ConfigUpdater(connection, mapper.readerForUpdating(config), config));
} catch (JsonProcessingException e) {
throw new SQLException("Could not initialize config updater", e);
}
ResultSetCache cache = cache(redisClient, keyspace, rowSetFactory, config);
return new SidecarConnection(backendConnection, keyspace, config, cache, rowSetFactory);
}

private static void checkRedisClient(String redisURI, boolean cluster, String keyspace) {
if (redisClient == null) {
redisClient = cluster ? RedisModulesClusterClient.create(redisURI) : RedisModulesClient.create(redisURI);
LettuceAssert.isTrue(configUpdater == null,
"Invalid state: Redis client is null but config updater is not");
configUpdater = new ConfigUpdater(redisClient instanceof RedisModulesClusterClient
? ((RedisModulesClusterClient) redisClient).connect()

: ((RedisModulesClient) redisClient).connect());
Metrics.addRegistry(new RedisTimeSeriesMeterRegistry(new RedisTimeSeriesConfig() {

@Override
public String get(String key) {
return null;
}

@Override
public String uri() {
return redisURI;
}

@Override
public boolean cluster() {
return cluster;
}

@Override
public String keyspace() {
return key(keyspace, "metrics");
}

@Override
public Duration step() {
return DEFAULT_METRICS_STEP;
}

}, Clock.SYSTEM, redisClient));
}
return new SidecarConnection(databaseConnection(config, info), client, config,
meterRegistry(redisURI, config, client));
}

private Connection databaseConnection(Config config, Properties info) throws SQLException {
private ResultSetCache cache(AbstractRedisClient client, String keyspace, RowSetFactory rowSetFactory,
Config config) {
ByteArrayResultSetCodec codec = new ByteArrayResultSetCodec(rowSetFactory, config.getBufferSize());
boolean cluster = client instanceof RedisClusterClient;
GenericObjectPool<StatefulConnection<String, ResultSet>> pool = ConnectionPoolSupport
.createGenericObjectPool(cluster ? () -> ((RedisClusterClient) client).connect(codec)
: () -> ((RedisClient) client).connect(codec), poolConfig(config));
return new StringResultSetCache(key(keyspace, "cache"), pool, sync());
}

private static <T> GenericObjectPoolConfig<T> poolConfig(Config config) {
Pool pool = config.getPool();
GenericObjectPoolConfig<T> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(pool.getMaxActive());
poolConfig.setMaxIdle(pool.getMaxIdle());
poolConfig.setMinIdle(pool.getMinIdle());
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(pool.getTimeBetweenEvictionRuns()));
poolConfig.setMaxWait(Duration.ofMillis(pool.getMaxWait()));
return poolConfig;
}

private static Function<StatefulConnection<String, ResultSet>, RedisStringCommands<String, ResultSet>> sync() {
if (redisClient instanceof RedisClusterClient) {
return c -> ((StatefulRedisClusterConnection<String, ResultSet>) c).sync();
}
return c -> ((StatefulRedisConnection<String, ResultSet>) c).sync();
}

private Connection backendConnection(Properties info) throws SQLException {
String driverClassName = info.getProperty(PROPERTY_DRIVER_CLASSNAME);
if (isEmpty(driverClassName)) {
throw new SQLException("No backend driver class specified");
}
String driverURL = info.getProperty(PROPERTY_DRIVER_URL);
if (isEmpty(driverURL)) {
throw new SQLException("No backend URL specified");
}
java.sql.Driver driver;
try {
driver = (java.sql.Driver) Class.forName(config.getDriver().getClassName()).getConstructor().newInstance();
driver = (java.sql.Driver) Class.forName(driverClassName).getConstructor().newInstance();
} catch (Exception e) {
throw new SQLException("Cannot initialize backend driver '" + config.getDriver().getClassName() + "'", e);
throw new SQLException("Cannot initialize backend driver '" + driverClassName + "'", e);
}
return driver.connect(config.getDriver().getUrl(), info);
return driver.connect(driverURL, info);
}

public static Config config(Properties info) throws IOException {
Expand All @@ -126,42 +205,6 @@ public static Config config(Properties info) throws IOException {
return propsMapper.readPropertiesAs(properties, propsSchema, Config.class);
}

private MeterRegistry meterRegistry(String redisURI, Config config, AbstractRedisClient client) {
if (meterRegistries.containsKey(redisURI)) {
return meterRegistries.get(redisURI);
}
MeterRegistry meterRegistry = new RedisTimeSeriesMeterRegistry(new RedisTimeSeriesConfig() {

@Override
public String get(String key) {
return null;
}

@Override
public String uri() {
return redisURI;
}

@Override
public boolean cluster() {
return config.getRedis().isCluster();
}

@Override
public String keyspace() {
return config.key();
}

@Override
public Duration step() {
return Duration.ofSeconds(config.getMetrics().getPublishInterval());
}

}, Clock.SYSTEM, client);
meterRegistries.put(redisURI, meterRegistry);
return meterRegistry;
}

private boolean isEmpty(String string) {
return string == null || string.isEmpty();
}
Expand Down Expand Up @@ -203,13 +246,18 @@ public Logger getParentLogger() {
return log;
}

public static void shutdown() {
configUpdaters.values().forEach(ConfigUpdater::close);
configUpdaters.clear();
meterRegistries.values().forEach(MeterRegistry::close);
meterRegistries.clear();
redisClients.values().forEach(AbstractRedisClient::shutdown);
redisClients.clear();
public static void shutdown() throws InterruptedException, ExecutionException {
if (configUpdater != null) {
configUpdater.close();
}
if (redisClient != null) {
redisClient.shutdown();
redisClient.getResources().shutdown().get();
}
}

public static String key(String keyspace, String id) {
return keyspace + KEY_SEPARATOR + id;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.redis.sidecar.config;

public class ByteSize {

private static final int KB = 1024;
private static final int MB = KB * KB;

private final int bytes;

private ByteSize(int bytes) {
this.bytes = bytes;
}

public int toBytes() {
return bytes;
}

public static ByteSize ofMB(int number) {
return new ByteSize(number * MB);
}

public static ByteSize ofKB(int number) {
return new ByteSize(number * KB);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.redis.sidecar.config;

import java.util.Arrays;
import java.util.List;

public class Config {

public static final ByteSize DEFAULT_BUFFER_SIZE = ByteSize.ofMB(100);
public static final long TTL_NO_CACHE = 0;
public static final long TTL_NO_EXPIRATION = -1;

private int bufferSize = DEFAULT_BUFFER_SIZE.toBytes();
private List<Rule> rules = Arrays.asList(Rule.builder().build());
private Pool pool = new Pool();

public Pool getPool() {
return pool;
}

public void setPool(Pool pool) {
this.pool = pool;
}

public int getBufferSize() {
return bufferSize;
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}

public List<Rule> getRules() {
return rules;
}

public void setRules(List<Rule> rules) {
this.rules = rules;
}

}
Loading

0 comments on commit b48bcc0

Please sign in to comment.