Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[typed store] iterators: deprecate skip_to/skip_prior_to/skip_to_last/reverse methods #21289

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions consensus/core/src/storage/rocksdb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,10 @@ impl Store for RocksDBStore {
let mut refs = VecDeque::new();
for kv in self
.digests_by_authorities
.safe_range_iter((
Included((author, Round::MIN, BlockDigest::MIN)),
Included((author, before_round, BlockDigest::MAX)),
))
.skip_to_last()
.reverse()
.reversed_safe_iter_with_bounds(
Some((author, Round::MIN, BlockDigest::MIN)),
Some((author, before_round, BlockDigest::MAX)),
)?
.take(num_of_rounds as usize)
{
let ((author, round, digest), _) = kv?;
Expand All @@ -247,7 +245,11 @@ impl Store for RocksDBStore {
}

fn read_last_commit(&self) -> ConsensusResult<Option<TrustedCommit>> {
let Some(result) = self.commits.safe_iter().skip_to_last().next() else {
let Some(result) = self
.commits
.reversed_safe_iter_with_bounds(None, None)?
.next()
else {
return Ok(None);
};
let ((_index, digest), serialized) = result?;
Expand Down Expand Up @@ -289,7 +291,11 @@ impl Store for RocksDBStore {
}

fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
let Some(result) = self.commit_info.safe_iter().skip_to_last().next() else {
let Some(result) = self
.commit_info
.reversed_safe_iter_with_bounds(None, None)?
.next()
else {
return Ok(None);
};
let (key, commit_info) = result.map_err(ConsensusError::RocksDBFailure)?;
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2788,7 +2788,7 @@ impl AuthorityState {
Some(seq) => self
.checkpoint_store
.get_checkpoint_by_sequence_number(seq)?,
None => self.checkpoint_store.get_latest_certified_checkpoint(),
None => self.checkpoint_store.get_latest_certified_checkpoint()?,
}
.map(|v| v.into_inner());
let contents = match &summary {
Expand All @@ -2813,7 +2813,7 @@ impl AuthorityState {
Some(seq) => self
.checkpoint_store
.get_checkpoint_by_sequence_number(seq)?,
None => self.checkpoint_store.get_latest_certified_checkpoint(),
None => self.checkpoint_store.get_latest_certified_checkpoint()?,
}
.map(|v| v.into_inner());
summary.map(CheckpointSummaryResponse::Certified)
Expand All @@ -2822,7 +2822,7 @@ impl AuthorityState {
Some(seq) => self.checkpoint_store.get_locally_computed_checkpoint(seq)?,
None => self
.checkpoint_store
.get_latest_locally_computed_checkpoint(),
.get_latest_locally_computed_checkpoint()?,
};
summary.map(CheckpointSummaryResponse::Pending)
};
Expand Down Expand Up @@ -3167,7 +3167,7 @@ impl AuthorityState {

let highest_locally_built_checkpoint_seq = self
.checkpoint_store
.get_latest_locally_computed_checkpoint()
.get_latest_locally_computed_checkpoint()?
.map(|c| *c.sequence_number())
.unwrap_or(0);

Expand Down
39 changes: 21 additions & 18 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,14 +703,18 @@ impl AuthorityEpochTables {
checkpoint_seq: CheckpointSequenceNumber,
starting_index: u64,
) -> SuiResult<
impl Iterator<Item = ((CheckpointSequenceNumber, u64), CheckpointSignatureMessage)> + '_,
impl Iterator<
Item = Result<
((CheckpointSequenceNumber, u64), CheckpointSignatureMessage),
TypedStoreError,
>,
> + '_,
> {
let key = (checkpoint_seq, starting_index);
trace!("Scanning pending checkpoint signatures from {:?}", key);
let iter = self
.pending_checkpoint_signatures
.unbounded_iter()
.skip_to(&key)?;
.safe_iter_with_bounds(Some(key), None);
Ok::<_, SuiError>(iter)
}

Expand Down Expand Up @@ -890,7 +894,7 @@ impl AuthorityPerEpochStore {

let mut jwk_aggregator = JwkAggregator::new(committee.clone());

for ((authority, id, jwk), _) in tables.pending_jwks.unbounded_iter().seek_to_first() {
for ((authority, id, jwk), _) in tables.pending_jwks.unbounded_iter() {
jwk_aggregator.insert(authority, (id, jwk));
}

Expand Down Expand Up @@ -1151,9 +1155,9 @@ impl AuthorityPerEpochStore {
Ok(self
.tables()?
.running_root_accumulators
.unbounded_iter()
.skip_to_last()
.next())
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?)
}

pub fn insert_running_root_accumulator(
Expand Down Expand Up @@ -4045,11 +4049,10 @@ impl AuthorityPerEpochStore {
// Reading from the db table is only need when upgrading to data quarantining
// for the first time.
let tables = self.tables()?;
let mut db_iter = tables.pending_checkpoints_v2.unbounded_iter();
if let Some(last_processed_height) = last {
db_iter = db_iter.skip_to(&(last_processed_height + 1))?;
}
db_iter.collect()
let db_iter = tables
.pending_checkpoints_v2
.safe_iter_with_bounds(last.map(|height| height + 1), None);
db_iter.collect::<Result<Vec<_>, _>>()?
} else {
vec![]
};
Expand Down Expand Up @@ -4137,9 +4140,9 @@ impl AuthorityPerEpochStore {
Ok(self
.tables()?
.builder_checkpoint_summary_v2
.unbounded_iter()
.skip_to_last()
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?
.map(|(_, s)| s))
}

Expand All @@ -4159,9 +4162,9 @@ impl AuthorityPerEpochStore {
let seq = self
.tables()?
.builder_checkpoint_summary_v2
.unbounded_iter()
.skip_to_last()
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?
.map(|(seq, s)| (seq, s.summary));
debug!(
"returning last_built_summary from builder_checkpoint_summary_v2: {:?}",
Expand Down Expand Up @@ -4217,9 +4220,9 @@ impl AuthorityPerEpochStore {
Ok(self
.tables()?
.pending_checkpoint_signatures
.unbounded_iter()
.skip_to_last()
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?
.map(|((_, index), _)| index)
.unwrap_or_default())
}
Expand Down
30 changes: 15 additions & 15 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,7 @@ impl AuthorityStore {
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table_v2
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
Expand All @@ -442,8 +441,7 @@ impl AuthorityStore {
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.reversed_safe_iter_with_bounds(Some(min_key), Some(max_key))?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
Expand Down Expand Up @@ -564,10 +562,12 @@ impl AuthorityStore {
let mut iterator = self
.perpetual_tables
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey(*object_id, prior_version))?;
.reversed_safe_iter_with_bounds(
Some(ObjectKey::min_for_id(object_id)),
Some(ObjectKey(*object_id, prior_version)),
)?;

if let Some((object_key, value)) = iterator.next() {
if let Some((object_key, value)) = iterator.next().transpose()? {
if object_key.0 == *object_id {
return Ok(Some(
self.perpetual_tables.object_reference(&object_key, value)?,
Expand Down Expand Up @@ -620,10 +620,9 @@ impl AuthorityStore {
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.unbounded_iter()
.skip_prior_to(&marker_key)?
.reversed_safe_iter_with_bounds(None, Some(marker_key))?
.next();
match marker_entry {
match marker_entry.transpose()? {
Some(((epoch, key), marker)) => {
// Make sure object id matches and version is >= `version`
let object_data_ok = key.0 == *object_id && key.1 >= version;
Expand Down Expand Up @@ -1048,11 +1047,13 @@ impl AuthorityStore {
let mut iterator = self
.perpetual_tables
.live_owned_object_markers
.unbounded_iter()
// Make the max possible entry for this object ID.
.skip_prior_to(&(object_id, SequenceNumber::MAX, ObjectDigest::MAX))?;
.reversed_safe_iter_with_bounds(
None,
Some((object_id, SequenceNumber::MAX, ObjectDigest::MAX)),
)?;
Ok(iterator
.next()
.transpose()?
.and_then(|value| {
if value.0 .0 == object_id {
Some(value)
Expand Down Expand Up @@ -1819,8 +1820,7 @@ impl AccumulatorStore for AuthorityStore {
Ok(self
.perpetual_tables
.root_state_hash_by_epoch
.safe_iter()
.skip_to_last()
.reversed_safe_iter_with_bounds(None, None)?
.next()
.transpose()?)
}
Expand Down
42 changes: 18 additions & 24 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,11 @@ impl AuthorityPerpetualTables {
object_id: ObjectID,
version: SequenceNumber,
) -> SuiResult<Option<Object>> {
let iter = self
.objects
.safe_range_iter(ObjectKey::min_for_id(&object_id)..=ObjectKey::max_for_id(&object_id))
.skip_prior_to(&ObjectKey(object_id, version))?;
match iter.reverse().next() {
let mut iter = self.objects.reversed_safe_iter_with_bounds(
Some(ObjectKey::min_for_id(&object_id)),
Some(ObjectKey(object_id, version)),
)?;
match iter.next() {
Some(Ok((key, o))) => self.object(&key, o),
Some(Err(e)) => Err(e.into()),
None => Ok(None),
Expand Down Expand Up @@ -298,12 +298,12 @@ impl AuthorityPerpetualTables {
&self,
object_id: ObjectID,
) -> Result<Option<ObjectRef>, SuiError> {
let mut iterator = self
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
let mut iterator = self.objects.reversed_safe_iter_with_bounds(
Some(ObjectKey::min_for_id(&object_id)),
Some(ObjectKey::max_for_id(&object_id)),
)?;

if let Some((object_key, value)) = iterator.next() {
if let Some(Ok((object_key, value))) = iterator.next() {
if object_key.0 == object_id {
return Ok(Some(self.object_reference(&object_key, value)?));
}
Expand All @@ -315,12 +315,12 @@ impl AuthorityPerpetualTables {
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, SuiError> {
let mut iterator = self
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
let mut iterator = self.objects.reversed_safe_iter_with_bounds(
Some(ObjectKey::min_for_id(&object_id)),
Some(ObjectKey::max_for_id(&object_id)),
)?;

if let Some((object_key, value)) = iterator.next() {
if let Some(Ok((object_key, value))) = iterator.next() {
if object_key.0 == object_id {
return Ok(Some((object_key, value)));
}
Expand Down Expand Up @@ -415,12 +415,7 @@ impl AuthorityPerpetualTables {
}

pub fn database_is_empty(&self) -> SuiResult<bool> {
Ok(self
.objects
.unbounded_iter()
.skip_to(&ObjectKey::ZERO)?
.next()
.is_none())
Ok(self.objects.unbounded_iter().next().is_none())
}

pub fn iter_live_object_set(&self, include_wrapped_object: bool) -> LiveSetIter<'_> {
Expand Down Expand Up @@ -506,11 +501,10 @@ impl AuthorityPerpetualTables {
pub fn get_object_fallible(&self, object_id: &ObjectID) -> SuiResult<Option<Object>> {
let obj_entry = self
.objects
.unbounded_iter()
.skip_prior_to(&ObjectKey::max_for_id(object_id))?
.reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))?
.next();

match obj_entry {
match obj_entry.transpose()? {
Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => {
Ok(self.object(&ObjectKey(obj_id, version), obj)?)
}
Expand Down
Loading
Loading