Skip to content
This repository has been archived by the owner on Feb 18, 2025. It is now read-only.

Commit

Permalink
core, eth/downloader: pbss fix release v1.13.1 (#614)
Browse files Browse the repository at this point in the history
* eth/downloader: prevent pivot moves after state commit (#28126)

* core, eth/downloader: fix genesis state missing due to state sync (#28124)

* core: fix chain repair corner case in path-based scheme

* eth/downloader: disable trie database whenever state sync is launched

---------

Co-authored-by: Péter Szilágyi <[email protected]>
Co-authored-by: rjl493456442 <[email protected]>
  • Loading branch information
3 people authored and huyngopt1994 committed Feb 17, 2025
1 parent 975b458 commit 7a22b9a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 54 deletions.
98 changes: 55 additions & 43 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,28 +373,38 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if !bc.HasState(head.Root()) {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash(), "snaproot", diskRoot)

snapDisk, err := bc.setHeadBeyondRoot(head.NumberU64(), diskRoot, true)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
if head.NumberU64() == 0 {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the state syncer overwrites it.
//
// The solution is to reset the state to the genesis state. Although it may not
// match the sync target, the state healer will later address and correct any
// inconsistencies.
bc.resetState()
} else {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if _, err := bc.setHeadBeyondRoot(head.NumberU64(), common.Hash{}, true); err != nil {
return nil, err
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)

snapDisk, err := bc.setHeadBeyondRoot(head.NumberU64(), diskRoot, true)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash())
if _, err := bc.setHeadBeyondRoot(head.NumberU64(), common.Hash{}, true); err != nil {
return nil, err
}
}
}
}
Expand Down Expand Up @@ -666,6 +676,28 @@ func (bc *BlockChain) SetHead(head uint64) error {
return err
}

// resetState resets the persistent state to genesis state if it's not present.
func (bc *BlockChain) resetState() {
// Short circuit if the genesis state is already present.
root := bc.genesisBlock.Root()
if bc.HasState(root) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Info("Reset state to genesis", "root", root)
}

// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewinding with snapshots enabled to ensure that we go back further than
Expand All @@ -688,26 +720,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
pivot := rawdb.ReadLastPivotNumber(bc.db)
frozen, _ := bc.db.Ancients()

// resetState resets the persistent state to genesis if it's not available.
resetState := func() {
// Short circuit if the genesis state is already present.
if bc.HasState(bc.genesisBlock.Root()) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
}

updateFn := func(db ethdb.KeyValueWriter, header *types.Header) (uint64, bool) {
// Rewind the blockchain, ensuring we don't end up with a stateless head
// block. Note, depth equality is permitted to allow using SetHead as a
Expand All @@ -717,7 +729,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
resetState()
bc.resetState()
} else {
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
Expand Down Expand Up @@ -748,7 +760,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bo
}
if beyondRoot || newHeadBlock.NumberU64() == 0 {
if newHeadBlock.NumberU64() == 0 {
resetState()
bc.resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
// Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here.
Expand Down
44 changes: 33 additions & 11 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// but until snap becomes prevalent, we should support both. TODO(karalabe).
if mode == SnapSync {
if !d.snapSync {
// Snap sync will directly modify the persistent state, making the entire
// trie database unusable until the state is fully synced. To prevent any
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
d.blockchain.TrieDB().Reset(types.EmptyRootHash)
}
// Snap sync uses the snapshot namespace to store potentially flakey data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean
// time to prevent access.
Expand Down Expand Up @@ -1777,17 +1784,30 @@ func (d *Downloader) processFastSyncContent() error {

// To cater for moving pivot points, track the pivot block and subsequently
// accumulated download results separately.
//
// These will be nil up to the point where we reach the pivot, and will only
// be set temporarily if the synced blocks are piling up, but the pivot is
// still busy downloading. In that case, we need to occasionally check for
// pivot moves, so need to unblock the loop. These fields will accumulate
// the results in the meantime.
//
// Note, there's no issue with memory piling up since after 64 blocks the
// pivot will forcefully move so these accumulators will be dropped.
var (
oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot
)
for {
// Wait for the next batch of downloaded data to be available, and if the pivot
// block became stale, move the goalpost
results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
// Wait for the next batch of downloaded data to be available. If we have
// not yet reached the pivot point, wait blockingly as there's no need to
// spin-loop check for pivot moves. If we reached the pivot but have not
// yet processed it, check for results async, so we might notice pivot
// moves while state syncing. If the pivot was passed fully, block again
// as there's no more reason to check for pivot moves at all.
results := d.queue.Results(oldPivot == nil)
if len(results) == 0 {
// If pivot sync is done, stop
if oldPivot == nil {
if atomic.LoadInt32(&d.committed) == 1 {
return sync.Cancel()
}
// If sync failed, stop
Expand All @@ -1807,21 +1827,23 @@ func (d *Downloader) processFastSyncContent() error {
pivot := d.pivotHeader
d.pivotLock.RUnlock()

if oldPivot == nil {
if pivot.Root != sync.root {
sync.Cancel()
sync = d.syncState(pivot.Root)
if oldPivot == nil { // no results piling up, we can move the pivot
if atomic.LoadInt32(&d.committed) == 0 { // not yet passed the pivot, we can move the pivot
if pivot.Root != sync.root { // pivot position changed, we can move the pivot
sync.Cancel()
sync = d.syncState(pivot.Root)

go closeOnErr(sync)
go closeOnErr(sync)
}
}
} else {
} else { // results already piled up, consume before handling pivot move
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
}
// Split around the pivot block and process the two sides via fast/full sync
if atomic.LoadInt32(&d.committed) == 0 {
latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a
// become stale in the network, and it was garbage collected, move to a
// new pivot.
//
// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
Expand Down

0 comments on commit 7a22b9a

Please sign in to comment.