Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate megabundle as soon as it's received #112

Merged
merged 7 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,14 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
mevBundles []types.MevBundle
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
mevBundles []types.MevBundle
megabundles map[common.Address]types.MevBundle // One megabundle per each trusted relay
NewMegabundleHooks []func(common.Address, *types.MevBundle)
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
Expand Down Expand Up @@ -630,13 +631,20 @@ func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactio
return errors.New("megabundle from non-trusted address")
}

pool.megabundles[relayAddr] = types.MevBundle{
megabundle := types.MevBundle{
Txs: txs,
BlockNumber: blockNumber,
MinTimestamp: minTimestamp,
MaxTimestamp: maxTimestamp,
RevertingTxHashes: revertingTxHashes,
}

pool.megabundles[relayAddr] = megabundle

for _, hook := range pool.NewMegabundleHooks {
go hook(relayAddr, &megabundle)
}

return nil
}

Expand Down
29 changes: 22 additions & 7 deletions miner/multi_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,31 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons
}))
}

relayWorkerMap := make(map[common.Address]*worker)

for i := 0; i < len(config.TrustedRelays); i++ {
workers = append(workers,
newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
}))
relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, merger, &flashbotsData{
isFlashbots: true,
isMegabundleWorker: true,
queue: queue,
relayAddr: config.TrustedRelays[i],
})
workers = append(workers, relayWorker)
relayWorkerMap[config.TrustedRelays[i]] = relayWorker
}

eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) {
worker, found := relayWorkerMap[relayAddr]
if !found {
return
}

select {
case worker.newMegabundleCh <- megabundle:
default:
}
})

log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers))
return &multiWorker{
regularWorker: regularWorker,
Expand Down
60 changes: 44 additions & 16 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type worker struct {
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust
newMegabundleCh chan *types.MevBundle

wg sync.WaitGroup

Expand Down Expand Up @@ -240,15 +241,17 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
newWorkCh: make(chan *newWorkReq, 1),
taskCh: taskCh,
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: exitCh,
startCh: make(chan struct{}, 1),
newMegabundleCh: make(chan *types.MevBundle),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
flashbots: flashbots,
}

// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
Expand Down Expand Up @@ -391,26 +394,38 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of mining.
runningInterrupt *int32 // Running task interrupt
queuedInterrupt *int32 // Queued task interrupt
minRecommit = recommit // minimal resubmit interval specified by user.
timestamp int64 // timestamp for each round of mining.
)

timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // discard the initial tick

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
// commit aborts in-flight transaction execution with highest seen signal and resubmits a new one
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
case queuedRequest := <-w.newWorkCh:
// Previously queued request wasn't started yet, update the request and resubmit
queuedRequest.noempty = queuedRequest.noempty || noempty
queuedRequest.timestamp = timestamp
w.newWorkCh <- queuedRequest // guaranteed to be nonblocking
default:
// Previously queued request has already started, cycle interrupt pointer and submit new work
runningInterrupt = queuedInterrupt
queuedInterrupt = new(int32)

w.newWorkCh <- &newWorkReq{interrupt: queuedInterrupt, noempty: noempty, timestamp: timestamp} // guaranteed to be nonblocking
}

if runningInterrupt != nil && s > atomic.LoadInt32(runningInterrupt) {
atomic.StoreInt32(runningInterrupt, s)
}

timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
Expand All @@ -437,6 +452,11 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
timestamp = time.Now().Unix()
commit(false, commitInterruptNewHead)

case <-w.newMegabundleCh:
if w.isRunning() {
commit(true, commitInterruptNone)
}

case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in
// higher priced transactions. Disable this overhead for pending blocks.
Expand Down Expand Up @@ -500,7 +520,10 @@ func (w *worker) mainLoop() {
for {
select {
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
// Don't start if the work has already been interrupted
if req.interrupt == nil || atomic.LoadInt32(req.interrupt) == commitInterruptNone {
w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
}

case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
Expand Down Expand Up @@ -761,10 +784,10 @@ func (w *worker) generateEnv(parent *types.Block, header *types.Header) (*enviro
// makeCurrent creates a new environment for the current cycle.
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
env, err := w.generateEnv(parent, header)
env.state.StartPrefetcher("miner")
if err != nil {
return err
}
env.state.StartPrefetcher("miner")

// Swap out the old work with the new one, terminating any leftover prefetcher
// processes in the mean time and starting a new one.
Expand Down Expand Up @@ -869,7 +892,6 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
Expand All @@ -881,8 +903,11 @@ func (w *worker) commitBundle(txs types.Transactions, coinbase common.Address, i
ratio: ratio,
inc: true,
}
return false
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead

// Discard the work as new head is present
return true
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -982,7 +1007,6 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
Expand All @@ -994,8 +1018,11 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin
ratio: ratio,
inc: true,
}
return false
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead

// Discard the work as new head is present
return true
}
// If we don't have enough gas for any further transactions then we're done
if w.current.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -1223,6 +1250,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
if err != nil {
return // no valid megabundle for this relay, nothing to do
}

// Flashbots bundle merging duplicates work by simulating TXes and then committing them once more.
// Megabundles API focuses on speed and runs everything in one cycle.
coinbaseBalanceBefore := w.current.state.GetBalance(w.coinbase)
Expand Down