Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/roothash: Batch history reindex writes #6050

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/6069.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/roothash: Optimize runtime history reindex

During runtime history reindex, we batch writes resulting in significant
speed-up of history reindex.
216 changes: 138 additions & 78 deletions go/consensus/cometbft/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
runtimeRegistry "github.com/oasisprotocol/oasis-core/go/runtime/registry"
)

const crashPointBlockBeforeIndex = "roothash.before_index"
const (
crashPointBlockBeforeIndex = "roothash.before_index"
reindexWriteBatchSize = 1000
)

// ServiceClient is the roothash service client interface.
type ServiceClient interface {
Expand Down Expand Up @@ -387,7 +390,7 @@ func (sc *serviceClient) getRuntimeNotifiers(id common.Namespace) *runtimeBroker
return notifiers
}

func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory) (uint64, error) {
func (sc *serviceClient) reindexBlocks(ctx context.Context, currentHeight int64, bh api.BlockHistory) (uint64, error) {
lastRound := api.RoundInvalid
if currentHeight <= 0 {
return lastRound, nil
Expand All @@ -402,15 +405,15 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory)
sc.logger.Error("failed to get last indexed height",
"err", err,
)
return lastRound, fmt.Errorf("failed to get last indexed height: %w", err)
return 0, fmt.Errorf("failed to get last indexed height: %w", err)
}
// +1 since we want the last non-seen height.
lastHeight++

// Take prune strategy into account.
lastRetainedHeight, err := sc.backend.GetLastRetainedVersion(sc.ctx)
lastRetainedHeight, err := sc.backend.GetLastRetainedVersion(ctx)
if err != nil {
return lastRound, fmt.Errorf("failed to get last retained height: %w", err)
return 0, fmt.Errorf("failed to get last retained height: %w", err)
}
if lastHeight < lastRetainedHeight {
logger.Debug("last height pruned, skipping until last retained",
Expand All @@ -421,32 +424,81 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory)
}

// Take initial genesis height into account.
genesisDoc, err := sc.backend.GetGenesisDocument(sc.ctx)
genesisDoc, err := sc.backend.GetGenesisDocument(ctx)
if err != nil {
return lastRound, fmt.Errorf("failed to get genesis document: %w", err)
return 0, fmt.Errorf("failed to get genesis document: %w", err)
}
if lastHeight < genesisDoc.Height {
lastHeight = genesisDoc.Height
}

// Scan all blocks between last indexed height and current height.
logger.Debug("reindexing blocks",
"last_indexed_height", lastHeight,
logger.Info("reindexing blocks",
"last_height", lastHeight,
"current_height", currentHeight,
logging.LogEvent, api.LogEventHistoryReindexing,
)

for height := lastHeight; height <= currentHeight; height++ {
for height := lastHeight; height <= currentHeight; height += reindexWriteBatchSize {
end := height + reindexWriteBatchSize - 1
if end > currentHeight {
end = currentHeight
}
last, err := sc.reindexBatch(ctx, runtimeID, bh, height, end)
if err != nil {
return 0, fmt.Errorf("failed to commit batch to history keeper: %w", err)
}
if last != api.RoundInvalid {
// New rounds indexed.
lastRound = last
}
}

if lastRound == api.RoundInvalid {
sc.logger.Debug("no new round reindexed, return latest known round")
switch blk, err := bh.GetCommittedBlock(ctx, api.RoundLatest); err {
case api.ErrNotFound:
case nil:
lastRound = blk.Header.Round
default:
return lastRound, fmt.Errorf("failed to get latest block: %w", err)
}
}

sc.logger.Info("block reindex complete",
"last_round", lastRound,
)

return lastRound, nil
}

func (sc *serviceClient) reindexBatch(
ctx context.Context,
runtimeID common.Namespace,
bh api.BlockHistory,
start int64,
end int64,
) (uint64, error) {
sc.logger.Debug("reindexing batch",
"runtime_id", runtimeID,
"start", start,
"end", end,
)

lastRound := api.RoundInvalid
var blocks []*api.AnnotatedBlock
var roundResults []*api.RoundResults
for height := start; height <= end; height++ {
var results *cmtrpctypes.ResultBlockResults
results, err = sc.backend.GetBlockResults(sc.ctx, height)
results, err := sc.backend.GetBlockResults(ctx, height)
if err != nil {
// XXX: could soft-fail first few heights in case more heights were
// pruned right after the GetLastRetainedVersion query.
logger.Error("failed to get cometbft block results",
sc.logger.Error("failed to get cometbft block results",
"err", err,
"height", height,
)
return lastRound, fmt.Errorf("failed to get cometbft block results: %w", err)
return 0, fmt.Errorf("failed to get cometbft block results: %w", err)
Comment on lines +493 to +501
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking, if we have prunning set to only retain few blocks (possibly one), or the time interval is very small, could batching always receive an error? Let's say on my machine to process a batch it takes 200-300ms. Would it be better to defensively check what is the last retained round again if we receive an error, and simply update the latest height if it has changed?

cc @ptrus

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the minimum interval is 1 second currently, and in practice it's likely much more (e.g. in minutes).

Likely not problematic in practice, so I don't really have strong opinions on how to handle it.

}

// Index block.
Expand Down Expand Up @@ -481,7 +533,7 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory)
case eventsAPI.IsAttributeKind(key, &api.FinalizedEvent{}):
var e api.FinalizedEvent
if err = eventsAPI.DecodeValue(val, &e); err != nil {
logger.Error("failed to unmarshal finalized event",
sc.logger.Error("failed to unmarshal finalized event",
"err", err,
"height", height,
)
Expand All @@ -500,28 +552,29 @@ func (sc *serviceClient) reindexBlocks(currentHeight int64, bh api.BlockHistory)
if !evRtID.Equal(&runtimeID) {
continue
}
if err = sc.processFinalizedEvent(sc.ctx, height, *evRtID, &ev.Round, true); err != nil {
return 0, fmt.Errorf("failed to process finalized event: %w", err)

annBlk, rr, err := sc.fetchFinalizedRound(ctx, height, runtimeID, &ev.Round)
if err != nil {
return 0, fmt.Errorf("failed to fetch roothash finalized round: %w", err)
}
blocks = append(blocks, annBlk)
roundResults = append(roundResults, rr)

lastRound = ev.Round
}
}

if lastRound == api.RoundInvalid {
sc.logger.Debug("no new round reindexed, return latest known round")
switch blk, err := bh.GetCommittedBlock(sc.ctx, api.RoundLatest); err {
case api.ErrNotFound:
case nil:
lastRound = blk.Header.Round
default:
return lastRound, fmt.Errorf("failed to get latest block: %w", err)
}
// Do not notify watchers during history reindex.
err := bh.CommitBatch(blocks, roundResults)
if err != nil {
sc.logger.Error("failed to commit batch to history keeper",
"err", err,
"runtime_id", runtimeID,
"start", start,
"end", end,
)
return 0, err
}

sc.logger.Debug("block reindex complete",
"last_round", lastRound,
)

return lastRound, nil
}

Expand Down Expand Up @@ -573,7 +626,7 @@ func (sc *serviceClient) DeliverCommand(ctx context.Context, height int64, cmd i
}

// Emit latest block.
if err := sc.processFinalizedEvent(ctx, rs.LastBlockHeight, tr.runtimeID, nil, false); err != nil {
if err := sc.processFinalizedEvent(ctx, rs.LastBlockHeight, tr.runtimeID, nil); err != nil {
sc.logger.Warn("failed to emit latest block",
"err", err,
"runtime_id", tr.runtimeID,
Expand Down Expand Up @@ -607,7 +660,7 @@ func (sc *serviceClient) DeliverEvent(ctx context.Context, height int64, tx cmtt
if sc.trackedRuntime[ev.RuntimeID] == nil {
continue
}
if err = sc.processFinalizedEvent(ctx, height, ev.RuntimeID, &ev.Finalized.Round, false); err != nil { //nolint:gosec
if err = sc.processFinalizedEvent(ctx, height, ev.RuntimeID, &ev.Finalized.Round); err != nil { //nolint:gosec
return fmt.Errorf("roothash: failed to process finalized event: %w", err)
}
}
Expand All @@ -626,7 +679,6 @@ func (sc *serviceClient) processFinalizedEvent(
height int64,
runtimeID common.Namespace,
round *uint64,
isReindex bool,
) (err error) {
tr := sc.trackedRuntime[runtimeID]
if tr == nil {
Expand All @@ -650,39 +702,9 @@ func (sc *serviceClient) processFinalizedEvent(
}

// Process finalized event.
var blk *block.Block
if blk, err = sc.getLatestBlockAt(ctx, runtimeID, height); err != nil {
sc.logger.Error("failed to fetch latest block",
"err", err,
"height", height,
"runtime_id", runtimeID,
)
return fmt.Errorf("roothash: failed to fetch latest block: %w", err)
}
if round != nil && blk.Header.Round != *round {
sc.logger.Error("finalized event/query round mismatch",
"block_round", blk.Header.Round,
"event_round", *round,
)
return fmt.Errorf("roothash: finalized event/query round mismatch")
}

roundResults, err := sc.GetLastRoundResults(ctx, &api.RuntimeRequest{
RuntimeID: runtimeID,
Height: height,
})
annBlk, roundResults, err := sc.fetchFinalizedRound(ctx, height, runtimeID, round)
if err != nil {
sc.logger.Error("failed to fetch round results",
"err", err,
"height", height,
"runtime_id", runtimeID,
)
return fmt.Errorf("roothash: failed to fetch round results: %w", err)
}

annBlk := &api.AnnotatedBlock{
Height: height,
Block: blk,
return fmt.Errorf("failed to fetch roothash finalized round: %w", err)
}

// Commit the block to history if needed.
Expand All @@ -691,10 +713,10 @@ func (sc *serviceClient) processFinalizedEvent(

// Perform reindex if required.
lastRound := api.RoundInvalid
if !isReindex && !tr.reindexDone {
if !tr.reindexDone {
// Note that we need to reindex up to the previous height as the current height is
// already being processed right now.
if lastRound, err = sc.reindexBlocks(height-1, tr.blockHistory); err != nil {
if lastRound, err = sc.reindexBlocks(ctx, height-1, tr.blockHistory); err != nil {
sc.logger.Error("failed to reindex blocks",
"err", err,
"runtime_id", runtimeID,
Expand All @@ -707,45 +729,83 @@ func (sc *serviceClient) processFinalizedEvent(
// Only commit the block in case it was not already committed during reindex. Note that even
// in case we only reindex up to height-1 this can still happen on the first emitted block
// since that height is not guaranteed to be the one that contains a round finalized event.
if lastRound == api.RoundInvalid || blk.Header.Round > lastRound {
if lastRound == api.RoundInvalid || annBlk.Block.Header.Round > lastRound {
sc.logger.Debug("commit block",
"runtime_id", runtimeID,
"height", height,
"round", blk.Header.Round,
"round", annBlk.Block.Header.Round,
)

err = tr.blockHistory.Commit(annBlk, roundResults, !isReindex)
err = tr.blockHistory.Commit(annBlk, roundResults)
if err != nil {
sc.logger.Error("failed to commit block to history keeper",
"err", err,
"runtime_id", runtimeID,
"height", height,
"round", blk.Header.Round,
"round", annBlk.Block.Header.Round,
)
return fmt.Errorf("failed to commit block to history keeper: %w", err)
}
}
}

// Skip emitting events if we are reindexing.
if isReindex {
return nil
}

notifiers := sc.getRuntimeNotifiers(runtimeID)
// Ensure latest block is set.
notifiers.Lock()
notifiers.lastBlock = blk
notifiers.lastBlock = annBlk.Block
notifiers.lastBlockHeight = height
notifiers.Unlock()

sc.allBlockNotifier.Broadcast(blk)
sc.allBlockNotifier.Broadcast(annBlk.Block)
notifiers.blockNotifier.Broadcast(annBlk)
tr.height = height

return nil
}

func (sc *serviceClient) fetchFinalizedRound(
ctx context.Context,
height int64,
runtimeID common.Namespace,
round *uint64,
) (*api.AnnotatedBlock, *api.RoundResults, error) {
blk, err := sc.getLatestBlockAt(ctx, runtimeID, height)
if err != nil {
sc.logger.Error("failed to fetch latest block",
"err", err,
"height", height,
"runtime_id", runtimeID,
)
return nil, nil, fmt.Errorf("roothash: failed to fetch latest block: %w", err)
}
if round != nil && blk.Header.Round != *round {
sc.logger.Error("finalized event/query round mismatch",
"block_round", blk.Header.Round,
"event_round", *round,
)
return nil, nil, fmt.Errorf("roothash: finalized event/query round mismatch")
}

roundResults, err := sc.GetLastRoundResults(ctx, &api.RuntimeRequest{
RuntimeID: runtimeID,
Height: height,
})
if err != nil {
sc.logger.Error("failed to fetch round results",
"err", err,
"height", height,
"runtime_id", runtimeID,
)
return nil, nil, fmt.Errorf("roothash: failed to fetch round results: %w", err)
}

annBlk := &api.AnnotatedBlock{
Height: height,
Block: blk,
}
return annBlk, roundResults, nil
}

// EventsFromCometBFT extracts staking events from CometBFT events.
func EventsFromCometBFT(
tx cmttypes.Tx,
Expand Down
Loading
Loading