Skip to content

Commit

Permalink
feat(web-keyvalue): add Redis support
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <[email protected]>
  • Loading branch information
rvolosatovs committed Nov 6, 2024
1 parent f5c0b4e commit df09a16
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/web/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ wrpc-transport-quic = { workspace = true }
wrpc-transport-web = { workspace = true }
wrpc-wasi-keyvalue = { workspace = true }
wrpc-wasi-keyvalue-mem = { workspace = true }
wrpc-wasi-keyvalue-redis = { workspace = true }
wtransport = { workspace = true, features = ["self-signed"] }
29 changes: 23 additions & 6 deletions examples/web/rust/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
} else if (conn === 'nats') {
let addr = obj['nats-addr'];
if (addr == null || addr === '') {
alert("NATS.io server URL must be set");
alert("NATS.io server address must be set");
return
}
identifier = 'wrpc+nats://' + addr;
Expand All @@ -46,6 +46,13 @@
if (bucket != null && bucket != '') {
identifier = identifier + ";" + bucket;
}
} else if (conn === 'redis') {
let url = obj['redis-url'];
if (url == null || url === '') {
alert("Redis URL must be set");
return
}
identifier = url;
} else if (conn === 'quic') {
let addr = obj.addr;
if (addr == null || addr === '') {
Expand Down Expand Up @@ -318,11 +325,12 @@ <h1 class="title">wRPC WebTransport demo</h1>
<div class="select">
<select id="connection" form="settings" name="connection">
<option value="mem">In-memory</option>
<option value="nats">NATS.io</option>
<option value="quic">QUIC</option>
<option value="tcp">TCP</option>
<option value="uds">Unix domain sockets</option>
<option value="web">WebTransport</option>
<option value="redis">Redis</option>
<option value="nats">wRPC/NATS.io</option>
<option value="quic">wRPC/QUIC</option>
<option value="tcp">wRPC/TCP</option>
<option value="uds">wRPC/Unix domain sockets</option>
<option value="web">wRPC/WebTransport</option>
</select>
</div>
</div>
Expand Down Expand Up @@ -351,6 +359,15 @@ <h1 class="title">wRPC WebTransport demo</h1>
<template class="form-fields" data-option="mem">
</template>

<template class="form-fields" data-option="redis">
<div class="field">
<label class="label">Redis server URL</label>
<div class="control">
<input class="input" type="text" name="redis-url" placeholder="redis://localhost:6379" value="redis://localhost:6379">
</div>
</div>
</template>

<template class="form-fields" data-option="nats">
<div class="field">
<label class="label">Bucket identifier</label>
Expand Down
53 changes: 30 additions & 23 deletions examples/web/rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ServerCertVerifier for Insecure {
#[derive(Clone, Debug)]
enum Bucket {
Mem(ResourceOwn<store::Bucket>),
Redis(ResourceOwn<store::Bucket>),
Nats(
ResourceOwn<wrpc_wasi_keyvalue::wasi::keyvalue::store::Bucket>,
wrpc_transport_nats::Client,
Expand All @@ -101,10 +102,11 @@ enum Bucket {
),
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
struct Handler {
buckets: Arc<RwLock<HashMap<Bytes, Bucket>>>,
mem: wrpc_wasi_keyvalue_mem::Handler,
redis: wrpc_wasi_keyvalue_redis::Handler,
}

impl Handler {
Expand Down Expand Up @@ -144,7 +146,7 @@ fn client_tls_config() -> rustls::ClientConfig {
}

impl<C: Send + Sync> store::Handler<C> for Handler {
#[instrument(level = "trace", skip(cx), ret(level = "trace"))]
#[instrument(level = "trace", skip(self, cx), ret(level = "trace"))]
async fn open(
&self,
cx: C,
Expand All @@ -161,12 +163,18 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
buckets.insert(id.clone(), Bucket::Mem(bucket));
return Ok(Ok(ResourceOwn::from(id)));
}
let (url, identifier) = identifier.split_once(';').unwrap_or((&identifier, ""));
let (url, suffix) = identifier.split_once(';').unwrap_or((&identifier, ""));
let mut url = match Url::parse(url) {
Ok(url) => url,
Err(err) => return Ok(Err(store::Error::Other(err.to_string()))),
};
let bucket = match url.scheme() {
"redis" | "rediss" | "redis+sentinel" | "rediss+sentinel" => {
match self.redis.open(cx, identifier).await? {
Ok(bucket) => Bucket::Redis(bucket),
Err(err) => return Ok(Err(err)),
}
}
"wrpc+nats" => {
let nats = match async_nats::connect_with_options(
url.authority(),
Expand All @@ -188,9 +196,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
Ok(wrpc) => wrpc,
Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
};
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, None, &identifier)
.await?
{
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, None, &suffix).await? {
Ok(bucket) => Bucket::Nats(bucket, wrpc),
Err(err) => return Ok(Err(err.into())),
}
Expand Down Expand Up @@ -225,9 +231,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
};
let wrpc = wrpc_transport_quic::Client::from(conn);
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &identifier)
.await?
{
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &suffix).await? {
Ok(bucket) => Bucket::Quic(bucket, wrpc),
Err(err) => return Ok(Err(err.into())),
}
Expand All @@ -238,9 +242,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
Err(err) => return Ok(Err(err)),
};
let wrpc = wrpc_transport::frame::tcp::Client::from(addr);
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &identifier)
.await?
{
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &suffix).await? {
Ok(bucket) => Bucket::Tcp(bucket, wrpc),
Err(err) => return Ok(Err(err.into())),
}
Expand All @@ -256,9 +258,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
)));
};
let wrpc = wrpc_transport::frame::unix::Client::from(path);
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &identifier)
.await?
{
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &suffix).await? {
Ok(bucket) => Bucket::Unix(bucket, wrpc),
Err(err) => return Ok(Err(err.into())),
}
Expand Down Expand Up @@ -288,9 +288,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
Err(err) => return Ok(Err(store::Error::Other(format!("{err:#}")))),
};
let wrpc = wrpc_transport_web::Client::from(conn);
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &identifier)
.await?
{
match wrpc_wasi_keyvalue::wasi::keyvalue::store::open(&wrpc, (), &suffix).await? {
Ok(bucket) => Bucket::Web(bucket, wrpc),
Err(err) => return Ok(Err(err.into())),
}
Expand All @@ -310,7 +308,7 @@ impl<C: Send + Sync> store::Handler<C> for Handler {
}

impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
#[instrument(level = "trace", skip(cx), ret(level = "trace"))]
#[instrument(level = "trace", skip(self, cx), ret(level = "trace"))]
async fn get(
&self,
cx: C,
Expand All @@ -319,6 +317,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
) -> anyhow::Result<Result<Option<Bytes>>> {
let res = match self.bucket(bucket).await? {
Bucket::Mem(bucket) => return self.mem.get(cx, bucket.as_borrow(), key).await,
Bucket::Redis(bucket) => return self.redis.get(cx, bucket.as_borrow(), key).await,
Bucket::Nats(bucket, wrpc) => {
wrpc_wasi_keyvalue::wasi::keyvalue::store::Bucket::get(
&wrpc,
Expand Down Expand Up @@ -372,7 +371,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
}
}

#[instrument(level = "trace", skip(cx), ret(level = "trace"))]
#[instrument(level = "trace", skip(self, cx), ret(level = "trace"))]
async fn set(
&self,
cx: C,
Expand All @@ -382,6 +381,9 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
) -> anyhow::Result<Result<()>> {
let res = match self.bucket(bucket).await? {
Bucket::Mem(bucket) => return self.mem.set(cx, bucket.as_borrow(), key, value).await,
Bucket::Redis(bucket) => {
return self.redis.set(cx, bucket.as_borrow(), key, value).await
}
Bucket::Nats(bucket, wrpc) => {
wrpc_wasi_keyvalue::wasi::keyvalue::store::Bucket::set(
&wrpc,
Expand Down Expand Up @@ -440,7 +442,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
}
}

#[instrument(level = "trace", skip(cx), ret(level = "trace"))]
#[instrument(level = "trace", skip(self, cx), ret(level = "trace"))]
async fn delete(
&self,
cx: C,
Expand All @@ -449,6 +451,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
) -> anyhow::Result<Result<()>> {
let res = match self.bucket(bucket).await? {
Bucket::Mem(bucket) => return self.mem.delete(cx, bucket.as_borrow(), key).await,
Bucket::Redis(bucket) => return self.redis.delete(cx, bucket.as_borrow(), key).await,
Bucket::Nats(bucket, wrpc) => {
wrpc_wasi_keyvalue::wasi::keyvalue::store::Bucket::delete(
&wrpc,
Expand Down Expand Up @@ -502,7 +505,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
}
}

#[instrument(level = "trace", skip(cx), ret(level = "trace"))]
#[instrument(level = "trace", skip(self, cx), ret(level = "trace"))]
async fn exists(
&self,
cx: C,
Expand All @@ -511,6 +514,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
) -> anyhow::Result<Result<bool>> {
let res = match self.bucket(bucket).await? {
Bucket::Mem(bucket) => return self.mem.exists(cx, bucket.as_borrow(), key).await,
Bucket::Redis(bucket) => return self.redis.exists(cx, bucket.as_borrow(), key).await,
Bucket::Nats(bucket, wrpc) => {
wrpc_wasi_keyvalue::wasi::keyvalue::store::Bucket::exists(
&wrpc,
Expand Down Expand Up @@ -564,7 +568,7 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
}
}

#[instrument(level = "trace", skip(cx), ret(level = "trace"))]
#[instrument(level = "trace", skip(self, cx), ret(level = "trace"))]
async fn list_keys(
&self,
cx: C,
Expand All @@ -573,6 +577,9 @@ impl<C: Send + Sync> store::HandlerBucket<C> for Handler {
) -> anyhow::Result<Result<store::KeyResponse>> {
let res = match self.bucket(bucket).await? {
Bucket::Mem(bucket) => return self.mem.list_keys(cx, bucket.as_borrow(), cursor).await,
Bucket::Redis(bucket) => {
return self.redis.list_keys(cx, bucket.as_borrow(), cursor).await
}
Bucket::Nats(bucket, wrpc) => {
wrpc_wasi_keyvalue::wasi::keyvalue::store::Bucket::list_keys(
&wrpc,
Expand Down

0 comments on commit df09a16

Please sign in to comment.