Skip to content

Commit

Permalink
Merge pull request #212 from lxfind/force-db-update-mutable
Browse files Browse the repository at this point in the history
Ensure all mutable objects are mutated
  • Loading branch information
lxfind authored Jan 20, 2022
2 parents 0d83a72 + 05022c7 commit 909e1fc
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 78 deletions.
3 changes: 2 additions & 1 deletion fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use move_core_types::{
};
use move_vm_runtime::native_functions::NativeFunctionTable;
use std::{
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
};

Expand Down Expand Up @@ -285,6 +285,7 @@ impl AuthorityState {
Ok(()) => ExecutionStatus::Success,
Err(err) => ExecutionStatus::Failure(Box::new(err)),
};
temporary_store.ensure_active_inputs_mutated();
(temporary_store, status)
}

Expand Down
39 changes: 18 additions & 21 deletions fastpay_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,26 +320,25 @@ impl AuthorityStore {

// Delete objects
write_batch = write_batch
.delete_batch(
&self.objects,
deleted.iter().map(|deleted_ref| deleted_ref.0),
)
.delete_batch(&self.objects, deleted.clone().into_iter())
.map_err(|_| FastPayError::StorageError)?;

// Make an iterator over all objects that are either deleted or have changed owner,
// along with their old owner. This is used to update the owner index.
let old_object_owners =
deleted
.iter()
.map(|(id, _, _)| (objects[id].owner, *id))
.chain(written.iter().filter_map(
|((id, _, _), new_object)| match objects.get(id) {
Some(old_object) if old_object.owner != new_object.owner => {
Some((old_object.owner, *id))
}
_ => None,
},
));
.map(|id| (objects[id].owner, *id))
.chain(
written
.iter()
.filter_map(|(id, new_object)| match objects.get(id) {
Some(old_object) if old_object.owner != new_object.owner => {
Some((old_object.owner, *id))
}
_ => None,
}),
);

// Delete the old owner index entries
write_batch = write_batch
Expand All @@ -352,17 +351,17 @@ impl AuthorityStore {
&self.parent_sync,
written
.iter()
.map(|(output_ref, _)| (*output_ref, transaction_digest)),
.map(|(_, object)| (object.to_object_reference(), transaction_digest)),
)
.map_err(|_| FastPayError::StorageError)?;

// Create locks for new objects, if they are not immutable
write_batch = write_batch
.insert_batch(
&self.order_lock,
written.iter().filter_map(|(output_ref, new_object)| {
written.iter().filter_map(|(_, new_object)| {
if !new_object.is_read_only() {
Some((*output_ref, None))
Some((new_object.to_object_reference(), None))
} else {
None
}
Expand All @@ -374,8 +373,8 @@ impl AuthorityStore {
write_batch = write_batch
.insert_batch(
&self.owner_index,
written.iter().map(|(output_ref, new_object)| {
((new_object.owner, output_ref.0), *output_ref)
written.iter().map(|(id, new_object)| {
((new_object.owner, *id), new_object.to_object_reference())
}),
)
.map_err(|_| FastPayError::StorageError)?;
Expand All @@ -384,9 +383,7 @@ impl AuthorityStore {
write_batch = write_batch
.insert_batch(
&self.objects,
written
.into_iter()
.map(|(output_ref, new_object)| (output_ref.0, new_object)),
written.into_iter().map(|(id, new_object)| (id, new_object)),
)
.map_err(|_| FastPayError::StorageError)?;

Expand Down
61 changes: 39 additions & 22 deletions fastpay_core/src/authority/temporary_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ use super::*;
pub type InnerTemporaryStore = (
BTreeMap<ObjectID, Object>,
Vec<ObjectRef>,
BTreeMap<ObjectRef, Object>,
Vec<ObjectRef>,
BTreeMap<ObjectID, Object>,
BTreeSet<ObjectID>,
);

pub struct AuthorityTemporaryStore {
object_store: Arc<AuthorityStore>,
objects: BTreeMap<ObjectID, Object>,
active_inputs: Vec<ObjectRef>, // Inputs that are not read only
written: BTreeMap<ObjectRef, Object>, // Objects written
deleted: Vec<ObjectRef>, // Objects actively deleted
// TODO: We need to study whether it's worth to optimize the lookup of
// object reference by caching object reference in the map as well.
// Object reference calculation involves hashing which could be expensive.
written: BTreeMap<ObjectID, Object>, // Objects written
deleted: BTreeSet<ObjectID>, // Objects actively deleted
}

impl AuthorityTemporaryStore {
Expand All @@ -31,7 +34,7 @@ impl AuthorityTemporaryStore {
.map(|v| v.to_object_reference())
.collect(),
written: BTreeMap::new(),
deleted: Vec::new(),
deleted: BTreeSet::new(),
}
}

Expand All @@ -41,11 +44,11 @@ impl AuthorityTemporaryStore {
&self.objects
}

pub fn written(&self) -> &BTreeMap<ObjectRef, Object> {
pub fn written(&self) -> &BTreeMap<ObjectID, Object> {
&self.written
}

pub fn deleted(&self) -> &Vec<ObjectRef> {
pub fn deleted(&self) -> &BTreeSet<ObjectID> {
&self.deleted
}

Expand All @@ -58,6 +61,20 @@ impl AuthorityTemporaryStore {
(self.objects, self.active_inputs, self.written, self.deleted)
}

/// For every object from active_inputs (i.e. all mutable objects), if they are not
/// mutated during the order execution, force mutating them by incrementing the
/// sequence number.
pub fn ensure_active_inputs_mutated(&mut self) {
for (id, _seq, _) in self.active_inputs.iter() {
if !self.written.contains_key(id) && !self.deleted.contains(id) {
let mut object = self.objects[id].clone();
// Active input object must be Move object.
object.data.try_as_move_mut().unwrap().increment_version();
self.written.insert(*id, object);
}
}
}

pub fn to_signed_effects(
&self,
authority_name: &AuthorityName,
Expand All @@ -71,9 +88,13 @@ impl AuthorityTemporaryStore {
mutated: self
.written
.iter()
.map(|(object_ref, object)| (*object_ref, object.owner))
.map(|(_, object)| (object.to_object_reference(), object.owner))
.collect(),
deleted: self
.deleted
.iter()
.map(|id| self.objects[id].to_object_reference())
.collect(),
deleted: self.deleted.clone(),
};
let signature = Signature::new(&effects, secret);

Expand All @@ -93,7 +114,7 @@ impl AuthorityTemporaryStore {
debug_assert!(
{
let mut used = HashSet::new();
self.deleted.iter().all(move |elt| used.insert(elt.0))
self.deleted.iter().all(move |elt| used.insert(elt))
},
"Duplicate object reference in self.deleted."
);
Expand All @@ -102,8 +123,8 @@ impl AuthorityTemporaryStore {
debug_assert!(
{
let mut used = HashSet::new();
self.written.iter().all(|(elt, _)| used.insert(elt.0));
self.deleted.iter().all(move |elt| used.insert(elt.0))
self.written.iter().all(|(elt, _)| used.insert(elt));
self.deleted.iter().all(move |elt| used.insert(elt))
},
"Object both written and deleted."
);
Expand All @@ -112,10 +133,10 @@ impl AuthorityTemporaryStore {
debug_assert!(
{
let mut used = HashSet::new();
self.written.iter().all(|(elt, _)| used.insert(elt.0));
self.deleted.iter().all(|elt| used.insert(elt.0));
self.written.iter().all(|(elt, _)| used.insert(elt));
self.deleted.iter().all(|elt| used.insert(elt));

self.active_inputs.iter().all(|elt| !used.insert(elt.0))
self.active_inputs.iter().all(|elt| !used.insert(&elt.0))
},
"Mutable input neither written nor deleted."
);
Expand Down Expand Up @@ -161,7 +182,7 @@ impl Storage for AuthorityTemporaryStore {
}
}

self.written.insert(object.to_object_reference(), object);
self.written.insert(object.id(), object);
}

fn delete_object(&mut self, id: &ObjectID) {
Expand All @@ -176,12 +197,8 @@ impl Storage for AuthorityTemporaryStore {
}

// Mark it for deletion
self.deleted.push(
self.objects
.get(id)
.expect("Internal invariant: object must exist to be deleted.")
.to_object_reference(),
);
debug_assert!(self.objects.get(id).is_some());
self.deleted.insert(*id);
}
}

Expand Down
Loading

1 comment on commit 909e1fc

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bench results

�[0m�[0m�[1m�[32m Finished�[0m release [optimized + debuginfo] target(s) in 0.93s
�[0m�[0m�[1m�[32m Running�[0m target/release/bench
[2022-01-20T05:34:30Z INFO bench] Starting benchmark: OrdersAndCerts
[2022-01-20T05:34:30Z INFO bench] Preparing accounts.
[2022-01-20T05:34:30Z INFO bench] Open database on path: "/tmp/DB_02BB88AF003AFE826EE66817B357D7F1"
[2022-01-20T05:34:34Z INFO bench] Preparing transactions.
[2022-01-20T05:34:43Z INFO fastpay::network] Listening to Tcp traffic on 127.0.0.1:9555
[2022-01-20T05:34:44Z INFO bench] Number of TCP connections: 2
[2022-01-20T05:34:44Z INFO bench] Set max_in_flight to 500
[2022-01-20T05:34:44Z INFO bench] Sending requests.
[2022-01-20T05:34:44Z INFO fastpay::network] Sending Tcp requests to 127.0.0.1:9555
[2022-01-20T05:34:45Z INFO fastpay::network] 127.0.0.1:9555 has processed 5000 packets
[2022-01-20T05:34:46Z INFO fastpay::network] In flight 500 Remaining 35000
[2022-01-20T05:34:46Z INFO fastpay::network] 127.0.0.1:9555 has processed 10000 packets
[2022-01-20T05:34:46Z INFO fastpay::network] 127.0.0.1:9555 has processed 15000 packets
[2022-01-20T05:34:47Z INFO fastpay::network] In flight 500 Remaining 30000
[2022-01-20T05:34:47Z INFO fastpay::network] 127.0.0.1:9555 has processed 20000 packets
[2022-01-20T05:34:48Z INFO fastpay::network] 127.0.0.1:9555 has processed 25000 packets
[2022-01-20T05:34:49Z INFO fastpay::network] In flight 500 Remaining 25000
[2022-01-20T05:34:49Z INFO fastpay::network] 127.0.0.1:9555 has processed 30000 packets
[2022-01-20T05:34:50Z INFO fastpay::network] 127.0.0.1:9555 has processed 35000 packets
[2022-01-20T05:34:50Z INFO fastpay::network] In flight 500 Remaining 20000
[2022-01-20T05:34:50Z INFO fastpay::network] 127.0.0.1:9555 has processed 40000 packets
[2022-01-20T05:34:51Z INFO fastpay::network] 127.0.0.1:9555 has processed 45000 packets
[2022-01-20T05:34:52Z INFO fastpay::network] In flight 500 Remaining 15000
[2022-01-20T05:34:52Z INFO fastpay::network] 127.0.0.1:9555 has processed 50000 packets
[2022-01-20T05:34:53Z INFO fastpay::network] 127.0.0.1:9555 has processed 55000 packets
[2022-01-20T05:34:53Z INFO fastpay::network] In flight 500 Remaining 10000
[2022-01-20T05:34:54Z INFO fastpay::network] 127.0.0.1:9555 has processed 60000 packets
[2022-01-20T05:34:54Z INFO fastpay::network] 127.0.0.1:9555 has processed 65000 packets
[2022-01-20T05:34:55Z INFO fastpay::network] In flight 500 Remaining 5000
[2022-01-20T05:34:55Z INFO fastpay::network] 127.0.0.1:9555 has processed 70000 packets
[2022-01-20T05:34:56Z INFO fastpay::network] 127.0.0.1:9555 has processed 75000 packets
[2022-01-20T05:34:57Z INFO fastpay::network] Done sending Tcp requests to 127.0.0.1:9555
[2022-01-20T05:34:57Z INFO fastpay::network] 127.0.0.1:9555 has processed 80000 packets
[2022-01-20T05:34:57Z INFO bench] Received 80000 responses.
[2022-01-20T05:34:57Z WARN bench] Completed benchmark for OrdersAndCerts
Total time: 12897804us, items: 40000, tx/sec: 3101.303136564953

Please sign in to comment.