Skip to content

Commit

Permalink
kvdb-rocksdb: optimize and rename iter_from_prefix (#365)
Browse files Browse the repository at this point in the history
* kvdb: small cleanup

* kvdb-rocksdb: rename and optimize iter_with_prefix

* fix links in changelogs

* apply suggestions from code review

* 🤦

* fmt
  • Loading branch information
ordian authored Apr 15, 2020
1 parent b99e466 commit a52fbeb
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 43 deletions.
6 changes: 3 additions & 3 deletions kvdb-memorydb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl KeyValueDB for InMemory {
}
}

fn iter_from_prefix<'a>(
fn iter_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
Expand Down Expand Up @@ -155,9 +155,9 @@ mod tests {
}

#[test]
fn iter_from_prefix() -> io::Result<()> {
fn iter_with_prefix() -> io::Result<()> {
let db = create(1);
st::test_iter_from_prefix(&db)
st::test_iter_with_prefix(&db)
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions kvdb-rocksdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog].
## [Unreleased]
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)
- Rename and optimize prefix iteration. [#365](https://github.com/paritytech/parity-common/pull/365)

## [0.7.0] - 2020-03-16
- Updated dependencies. [#361](https://github.com/paritytech/parity-common/pull/361)
Expand Down
28 changes: 19 additions & 9 deletions kvdb-rocksdb/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
//!
//! Note: this crate does not use "Prefix Seek" mode which means that the prefix iterator
//! will return keys not starting with the given prefix as well (as long as `key >= prefix`).
//! To work around this we filter the data returned by rocksdb to ensure that
//! all data yielded by the iterator does start with the given prefix.
//! To work around this we set an upper bound to the prefix successor.
//! See https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes for details.
use crate::DBAndColumns;
Expand All @@ -28,6 +27,13 @@ pub type KeyValuePair = (Box<[u8]>, Box<[u8]>);
/// Iterator with built-in synchronization.
pub struct ReadGuardedIterator<'a, I, T> {
inner: OwningHandle<UnsafeStableAddress<'a, Option<T>>, DerefWrapper<Option<I>>>,
// We store the upper bound here
// to make sure it lives at least as long as the iterator.
// See https://github.com/rust-rocksdb/rust-rocksdb/pull/309.
// TODO: remove this once https://github.com/rust-rocksdb/rust-rocksdb/pull/377
// is merged and released.
#[allow(dead_code)]
upper_bound_prefix: Option<Box<[u8]>>,
}

// We can't implement `StableAddress` for a `RwLockReadGuard`
Expand Down Expand Up @@ -81,8 +87,8 @@ pub trait IterationHandler {
/// Create an `Iterator` over a `ColumnFamily` corresponding to the passed index. Takes a
/// reference to a `ReadOptions` to allow configuration of the new iterator (see
/// https://github.com/facebook/rocksdb/blob/master/include/rocksdb/options.h#L1169).
/// The iterator starts from the first key having the provided `prefix`.
fn iter_from_prefix(&self, col: u32, prefix: &[u8], read_opts: &ReadOptions) -> Self::Iterator;
/// The `Iterator` iterates over keys which start with the provided `prefix`.
fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: &ReadOptions) -> Self::Iterator;
}

impl<'a, T> ReadGuardedIterator<'a, <&'a T as IterationHandler>::Iterator, T>
Expand All @@ -92,18 +98,22 @@ where
/// Creates a new `ReadGuardedIterator` that maps `RwLock<RocksDB>` to `RwLock<DBIterator>`,
/// where `DBIterator` iterates over all keys.
pub fn new(read_lock: RwLockReadGuard<'a, Option<T>>, col: u32, read_opts: &ReadOptions) -> Self {
Self { inner: Self::new_inner(read_lock, |db| db.iter(col, read_opts)) }
Self { inner: Self::new_inner(read_lock, |db| db.iter(col, read_opts)), upper_bound_prefix: None }
}

