diff --git a/protocol/client.go b/protocol/client.go index e7bfe730b6..0f56961aae 100644 --- a/protocol/client.go +++ b/protocol/client.go @@ -295,9 +295,6 @@ func (client *syncPeerClient) GetBlocks( return nil, fmt.Errorf("failed to create sync peer client: %w", err) } - ctx, cancel := context.WithTimeout(ctx, defaultTimeoutForStatus) - defer cancel() - rsp, err := clt.GetBlocks(ctx, &proto.GetBlocksRequest{ From: from, To: to, diff --git a/protocol/syncer.go b/protocol/syncer.go index b5f7b74548..8fa12bc2cd 100644 --- a/protocol/syncer.go +++ b/protocol/syncer.go @@ -444,99 +444,78 @@ func (s *noForkSyncer) bulkSyncWithPeer( defer cancel() // sync up to the current known header - for { - // set to - to := from + _blockSyncStep - 1 - if to > target { - // adjust to - to = target - } - - s.logger.Info("sync up to block", "peer", p.ID, "from", from, "to", to) + to := from + _blockSyncStep - 1 + if to > target { + // adjust to + to = target + } - blocks, err := s.syncPeerClient.GetBlocks(ctx, p.ID, from, to) - if err != nil { - if rpcErr, ok := grpcstatus.FromError(err); ok { - switch rpcErr.Code() { - case grpccodes.OK, grpccodes.Canceled, grpccodes.DataLoss: - s.logger.Debug("peer return recoverable error", "id", p.ID, "err", err) - default: // other errors are not acceptable - s.logger.Info("skip peer due to error", "id", p.ID, "err", err) + s.logger.Info("sync up to block", "peer", p.ID, "from", from, "to", to) - result.SkipList[p.ID] = time.Now().Add( - time.Duration(_skipListTTL+common.SecureRandInt(_skipListRandTTLRange)) * time.Second, - ).Unix() - } + blocks, err := s.syncPeerClient.GetBlocks(ctx, p.ID, from, to) + if err != nil { + if rpcErr, ok := grpcstatus.FromError(err); ok { + switch rpcErr.Code() { + case grpccodes.OK, grpccodes.Canceled, grpccodes.DataLoss: + s.logger.Debug("peer return recoverable error", "id", p.ID, "err", err) + default: // other errors are not acceptable + s.logger.Info("skip peer due to error", "id", p.ID, "err", err) + + result.SkipList[p.ID] = time.Now().Add( + time.Duration(_skipListTTL+common.SecureRandInt(_skipListRandTTLRange)) * time.Second, + ).Unix() } - - return result, err } - if len(blocks) > 0 { - s.logger.Info( - "get all blocks", - "peer", p.ID, - "from", blocks[0].Number(), - "to", blocks[len(blocks)-1].Number()) - } - - // write block - for _, block := range blocks { - if err := s.blockchain.VerifyFinalizedBlock(block); err != nil { - // not the same network or bad peer - s.logger.Error("block verifying failed", "peer", p.ID, "err", err) - - result.SkipList[p.ID] = time.Now().Add(time.Hour).Unix() + return result, err + } - // if server is nil, it running in test mode - if s.server != nil { - s.server.ForgetPeer(p.ID, ErrBlockVerifyFailed.Error()) - } + if len(blocks) > 0 { + s.logger.Info( + "get all blocks", + "peer", p.ID, + "from", blocks[0].Number(), + "to", blocks[len(blocks)-1].Number()) + } else { + return result, nil + } - return result, ErrBlockVerifyFailed - } + // write block + for _, block := range blocks { + if err := s.blockchain.VerifyFinalizedBlock(block); err != nil { + // not the same network or bad peer + s.logger.Error("block verifying failed", "peer", p.ID, "err", err) - if err := s.blockchain.WriteBlock(block, WriteBlockSource); err != nil { - return result, fmt.Errorf("failed to write block while bulk syncing: %w", err) - } + result.SkipList[p.ID] = time.Now().Add(time.Hour).Unix() - if newBlockCallback != nil { - // NOTE: result not use for now, should remove? - result.ShouldTerminate = newBlockCallback(block) + // if server is nil, it running in test mode + if s.server != nil { + s.server.ForgetPeer(p.ID, ErrBlockVerifyFailed.Error()) } - result.LastReceivedNumber = block.Number() - - // broadcast latest block to the network - if s.blockBroadcast && blockNearEnough(block.Number(), target) { - startBroadcasting = true // upgrade broadcasting flag - } + return result, ErrBlockVerifyFailed + } - // After switching to broadcast, we don't close it until it catches up or returns an error - if startBroadcasting { - s.logger.Info("broadcast block and status", "height", result.LastReceivedNumber) - s.syncPeerClient.Broadcast(block) - } + if err := s.blockchain.WriteBlock(block, WriteBlockSource); err != nil { + return result, fmt.Errorf("failed to write block while bulk syncing: %w", err) } - // update range - from = result.LastReceivedNumber + 1 - - // Update the target. This entire outer loop is there in order to make sure - // bulk syncing is entirely done as the peer's status can change over time - // if block writes have a significant time impact on the node in question - progression := s.syncProgression.GetProgression() - if progression != nil { - if highestBlock := progression.GetHighestBlock(); highestBlock > target { - target = highestBlock - s.logger.Debug("update syncing target", "target", target) - } + if newBlockCallback != nil { + // NOTE: result not use for now, should remove? + result.ShouldTerminate = newBlockCallback(block) } - if from > target { - s.logger.Info("sync target reached", "target", target) + result.LastReceivedNumber = block.Number() - break + // broadcast latest block to the network + if s.blockBroadcast && blockNearEnough(block.Number(), target) { + startBroadcasting = true // upgrade broadcasting flag + } + + // After switching to broadcast, we don't close it until it catches up or returns an error + if startBroadcasting { + s.logger.Info("broadcast block and status", "height", result.LastReceivedNumber) + s.syncPeerClient.Broadcast(block) } } @@ -625,12 +604,6 @@ func (s *noForkSyncer) putToPeerMap(status *NoForkPeer) { s.logger.Debug("syncingPeer", "id", syncingPeer, "status.ID", status.ID.String()) - // update progression if needed - if status.ID.String() == syncingPeer && status.Number > 0 { - s.logger.Debug("connected peer update status", "id", status.ID, "number", status.Number) - s.syncProgression.UpdateHighestProgression(status.Number) - } - s.peerMap.Put(status) // blockchain if nil, it running in test mode