Skip to content

Commit

Permalink
allow forensics send msg to stats server (ethereum#95)
Browse files Browse the repository at this point in the history
* allow forensics send msg to stats server

* add test for forensics reporting mechanism
  • Loading branch information
wjrjerome authored May 25, 2022
1 parent ca6a645 commit 7effc71
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 27 deletions.
6 changes: 6 additions & 0 deletions consensus/XDPoS/XDPoS.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/engines/engine_v1"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/engines/engine_v2"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/event"

"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
Expand Down Expand Up @@ -65,6 +66,11 @@ type XDPoS struct {
EngineV2 *engine_v2.XDPoS_v2
}

// Subscribe to consensus engines forensics events. Currently only exist for engine v2
func (x *XDPoS) SubscribeForensicsEvent(ch chan<- types.ForensicsEvent) event.Subscription {
return x.EngineV2.ForensicsProcessor.SubscribeForensicsEvent(ch)
}

// New creates a XDPoS delegated-proof-of-stake consensus engine with the initial
// signers set to the ones provided by the user.
func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS {
Expand Down
6 changes: 3 additions & 3 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type XDPoS_v2 struct {
HookReward func(chain consensus.ChainReader, state *state.StateDB, parentState *state.StateDB, header *types.Header) (map[string]interface{}, error)
HookPenalty func(chain consensus.ChainReader, number *big.Int, parentHash common.Hash, candidates []common.Address) ([]common.Address, error)

forensics *Forensics
ForensicsProcessor *Forensics
}

func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *XDPoS_v2 {
Expand Down Expand Up @@ -107,7 +107,7 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *
},
highestVotedRound: types.Round(0),
highestCommitBlock: nil,
forensics: NewForensics(),
ForensicsProcessor: NewForensics(),
}
// Add callback to the timer
timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout
Expand Down Expand Up @@ -925,7 +925,7 @@ func (x *XDPoS_v2) commitBlocks(blockChainReader consensus.ChainReader, proposed
// Perform forensics related operation
var headerQcToBeCommitted []types.Header
headerQcToBeCommitted = append(headerQcToBeCommitted, *parentBlock, *proposedBlockHeader)
go x.forensics.ForensicsMonitoring(blockChainReader, x, headerQcToBeCommitted, *incomingQc)
go x.ForensicsProcessor.ForensicsMonitoring(blockChainReader, x, headerQcToBeCommitted, *incomingQc)
return true, nil
}
// Everything else, fail to commit
Expand Down
40 changes: 22 additions & 18 deletions consensus/XDPoS/engines/engine_v2/forensics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,32 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
)

const (
NUM_OF_FORENSICS_QC = 3
)

type ForensicsInfo struct {
HashPath []string // HashesTillSmallerRoundQc or HashesTillLargerRoundQc
QuorumCert types.QuorumCert
SignerAddresses []string
}

type ForensicProof struct {
SmallerRoundInfo *ForensicsInfo
LargerRoundInfo *ForensicsInfo
DivergingHash common.Hash
AcrossEpochs bool
}

// Forensics instance. Placeholder for future properties to be added
type Forensics struct {
HighestCommittedQCs []types.QuorumCert
forensicsFeed event.Feed
scope event.SubscriptionScope
}

// Initiate a forensics process
func NewForensics() *Forensics {
return &Forensics{}
}

// SubscribeForensicsEvent registers a subscription of ForensicsEvent and
// starts sending event to the given channel.
func (f *Forensics) SubscribeForensicsEvent(ch chan<- types.ForensicsEvent) event.Subscription {
return f.scope.Track(f.forensicsFeed.Subscribe(ch))
}