/// Creates a new `ReadGuardedIterator` that maps `RwLock<RocksDB>` to `RwLock<DBIterator>`,
/// where `DBIterator` iterates over keys >= prefix.
pub fn new_from_prefix(
/// where `DBIterator` iterates over keys which start with the given prefix.
pub fn new_with_prefix(
read_lock: RwLockReadGuard<'a, Option<T>>,
col: u32,
prefix: &[u8],
upper_bound: Box<[u8]>,
read_opts: &ReadOptions,
) -> Self {
Self { inner: Self::new_inner(read_lock, |db| db.iter_from_prefix(col, prefix, read_opts)) }
Self {
inner: Self::new_inner(read_lock, |db| db.iter_with_prefix(col, prefix, read_opts)),
upper_bound_prefix: Some(upper_bound),
}
}

fn new_inner(
Expand All @@ -126,7 +136,7 @@ impl<'a> IterationHandler for &'a DBAndColumns {
.expect("iterator params are valid; qed")
}

fn iter_from_prefix(&self, col: u32, prefix: &[u8], read_opts: &ReadOptions) -> Self::Iterator {
fn iter_with_prefix(&self, col: u32, prefix: &[u8], read_opts: &ReadOptions) -> Self::Iterator {
self.db
.iterator_cf_opt(self.cf(col as usize), read_opts, IteratorMode::From(prefix, Direction::Forward))
.expect("iterator params are valid; qed")
Expand Down
36 changes: 22 additions & 14 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,13 @@ impl Database {
batch.delete_cf(cf, &key).map_err(other_io_err)?
}
DBOp::DeletePrefix { col: _, prefix } => {
if prefix.len() > 0 {
if !prefix.is_empty() {
let end_range = kvdb::end_prefix(&prefix[..]);
batch.delete_range_cf(cf, &prefix[..], &end_range[..]).map_err(other_io_err)?;
} else {
// Deletes all values in the column.
let end_range = &[u8::max_value()];
batch.delete_range_cf(cf, &prefix[..], &end_range[..]).map_err(other_io_err)?;
batch.delete_range_cf(cf, &[][..], &end_range[..]).map_err(other_io_err)?;
batch.delete_cf(cf, &end_range[..]).map_err(other_io_err)?;
}
}
Expand Down Expand Up @@ -492,7 +492,7 @@ impl Database {

/// Get value by partial key. Prefix size should match configured prefix size.
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
self.iter_from_prefix(col, prefix).next().map(|(_, v)| v)
self.iter_with_prefix(col, prefix).next().map(|(_, v)| v)
}

/// Iterator over the data in the given database column index.
Expand All @@ -512,18 +512,26 @@ impl Database {
/// Iterator over data in the `col` database column index matching the given prefix.
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
fn iter_from_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, col, prefix, &self.read_opts);
let mut read_opts = ReadOptions::default();
read_opts.set_verify_checksums(false);
let end_prefix = kvdb::end_prefix(prefix).into_boxed_slice();
// rocksdb doesn't work with an empty upper bound
if !end_prefix.is_empty() {
// SAFETY: the end_prefix lives as long as the iterator
// See `ReadGuardedIterator` definition for more details.
unsafe {
read_opts.set_iterate_upper_bound(&end_prefix);
}
}
let guarded = iter::ReadGuardedIterator::new_with_prefix(read_lock, col, prefix, end_prefix, &read_opts);
Some(guarded)
} else {
None
};
// We're not using "Prefix Seek" mode, so the iterator will return
// keys not starting with the given prefix as well,
// see https://github.com/facebook/rocksdb/wiki/Prefix-Seek-API-Changes
optional.into_iter().flat_map(identity).take_while(move |(k, _)| k.starts_with(prefix))
optional.into_iter().flat_map(identity)
}

/// Close the database
Expand Down Expand Up @@ -648,8 +656,8 @@ impl KeyValueDB for Database {
Box::new(unboxed.into_iter())
}

fn iter_from_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
let unboxed = Database::iter_from_prefix(self, col, prefix);
fn iter_with_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
let unboxed = Database::iter_with_prefix(self, col, prefix);
Box::new(unboxed.into_iter())
}

Expand Down Expand Up @@ -729,9 +737,9 @@ mod tests {
}

#[test]
fn iter_from_prefix() -> io::Result<()> {
fn iter_with_prefix() -> io::Result<()> {
let db = create(1)?;
st::test_iter_from_prefix(&db)
st::test_iter_with_prefix(&db)
}

#[test]
Expand All @@ -742,7 +750,7 @@ mod tests {

#[test]
fn stats() -> io::Result<()> {
let db = create(st::IOSTATS_NUM_COLUMNS)?;
let db = create(st::IO_STATS_NUM_COLUMNS)?;
st::test_io_stats(&db)
}

Expand Down
18 changes: 9 additions & 9 deletions kvdb-shared-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ pub fn test_iter(db: &dyn KeyValueDB) -> io::Result<()> {
Ok(())
}

/// A test for `KeyValueDB::iter_from_prefix`.
pub fn test_iter_from_prefix(db: &dyn KeyValueDB) -> io::Result<()> {
/// A test for `KeyValueDB::iter_with_prefix`.
pub fn test_iter_with_prefix(db: &dyn KeyValueDB) -> io::Result<()> {
let key1 = b"0";
let key2 = b"ab";
let key3 = b"abc";
Expand All @@ -95,39 +95,39 @@ pub fn test_iter_from_prefix(db: &dyn KeyValueDB) -> io::Result<()> {
db.write(batch)?;

// empty prefix
let contents: Vec<_> = db.iter_from_prefix(0, b"").into_iter().collect();
let contents: Vec<_> = db.iter_with_prefix(0, b"").into_iter().collect();
assert_eq!(contents.len(), 4);
assert_eq!(&*contents[0].0, key1);
assert_eq!(&*contents[1].0, key2);
assert_eq!(&*contents[2].0, key3);
assert_eq!(&*contents[3].0, key4);

// prefix a
let contents: Vec<_> = db.iter_from_prefix(0, b"a").into_iter().collect();
let contents: Vec<_> = db.iter_with_prefix(0, b"a").into_iter().collect();
assert_eq!(contents.len(), 3);
assert_eq!(&*contents[0].0, key2);
assert_eq!(&*contents[1].0, key3);
assert_eq!(&*contents[2].0, key4);

// prefix abc
let contents: Vec<_> = db.iter_from_prefix(0, b"abc").into_iter().collect();
let contents: Vec<_> = db.iter_with_prefix(0, b"abc").into_iter().collect();
assert_eq!(contents.len(), 2);
assert_eq!(&*contents[0].0, key3);
assert_eq!(&*contents[1].0, key4);

// prefix abcde
let contents: Vec<_> = db.iter_from_prefix(0, b"abcde").into_iter().collect();
let contents: Vec<_> = db.iter_with_prefix(0, b"abcde").into_iter().collect();
assert_eq!(contents.len(), 0);

// prefix 0
let contents: Vec<_> = db.iter_from_prefix(0, b"0").into_iter().collect();
let contents: Vec<_> = db.iter_with_prefix(0, b"0").into_iter().collect();
assert_eq!(contents.len(), 1);
assert_eq!(&*contents[0].0, key1);
Ok(())
}

/// The number of columns required to run `test_io_stats`.
pub const IOSTATS_NUM_COLUMNS: u32 = 3;
pub const IO_STATS_NUM_COLUMNS: u32 = 3;

/// A test for `KeyValueDB::io_stats`.
/// Assumes that the `db` has at least 3 columns.
Expand Down Expand Up @@ -256,7 +256,7 @@ pub fn test_complex(db: &dyn KeyValueDB) -> io::Result<()> {
assert_eq!(contents[1].0.to_vec(), key2.to_vec());
assert_eq!(&*contents[1].1, b"dog");

let mut prefix_iter = db.iter_from_prefix(0, b"04c0");
let mut prefix_iter = db.iter_with_prefix(0, b"04c0");
assert_eq!(*prefix_iter.next().unwrap().1, b"caterpillar"[..]);
assert_eq!(*prefix_iter.next().unwrap().1, b"beef"[..]);
assert_eq!(*prefix_iter.next().unwrap().1, b"fish"[..]);
Expand Down
4 changes: 2 additions & 2 deletions kvdb-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ impl KeyValueDB for Database {
}

// NOTE: clones the whole db
fn iter_from_prefix<'a>(
fn iter_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a> {
self.in_memory.iter_from_prefix(col, prefix)
self.in_memory.iter_with_prefix(col, prefix)
}

// NOTE: not supported
Expand Down
6 changes: 3 additions & 3 deletions kvdb-web/tests/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ async fn iter() {
}

#[wasm_bindgen_test]
async fn iter_from_prefix() {
let db = open_db(1, "iter_from_prefix").await;
st::test_iter_from_prefix(&db).unwrap()
async fn iter_with_prefix() {
let db = open_db(1, "iter_with_prefix").await;
st::test_iter_with_prefix(&db).unwrap()
}

#[wasm_bindgen_test]
Expand Down
2 changes: 2 additions & 0 deletions kvdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ The format is based on [Keep a Changelog].
## [Unreleased]
### Breaking
- Removed `write_buffered` and `flush` methods. [#313](https://github.com/paritytech/parity-common/pull/313)
- Introduce a new `DeletePrefix` database operation. [#360](https://github.com/paritytech/parity-common/pull/360)
- Rename prefix iteration to `iter_with_prefix`. [#365](https://github.com/paritytech/parity-common/pull/365)

## [0.5.0] - 2020-03-16
- License changed from GPL3 to dual MIT/Apache2. [#342](https://github.com/paritytech/parity-common/pull/342)
Expand Down
8 changes: 5 additions & 3 deletions kvdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl DBTransaction {

/// Delete all values with the given key prefix.
/// Using an empty prefix here will remove all keys
/// (all keys starts with the empty prefix).
/// (all keys start with the empty prefix).
pub fn delete_prefix(&mut self, col: u32, prefix: &[u8]) {
self.ops.push(DBOp::DeletePrefix { col, prefix: DBKey::from_slice(prefix) });
}
Expand Down Expand Up @@ -119,8 +119,9 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf {
/// Iterate over the data for a given column.
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = (Box<[u8]>, Box<[u8]>)> + 'a>;

/// Iterate over the data for a given column, starting from a given prefix.
fn iter_from_prefix<'a>(
/// Iterate over the data for a given column, returning all key/value pairs
/// where the key starts with the given prefix.
fn iter_with_prefix<'a>(
&'a self,
col: u32,
prefix: &'a [u8],
Expand Down Expand Up @@ -170,6 +171,7 @@ mod test {

assert_eq!(end_prefix(&[0x00, 0xff]), vec![0x01]);
assert_eq!(end_prefix(&[0xff]), vec![]);
assert_eq!(end_prefix(b"0"), b"1".to_vec());
assert_eq!(end_prefix(&[]), vec![]);
}
}

0 comments on commit a52fbeb

Please sign in to comment.