Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/rawdb, ethdb: introduce batched/atomic reads from ancients #23566

Merged
merged 13 commits into from
Oct 25, 2021
199 changes: 76 additions & 123 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,15 @@ import (

// ReadCanonicalHash retrieves the hash assigned to a canonical block number.
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
data, _ := db.Ancient(freezerHashTable, number)
if len(data) == 0 {
data, _ = db.Get(headerHashKey(number))
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
data, _ = reader.Ancient(freezerHashTable, number)
if len(data) == 0 {
data, _ = db.Ancient(freezerHashTable, number)
// Get it by hash from leveldb
data, _ = db.Get(headerHashKey(number))
}
}
if len(data) == 0 {
return common.Hash{}
}
return nil
})
return common.BytesToHash(data)
}

Expand Down Expand Up @@ -304,32 +299,25 @@ func WriteFastTxLookupLimit(db ethdb.KeyValueWriter, number uint64) {

// ReadHeaderRLP retrieves a block header in its raw RLP database encoding.
func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return data
}
// Then try to look up the data in leveldb.
data, _ = db.Get(headerKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return data
}
return nil // Can't find the data anywhere.
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ = reader.Ancient(freezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return nil
}
// If not, try reading from leveldb
data, _ = db.Get(headerKey(number, hash))
return nil
})
return data
}

// HasHeader verifies the existence of a block header corresponding to the hash.
func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
if isCanon(db, number, hash) {
return true
}
if has, err := db.Has(headerKey(number, hash)); !has || err != nil {
Expand Down Expand Up @@ -389,53 +377,48 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
}
}

// isCanon is an internal utility method, to check whether the given number/hash
// is part of the ancient (canon) set.
func isCanon(reader ethdb.AncientReader, number uint64, hash common.Hash) bool {
h, err := reader.Ancient(freezerHashTable, number)
if err != nil {
return false
}
return bytes.Equal(h, hash[:])
}

// ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding.
func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerBodiesTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(blockBodyKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerBodiesTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(freezerBodiesTable, number)
return nil
}
}
return nil // Can't find the data anywhere.
// If not, try reading from leveldb
data, _ = db.Get(blockBodyKey(number, hash))
return nil
})
return data
}

// ReadCanonicalBodyRLP retrieves the block body (transactions and uncles) for the canonical
// block at number, in RLP encoding.
func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
// If it's an ancient one, we don't need the canonical hash
data, _ := db.Ancient(freezerBodiesTable, number)
if len(data) == 0 {
// Need to get the hash
data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number)))
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
if len(data) == 0 {
data, _ = db.Ancient(freezerBodiesTable, number)
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
data, _ = reader.Ancient(freezerBodiesTable, number)
if len(data) > 0 {
return nil
}
}
// Get it by hash from leveldb
data, _ = db.Get(blockBodyKey(number, ReadCanonicalHash(db, number)))
return nil
})
return data
}

Expand All @@ -448,7 +431,7 @@ func WriteBodyRLP(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rlp

// HasBody verifies the existence of a block body corresponding to the hash.
func HasBody(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
if isCanon(db, number, hash) {
return true
}
if has, err := db.Has(blockBodyKey(number, hash)); !has || err != nil {
Expand Down Expand Up @@ -489,33 +472,18 @@ func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {

// ReadTdRLP retrieves a block's total difficulty corresponding to the hash in RLP encoding.
func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerDifficultyTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(headerTDKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerDifficultyTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(freezerDifficultyTable, number)
return nil
}
}
return nil // Can't find the data anywhere.
// If not, try reading from leveldb
data, _ = db.Get(headerTDKey(number, hash))
return nil
})
return data
}

// ReadTd retrieves a block's total difficulty corresponding to the hash.
Expand Down Expand Up @@ -553,7 +521,7 @@ func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
// HasReceipts verifies the existence of all the transaction receipts belonging
// to a block.
func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {
if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash {
if isCanon(db, number, hash) {
return true
}
if has, err := db.Has(blockReceiptsKey(number, hash)); !has || err != nil {
Expand All @@ -564,33 +532,18 @@ func HasReceipts(db ethdb.Reader, hash common.Hash, number uint64) bool {

// ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding.
func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ := db.Ancient(freezerReceiptTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
}
}
// Then try to look up the data in leveldb.
data, _ = db.Get(blockReceiptsKey(number, hash))
if len(data) > 0 {
return data
}
// In the background freezer is moving data from leveldb to flatten files.
// So during the first check for ancient db, the data is not yet in there,
// but when we reach into leveldb, the data was already moved. That would
// result in a not found error.
data, _ = db.Ancient(freezerReceiptTable, number)
if len(data) > 0 {
h, _ := db.Ancient(freezerHashTable, number)
if common.BytesToHash(h) == hash {
return data
var data []byte
db.ReadAncients(func(reader ethdb.AncientReader) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(freezerReceiptTable, number)
return nil
}
}
return nil // Can't find the data anywhere.
// If not, try reading from leveldb
data, _ = db.Get(blockReceiptsKey(number, hash))
return nil
})
return data
}

// ReadRawReceipts retrieves all the transaction receipts belonging to a block.
Expand Down
20 changes: 18 additions & 2 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func (db *nofreezedb) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}

