diff --git a/libsql-server/tests/embedded_replica/mod.rs b/libsql-server/tests/embedded_replica/mod.rs index 3ca71ff2802..b20703e07df 100644 --- a/libsql-server/tests/embedded_replica/mod.rs +++ b/libsql-server/tests/embedded_replica/mod.rs @@ -81,6 +81,7 @@ fn embedded_replica() { "http://foo.primary:8080", "", TurmoilConnector, + false, ) .await?; @@ -150,6 +151,7 @@ fn execute_batch() { "http://foo.primary:8080", "", TurmoilConnector, + false, ) .await?; @@ -256,6 +258,7 @@ fn replica_primary_reset() { "http://primary:8080", "", TurmoilConnector, + false, ) .await .unwrap(); @@ -311,6 +314,7 @@ fn replica_primary_reset() { "http://primary:8080", "", TurmoilConnector, + false, ) .await .unwrap(); @@ -412,6 +416,7 @@ fn replica_no_resync_on_restart() { "http://primary:8080", "", TurmoilConnector, + false, ) .await .unwrap(); @@ -426,6 +431,7 @@ fn replica_no_resync_on_restart() { "http://primary:8080", "", TurmoilConnector, + false, ) .await .unwrap(); @@ -498,6 +504,7 @@ fn replicate_with_snapshots() { "http://primary:8080", "", TurmoilConnector, + false, ) .await .unwrap(); @@ -523,3 +530,45 @@ fn replicate_with_snapshots() { sim.run().unwrap(); } + +#[test] +fn read_your_writes() { + let mut sim = Builder::new().build(); + + let tmp_embedded = tempdir().unwrap(); + let tmp_host = tempdir().unwrap(); + let tmp_embedded_path = tmp_embedded.path().to_owned(); + let tmp_host_path = tmp_host.path().to_owned(); + + make_primary(&mut sim, tmp_host_path.clone()); + + sim.client("client", async move { + let client = Client::new(); + client + .post("http://primary:9090/v1/namespaces/foo/create", json!({})) + .await?; + + let path = tmp_embedded_path.join("embedded"); + let db = Database::open_with_remote_sync_connector( + path.to_str().unwrap(), + "http://foo.primary:8080", + "", + TurmoilConnector, + true, + ) + .await?; + + let conn = db.connect()?; + + conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) + .await?; + + conn.execute("INSERT INTO user(id) VALUES (1)", ()) + .await + .unwrap(); + + Ok(()) + }); + + sim.run().unwrap(); +} diff --git a/libsql/src/database.rs b/libsql/src/database.rs index 0bcb47aaf0c..2148b7a3c6e 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -109,7 +109,24 @@ cfg_replication! { http.enforce_http(false); http.set_nodelay(true); - Self::open_with_remote_sync_connector(db_path, url, token, http).await + Self::open_with_remote_sync_connector(db_path, url, token, http, false).await + } + + /// Open a local database file with the ability to sync from a remote database + /// in consistent mode. + /// + /// Consistent mode means that when a write happens it will not complete until + /// that write is visible in the local db. + pub async fn open_with_remote_sync_consistent( + db_path: impl Into, + url: impl Into, + token: impl Into, + ) -> Result { + let mut http = hyper::client::HttpConnector::new(); + http.enforce_http(false); + http.set_nodelay(true); + + Self::open_with_remote_sync_connector(db_path, url, token, http, true).await } #[doc(hidden)] @@ -117,13 +134,21 @@ cfg_replication! { db_path: impl Into, url: impl Into, token: impl Into, - version: Option + version: Option, + read_your_writes: bool, ) -> Result { let mut http = hyper::client::HttpConnector::new(); http.enforce_http(false); http.set_nodelay(true); - Self::open_with_remote_sync_connector_internal(db_path, url, token, http, version).await + Self::open_with_remote_sync_connector_internal( + db_path, + url, + token, + http, + version, + read_your_writes + ).await } #[doc(hidden)] @@ -132,6 +157,7 @@ cfg_replication! { url: impl Into, token: impl Into, connector: C, + read_your_writes: bool, ) -> Result where C: tower::Service + Send + Clone + Sync + 'static, @@ -139,7 +165,14 @@ cfg_replication! { C::Future: Send + 'static, C::Error: Into>, { - Self::open_with_remote_sync_connector_internal(db_path, url, token, connector, None).await + Self::open_with_remote_sync_connector_internal( + db_path, + url, + token, + connector, + None, + read_your_writes + ).await } #[doc(hidden)] @@ -148,7 +181,8 @@ cfg_replication! { url: impl Into, token: impl Into, connector: C, - version: Option + version: Option, + read_your_writes: bool, ) -> Result where C: tower::Service + Send + Clone + Sync + 'static, @@ -169,7 +203,8 @@ cfg_replication! { db_path.into(), url.into(), token.into(), - version + version, + read_your_writes ).await?; Ok(Database { diff --git a/libsql/src/local/connection.rs b/libsql/src/local/connection.rs index 054cb3c0ec5..e8a73399415 100644 --- a/libsql/src/local/connection.rs +++ b/libsql/src/local/connection.rs @@ -7,10 +7,10 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction}; use crate::TransactionBehavior; use libsql_sys::ffi; -use std::{ffi::c_int, sync::Arc}; +use std::{ffi::c_int, fmt, sync::Arc}; /// A connection to a libSQL database. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct Connection { pub(crate) raw: *mut ffi::sqlite3, @@ -227,3 +227,9 @@ impl Connection { self.writer.as_ref() } } + +impl fmt::Debug for Connection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Connection").finish() + } +} diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index e57777fed38..f65651d9c61 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -1,27 +1,25 @@ use std::sync::Once; cfg_replication!( - use tokio::sync::Mutex; - use libsql_replication::replicator::Replicator; use libsql_replication::frame::FrameNo; - use libsql_replication::replicator::Either; use crate::replication::client::Client; use crate::replication::local_client::LocalClient; use crate::replication::remote_client::RemoteClient; + use crate::replication::EmbeddedReplicator; pub use crate::replication::Frames; + + pub struct ReplicationContext { + pub(crate) replicator: EmbeddedReplicator, + client: Option, + read_your_writes: bool, + } ); use crate::{database::OpenFlags, local::connection::Connection}; use crate::{Error::ConnectionFailed, Result}; use libsql_sys::ffi; -#[cfg(feature = "replication")] -pub struct ReplicationContext { - pub replicator: Mutex>>, - client: Option, -} - // A libSQL database. pub struct Database { pub db_path: String, @@ -54,34 +52,7 @@ impl Database { endpoint: String, auth_token: String, ) -> Result { - use std::path::PathBuf; - - use crate::util::coerce_url_scheme; - - let mut db = Database::open(&db_path, OpenFlags::default())?; - - let endpoint = coerce_url_scheme(&endpoint); - let remote = crate::replication::client::Client::new( - connector, - endpoint.as_str().try_into().unwrap(), - auth_token, - None, - ) - .unwrap(); - let path = PathBuf::from(db_path); - let client = RemoteClient::new(remote.clone(), &path).await.unwrap(); - let replicator = Mutex::new( - Replicator::new(Either::Left(client), path, 1000) - .await - .unwrap(), - ); - - db.replication_ctx = Some(ReplicationContext { - replicator, - client: Some(remote), - }); - - Ok(db) + Self::open_http_sync_internal(connector, db_path, endpoint, auth_token, None, false).await } #[cfg(feature = "replication")] @@ -92,6 +63,7 @@ impl Database { endpoint: String, auth_token: String, version: Option, + read_your_writes: bool, ) -> Result { use std::path::PathBuf; @@ -108,16 +80,16 @@ impl Database { ) .unwrap(); let path = PathBuf::from(db_path); - let client = RemoteClient::new(remote.clone(), &path).await.map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?; - let replicator = Mutex::new( - Replicator::new(Either::Left(client), path, 1000) + let client = RemoteClient::new(remote.clone(), &path) .await - .map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))? - ); + .map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?; + + let replicator = EmbeddedReplicator::with_remote(client, path, 1000).await; db.replication_ctx = Some(ReplicationContext { replicator, client: Some(remote), + read_your_writes, }); Ok(db) @@ -132,14 +104,13 @@ impl Database { let path = PathBuf::from(db_path); let client = LocalClient::new(&path).await.unwrap(); - let replicator = Mutex::new( - Replicator::new(Either::Right(client), path, 1000) - .await - .map_err(|e| ConnectionFailed(format!("{e}")))?, - ); + + let replicator = EmbeddedReplicator::with_local(client, path, 1000).await; + db.replication_ctx = Some(ReplicationContext { replicator, client: None, + read_your_writes: false, }); Ok(db) @@ -184,13 +155,19 @@ impl Database { #[cfg(feature = "replication")] pub fn writer(&self) -> Result> { use crate::replication::Writer; -if let Some(ReplicationContext { + if let Some(ReplicationContext { client: Some(ref client), - .. + replicator, + read_your_writes, }) = &self.replication_ctx { Ok(Some(Writer { client: client.clone(), + replicator: if *read_your_writes { + Some(replicator.clone()) + } else { + None + }, })) } else { Ok(None) @@ -201,43 +178,8 @@ if let Some(ReplicationContext { /// Perform a sync step, returning the new replication index, or None, if the nothing was /// replicated yet pub async fn sync_oneshot(&self) -> Result> { - use libsql_replication::replicator::ReplicatorClient; - if let Some(ref ctx) = self.replication_ctx { - let mut replicator = ctx.replicator.lock().await; - if !matches!(replicator.client_mut(), Either::Left(_)) { - return Err(crate::errors::Error::Misuse( - "Trying to replicate from HTTP, but this is a local replicator".into(), - )); - } - - // we force a handshake to get the most up to date replication index from the primary. - replicator.force_handshake(); - - loop { - match replicator - .replicate() - .await { - Err(libsql_replication::replicator::Error::Meta(libsql_replication::meta::Error::LogIncompatible)) => { - // The meta must have been marked as dirty, replicate again from scratch - // this time. - tracing::debug!("re-replicating database after LogIncompatible error"); - replicator.replicate().await.map_err(|e| crate::Error::Replication(e.into()))?; - } - Err(e) => return Err(crate::Error::Replication(e.into())), - Ok(_) => { - let Either::Left(client) = replicator.client_mut() else { unreachable!() }; - let Some(primary_index) = client.last_handshake_replication_index() else { return Ok(None) }; - if let Some(replica_index) = replicator.client_mut().committed_frame_no() { - if replica_index >= primary_index { - break - } - } - }, - } - } - - Ok(replicator.client_mut().committed_frame_no()) + ctx.replicator.sync_oneshot().await } else { Err(crate::errors::Error::Misuse( "No replicator available. Use Database::with_replicator() to enable replication" @@ -265,26 +207,8 @@ if let Some(ReplicationContext { #[cfg(feature = "replication")] pub async fn sync_frames(&self, frames: Frames) -> Result> { - use libsql_replication::replicator::ReplicatorClient; - if let Some(ref ctx) = self.replication_ctx { - let mut replicator = ctx.replicator.lock().await; - match replicator.client_mut() { - Either::Right(c) => { - c.load_frames(frames); - } - Either::Left(_) => { - return Err(crate::errors::Error::Misuse( - "Trying to call sync_frames with an HTTP replicator".into(), - )) - } - } - replicator - .replicate() - .await - .map_err(|e| crate::Error::Replication(e.into()))?; - - Ok(replicator.client_mut().committed_frame_no()) + ctx.replicator.sync_frames(frames).await } else { Err(crate::errors::Error::Misuse( "No replicator available. Use Database::with_replicator() to enable replication" @@ -295,15 +219,8 @@ if let Some(ReplicationContext { #[cfg(feature = "replication")] pub async fn flush_replicator(&self) -> Result> { - use libsql_replication::replicator::ReplicatorClient; - if let Some(ref ctx) = self.replication_ctx { - let mut replicator = ctx.replicator.lock().await; - replicator - .flush() - .await - .map_err(|e| crate::Error::Replication(e.into()))?; - Ok(replicator.client_mut().committed_frame_no()) + ctx.replicator.flush().await } else { Err(crate::errors::Error::Misuse( "No replicator available. Use Database::with_replicator() to enable replication" @@ -314,10 +231,8 @@ if let Some(ReplicationContext { #[cfg(feature = "replication")] pub async fn replication_index(&self) -> Result> { - use libsql_replication::replicator::ReplicatorClient; - if let Some(ref ctx) = self.replication_ctx { - Ok(ctx.replicator.lock().await.client_mut().committed_frame_no()) + Ok(ctx.replicator.committed_frame_no().await) } else { Err(crate::errors::Error::Misuse( "No replicator available. Use Database::with_replicator() to enable replication" diff --git a/libsql/src/replication/connection.rs b/libsql/src/replication/connection.rs index cb086758b17..198fb6a6165 100644 --- a/libsql/src/replication/connection.rs +++ b/libsql/src/replication/connection.rs @@ -191,6 +191,10 @@ impl RemoteConnection { .into(); } + if let Some(replicator) = writer.replicator() { + replicator.sync_oneshot().await?; + } + Ok(res) } diff --git a/libsql/src/replication/mod.rs b/libsql/src/replication/mod.rs index 13362c6fc03..aa17b36934e 100644 --- a/libsql/src/replication/mod.rs +++ b/libsql/src/replication/mod.rs @@ -1,14 +1,25 @@ -use libsql_replication::frame::Frame; +use std::path::PathBuf; +use std::sync::Arc; + +use libsql_replication::frame::{Frame, FrameNo}; +use libsql_replication::replicator::{Either, Replicator}; use libsql_replication::rpc::proxy::{ query::Params, DescribeRequest, DescribeResult, ExecuteResults, Positional, Program, ProgramReq, Query, Step, }; use libsql_replication::snapshot::SnapshotFile; +use tokio::sync::Mutex; use crate::parser::Statement; +use crate::Result; + +use libsql_replication::replicator::ReplicatorClient; pub use connection::RemoteConnection; +use self::local_client::LocalClient; +use self::remote_client::RemoteClient; + pub(crate) mod client; mod connection; pub(crate) mod local_client; @@ -22,9 +33,10 @@ pub enum Frames { Snapshot(SnapshotFile), } -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Writer { pub(crate) client: client::Client, + pub(crate) replicator: Option, } impl Writer { @@ -70,4 +82,119 @@ impl Writer { }) .await } + + pub(crate) fn replicator(&self) -> Option<&EmbeddedReplicator> { + self.replicator.as_ref() + } +} + +#[derive(Clone)] +pub(crate) struct EmbeddedReplicator { + replicator: Arc>>>, +} + +impl EmbeddedReplicator { + pub async fn with_remote(client: RemoteClient, db_path: PathBuf, auto_checkpoint: u32) -> Self { + let replicator = Arc::new(Mutex::new( + Replicator::new(Either::Left(client), db_path, auto_checkpoint) + .await + .unwrap(), + )); + + Self { replicator } + } + + pub async fn with_local(client: LocalClient, db_path: PathBuf, auto_checkpoint: u32) -> Self { + let replicator = Arc::new(Mutex::new( + Replicator::new(Either::Right(client), db_path, auto_checkpoint) + .await + .unwrap(), + )); + + Self { replicator } + } + + pub async fn sync_oneshot(&self) -> Result> { + use libsql_replication::replicator::ReplicatorClient; + + let mut replicator = self.replicator.lock().await; + if !matches!(replicator.client_mut(), Either::Left(_)) { + return Err(crate::errors::Error::Misuse( + "Trying to replicate from HTTP, but this is a local replicator".into(), + )); + } + + // we force a handshake to get the most up to date replication index from the primary. + replicator.force_handshake(); + + loop { + match replicator.replicate().await { + Err(libsql_replication::replicator::Error::Meta( + libsql_replication::meta::Error::LogIncompatible, + )) => { + // The meta must have been marked as dirty, replicate again from scratch + // this time. + tracing::debug!("re-replicating database after LogIncompatible error"); + replicator + .replicate() + .await + .map_err(|e| crate::Error::Replication(e.into()))?; + } + Err(e) => return Err(crate::Error::Replication(e.into())), + Ok(_) => { + let Either::Left(client) = replicator.client_mut() else { + unreachable!() + }; + let Some(primary_index) = client.last_handshake_replication_index() else { + return Ok(None); + }; + if let Some(replica_index) = replicator.client_mut().committed_frame_no() { + if replica_index >= primary_index { + break; + } + } + } + } + } + + Ok(replicator.client_mut().committed_frame_no()) + } + + pub async fn sync_frames(&self, frames: Frames) -> Result> { + let mut replicator = self.replicator.lock().await; + + match replicator.client_mut() { + Either::Right(c) => { + c.load_frames(frames); + } + Either::Left(_) => { + return Err(crate::errors::Error::Misuse( + "Trying to call sync_frames with an HTTP replicator".into(), + )) + } + } + replicator + .replicate() + .await + .map_err(|e| crate::Error::Replication(e.into()))?; + + Ok(replicator.client_mut().committed_frame_no()) + } + + pub async fn flush(&self) -> Result> { + let mut replicator = self.replicator.lock().await; + replicator + .flush() + .await + .map_err(|e| crate::Error::Replication(e.into()))?; + Ok(replicator.client_mut().committed_frame_no()) + } + + pub async fn committed_frame_no(&self) -> Option { + self.replicator + .lock() + .await + .client_mut() + .committed_frame_no() + } }