Skip to content

Commit

Permalink
libsql: add periodic background sync (#995)
Browse files Browse the repository at this point in the history
This adds a new periodic sync configuration item to the `Builder` that
will allow users to set a duration at which they want the background
sync to sync.
  • Loading branch information
LucioFranco authored Feb 7, 2024
1 parent a72c066 commit c18caa4
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 12 deletions.
50 changes: 50 additions & 0 deletions libsql-server/tests/embedded_replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,3 +663,53 @@ fn proxy_write_returning_row() {

sim.run().unwrap();
}

#[test]
fn periodic_sync() {
let mut sim = Builder::new().build();

let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();

make_primary(&mut sim, tmp_host_path.clone());

sim.client("client", async move {
let client = Client::new();
client
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
.await?;

let path = tmp_embedded_path.join("embedded");
let db = libsql::Builder::new_remote_replica(
path.to_str().unwrap(),
"http://foo.primary:8080".to_string(),
"".to_string(),
)
.connector(TurmoilConnector)
.periodic_sync(Duration::from_millis(100))
.build()
.await?;

let conn = db.connect()?;

conn.execute("create table test (x)", ()).await?;

conn.execute("insert into test values (12)", ())
.await
.unwrap();

tokio::time::sleep(Duration::from_millis(500)).await;

let mut rows = conn.query("select * from test", ()).await.unwrap();

let row = rows.next().await.unwrap().unwrap();

assert_eq!(row.get::<u64>(0).unwrap(), 12);

Ok(())
});

sim.run().unwrap();
}
7 changes: 6 additions & 1 deletion libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ cfg_replication! {
None,
read_your_writes,
encryption_key,
None
).await
}

Expand All @@ -249,6 +250,7 @@ cfg_replication! {
version: Option<String>,
read_your_writes: bool,
encryption_key: Option<bytes::Bytes>,
periodic_sync: Option<std::time::Duration>,
) -> Result<Database> {
let https = connector();

Expand All @@ -260,6 +262,7 @@ cfg_replication! {
version,
read_your_writes,
encryption_key,
periodic_sync
).await
}

Expand All @@ -272,6 +275,7 @@ cfg_replication! {
version: Option<String>,
read_your_writes: bool,
encryption_key: Option<bytes::Bytes>,
periodic_sync: Option<std::time::Duration>,
) -> Result<Database>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
Expand All @@ -294,7 +298,8 @@ cfg_replication! {
token.into(),
version,
read_your_writes,
encryption_key.clone()
encryption_key.clone(),
periodic_sync
).await?;

Ok(Database {
Expand Down
12 changes: 12 additions & 0 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl Builder<()> {
},
encryption_key: None,
read_your_writes: false,
periodic_sync: None
},
}
}
Expand Down Expand Up @@ -139,6 +140,7 @@ cfg_replication! {
remote: Remote,
encryption_key: Option<bytes::Bytes>,
read_your_writes: bool,
periodic_sync: Option<std::time::Duration>,
}

/// Local replica configuration type in [`Builder`].
Expand Down Expand Up @@ -178,6 +180,14 @@ cfg_replication! {
self
}

/// Set the duration at which the replicator will automatically call `sync` in the
/// background. The sync will continue for the duration that the resulted `Database`
/// type is alive for, once it is dropped the background task will get dropped and stop.
pub fn periodic_sync(mut self, duration: std::time::Duration) -> Builder<RemoteReplica> {
self.inner.periodic_sync = Some(duration);
self
}

#[doc(hidden)]
pub fn version(mut self, version: String) -> Builder<RemoteReplica> {
self.inner.remote = self.inner.remote.version(version);
Expand All @@ -197,6 +207,7 @@ cfg_replication! {
},
encryption_key,
read_your_writes,
periodic_sync
} = self.inner;

