Skip to content

Commit

Permalink
[sycner] Remove different network peer (#239)
Browse files Browse the repository at this point in the history
# Description

The remote peer was not removed even if it was another (private)
network, and it might bring annoying messages to current node.

The PR fixes the issue, disconnect those different network peers, and
gives a more reasonable way of finding `BestPeer`: compare distance when
peers' block numbers are same.

# Changes include

- [x] Bugfix (non-breaking change that solves an issue)
- [x] New feature (non-breaking change that adds functionality)

## Testing

- [x] I have tested this code with the official test suite
- [x] I have tested this code manually

### Manual tests

1. Deploy a 4-nodes pos network, but only bring up 3 nodes.
2. Modify the last node with different `portland` hard fork height in
`Genesis`.
3. Bring the last node up.

The PR branch node would reject the block and disconnect peers. In
contrast, the base branch node do not disconnect peers, but instead try
to query blocks over and over.
  • Loading branch information
DarianShawn authored Nov 12, 2022
1 parent eeae5b4 commit 7e051a9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 12 deletions.
4 changes: 2 additions & 2 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,9 @@ func (s *Server) Close() error {
return err
}

// newProtoConnection opens up a new stream on the set protocol to the peer,
// NewProtoConnection opens up a new stream on the set protocol to the peer,
// and returns a reference to the connection
func (s *Server) newProtoConnection(protocol string, peerID peer.ID) (*rawGrpc.ClientConn, error) {
func (s *Server) NewProtoConnection(protocol string, peerID peer.ID) (*rawGrpc.ClientConn, error) {
s.protocolsLock.Lock()
defer s.protocolsLock.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions network/server_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *Server) NewDiscoveryClient(peerID peer.ID) (proto.DiscoveryClient, erro
}

// Create a new stream connection and return it
protoStream, err := s.newProtoConnection(common.DiscProto, peerID)
protoStream, err := s.NewProtoConnection(common.DiscProto, peerID)
if err != nil {
return nil, err
}
Expand All @@ -83,15 +83,15 @@ func (s *Server) NewDiscoveryClient(peerID peer.ID) (proto.DiscoveryClient, erro
// since they are referenced later on,
// if they are not temporary
if !isTemporaryDial {
s.saveProtocolStream(common.DiscProto, protoStream, peerID)
s.SaveProtocolStream(common.DiscProto, protoStream, peerID)
}

return proto.NewDiscoveryClient(protoStream), nil
}

// saveProtocolStream saves the protocol stream to the peer
// SaveProtocolStream saves the protocol stream to the peer
// protocol stream reference [Thread safe]
func (s *Server) saveProtocolStream(
func (s *Server) SaveProtocolStream(
protocol string,
stream *rawGrpc.ClientConn,
peerID peer.ID,
Expand Down
13 changes: 12 additions & 1 deletion network/server_identity.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package network

import (
"math/big"

"github.com/dogechain-lab/dogechain/network/common"
peerEvent "github.com/dogechain-lab/dogechain/network/event"
"github.com/dogechain-lab/dogechain/network/grpc"
"github.com/dogechain-lab/dogechain/network/identity"
"github.com/dogechain-lab/dogechain/network/proto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p-kbucket/keyspace"
rawGrpc "google.golang.org/grpc"
)

// NewIdentityClient returns a new identity service client connection
func (s *Server) NewIdentityClient(peerID peer.ID) (proto.IdentityClient, error) {
// Create a new stream connection and return it
protoStream, err := s.newProtoConnection(common.IdentityProto, peerID)
protoStream, err := s.NewProtoConnection(common.IdentityProto, peerID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,3 +134,10 @@ func (s *Server) registerIdentityService(identityService *identity.IdentityServi

s.RegisterProtocol(common.IdentityProto, grpcStream)
}

func (s *Server) GetPeerDistance(peerID peer.ID) *big.Int {
nodeKey := keyspace.Key{Space: keyspace.XORKeySpace, Bytes: kbucket.ConvertPeerID(s.AddrInfo().ID)}
peerKey := keyspace.Key{Space: keyspace.XORKeySpace, Bytes: kbucket.ConvertPeerID(peerID)}

return nodeKey.Distance(peerKey)
}
37 changes: 32 additions & 5 deletions protocol/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

const (
_syncerName = "syncer"
_syncerV1 = "/syncer/0.1"
)

const (
maxEnqueueSize = 50
popTimeout = 10 * time.Second
Expand Down Expand Up @@ -81,7 +86,7 @@ type Syncer struct {
// NewSyncer creates a new Syncer instance
func NewSyncer(logger hclog.Logger, server *network.Server, blockchain blockchainShim) *Syncer {
s := &Syncer{
logger: logger.Named("syncer"),
logger: logger.Named(_syncerName),
stopCh: make(chan struct{}),
blockchain: blockchain,
server: server,
Expand Down Expand Up @@ -151,8 +156,6 @@ func (s *Syncer) updateStatus(status *Status) {
s.status = status
}

const syncerV1 = "/syncer/0.1"

// enqueueBlock adds the specific block to the peerID queue
func (s *Syncer) enqueueBlock(peerID peer.ID, b *types.Block) {
s.logger.Debug("enqueue block", "peer", peerID, "number", b.Number(), "hash", b.Hash())
Expand Down Expand Up @@ -278,7 +281,7 @@ func (s *Syncer) Start() {
grpcStream := libp2pGrpc.NewGrpcStream()
proto.RegisterV1Server(grpcStream.GrpcServer(), s.serviceV1)
grpcStream.Serve()
s.server.RegisterProtocol(syncerV1, grpcStream)
s.server.RegisterProtocol(_syncerV1, grpcStream)

s.setupPeers()

Expand Down Expand Up @@ -338,6 +341,7 @@ func (s *Syncer) BestPeer() *SyncPeer {
}

peerBlockNumber := syncPeer.Number()
// compare block height
if bestPeer == nil || peerBlockNumber > bestBlockNumber {
bestPeer = syncPeer
bestBlockNumber = peerBlockNumber
Expand All @@ -360,7 +364,7 @@ func (s *Syncer) AddPeer(peerID peer.ID) error {
return nil
}

stream, err := s.server.NewStream(syncerV1, peerID)
stream, err := s.server.NewStream(_syncerV1, peerID)
if err != nil {
return fmt.Errorf("failed to open a stream, err %w", err)
}
Expand Down Expand Up @@ -548,6 +552,12 @@ func (s *Syncer) logSyncPeerPopBlockError(err error, peer *SyncPeer) {
func (s *Syncer) BulkSyncWithPeer(p *SyncPeer, newBlockHandler func(block *types.Block)) error {
// find the common ancestor
ancestor, fork, err := s.findCommonAncestor(p.client, p.status)
// check whether peer network same with us
if isDifferentNetworkError(err) {
s.server.DisconnectFromPeer(p.peer, "Different network")
}

// return error
if err != nil {
// No need to sync with this peer
return err
Expand Down Expand Up @@ -624,6 +634,8 @@ func (s *Syncer) BulkSyncWithPeer(p *SyncPeer, newBlockHandler func(block *types
// Verify and write the data locally
for _, block := range sk.blocks {
if err := s.blockchain.VerifyFinalizedBlock(block); err != nil {
s.server.DisconnectFromPeer(p.peer, "Different network due to hard fork")

return fmt.Errorf("unable to verify block, %w", err)
}

Expand All @@ -649,6 +661,21 @@ func (s *Syncer) BulkSyncWithPeer(p *SyncPeer, newBlockHandler func(block *types
return nil
}

func isDifferentNetworkError(err error) bool {
if err == nil {
return false
}

switch {
case errors.Is(err, ErrMismatchGenesis), // genesis not right
errors.Is(err, ErrCommonAncestorNotFound), // might be data missing
errors.Is(err, ErrForkNotFound): // starting block not found
return true
}

return false
}

func getHeader(clt proto.V1Client, num *uint64, hash *types.Hash) (*types.Header, error) {
req := &proto.GetHeadersRequest{}
if num != nil {
Expand Down

0 comments on commit 7e051a9

Please sign in to comment.