Skip to content

Commit

Permalink
libsql: add read your writes support
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Dec 14, 2023
1 parent 47ec93e commit 53db85d
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 125 deletions.
49 changes: 49 additions & 0 deletions libsql-server/tests/embedded_replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn embedded_replica() {
"http://foo.primary:8080",
"",
TurmoilConnector,
false,
)
.await?;

Expand Down Expand Up @@ -150,6 +151,7 @@ fn execute_batch() {
"http://foo.primary:8080",
"",
TurmoilConnector,
false,
)
.await?;

Expand Down Expand Up @@ -256,6 +258,7 @@ fn replica_primary_reset() {
"http://primary:8080",
"",
TurmoilConnector,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -311,6 +314,7 @@ fn replica_primary_reset() {
"http://primary:8080",
"",
TurmoilConnector,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -412,6 +416,7 @@ fn replica_no_resync_on_restart() {
"http://primary:8080",
"",
TurmoilConnector,
false,
)
.await
.unwrap();
Expand All @@ -426,6 +431,7 @@ fn replica_no_resync_on_restart() {
"http://primary:8080",
"",
TurmoilConnector,
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -498,6 +504,7 @@ fn replicate_with_snapshots() {
"http://primary:8080",
"",
TurmoilConnector,
false,
)
.await
.unwrap();
Expand All @@ -523,3 +530,45 @@ fn replicate_with_snapshots() {

sim.run().unwrap();
}

#[test]
fn read_your_writes() {
let mut sim = Builder::new().build();

let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();

make_primary(&mut sim, tmp_host_path.clone());

sim.client("client", async move {
let client = Client::new();
client
.post("http://primary:9090/v1/namespaces/foo/create", json!({}))
.await?;

let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://foo.primary:8080",
"",
TurmoilConnector,
true,
)
.await?;

let conn = db.connect()?;

conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;

conn.execute("INSERT INTO user(id) VALUES (1)", ())
.await
.unwrap();

Ok(())
});

sim.run().unwrap();
}
47 changes: 41 additions & 6 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,46 @@ cfg_replication! {
http.enforce_http(false);
http.set_nodelay(true);

Self::open_with_remote_sync_connector(db_path, url, token, http).await
Self::open_with_remote_sync_connector(db_path, url, token, http, false).await
}

/// Open a local database file with the ability to sync from a remote database
/// in consistent mode.
///
/// Consistent mode means that when a write happens it will not complete until
/// that write is visible in the local db.
pub async fn open_with_remote_sync_consistent(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
) -> Result<Database> {
let mut http = hyper::client::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);

Self::open_with_remote_sync_connector(db_path, url, token, http, true).await
}

#[doc(hidden)]
pub async fn open_with_remote_sync_internal(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
version: Option<String>
version: Option<String>,
read_your_writes: bool,
) -> Result<Database> {
let mut http = hyper::client::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);

Self::open_with_remote_sync_connector_internal(db_path, url, token, http, version).await
Self::open_with_remote_sync_connector_internal(
db_path,
url,
token,
http,
version,
read_your_writes
).await
}

#[doc(hidden)]
Expand All @@ -132,14 +157,22 @@ cfg_replication! {
url: impl Into<String>,
token: impl Into<String>,
connector: C,
read_your_writes: bool,
) -> Result<Database>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Self::open_with_remote_sync_connector_internal(db_path, url, token, connector, None).await
Self::open_with_remote_sync_connector_internal(
db_path,
url,
token,
connector,
None,
read_your_writes
).await
}

#[doc(hidden)]
Expand All @@ -148,7 +181,8 @@ cfg_replication! {
url: impl Into<String>,
token: impl Into<String>,
connector: C,
version: Option<String>
version: Option<String>,
read_your_writes: bool,
) -> Result<Database>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
Expand All @@ -169,7 +203,8 @@ cfg_replication! {
db_path.into(),
url.into(),
token.into(),
version
version,
read_your_writes
).await?;

Ok(Database {
Expand Down
10 changes: 8 additions & 2 deletions libsql/src/local/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use super::{Database, Error, Result, Rows, RowsFuture, Statement, Transaction};
use crate::TransactionBehavior;

use libsql_sys::ffi;
use std::{ffi::c_int, sync::Arc};
use std::{ffi::c_int, fmt, sync::Arc};

/// A connection to a libSQL database.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct Connection {
pub(crate) raw: *mut ffi::sqlite3,

Expand Down Expand Up @@ -227,3 +227,9 @@ impl Connection {
self.writer.as_ref()
}
}

impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
}
}
Loading

0 comments on commit 53db85d

Please sign in to comment.