Skip to content

Commit

Permalink
fix(baseapp): introduce mutex to state (backport #18846) (#18863)
Browse files Browse the repository at this point in the history
Co-authored-by: Nikhil Vasan <[email protected]>
Co-authored-by: marbar3778 <[email protected]>
Co-authored-by: Aleksandr Bezobchuk <[email protected]>
  • Loading branch information
4 people authored Dec 22, 2023
1 parent 0930b39 commit 52c3db2
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 49 deletions.
85 changes: 43 additions & 42 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.ctx, *req.ConsensusParams)
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
Expand All @@ -81,28 +81,28 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.ctx = app.checkState.ctx.WithBlockHeader(initHeader).
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockHeader(initHeader).
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
}))
}()

if app.initChainer == nil {
return &abci.ResponseInitChain{}, nil
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(storetypes.NewInfiniteGasMeter())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))

res, err := app.initChainer(app.finalizeBlockState.ctx, req)
res, err := app.initChainer(app.finalizeBlockState.Context(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,7 +398,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
return nil, errors.New("PrepareProposal called with invalid height")
}

app.prepareProposalState.ctx = app.getContextForProposal(app.prepareProposalState.ctx, req.Height).
app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height).
WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithBlockTime(req.Time).
Expand All @@ -409,11 +409,11 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))

app.prepareProposalState.ctx = app.prepareProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.ctx))
app.prepareProposalState.SetContext(app.prepareProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -428,7 +428,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
}
}()

resp, err = app.prepareProposal(app.prepareProposalState.ctx, req)
resp, err = app.prepareProposal(app.prepareProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err)
return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil
Expand Down Expand Up @@ -486,7 +486,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
app.setState(execModeFinalize, header)
}

app.processProposalState.ctx = app.getContextForProposal(app.processProposalState.ctx, req.Height).
app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height).
WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithBlockTime(req.Time).
Expand All @@ -498,11 +498,11 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))

app.processProposalState.ctx = app.processProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.processProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.ctx))
app.processProposalState.SetContext(app.processProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -517,7 +517,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
}
}()

resp, err = app.processProposal(app.processProposalState.ctx, req)
resp, err = app.processProposal(app.processProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
Expand Down Expand Up @@ -557,7 +557,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
emptyHeader := cmtproto.Header{ChainID: app.chainID, Height: req.Height}
ms := app.cms.CacheMultiStore()
Expand Down Expand Up @@ -632,7 +632,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
emptyHeader := cmtproto.Header{ChainID: app.chainID, Height: req.Height}
ms := app.cms.CacheMultiStore()
Expand Down Expand Up @@ -675,7 +675,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r

// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context()
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
var events []abci.Event
Expand Down Expand Up @@ -711,7 +711,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}

// Context is now updated with Header information.
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
Expand All @@ -721,24 +721,24 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.ctx)).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(cometInfo{
Misbehavior: req.Misbehavior,
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: req.DecidedLastCommit,
})
}))

// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))

if app.checkState != nil {
app.checkState.ctx = app.checkState.ctx.
app.checkState.SetContext(app.checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash)
WithHeaderHash(req.Hash))
}

if err := app.preBlock(req); err != nil {
Expand All @@ -762,8 +762,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
Expand Down Expand Up @@ -804,7 +804,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.ctx)
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
if err != nil {
return nil, err
}
Expand All @@ -818,7 +818,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())

return &abci.ResponseFinalizeBlock{
Events: events,
Expand Down Expand Up @@ -866,7 +866,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons

// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.ctx, *req, *res); err != nil {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
}
}
Expand Down Expand Up @@ -900,11 +900,11 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
header := app.finalizeBlockState.ctx.BlockHeader()
header := app.finalizeBlockState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.ctx)
app.precommiter(app.finalizeBlockState.Context())
}

rms, ok := app.cms.(*rootmulti.Store)
Expand All @@ -920,7 +920,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()

Expand All @@ -940,7 +940,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
app.finalizeBlockState = nil

if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.ctx)
app.prepareCheckStater(app.checkState.Context())
}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
Expand Down Expand Up @@ -1105,7 +1105,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.ResponseQuery {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()

// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithBlockHeader(cmtproto.Header{}).WithHeaderInfo(coreheader.Info{})
Expand Down Expand Up @@ -1212,10 +1212,11 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e
}

// branch the commit multi-store for safety
ctx := sdk.NewContext(cacheMS, app.checkState.ctx.BlockHeader(), true, app.logger).
header := app.checkState.Context().BlockHeader()
ctx := sdk.NewContext(cacheMS, header, true, app.logger).
WithMinGasPrices(app.minGasPrices).
WithBlockHeight(height).
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit))
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(header)

if height != lastBlockHeight {
rms, ok := app.cms.(*rootmulti.Store)
Expand Down Expand Up @@ -1283,7 +1284,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}
Expand Down
12 changes: 6 additions & 6 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (app *BaseApp) setState(mode execMode, header cmtproto.Header) {

switch mode {
case execModeCheck:
baseState.ctx = baseState.ctx.WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)
baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices))
app.checkState = baseState

case execModePrepareProposal:
Expand Down Expand Up @@ -643,7 +643,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.ctx.
ctx := modeState.Context().
WithTxBytes(txBytes)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

Expand Down Expand Up @@ -681,7 +681,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context

func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
if app.preBlocker != nil {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
rsp, err := app.preBlocker(ctx, req)
if err != nil {
return err
Expand All @@ -693,7 +693,7 @@ func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.ctx = ctx
app.finalizeBlockState.SetContext(ctx)
}
}
return nil
Expand All @@ -706,7 +706,7 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
)

if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.ctx)
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
if err != nil {
return resp, err
}
Expand Down Expand Up @@ -768,7 +768,7 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock

if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.ctx)
eb, err := app.endBlocker(app.finalizeBlockState.Context())
if err != nil {
return endblock, err
}
Expand Down
15 changes: 14 additions & 1 deletion baseapp/state.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package baseapp

import (
"sync"

storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

type state struct {
ms storetypes.CacheMultiStore
ms storetypes.CacheMultiStore

mtx sync.RWMutex
ctx sdk.Context
}

Expand All @@ -17,7 +21,16 @@ func (st *state) CacheMultiStore() storetypes.CacheMultiStore {
return st.ms.CacheMultiStore()
}

// SetContext updates the state's context to the context provided.
func (st *state) SetContext(ctx sdk.Context) {
st.mtx.Lock()
defer st.mtx.Unlock()
st.ctx = ctx
}

// Context returns the Context of the state.
func (st *state) Context() sdk.Context {
st.mtx.RLock()
defer st.mtx.RUnlock()
return st.ctx
}

0 comments on commit 52c3db2

Please sign in to comment.