Skip to content

Commit

Permalink
Merge pull request #6603 from multiversx/MX-16107-no-more-notify
Browse files Browse the repository at this point in the history
Don't notify cache about nonces; pass the account nonce provider, instead
  • Loading branch information
andreibancioiu authored Nov 27, 2024
2 parents 4d7e013 + 25b2594 commit e04679e
Show file tree
Hide file tree
Showing 75 changed files with 491 additions and 918 deletions.
4 changes: 2 additions & 2 deletions consensus/broadcast/delayedBroadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byt
)

alarmsToCancel := make([]string, 0)
dbb.mutDataForBroadcast.RLock()
dbb.mutDataForBroadcast.Lock()
for i, broadcastData := range dbb.valHeaderBroadcastData {
samePrevRandSeed := bytes.Equal(broadcastData.header.GetPrevRandSeed(), headerHandler.GetPrevRandSeed())
sameRound := broadcastData.header.GetRound() == headerHandler.GetRound()
Expand All @@ -663,7 +663,7 @@ func (dbb *delayedBlockBroadcaster) interceptedHeader(_ string, headerHash []byt
}
}

dbb.mutDataForBroadcast.RUnlock()
dbb.mutDataForBroadcast.Unlock()

for _, alarmID := range alarmsToCancel {
dbb.alarm.Cancel(alarmID)
Expand Down
6 changes: 0 additions & 6 deletions dataRetriever/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,6 @@ var ErrCacheConfigInvalidSize = errors.New("cache parameter [size] is not valid,
// ErrCacheConfigInvalidShards signals that the cache parameter "shards" is invalid
var ErrCacheConfigInvalidShards = errors.New("cache parameter [shards] is not valid, it must be a positive number")

// ErrCacheConfigInvalidEconomics signals that an economics parameter required by the cache is invalid
var ErrCacheConfigInvalidEconomics = errors.New("cache-economics parameter is not valid")

// ErrCacheConfigInvalidSharding signals that a sharding parameter required by the cache is invalid
var ErrCacheConfigInvalidSharding = errors.New("cache-sharding parameter is not valid")

Expand Down Expand Up @@ -265,6 +262,3 @@ var ErrNilValidatorInfoStorage = errors.New("nil validator info storage")

// ErrValidatorInfoNotFound signals that no validator info was found
var ErrValidatorInfoNotFound = errors.New("validator info not found")

// ErrNilAccountNonceProvider signals that a nil AccountNonceProvider has been provided
var ErrNilAccountNonceProvider = errors.New("nil account nonce provider")
20 changes: 9 additions & 11 deletions dataRetriever/factory/dataPoolFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ var log = logger.GetOrCreate("dataRetriever/factory")

// ArgsDataPool holds the arguments needed for NewDataPoolFromConfig function
type ArgsDataPool struct {
Config *config.Config
EconomicsData process.EconomicsDataHandler
ShardCoordinator sharding.Coordinator
Marshalizer marshal.Marshalizer
PathManager storage.PathManagerHandler
AccountNonceProvider dataRetriever.AccountNonceProvider
Config *config.Config
EconomicsData process.EconomicsDataHandler
ShardCoordinator sharding.Coordinator
Marshalizer marshal.Marshalizer
PathManager storage.PathManagerHandler
}

// NewDataPoolFromConfig will return a new instance of a PoolsHolder
Expand All @@ -64,11 +63,10 @@ func NewDataPoolFromConfig(args ArgsDataPool) (dataRetriever.PoolsHolder, error)
mainConfig := args.Config

txPool, err := txpool.NewShardedTxPool(txpool.ArgShardedTxPool{
Config: factory.GetCacherFromConfig(mainConfig.TxDataPool),
NumberOfShards: args.ShardCoordinator.NumberOfShards(),
SelfShardID: args.ShardCoordinator.SelfId(),
TxGasHandler: args.EconomicsData,
AccountNonceProvider: args.AccountNonceProvider,
Config: factory.GetCacherFromConfig(mainConfig.TxDataPool),
NumberOfShards: args.ShardCoordinator.NumberOfShards(),
SelfShardID: args.ShardCoordinator.SelfId(),
TxGasHandler: args.EconomicsData,
})
if err != nil {
return nil, fmt.Errorf("%w while creating the cache for the transactions", err)
Expand Down
17 changes: 5 additions & 12 deletions dataRetriever/factory/dataPoolFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ func TestNewDataPoolFromConfig_MissingDependencyShouldErr(t *testing.T) {
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
require.Equal(t, dataRetriever.ErrNilPathManager, err)

args = getGoodArgs()
args.AccountNonceProvider = nil
holder, err = NewDataPoolFromConfig(args)
require.Nil(t, holder)
require.ErrorContains(t, err, "nil account nonce provider while creating the cache for the transactions")
}

func TestNewDataPoolFromConfig_BadConfigShouldErr(t *testing.T) {
Expand Down Expand Up @@ -153,11 +147,10 @@ func getGoodArgs() ArgsDataPool {
config := testscommon.GetGeneralConfig()

return ArgsDataPool{
Config: &config,
EconomicsData: testEconomics,
ShardCoordinator: mock.NewMultipleShardsCoordinatorMock(),
Marshalizer: &mock.MarshalizerMock{},
PathManager: &testscommon.PathManagerStub{},
AccountNonceProvider: testscommon.NewAccountNonceProviderMock(),
Config: &config,
EconomicsData: testEconomics,
ShardCoordinator: mock.NewMultipleShardsCoordinatorMock(),
Marshalizer: &mock.MarshalizerMock{},
PathManager: &testscommon.PathManagerStub{},
}
}
8 changes: 0 additions & 8 deletions dataRetriever/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ type ShardedDataCacherNotifier interface {
RemoveSetOfDataFromPool(keys [][]byte, cacheId string)
ImmunizeSetOfDataAgainstEviction(keys [][]byte, cacheId string)
RemoveDataFromAllShards(key []byte)
ForgetAllAccountNoncesInMempool()
MergeShardStores(sourceCacheID, destCacheID string)
Clear()
ClearShardStore(cacheId string)
Expand Down Expand Up @@ -358,10 +357,3 @@ type PeerAuthenticationPayloadValidator interface {
ValidateTimestamp(payloadTimestamp int64) error
IsInterfaceNil() bool
}

// AccountNonceProvider defines the behavior of a component able to provide the nonce for an account
type AccountNonceProvider interface {
GetAccountNonce(accountKey []byte) (uint64, error)
SetAccountsAdapter(accountsAdapter state.AccountsAdapter) error
IsInterfaceNil() bool
}
32 changes: 19 additions & 13 deletions dataRetriever/requestHandlers/requestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/epochStart"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-logger-go"
logger "github.com/multiversx/mx-chain-logger-go"
)

var _ epochStart.RequestHandler = (*resolverRequestHandler)(nil)
Expand Down Expand Up @@ -571,10 +571,12 @@ func (rrh *resolverRequestHandler) RequestValidatorInfo(hash []byte) {
return
}

epoch := rrh.getEpoch()

log.Debug("requesting validator info messages from network",
"topic", common.ValidatorInfoTopic,
"hash", hash,
"epoch", rrh.epoch,
"epoch", epoch,
)

requester, err := rrh.requestersFinder.MetaChainRequester(common.ValidatorInfoTopic)
Expand All @@ -583,20 +585,20 @@ func (rrh *resolverRequestHandler) RequestValidatorInfo(hash []byte) {
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"hash", hash,
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}

rrh.whiteList.Add([][]byte{hash})

err = requester.RequestDataFromHash(hash, rrh.epoch)
err = requester.RequestDataFromHash(hash, epoch)
if err != nil {
log.Debug("RequestValidatorInfo.RequestDataFromHash",
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"hash", hash,
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand All @@ -611,10 +613,12 @@ func (rrh *resolverRequestHandler) RequestValidatorsInfo(hashes [][]byte) {
return
}

epoch := rrh.getEpoch()

log.Debug("requesting validator info messages from network",
"topic", common.ValidatorInfoTopic,
"num hashes", len(unrequestedHashes),
"epoch", rrh.epoch,
"epoch", epoch,
)

requester, err := rrh.requestersFinder.MetaChainRequester(common.ValidatorInfoTopic)
Expand All @@ -623,7 +627,7 @@ func (rrh *resolverRequestHandler) RequestValidatorsInfo(hashes [][]byte) {
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"num hashes", len(unrequestedHashes),
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand All @@ -636,13 +640,13 @@ func (rrh *resolverRequestHandler) RequestValidatorsInfo(hashes [][]byte) {

rrh.whiteList.Add(unrequestedHashes)

err = validatorInfoRequester.RequestDataFromHashArray(unrequestedHashes, rrh.epoch)
err = validatorInfoRequester.RequestDataFromHashArray(unrequestedHashes, epoch)
if err != nil {
log.Debug("RequestValidatorInfo.RequestDataFromHash",
"error", err.Error(),
"topic", common.ValidatorInfoTopic,
"num hashes", len(unrequestedHashes),
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand Down Expand Up @@ -827,11 +831,13 @@ func (rrh *resolverRequestHandler) GetNumPeersToQuery(key string) (int, int, err

// RequestPeerAuthenticationsByHashes asks for peer authentication messages from specific peers hashes
func (rrh *resolverRequestHandler) RequestPeerAuthenticationsByHashes(destShardID uint32, hashes [][]byte) {
epoch := rrh.getEpoch()

log.Debug("requesting peer authentication messages from network",
"topic", common.PeerAuthenticationTopic,
"shard", destShardID,
"num hashes", len(hashes),
"epoch", rrh.epoch,
"epoch", epoch,
)

requester, err := rrh.requestersFinder.MetaChainRequester(common.PeerAuthenticationTopic)
Expand All @@ -840,7 +846,7 @@ func (rrh *resolverRequestHandler) RequestPeerAuthenticationsByHashes(destShardI
"error", err.Error(),
"topic", common.PeerAuthenticationTopic,
"shard", destShardID,
"epoch", rrh.epoch,
"epoch", epoch,
)
return
}
Expand All @@ -851,13 +857,13 @@ func (rrh *resolverRequestHandler) RequestPeerAuthenticationsByHashes(destShardI
return
}

err = peerAuthRequester.RequestDataFromHashArray(hashes, rrh.epoch)
err = peerAuthRequester.RequestDataFromHashArray(hashes, epoch)
if err != nil {
log.Debug("RequestPeerAuthenticationsByHashes.RequestDataFromHashArray",
"error", err.Error(),
"topic", common.PeerAuthenticationTopic,
"shard", destShardID,
"epoch", rrh.epoch,
"epoch", epoch,
)
}
}
18 changes: 13 additions & 5 deletions dataRetriever/shardedData/shardedData.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/counting"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/dataRetriever"
Expand Down Expand Up @@ -161,14 +162,25 @@ func (sd *shardedData) RemoveSetOfDataFromPool(keys [][]byte, cacheID string) {
return
}

stopWatch := core.NewStopWatch()
stopWatch.Start("removal")

numRemoved := 0
for _, key := range keys {
if store.cache.RemoveWithResult(key) {
numRemoved++
}
}

log.Trace("shardedData.removeTxBulk()", "name", sd.name, "cacheID", cacheID, "numToRemove", len(keys), "numRemoved", numRemoved)
stopWatch.Stop("removal")

log.Debug("shardedData.removeTxBulk",
"name", sd.name,
"cacheID", cacheID,
"numToRemove", len(keys),
"numRemoved", numRemoved,
"duration", stopWatch.GetMeasurement("removal"),
)
}

// ImmunizeSetOfDataAgainstEviction marks the items as non-evictable
Expand All @@ -178,10 +190,6 @@ func (sd *shardedData) ImmunizeSetOfDataAgainstEviction(keys [][]byte, cacheID s
log.Trace("shardedData.ImmunizeSetOfDataAgainstEviction()", "name", sd.name, "cacheID", cacheID, "len(keys)", len(keys), "numNow", numNow, "numFuture", numFuture)
}

// ForgetAllAccountNoncesInMempool does nothing
func (sd *shardedData) ForgetAllAccountNoncesInMempool() {
}

// RemoveData will remove data hash from the corresponding shard store
func (sd *shardedData) RemoveData(key []byte, cacheID string) {
store := sd.shardStore(cacheID)
Expand Down
12 changes: 4 additions & 8 deletions dataRetriever/txpool/argShardedTxPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (

// ArgShardedTxPool is the argument for ShardedTxPool's constructor
type ArgShardedTxPool struct {
Config storageunit.CacheConfig
TxGasHandler txcache.TxGasHandler
AccountNonceProvider dataRetriever.AccountNonceProvider
NumberOfShards uint32
SelfShardID uint32
Config storageunit.CacheConfig
TxGasHandler txcache.TxGasHandler
NumberOfShards uint32
SelfShardID uint32
}

// TODO: Upon further analysis and brainstorming, add some sensible minimum accepted values for the appropriate fields.
Expand All @@ -40,9 +39,6 @@ func (args *ArgShardedTxPool) verify() error {
if check.IfNil(args.TxGasHandler) {
return fmt.Errorf("%w: TxGasHandler is not valid", dataRetriever.ErrNilTxGasHandler)
}
if check.IfNil(args.AccountNonceProvider) {
return dataRetriever.ErrNilAccountNonceProvider
}
if args.NumberOfShards == 0 {
return fmt.Errorf("%w: NumberOfShards is not valid", dataRetriever.ErrCacheConfigInvalidSharding)
}
Expand Down
2 changes: 0 additions & 2 deletions dataRetriever/txpool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ type txCache interface {
storage.Cacher

AddTx(tx *txcache.WrappedTransaction) (ok bool, added bool)
NotifyAccountNonce(accountKey []byte, nonce uint64)
ForgetAllAccountNonces()
GetByTxHash(txHash []byte) (*txcache.WrappedTransaction, bool)
RemoveTxByHash(txHash []byte) bool
ImmunizeTxsAgainstEviction(keys [][]byte)
Expand Down
24 changes: 11 additions & 13 deletions dataRetriever/txpool/memorytests/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/dataRetriever/txpool"
"github.com/multiversx/mx-chain-go/storage/storageunit"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/txcachemocks"
"github.com/stretchr/testify/require"
)
Expand All @@ -36,18 +35,18 @@ func TestShardedTxPool_MemoryFootprint(t *testing.T) {

journals = append(journals, runScenario(t, newScenario(200, 1, core.MegabyteSize, "0"), memoryAssertion{200, 200}, memoryAssertion{0, 1}))
journals = append(journals, runScenario(t, newScenario(10, 1000, 20480, "0"), memoryAssertion{190, 205}, memoryAssertion{1, 4}))
journals = append(journals, runScenario(t, newScenario(10000, 1, 1024, "0"), memoryAssertion{10, 16}, memoryAssertion{4, 10}))
journals = append(journals, runScenario(t, newScenario(1, 60000, 256, "0"), memoryAssertion{30, 40}, memoryAssertion{10, 16}))
journals = append(journals, runScenario(t, newScenario(10, 10000, 100, "0"), memoryAssertion{36, 52}, memoryAssertion{16, 24}))
journals = append(journals, runScenario(t, newScenario(10000, 1, 1024, "0"), memoryAssertion{10, 16}, memoryAssertion{0, 10}))
journals = append(journals, runScenario(t, newScenario(1, 60000, 256, "0"), memoryAssertion{30, 40}, memoryAssertion{10, 24}))
journals = append(journals, runScenario(t, newScenario(10, 10000, 100, "0"), memoryAssertion{36, 52}, memoryAssertion{16, 36}))
journals = append(journals, runScenario(t, newScenario(100000, 1, 1024, "0"), memoryAssertion{120, 138}, memoryAssertion{32, 60}))

// With larger memory footprint

journals = append(journals, runScenario(t, newScenario(100000, 3, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{80, 120}))
journals = append(journals, runScenario(t, newScenario(150000, 2, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{90, 140}))
journals = append(journals, runScenario(t, newScenario(100000, 3, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{80, 148}))
journals = append(journals, runScenario(t, newScenario(150000, 2, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{90, 160}))
journals = append(journals, runScenario(t, newScenario(300000, 1, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{100, 190}))
journals = append(journals, runScenario(t, newScenario(30, 10000, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{60, 90}))
journals = append(journals, runScenario(t, newScenario(300, 1000, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{60, 90}))
journals = append(journals, runScenario(t, newScenario(30, 10000, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{60, 132}))
journals = append(journals, runScenario(t, newScenario(300, 1000, 650, "0"), memoryAssertion{290, 335}, memoryAssertion{60, 148}))

// Scenarios where destination == me

Expand Down Expand Up @@ -111,11 +110,10 @@ func newPool() dataRetriever.ShardedDataCacherNotifier {
}

args := txpool.ArgShardedTxPool{
Config: config,
TxGasHandler: txcachemocks.NewTxGasHandlerMock(),
AccountNonceProvider: testscommon.NewAccountNonceProviderMock(),
NumberOfShards: 2,
SelfShardID: 0,
Config: config,
TxGasHandler: txcachemocks.NewTxGasHandlerMock(),
NumberOfShards: 2,
SelfShardID: 0,
}
pool, err := txpool.NewShardedTxPool(args)
if err != nil {
Expand Down
Loading

0 comments on commit e04679e

Please sign in to comment.