Skip to content

Commit

Permalink
[Txpool] Prune inactive account enqueued transactions when promotion …
Browse files Browse the repository at this point in the history
…outdated (#153)

* Upgrade default outdate duration to 1 hour

* Refactor txpool associated contantants to one package

* Refactor txpool associated constants to one file

* Re-enable pruning outdated account transaction feature

* Extract account function

* More refactor

* More efficient codes when prune stale account

* More clear test case comments

* Add pruneStaleAccounts benchmark

* Fix limnt error
  • Loading branch information
DarianShawn authored Sep 1, 2022
1 parent 96ffb10 commit 9e509f5
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 63 deletions.
19 changes: 7 additions & 12 deletions command/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@ package command
import "github.com/dogechain-lab/dogechain/server"

const (
DefaultGenesisFileName = "genesis.json"
DefaultChainName = "dogechain"
DefaultChainID = 100
DefaultPremineBalance = "0x3635C9ADC5DEA00000" // 1000 ETH
DefaultConsensus = server.IBFTConsensus
DefaultPriceLimit = 0
DefaultMaxSlots = 4096
DefaultMaxAccountDemotions = 10 // account demotion counter limit
DefaultPruneTickSeconds = 300 // ticker duration for pruning account future transactions
DefaultPromoteOutdateSeconds = 1800 // not promoted account for a long time would be pruned
DefaultGenesisGasUsed = 458752 // 0x70000
DefaultGenesisGasLimit = 5242880 // 0x500000
DefaultGenesisFileName = "genesis.json"
DefaultChainName = "dogechain"
DefaultChainID = 100
DefaultPremineBalance = "0x3635C9ADC5DEA00000" // 1000 ETH
DefaultConsensus = server.IBFTConsensus
DefaultGenesisGasUsed = 458752 // 0x70000
DefaultGenesisGasLimit = 5242880 // 0x500000
)

const (
Expand Down
13 changes: 6 additions & 7 deletions command/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"io/ioutil"
"strings"

"github.com/dogechain-lab/dogechain/command"
"github.com/dogechain-lab/dogechain/jsonrpc"
"github.com/dogechain-lab/dogechain/network"

"github.com/dogechain-lab/dogechain/txpool"
"github.com/hashicorp/hcl"
)

Expand Down Expand Up @@ -86,11 +85,11 @@ func DefaultConfig() *Config {
Telemetry: &Telemetry{},
ShouldSeal: false,
TxPool: &TxPool{
PriceLimit: command.DefaultPriceLimit,
MaxSlots: command.DefaultMaxSlots,
MaxAccountDemotions: command.DefaultMaxAccountDemotions,
PruneTickSeconds: command.DefaultPruneTickSeconds,
PromoteOutdateSeconds: command.DefaultPromoteOutdateSeconds,
PriceLimit: 0,
MaxSlots: txpool.DefaultMaxSlots,
MaxAccountDemotions: txpool.DefaultMaxAccountDemotions,
PruneTickSeconds: txpool.DefaultPruneTickSeconds,
PromoteOutdateSeconds: txpool.DefaultPromoteOutdateSeconds,
},
LogLevel: "INFO",
RestoreFile: "",
Expand Down
9 changes: 5 additions & 4 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dogechain-lab/dogechain/command"
"github.com/dogechain-lab/dogechain/crypto"
"github.com/dogechain-lab/dogechain/helper/daemon"
"github.com/dogechain-lab/dogechain/txpool"
"github.com/howeyc/gopass"
"github.com/spf13/cobra"

Expand Down Expand Up @@ -174,28 +175,28 @@ func setFlags(cmd *cobra.Command) {
cmd.Flags().Uint64Var(
&params.rawConfig.TxPool.MaxSlots,
maxSlotsFlag,
command.DefaultMaxSlots,
txpool.DefaultMaxSlots,
"maximum slots in the pool",
)

cmd.Flags().Uint64Var(
&params.rawConfig.TxPool.MaxAccountDemotions,
maxAccountDemotionsFlag,
command.DefaultMaxAccountDemotions,
txpool.DefaultMaxAccountDemotions,
"maximum account demontion counter limit in the pool",
)

cmd.Flags().Uint64Var(
&params.rawConfig.TxPool.PruneTickSeconds,
pruneTickSecondsFlag,
command.DefaultPruneTickSeconds,
txpool.DefaultPruneTickSeconds,
"tick seconds for pruning account future transactions in the pool",
)

cmd.Flags().Uint64Var(
&params.rawConfig.TxPool.PromoteOutdateSeconds,
promoteOutdateSecondsFlag,
command.DefaultPromoteOutdateSeconds,
txpool.DefaultPromoteOutdateSeconds,
"account in the pool not promoted for a long time would be pruned",
)

Expand Down
36 changes: 27 additions & 9 deletions txpool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (m *accountsMap) initOnce(addr types.Address, nonce uint64) *account {
// set the nonce
newAccount.setNonce(nonce)

// set the timestamp for pruning
newAccount.lastPromoted = time.Now()
// set the timestamp for pruning. Reinit account should reset it.
newAccount.updatePromoted()

// update global count
atomic.AddUint64(&m.count, 1)
Expand Down Expand Up @@ -161,23 +161,31 @@ func (m *accountsMap) allTxs(includeEnqueued bool) (
}

func (m *accountsMap) pruneStaleEnqueuedTxs(outdateDuration time.Duration) []*types.Transaction {
pruned := make([]*types.Transaction, 0)
var (
pruned = make([]*types.Transaction, 0)
// use same time for faster comparison
outdateTimeBound = time.Now().Add(-1 * outdateDuration)
)

m.Range(func(_, value interface{}) bool {
account, ok := value.(*account)
if !ok {
// It shouldn't be. We just do some prevention work.
return false
}
// should not do anything, make things faster
if account.enqueued.length() == 0 {
return true
}

account.enqueued.lock(true)
defer account.enqueued.unlock()

if time.Since(account.lastPromoted) >= outdateDuration {
if account.IsOutdated(outdateTimeBound) {
// only lock the account when needed
account.enqueued.lock(true)
pruned = append(
pruned,
account.enqueued.clear()...,
)
account.enqueued.unlock()
}

return true
Expand Down Expand Up @@ -341,10 +349,20 @@ func (a *account) promote() (promoted []*types.Transaction, dropped []*types.Tra
// is higher than the one previously stored.
if nextNonce > currentNonce {
a.setNonce(nextNonce)
// only update the promotion timestamp when it is actually promoted.
a.updatePromoted()
}

// update timestamp for pruning
return
}

// updatePromoted updates promoted timestamp
func (a *account) updatePromoted() {
a.lastPromoted = time.Now()
}

return
// IsOutdated returns whether account was outdated comparing with the outdate time bound,
// the promoted timestamp before the bound is outdated.
func (a *account) IsOutdated(outdateTimeBound time.Time) bool {
return a.lastPromoted.Before(outdateTimeBound)
}
10 changes: 10 additions & 0 deletions txpool/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package txpool

const (
DefaultPruneTickSeconds = 300 // ticker duration for pruning account future transactions
DefaultPromoteOutdateSeconds = 3600 // not promoted account for a long time would be pruned
// txpool transaction max slots. tx <= 32kB would only take 1 slot. tx > 32kB would take
// ceil(tx.size / 32kB) slots.
DefaultMaxSlots = 4096
DefaultMaxAccountDemotions = 10 // account demotion counter limit
)
48 changes: 22 additions & 26 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
)

const (
txSlotSize = 32 * 1024 // 32kB
txMaxSize = 128 * 1024 //128Kb
topicNameV1 = "txpool/0.1"
defaultPruneTickSeconds = 300
defaultPromoteOutdateSeconds = 1800
txSlotSize = 32 * 1024 // 32kB
txMaxSize = 128 * 1024 //128Kb
topicNameV1 = "txpool/0.1"
)

// errors
Expand Down Expand Up @@ -194,14 +192,24 @@ func NewTxPool(
var (
pruneTickSeconds = config.PruneTickSeconds
promoteOutdateSeconds = config.PromoteOutdateSeconds
maxSlot = config.MaxSlots
maxAccountDemotions = config.MaxAccountDemotions
)

if pruneTickSeconds == 0 {
pruneTickSeconds = defaultPruneTickSeconds
pruneTickSeconds = DefaultPruneTickSeconds
}

if promoteOutdateSeconds == 0 {
promoteOutdateSeconds = defaultPromoteOutdateSeconds
promoteOutdateSeconds = DefaultPromoteOutdateSeconds
}

if maxSlot == 0 {
maxSlot = DefaultMaxSlots
}

if maxAccountDemotions == 0 {
maxAccountDemotions = DefaultMaxAccountDemotions
}

pool := &TxPool{
Expand All @@ -212,10 +220,10 @@ func NewTxPool(
accounts: accountsMap{},
executables: newPricedQueue(),
index: lookupMap{all: make(map[types.Hash]*types.Transaction)},
gauge: slotGauge{height: 0, max: config.MaxSlots},
gauge: slotGauge{height: 0, max: maxSlot},
priceLimit: config.PriceLimit,
sealing: config.Sealing,
maxAccountDemotions: config.MaxAccountDemotions,
maxAccountDemotions: maxAccountDemotions,
pruneTick: time.Second * time.Duration(pruneTickSeconds),
promoteOutdateDuration: time.Second * time.Duration(promoteOutdateSeconds),
}
Expand Down Expand Up @@ -256,9 +264,7 @@ func (p *TxPool) Start() {
// set default value of txpool transactions gauge
p.metrics.SetDefaultValue(0)

// p.pruneAccountTicker = time.NewTicker(p.pruneTick)
// case <-p.pruneAccountTicker.C:
// go p.pruneStaleAccounts()
p.pruneAccountTicker = time.NewTicker(p.pruneTick)

go func() {
for {
Expand All @@ -269,6 +275,8 @@ func (p *TxPool) Start() {
go p.handleEnqueueRequest(req)
case req := <-p.promoteReqCh:
go p.handlePromoteRequest(req)
case <-p.pruneAccountTicker.C:
go p.pruneStaleAccounts()
}
}
}()
Expand Down Expand Up @@ -826,24 +834,12 @@ func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) {
// prune pool state
if len(allPrunedPromoted) > 0 {
cleanup(allPrunedPromoted...)
p.eventManager.signalEvent(
proto.EventType_PRUNED_PROMOTED,
toHash(allPrunedPromoted...)...,
)

// update metrics
p.metrics.PendingTxs.Add(float64(-1 * len(allPrunedPromoted)))
p.decreaseQueueGauge(allPrunedPromoted, p.metrics.PendingTxs, proto.EventType_PRUNED_PROMOTED)
}

if len(allPrunedEnqueued) > 0 {
cleanup(allPrunedEnqueued...)
p.eventManager.signalEvent(
proto.EventType_PRUNED_ENQUEUED,
toHash(allPrunedEnqueued...)...,
)

// update metrics
p.metrics.EnqueueTxs.Add(float64(-1 * len(allPrunedEnqueued)))
p.decreaseQueueGauge(allPrunedEnqueued, p.metrics.EnqueueTxs, proto.EventType_PRUNED_ENQUEUED)
}
}

Expand Down
69 changes: 64 additions & 5 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ecdsa"
"crypto/rand"
"math/big"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -94,8 +95,8 @@ func newTestPoolWithSlots(maxSlots uint64, mockStore ...store) (*TxPool, error)
MaxSlots: maxSlots,
Sealing: false,
MaxAccountDemotions: defaultMaxAccountDemotions,
PruneTickSeconds: defaultPruneTickSeconds,
PromoteOutdateSeconds: defaultPromoteOutdateSeconds,
PruneTickSeconds: DefaultPruneTickSeconds,
PromoteOutdateSeconds: DefaultPromoteOutdateSeconds,
},
)
}
Expand Down Expand Up @@ -1276,23 +1277,26 @@ func TestDemote(t *testing.T) {
})
}

func TestEnqueuedPruning(t *testing.T) {
func TestTxpool_PruneStaleAccounts(t *testing.T) {
t.Parallel()

testTable := []struct {
name string
futureTx *types.Transaction
lastPromoted time.Time
expectedTxCount uint64
expectedGauge uint64
}{
{
"prune stale tx",
time.Now().Add(-time.Second * defaultPromoteOutdateSeconds),
newTx(addr1, 3, 1),
time.Now().Add(-time.Second * DefaultPromoteOutdateSeconds),
0,
0,
},
{
"no stale tx to prune",
newTx(addr1, 5, 1),
time.Now().Add(-5 * time.Second),
1,
1,
Expand All @@ -1310,7 +1314,8 @@ func TestEnqueuedPruning(t *testing.T) {
pool.SetSigner(&mockSigner{})

go func() {
err := pool.addTx(local, newTx(addr1, 5, 1))
// add a future nonce tx
err := pool.addTx(local, test.futureTx)
assert.NoError(t, err)
}()
pool.handleEnqueueRequest(<-pool.enqueueReqCh)
Expand All @@ -1331,6 +1336,60 @@ func TestEnqueuedPruning(t *testing.T) {
}
}

func BenchmarkPruneStaleAccounts1KAccounts(b *testing.B) { benchmarkPruneStaleAccounts(b, 1000) }
func BenchmarkPruneStaleAccounts10KAccounts(b *testing.B) { benchmarkPruneStaleAccounts(b, 10000) }
func BenchmarkPruneStaleAccounts100KAccounts(b *testing.B) { benchmarkPruneStaleAccounts(b, 100000) }

func benchmarkPruneStaleAccounts(b *testing.B, accountSize int) {
b.Helper()

pool, err := newTestPoolWithSlots(uint64(accountSize + 1))
assert.NoError(b, err)

pool.SetSigner(&mockSigner{})
pool.pruneTick = time.Second // check on every second

pool.Start()
defer pool.Close()

var (
addresses = make([]types.Address, accountSize)
lastPromotedTime = time.Now()
)

for i := 0; i < accountSize; i++ {
addresses[i] = types.StringToAddress("0x" + strconv.FormatInt(int64(1024+i), 16))
addr := addresses[i]
// add enough future tx
err := pool.addTx(local, newTx(addr, uint64(10+i), 1))
if !assert.NoError(b, err, "add tx failed") {
b.FailNow()
}
// set the account lastPromoted to the same timestamp
if !pool.accounts.exists(addr) {
pool.createAccountOnce(addr)
}

pool.accounts.get(addr).lastPromoted = lastPromotedTime
}

// mark all transactions outdated
pool.promoteOutdateDuration = time.Since(lastPromotedTime) - time.Millisecond

// Reset the benchmark and measure pruning task
b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
// benchmark pruning task
pool.pruneStaleAccounts()
}

promoted, enqueued := pool.GetTxs(true)
assert.Len(b, promoted, 0)
assert.Len(b, enqueued, 0)
}

/* "Integrated" tests */

// The following tests ensure that the pool's inner event loop
Expand Down

0 comments on commit 9e509f5

Please sign in to comment.