let connector = if let Some(connector) = connector {
Expand All @@ -222,6 +233,7 @@ cfg_replication! {
version,
read_your_writes,
encryption_key.clone(),
periodic_sync
)
.await?;

Expand Down
7 changes: 6 additions & 1 deletion libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl Database {
endpoint: String,
auth_token: String,
encryption_key: Option<bytes::Bytes>,
periodic_sync: Option<std::time::Duration>,
) -> Result<Database> {
Self::open_http_sync_internal(
connector,
Expand All @@ -61,6 +62,7 @@ impl Database {
None,
false,
encryption_key,
periodic_sync,
)
.await
}
Expand All @@ -75,6 +77,7 @@ impl Database {
version: Option<String>,
read_your_writes: bool,
encryption_key: Option<bytes::Bytes>,
periodic_sync: Option<std::time::Duration>,
) -> Result<Database> {
use std::path::PathBuf;

Expand All @@ -95,7 +98,9 @@ impl Database {
.await
.map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?;

let replicator = EmbeddedReplicator::with_remote(client, path, 1000, encryption_key).await;
let replicator =
EmbeddedReplicator::with_remote(client, path, 1000, encryption_key, periodic_sync)
.await;

db.replication_ctx = Some(ReplicationContext {
replicator,
Expand Down
80 changes: 70 additions & 10 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

pub use libsql_replication::frame::{Frame, FrameNo};
use libsql_replication::replicator::{Either, Replicator};
Expand All @@ -12,6 +13,8 @@ use libsql_replication::rpc::proxy::{
ProgramReq, Query, Step,
};
use tokio::sync::Mutex;
use tokio::task::AbortHandle;
use tracing::Instrument;

use crate::parser::Statement;
use crate::Result;
Expand Down Expand Up @@ -95,27 +98,76 @@ impl Writer {
#[derive(Clone)]
pub(crate) struct EmbeddedReplicator {
replicator: Arc<Mutex<Replicator<Either<RemoteClient, LocalClient>>>>,
bg_abort: Option<Arc<DropAbort>>,
}

impl EmbeddedReplicator {
pub async fn with_remote(client: RemoteClient, db_path: PathBuf, auto_checkpoint: u32, encryption_key: Option<bytes::Bytes>) -> Self {
pub async fn with_remote(
client: RemoteClient,
db_path: PathBuf,
auto_checkpoint: u32,
encryption_key: Option<bytes::Bytes>,
perodic_sync: Option<Duration>,
) -> Self {
let replicator = Arc::new(Mutex::new(
Replicator::new(Either::Left(client), db_path, auto_checkpoint, encryption_key)
.await
.unwrap(),
Replicator::new(
Either::Left(client),
db_path,
auto_checkpoint,
encryption_key,
)
.await
.unwrap(),
));

Self { replicator }
let mut replicator = Self {
replicator,
bg_abort: None,
};

if let Some(sync_duration) = perodic_sync {
let replicator2 = replicator.clone();

let jh = tokio::spawn(
async move {
loop {
if let Err(e) = replicator2.sync_oneshot().await {
tracing::error!("replicator sync error: {}", e);
}

tokio::time::sleep(sync_duration).await;
}
}
.instrument(tracing::info_span!("periodic_sync")),
);

replicator.bg_abort = Some(Arc::new(DropAbort(jh.abort_handle())));
}

replicator
}

pub async fn with_local(client: LocalClient, db_path: PathBuf, auto_checkpoint: u32, encryption_key: Option<bytes::Bytes>) -> Self {
pub async fn with_local(
client: LocalClient,
db_path: PathBuf,
auto_checkpoint: u32,
encryption_key: Option<bytes::Bytes>,
) -> Self {
let replicator = Arc::new(Mutex::new(
Replicator::new(Either::Right(client), db_path, auto_checkpoint, encryption_key)
.await
.unwrap(),
Replicator::new(
Either::Right(client),
db_path,
auto_checkpoint,
encryption_key,
)
.await
.unwrap(),
));

Self { replicator }
Self {
replicator,
bg_abort: None,
}
}

pub async fn sync_oneshot(&self) -> Result<Option<FrameNo>> {
Expand Down Expand Up @@ -202,3 +254,11 @@ impl EmbeddedReplicator {
.committed_frame_no()
}
}

struct DropAbort(AbortHandle);

impl Drop for DropAbort {
fn drop(&mut self) {
self.0.abort();
}
}

0 comments on commit c18caa4

Please sign in to comment.