Skip to content

Commit

Permalink
feat: Redis Publish Subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdubey521 committed Jun 19, 2024
1 parent 8e13c1f commit 134dfe2
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 20 deletions.
25 changes: 16 additions & 9 deletions crates/routing_engine/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<

Ok::<
estimator::DataPoint<f64, f64>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>,
>(estimator::DataPoint {
x: input_value_in_usd,
y: fee_in_usd,
Expand All @@ -102,7 +102,7 @@ impl<
.collect::<Vec<
Result<
estimator::DataPoint<f64, f64>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>,
>,
>>()
.await
Expand All @@ -125,7 +125,7 @@ impl<
>(
&mut self,
values: Vec<(&&BucketConfig, &Estimator)>,
) -> Result<(), IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>> {
) -> Result<(), IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>> {
let values_transformed = values
.iter()
.map(|(k, v)| {
Expand All @@ -150,7 +150,7 @@ impl<
&mut self,
) -> Result<
HashMap<&'config BucketConfig, Estimator>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore>,
IndexerErrors<TokenPriceProvider, RouteSource, ModelStore, Producer>,
> {
// Build Estimators
let estimator_map: HashMap<&BucketConfig, Estimator> =
Expand Down Expand Up @@ -189,6 +189,7 @@ enum IndexerErrors<
T: token_price::TokenPriceProvider,
S: source::RouteSource,
R: storage::RoutingModelStore,
U: storage::MessageQueue,
> {
#[display(fmt = "Route build error: {}", _0)]
RouteBuildError(RouteError),
Expand All @@ -206,15 +207,15 @@ enum IndexerErrors<
PublishEstimatorErrors(Vec<R::Error>),

#[display(fmt = "Indexer update message error: {}", _0)]
PublishIndexerUpdateMessageError(String),
PublishIndexerUpdateMessageError(U::Error),
}

#[cfg(test)]
mod tests {
use std::env;

use config::Config;
use storage::{MessageQueue, RoutingModelStore};
use storage::{ControlFlow, MessageQueue, Msg, RoutingModelStore};

use crate::CostType;
use crate::estimator::{Estimator, LinearRegressionEstimator};
Expand Down Expand Up @@ -246,12 +247,18 @@ mod tests {

struct ProducerStub;
impl MessageQueue for ProducerStub {
async fn publish(&mut self, topic: &str, message: &str) -> Result<(), String> {
type Error = ();

async fn publish(&mut self, topic: &str, message: &str) -> Result<(), ()> {
Ok(())
}

async fn subscribe(&mut self, topic: &str) -> Result<String, String> {
Ok("Subscribed".to_string())
fn subscribe<U>(
&mut self,
topic: &str,
callback: impl FnMut(Msg) -> ControlFlow<U>,
) -> Result<(), Self::Error> {
Ok(())
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/routing_engine/src/source/bungee/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ impl RouteSource for BungeeClient {
.iter()
.map(|route| match estimation_type {
CostType::Fee => Some(
route.total_gas_fees_in_usd + route.output_value_in_usd?
- route.input_value_in_usd?,
route.total_gas_fees_in_usd + route.input_value_in_usd?
- route.output_value_in_usd?,
),
_ => None,
})
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"
config = { path = "../config" }
redis = { version = "0.25.4", features = ["aio", "tokio-comp"] }
thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["macros"] }
tokio = { version = "1.38.0", features = ["macros", "rt"] }
12 changes: 10 additions & 2 deletions crates/storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt::Debug;

pub use ::redis::{ControlFlow, Msg};

mod redis;

pub trait RoutingModelStore {
Expand All @@ -15,7 +17,13 @@ pub trait RoutingModelStore {
}

pub trait MessageQueue {
async fn publish(&mut self, topic: &str, message: &str) -> Result<(), String>;
type Error: Debug;

async fn publish(&mut self, topic: &str, message: &str) -> Result<(), Self::Error>;

async fn subscribe(&mut self, topic: &str) -> Result<String, String>;
fn subscribe<U>(
&mut self,
topic: &str,
callback: impl FnMut(Msg) -> ControlFlow<U>,
) -> Result<(), Self::Error>;
}
49 changes: 43 additions & 6 deletions crates/storage/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use redis;
use redis::{aio, AsyncCommands};
use redis::{aio, AsyncCommands, Commands, ControlFlow, Msg, PubSubCommands};
use redis::RedisError;
use thiserror::Error;

Expand Down Expand Up @@ -41,12 +41,20 @@ impl RoutingModelStore for RedisClient {
}

impl MessageQueue for RedisClient {
async fn publish(&mut self, topic: &str, message: &str) -> Result<(), String> {
todo!()
type Error = RedisClientError;

async fn publish(&mut self, topic: &str, message: &str) -> Result<(), Self::Error> {
self.connection.publish(topic, message).await.map_err(RedisClientError::RedisLibraryError)
}

async fn subscribe(&mut self, topic: &str) -> Result<String, String> {
todo!()
fn subscribe<U>(
&mut self,
topic: &str,
callback: impl FnMut(Msg) -> ControlFlow<U>,
) -> Result<(), Self::Error> {
let mut connection = self.client.get_connection()?;
connection.subscribe(topic, callback)?;
Ok(())
}
}

Expand All @@ -58,6 +66,8 @@ pub enum RedisClientError {

#[cfg(test)]
mod tests {
use std::sync::mpsc::channel;

use tokio;

use super::*;
Expand All @@ -68,7 +78,7 @@ mod tests {
}

#[tokio::test]
async fn test_redis_client() {
async fn test_key_store() {
let mut client = setup().await;

let keys = vec!["test_key1".to_string(), "test_key2".to_string()];
Expand Down Expand Up @@ -100,4 +110,31 @@ mod tests {
let values = client.get_multiple(&keys).await.unwrap();
assert_eq!(values, vec!["test_value1".to_string(), "test_value2".to_string()]);
}

#[tokio::test]
async fn test_pub_sub() {
let (tx, mut rx) = channel::<String>();
let mut client = setup().await;

let handle = tokio::task::spawn_blocking(move || {
client
.subscribe("TOPIC", |msg: Msg| -> ControlFlow<String> {
let message = msg.get_payload().unwrap();
tx.send(message).expect("Sending message failed");

ControlFlow::Break("DONE".to_string())
})
.unwrap();
});

let mut client = setup().await;
client.publish("TOPIC", "HELLO").await.unwrap();

loop {
if let Ok(data) = rx.recv() {
assert_eq!(data, "HELLO".to_string());
break;
}
}
}
}

0 comments on commit 134dfe2

Please sign in to comment.