func (f *Forensics) ForensicsMonitoring(chain consensus.ChainReader, engine *XDPoS_v2, headerQcToBeCommitted []types.Header, incomingQC types.QuorumCert) error {
f.ProcessForensics(chain, engine, incomingQC)
return f.SetCommittedQCs(headerQcToBeCommitted, incomingQC)
Expand Down Expand Up @@ -132,7 +128,7 @@ func (f *Forensics) SendForensicProof(chain consensus.ChainReader, engine *XDPoS
lowerRoundQC := firstQc
higherRoundQC := secondQc

if (secondQc.ProposedBlockInfo.Round - firstQc.ProposedBlockInfo.Round) < 0 {
if secondQc.ProposedBlockInfo.Round < firstQc.ProposedBlockInfo.Round {
lowerRoundQC = secondQc
higherRoundQC = firstQc
}
Expand All @@ -146,28 +142,36 @@ func (f *Forensics) SendForensicProof(chain consensus.ChainReader, engine *XDPoS

// Check if two QCs are across epoch, this is used as a indicator for the "prone to attack" scenario
lowerRoundQcEpochSwitchInfo, err := engine.getEpochSwitchInfo(chain, nil, lowerRoundQC.ProposedBlockInfo.Hash)
if err != nil {
log.Error("[SendForensicProof] Errir while trying to find lowerRoundQcEpochSwitchInfo", "lowerRoundQC.ProposedBlockInfo.Hash", lowerRoundQC.ProposedBlockInfo.Hash, "err", err)
return err
}
higherRoundQcEpochSwitchInfo, err := engine.getEpochSwitchInfo(chain, nil, higherRoundQC.ProposedBlockInfo.Hash)
if err != nil {
log.Error("[SendForensicProof] Errir while trying to find higherRoundQcEpochSwitchInfo", "higherRoundQC.ProposedBlockInfo.Hash", higherRoundQC.ProposedBlockInfo.Hash, "err", err)
return err
}
accrossEpoches := false
if lowerRoundQcEpochSwitchInfo.EpochSwitchBlockInfo.Hash != higherRoundQcEpochSwitchInfo.EpochSwitchBlockInfo.Hash {
accrossEpoches = true
}

forensicsProof := &ForensicProof{
forensicsProof := &types.ForensicProof{
DivergingHash: ancestorHash,
AcrossEpochs: accrossEpoches,
SmallerRoundInfo: &ForensicsInfo{
SmallerRoundInfo: &types.ForensicsInfo{
HashPath: ancestorToLowerRoundPath,
QuorumCert: lowerRoundQC,
SignerAddresses: f.getQcSignerAddresses(lowerRoundQC),
},
LargerRoundInfo: &ForensicsInfo{
LargerRoundInfo: &types.ForensicsInfo{
HashPath: ancestorToHigherRoundPath,
QuorumCert: higherRoundQC,
SignerAddresses: f.getQcSignerAddresses(higherRoundQC),
},
}
// TODO: send to dedicated channel which will redirect to stats server
log.Info("Forensics proof report generated, sending to the stats server", forensicsProof)
go f.forensicsFeed.Send(types.ForensicsEvent{ForensicsProof: forensicsProof})
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/XDPoS/engines/engine_v2/testing_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ func (x *XDPoS_v2) AuthorizeFaker(signer common.Address) {
}

func (x *XDPoS_v2) GetForensicsFaker() *Forensics {
return x.forensics
return x.ForensicsProcessor
}
107 changes: 105 additions & 2 deletions consensus/tests/engine_v2_tests/forensics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,33 @@ func TestForensicsMonitoringNotOnSameChainButHaveSameRoundQC(t *testing.T) {
parentOfForkedHeader := blockchain.GetBlockByHash(currentForkBlock.ParentHash()).Header()
grandParentOfForkedHeader := blockchain.GetBlockByHash(parentOfForkedHeader.ParentHash).Header()
forkedHeaders = append(forkedHeaders, *grandParentOfForkedHeader, *parentOfForkedHeader)

// Set up forensics events trigger
forensicsEventCh := make(chan types.ForensicsEvent)
forensics.SubscribeForensicsEvent(forensicsEventCh)

err = forensics.ForensicsMonitoring(blockchain, blockchain.Engine().(*XDPoS.XDPoS).EngineV2, forkedHeaders, *incomingQC)
assert.Nil(t, err)
// TODO: Check SendForensicProof triggered

// Check SendForensicProof triggered
for {
select {
case forensics := <-forensicsEventCh:
assert.NotNil(t, forensics.ForensicsProof)
assert.False(t, forensics.ForensicsProof.AcrossEpochs)
assert.Equal(t, types.Round(13), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, uint64(913), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64())
assert.Equal(t, 9, len(forensics.ForensicsProof.SmallerRoundInfo.HashPath))
assert.Equal(t, 4, len(forensics.ForensicsProof.SmallerRoundInfo.SignerAddresses))
assert.Equal(t, types.Round(13), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, uint64(912), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64())
assert.Equal(t, 8, len(forensics.ForensicsProof.LargerRoundInfo.HashPath))
assert.Equal(t, 4, len(forensics.ForensicsProof.LargerRoundInfo.SignerAddresses))
return
case <-time.After(5 * time.Second):
t.FailNow()
}
}
}

func TestForensicsMonitoringNotOnSameChainDoNotHaveSameRoundQC(t *testing.T) {
Expand Down Expand Up @@ -190,7 +214,86 @@ func TestForensicsMonitoringNotOnSameChainDoNotHaveSameRoundQC(t *testing.T) {
grandParentOfForkedHeader := blockchain.GetBlockByHash(parentOfForkedHeader.ParentHash).Header()
forkedHeaders = append(forkedHeaders, *grandParentOfForkedHeader, *parentOfForkedHeader)

// Set up forensics events trigger
forensicsEventCh := make(chan types.ForensicsEvent)
forensics.SubscribeForensicsEvent(forensicsEventCh)

err = forensics.ForensicsMonitoring(blockchain, blockchain.Engine().(*XDPoS.XDPoS).EngineV2, forkedHeaders, *incomingQC)
assert.Nil(t, err)
// TODO: Check SendForensicProof triggered
// Check SendForensicProof triggered
for {
select {
case forensics := <-forensicsEventCh:
assert.NotNil(t, forensics.ForensicsProof)
assert.False(t, forensics.ForensicsProof.AcrossEpochs)
assert.Equal(t, types.Round(14), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, uint64(914), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64())
assert.Equal(t, 10, len(forensics.ForensicsProof.SmallerRoundInfo.HashPath))
assert.Equal(t, 4, len(forensics.ForensicsProof.SmallerRoundInfo.SignerAddresses))
assert.Equal(t, types.Round(16), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, uint64(906), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64())
assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.HashPath))
assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.SignerAddresses))
return
case <-time.After(5 * time.Second):
t.FailNow()
}
}
}

// "prone to attack" test where the "across epoch" field is true
func TestForensicsAcrossEpoch(t *testing.T) {
var numOfForks = new(int)
*numOfForks = 10
var forkRoundDifference = new(int)
*forkRoundDifference = 10
var forkedChainSignersKey []*ecdsa.PrivateKey
forkedChainSignersKey = append(forkedChainSignersKey, acc1Key)
blockchain, _, _, _, _, currentForkBlock := PrepareXDCTestBlockChainForV2Engine(t, 1801, params.TestXDPoSMockChainConfig, &ForkedBlockOptions{numOfForkedBlocks: numOfForks, forkedRoundDifference: forkRoundDifference, signersKey: forkedChainSignersKey})
forensics := blockchain.Engine().(*XDPoS.XDPoS).EngineV2.GetForensicsFaker()

// Now, let's try set committed blocks, where the highestedCommitted blocks are 1799, 1800 and 1801
var headers []types.Header
var decodedBlock1801ExtraField types.ExtraFields_v2
err := utils.DecodeBytesExtraFields(blockchain.GetHeaderByNumber(1801).Extra, &decodedBlock1801ExtraField)
assert.Nil(t, err)
err = forensics.SetCommittedQCs(append(headers, *blockchain.GetHeaderByNumber(1799), *blockchain.GetHeaderByNumber(1800)), *decodedBlock1801ExtraField.QuorumCert)
assert.Nil(t, err)

var decodedExtraField types.ExtraFields_v2
// Decode the QC from forking chain
err = utils.DecodeBytesExtraFields(currentForkBlock.Header().Extra, &decodedExtraField)
assert.Nil(t, err)

incomingQC := decodedExtraField.QuorumCert
var forkedHeaders []types.Header
parentOfForkedHeader := blockchain.GetBlockByHash(currentForkBlock.ParentHash()).Header()
grandParentOfForkedHeader := blockchain.GetBlockByHash(parentOfForkedHeader.ParentHash).Header()
forkedHeaders = append(forkedHeaders, *grandParentOfForkedHeader, *parentOfForkedHeader)

// Set up forensics events trigger
forensicsEventCh := make(chan types.ForensicsEvent)
forensics.SubscribeForensicsEvent(forensicsEventCh)

err = forensics.ForensicsMonitoring(blockchain, blockchain.Engine().(*XDPoS.XDPoS).EngineV2, forkedHeaders, *incomingQC)
assert.Nil(t, err)
// Check SendForensicProof triggered
for {
select {
case forensics := <-forensicsEventCh:
assert.NotNil(t, forensics.ForensicsProof)
assert.True(t, forensics.ForensicsProof.AcrossEpochs)
assert.Equal(t, types.Round(900), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, uint64(1800), forensics.ForensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64())
assert.Equal(t, 10, len(forensics.ForensicsProof.SmallerRoundInfo.HashPath))
assert.Equal(t, 4, len(forensics.ForensicsProof.SmallerRoundInfo.SignerAddresses))
assert.Equal(t, types.Round(902), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, uint64(1792), forensics.ForensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Number.Uint64())
assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.HashPath))
assert.Equal(t, 2, len(forensics.ForensicsProof.LargerRoundInfo.SignerAddresses))
return
case <-time.After(5 * time.Second):
t.FailNow()
}
}
}
20 changes: 20 additions & 0 deletions core/types/forensics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package types

import "github.com/XinFinOrg/XDPoSChain/common"

type ForensicsInfo struct {
HashPath []string // HashesTillSmallerRoundQc or HashesTillLargerRoundQc
QuorumCert QuorumCert
SignerAddresses []string
}

type ForensicProof struct {
SmallerRoundInfo *ForensicsInfo
LargerRoundInfo *ForensicsInfo
DivergingHash common.Hash
AcrossEpochs bool
}

type ForensicsEvent struct {
ForensicsProof *ForensicProof
}
50 changes: 47 additions & 3 deletions ethstats/ethstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/eth"
Expand All @@ -56,6 +57,12 @@ const (
chainHeadChanSize = 10
)

type consensusEngine interface {
// SubscribeForensicsEvent should return an event subscription of
// ForensicsEvent and send events to the given channel.
SubscribeForensicsEvent(chan<- types.ForensicsEvent) event.Subscription
}

type txPool interface {
// SubscribeTxPreEvent should return an event subscription of
// TxPreEvent and send events to the given channel.
Expand Down Expand Up @@ -140,9 +147,11 @@ func (s *Service) loop() {
// Subscribe to chain events to execute updates on
var blockchain blockChain
var txpool txPool
var engine consensusEngine
if s.eth != nil {
blockchain = s.eth.BlockChain()
txpool = s.eth.TxPool()
engine = s.eth.Engine().(*XDPoS.XDPoS)
} else {
blockchain = s.les.BlockChain()
txpool = s.les.TxPool()
Expand All @@ -156,18 +165,31 @@ func (s *Service) loop() {
txSub := txpool.SubscribeTxPreEvent(txEventCh)
defer txSub.Unsubscribe()

// Forensics events
forensicsEventCh := make(chan types.ForensicsEvent)
if engine != nil {
forensicsSub := engine.SubscribeForensicsEvent(forensicsEventCh)
defer forensicsSub.Unsubscribe()
}

// Start a goroutine that exhausts the subsciptions to avoid events piling up
var (
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
txCh = make(chan struct{}, 1)
quitCh = make(chan struct{})
headCh = make(chan *types.Block, 1)
txCh = make(chan struct{}, 1)
forensicsCh = make(chan *types.ForensicProof, 1)
)
go func() {
var lastTx mclock.AbsTime

HandleLoop:
for {
select {
case forensics := <-forensicsEventCh:
select {
case forensicsCh <- forensics.ForensicsProof:
default:
}
// Notify of chain head events, but drop if too frequent
case head := <-chainHeadCh:
select {
Expand Down Expand Up @@ -268,6 +290,10 @@ func (s *Service) loop() {
if err = s.reportPending(conn); err != nil {
log.Warn("Transaction stats report failed", "err", err)
}
case forensicsReport := <-forensicsCh:
if err = s.reportForensics(conn, forensicsReport); err != nil {
log.Error("Forensics proof stats report failed", "err", err)
}
}
}
// Make sure the connection is closed
Expand Down Expand Up @@ -519,6 +545,24 @@ func (s *Service) reportBlock(conn *websocket.Conn, block *types.Block) error {
return websocket.JSON.Send(conn, report)
}

// reportForensics forward the forensics repors it to the stats server.
func (s *Service) reportForensics(conn *websocket.Conn, forensicsProof *types.ForensicProof) error {
log.Info(
"Sending Forensics report to ethstats",
"SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Hash", forensicsProof.SmallerRoundInfo.QuorumCert.ProposedBlockInfo.Hash,
"LargerRoundInfo.QuorumCert.ProposedBlockInfo.Hash", forensicsProof.LargerRoundInfo.QuorumCert.ProposedBlockInfo.Hash,
)

stats := map[string]interface{}{
"id": s.node,
"forensicsProof": forensicsProof,
}
report := map[string][]interface{}{
"emit": {"forensics", stats},
}
return websocket.JSON.Send(conn, report)
}

// assembleBlockStats retrieves any required metadata to report a single block
// and assembles the block stats. If block is nil, the current head is processed.
func (s *Service) assembleBlockStats(block *types.Block) *blockStats {
Expand Down

0 comments on commit 7effc71

Please sign in to comment.