// ReadAncients returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) ReadAncients(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
// AncientRange returns an error as we don't have a backing chain freezer.
func (db *nofreezedb) AncientRange(kind string, start, max, maxByteSize uint64) ([][]byte, error) {
return nil, errNotSupported
}

Expand Down Expand Up @@ -119,6 +119,22 @@ func (db *nofreezedb) Sync() error {
return errNotSupported
}

func (db *nofreezedb) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) {
// Unlike other ancient-related methods, this method does not return
// errNotSupported when invoked.
// The reason for this is that the caller might want to do several things:
// 1. Check if something is in freezer,
// 2. If not, check leveldb.
//
// This will work, since the ancient-checks inside 'fn' will return errors,
// and the leveldb work will continue.
//
// If we instead were to return errNotSupported here, then the caller would
// have to explicitly check for that, having an extra clause to do the
// non-ancient operations.
return fn(db)
}

// NewDatabase creates a high level database on top of a given key-value data
// store without a freezer moving immutable chain segments into cold storage.
func NewDatabase(db ethdb.KeyValueStore) ethdb.Database {
Expand Down
21 changes: 15 additions & 6 deletions core/rawdb/freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ type freezer struct {
frozen uint64 // Number of blocks already frozen
threshold uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

// This lock synchronizes writers and the truncate operation.
writeLock sync.Mutex
// This lock synchronizes writers and the truncate operation, as well as
// the "atomic" (batched) read operations.
writeLock sync.RWMutex
writeBatch *freezerBatch

readonly bool
Expand Down Expand Up @@ -201,12 +202,12 @@ func (f *freezer) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errUnknownTable
}

// ReadAncients retrieves multiple items in sequence, starting from the index 'start'.
// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'max' items,
// - at least 1 item (even if exceeding the maxByteSize), but will otherwise
// return as many items as fit into maxByteSize.
func (f *freezer) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) {
func (f *freezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
if table := f.tables[kind]; table != nil {
return table.RetrieveItems(start, count, maxBytes)
}
Expand All @@ -222,15 +223,23 @@ func (f *freezer) Ancients() (uint64, error) {
func (f *freezer) AncientSize(kind string) (uint64, error) {
// This needs the write lock to avoid data races on table fields.
// Speed doesn't matter here, AncientSize is for debugging.
f.writeLock.Lock()
defer f.writeLock.Unlock()
f.writeLock.RLock()
defer f.writeLock.RUnlock()

if table := f.tables[kind]; table != nil {
return table.size()
}
return 0, errUnknownTable
}

// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *freezer) ReadAncients(fn func(ethdb.AncientReader) error) (err error) {
f.writeLock.RLock()
defer f.writeLock.RUnlock()
return fn(f)
}

// ModifyAncients runs the given write operation.
func (f *freezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
if f.readonly {
Expand Down
10 changes: 7 additions & 3 deletions core/rawdb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func (t *table) Ancient(kind string, number uint64) ([]byte, error) {
return t.db.Ancient(kind, number)
}

// ReadAncients is a noop passthrough that just forwards the request to the underlying
// AncientRange is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) ReadAncients(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return t.db.ReadAncients(kind, start, count, maxBytes)
func (t *table) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return t.db.AncientRange(kind, start, count, maxBytes)
}

// Ancients is a noop passthrough that just forwards the request to the underlying
Expand All @@ -85,6 +85,10 @@ func (t *table) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (int64, erro
return t.db.ModifyAncients(fn)
}

func (t *table) ReadAncients(fn func(reader ethdb.AncientReader) error) (err error) {
return t.db.ReadAncients(fn)
}

// TruncateAncients is a noop passthrough that just forwards the request to the underlying
// database.
func (t *table) TruncateAncients(items uint64) error {
Expand Down
Loading