From 9d20f4e6db3dbf93cdab0be7deb5ff73552bdb96 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Fri, 8 Dec 2023 09:42:03 -0600 Subject: [PATCH] Make backfilling configurable --- node/cmd/guardiand/node.go | 193 ++++++++++++--------- node/pkg/watchers/evm/ccq.go | 16 +- node/pkg/watchers/evm/config.go | 3 +- node/pkg/watchers/evm/connectors/poller.go | 6 +- node/pkg/watchers/evm/watcher.go | 6 +- 5 files changed, 131 insertions(+), 93 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index acb24ebabf..999eb62c33 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -225,6 +225,7 @@ var ( ccqP2pPort *uint ccqP2pBootstrap *string ccqAllowedPeers *string + ccqBackfillCache *bool gatewayRelayerContract *string gatewayRelayerKeyPath *string @@ -408,6 +409,7 @@ func init() { ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port") ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (comma-separated)") ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)") + ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", false, "Should EVM chains backfill CCQ timestamp cache on startup") gatewayRelayerContract = NodeCmd.Flags().String("gatewayRelayerContract", "", "Address of the smart contract on wormchain to receive relayed VAAs") gatewayRelayerKeyPath = NodeCmd.Flags().String("gatewayRelayerKeyPath", "", "Path to gateway relayer private key for signing transactions") @@ -1181,6 +1183,7 @@ func runNode(cmd *cobra.Command, args []string) { Rpc: *ethRPC, Contract: *ethContract, GuardianSetUpdateChain: true, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1188,10 +1191,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(bscRPC) { wc := &evm.WatcherConfig{ - NetworkID: "bsc", - ChainID: vaa.ChainIDBSC, - Rpc: *bscRPC, - Contract: *bscContract, + NetworkID: "bsc", + ChainID: vaa.ChainIDBSC, + Rpc: *bscRPC, + Contract: *bscContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1199,10 +1203,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(polygonRPC) { wc := &evm.WatcherConfig{ - NetworkID: "polygon", - ChainID: vaa.ChainIDPolygon, - Rpc: *polygonRPC, - Contract: *polygonContract, + NetworkID: "polygon", + ChainID: vaa.ChainIDPolygon, + Rpc: *polygonRPC, + Contract: *polygonContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1210,10 +1215,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(avalancheRPC) { wc := &evm.WatcherConfig{ - NetworkID: "avalanche", - ChainID: vaa.ChainIDAvalanche, - Rpc: *avalancheRPC, - Contract: *avalancheContract, + NetworkID: "avalanche", + ChainID: vaa.ChainIDAvalanche, + Rpc: *avalancheRPC, + Contract: *avalancheContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1221,10 +1227,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(oasisRPC) { wc := &evm.WatcherConfig{ - NetworkID: "oasis", - ChainID: vaa.ChainIDOasis, - Rpc: *oasisRPC, - Contract: *oasisContract, + NetworkID: "oasis", + ChainID: vaa.ChainIDOasis, + Rpc: *oasisRPC, + Contract: *oasisContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1232,10 +1239,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(auroraRPC) { wc := &evm.WatcherConfig{ - NetworkID: "aurora", - ChainID: vaa.ChainIDAurora, - Rpc: *auroraRPC, - Contract: *auroraContract, + NetworkID: "aurora", + ChainID: vaa.ChainIDAurora, + Rpc: *auroraRPC, + Contract: *auroraContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1243,10 +1251,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(fantomRPC) { wc := &evm.WatcherConfig{ - NetworkID: "fantom", - ChainID: vaa.ChainIDFantom, - Rpc: *fantomRPC, - Contract: *fantomContract, + NetworkID: "fantom", + ChainID: vaa.ChainIDFantom, + Rpc: *fantomRPC, + Contract: *fantomContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1254,10 +1263,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(karuraRPC) { wc := &evm.WatcherConfig{ - NetworkID: "karura", - ChainID: vaa.ChainIDKarura, - Rpc: *karuraRPC, - Contract: *karuraContract, + NetworkID: "karura", + ChainID: vaa.ChainIDKarura, + Rpc: *karuraRPC, + Contract: *karuraContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1265,10 +1275,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(acalaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "acala", - ChainID: vaa.ChainIDAcala, - Rpc: *acalaRPC, - Contract: *acalaContract, + NetworkID: "acala", + ChainID: vaa.ChainIDAcala, + Rpc: *acalaRPC, + Contract: *acalaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1276,10 +1287,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(klaytnRPC) { wc := &evm.WatcherConfig{ - NetworkID: "klaytn", - ChainID: vaa.ChainIDKlaytn, - Rpc: *klaytnRPC, - Contract: *klaytnContract, + NetworkID: "klaytn", + ChainID: vaa.ChainIDKlaytn, + Rpc: *klaytnRPC, + Contract: *klaytnContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1287,10 +1299,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(celoRPC) { wc := &evm.WatcherConfig{ - NetworkID: "celo", - ChainID: vaa.ChainIDCelo, - Rpc: *celoRPC, - Contract: *celoContract, + NetworkID: "celo", + ChainID: vaa.ChainIDCelo, + Rpc: *celoRPC, + Contract: *celoContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1298,10 +1311,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(moonbeamRPC) { wc := &evm.WatcherConfig{ - NetworkID: "moonbeam", - ChainID: vaa.ChainIDMoonbeam, - Rpc: *moonbeamRPC, - Contract: *moonbeamContract, + NetworkID: "moonbeam", + ChainID: vaa.ChainIDMoonbeam, + Rpc: *moonbeamRPC, + Contract: *moonbeamContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1314,6 +1328,7 @@ func runNode(cmd *cobra.Command, args []string) { Rpc: *arbitrumRPC, Contract: *arbitrumContract, L1FinalizerRequired: "eth", + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1321,10 +1336,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(optimismRPC) { wc := &evm.WatcherConfig{ - NetworkID: "optimism", - ChainID: vaa.ChainIDOptimism, - Rpc: *optimismRPC, - Contract: *optimismContract, + NetworkID: "optimism", + ChainID: vaa.ChainIDOptimism, + Rpc: *optimismRPC, + Contract: *optimismContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1332,10 +1348,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(baseRPC) { wc := &evm.WatcherConfig{ - NetworkID: "base", - ChainID: vaa.ChainIDBase, - Rpc: *baseRPC, - Contract: *baseContract, + NetworkID: "base", + ChainID: vaa.ChainIDBase, + Rpc: *baseRPC, + Contract: *baseContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1343,10 +1360,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(scrollRPC) { wc := &evm.WatcherConfig{ - NetworkID: "scroll", - ChainID: vaa.ChainIDScroll, - Rpc: *scrollRPC, - Contract: *scrollContract, + NetworkID: "scroll", + ChainID: vaa.ChainIDScroll, + Rpc: *scrollRPC, + Contract: *scrollContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1354,10 +1372,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(mantleRPC) { wc := &evm.WatcherConfig{ - NetworkID: "mantle", - ChainID: vaa.ChainIDMantle, - Rpc: *mantleRPC, - Contract: *mantleContract, + NetworkID: "mantle", + ChainID: vaa.ChainIDMantle, + Rpc: *mantleRPC, + Contract: *mantleContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1512,10 +1531,11 @@ func runNode(cmd *cobra.Command, args []string) { if *testnetMode || *unsafeDevMode { if shouldStart(sepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "sepolia", - ChainID: vaa.ChainIDSepolia, - Rpc: *sepoliaRPC, - Contract: *sepoliaContract, + NetworkID: "sepolia", + ChainID: vaa.ChainIDSepolia, + Rpc: *sepoliaRPC, + Contract: *sepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1523,10 +1543,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(holeskyRPC) { wc := &evm.WatcherConfig{ - NetworkID: "holesky", - ChainID: vaa.ChainIDHolesky, - Rpc: *holeskyRPC, - Contract: *holeskyContract, + NetworkID: "holesky", + ChainID: vaa.ChainIDHolesky, + Rpc: *holeskyRPC, + Contract: *holeskyContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1534,10 +1555,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(arbitrumSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "arbitrum_sepolia", - ChainID: vaa.ChainIDArbitrumSepolia, - Rpc: *arbitrumSepoliaRPC, - Contract: *arbitrumSepoliaContract, + NetworkID: "arbitrum_sepolia", + ChainID: vaa.ChainIDArbitrumSepolia, + Rpc: *arbitrumSepoliaRPC, + Contract: *arbitrumSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1545,10 +1567,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(baseSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "base_sepolia", - ChainID: vaa.ChainIDBaseSepolia, - Rpc: *baseSepoliaRPC, - Contract: *baseSepoliaContract, + NetworkID: "base_sepolia", + ChainID: vaa.ChainIDBaseSepolia, + Rpc: *baseSepoliaRPC, + Contract: *baseSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1556,10 +1579,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(optimismSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "optimism_sepolia", - ChainID: vaa.ChainIDOptimismSepolia, - Rpc: *optimismSepoliaRPC, - Contract: *optimismSepoliaContract, + NetworkID: "optimism_sepolia", + ChainID: vaa.ChainIDOptimismSepolia, + Rpc: *optimismSepoliaRPC, + Contract: *optimismSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1567,10 +1591,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(polygonSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "polygon_sepolia", - ChainID: vaa.ChainIDPolygonSepolia, - Rpc: *polygonSepoliaRPC, - Contract: *polygonSepoliaContract, + NetworkID: "polygon_sepolia", + ChainID: vaa.ChainIDPolygonSepolia, + Rpc: *polygonSepoliaRPC, + Contract: *polygonSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) diff --git a/node/pkg/watchers/evm/ccq.go b/node/pkg/watchers/evm/ccq.go index 64d6e52637..1cac8120f1 100644 --- a/node/pkg/watchers/evm/ccq.go +++ b/node/pkg/watchers/evm/ccq.go @@ -215,7 +215,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q if !found { status := query.QueryRetryNeeded if nextBlockNum == 0 { - w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp beyond the end of the cache, will wait and retry", + w.ccqLogger.Warn("block look up failed in eth_call_by_timestamp query request, timestamp beyond the end of the cache, will wait and retry", zap.String("requestId", requestId), zap.Uint64("timestamp", req.TargetTimestamp), zap.String("block", block), @@ -233,8 +233,8 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q zap.Uint64("nextBlockNum", nextBlockNum), ) status = query.QueryFatalError - } else { - w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, will request a backfill and retry", + } else if w.ccqBackfillCache { + w.ccqLogger.Warn("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, will request a backfill and retry", zap.String("requestId", requestId), zap.Uint64("timestamp", req.TargetTimestamp), zap.String("block", block), @@ -243,6 +243,16 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q zap.Uint64("nextBlockNum", nextBlockNum), ) w.ccqRequestBackfill(req.TargetTimestamp / 1000000) + } else { + w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, failing request", + zap.String("requestId", requestId), + zap.Uint64("timestamp", req.TargetTimestamp), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Uint64("blockNum", blockNum), + zap.Uint64("nextBlockNum", nextBlockNum), + ) + status = query.QueryFatalError } w.ccqSendQueryResponse(queryRequest, status, nil) return diff --git a/node/pkg/watchers/evm/config.go b/node/pkg/watchers/evm/config.go index 24bdff5059..9fe8dfa8b7 100644 --- a/node/pkg/watchers/evm/config.go +++ b/node/pkg/watchers/evm/config.go @@ -19,6 +19,7 @@ type WatcherConfig struct { GuardianSetUpdateChain bool // if `true`, we will retrieve the GuardianSet from this chain and watch this chain for GuardianSet updates L1FinalizerRequired watchers.NetworkID // (optional) l1Finalizer interfaces.L1Finalizer + CcqBackfillCache bool } func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID { @@ -54,7 +55,7 @@ func (wc *WatcherConfig) Create( var devMode bool = (env == common.UnsafeDevNet) - watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode) + watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode, wc.CcqBackfillCache) watcher.SetL1Finalizer(wc.l1Finalizer) return watcher, watcher.Run, nil } diff --git a/node/pkg/watchers/evm/connectors/poller.go b/node/pkg/watchers/evm/connectors/poller.go index e6f538fb13..4c77ef987d 100644 --- a/node/pkg/watchers/evm/connectors/poller.go +++ b/node/pkg/watchers/evm/connectors/poller.go @@ -80,7 +80,7 @@ func (b *FinalizerPollConnector) runFromSupervisor(ctx context.Context) error { } func (b *FinalizerPollConnector) run(ctx context.Context, logger *zap.Logger) error { - prevLatest, err := GetLatestBlock(ctx, logger, b.Connector) + prevLatest, err := getLatestBlock(ctx, logger, b.Connector) if err != nil { return err } @@ -121,7 +121,7 @@ func (b *FinalizerPollConnector) run(ctx context.Context, logger *zap.Logger) er // pollBlock poll for the latest block, compares them to the last one, and publishes any new ones. // In the case of an error, it returns the last block that were passed in, otherwise it returns the new block. func (b *FinalizerPollConnector) pollBlock(ctx context.Context, logger *zap.Logger, prevLatest *NewBlock, prevFinalized *NewBlock) (newLatest *NewBlock, newFinalized *NewBlock, err error) { - newLatest, err = GetLatestBlock(ctx, logger, b.Connector) + newLatest, err = getLatestBlock(ctx, logger, b.Connector) if err != nil { err = fmt.Errorf("failed to get latest block: %w", err) newLatest = prevLatest @@ -188,7 +188,7 @@ func (b *FinalizerPollConnector) pollBlock(ctx context.Context, logger *zap.Logg return } -func GetLatestBlock(ctx context.Context, logger *zap.Logger, conn Connector) (*NewBlock, error) { +func getLatestBlock(ctx context.Context, logger *zap.Logger, conn Connector) (*NewBlock, error) { return getBlockByFinality(ctx, logger, conn, Latest) } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index aed6c3dddf..1011209d31 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -133,6 +133,7 @@ type ( ccqTimestampCache *BlocksByTimestamp ccqBackfillChannel chan *ccqBackfillRequest ccqBatchSize int64 + ccqBackfillCache bool ccqLogger *zap.Logger } @@ -163,6 +164,7 @@ func NewEthWatcher( queryReqC <-chan *query.PerChainQueryInternal, queryResponseC chan<- *query.PerChainQueryResponseInternal, unsafeDevMode bool, + ccqBackfillCache bool, ) *Watcher { return &Watcher{ url: url, @@ -178,7 +180,7 @@ func NewEthWatcher( pending: map[pendingKey]*pendingMessage{}, unsafeDevMode: unsafeDevMode, ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64), - ccqTimestampCache: ccqTimestampCache, + ccqBackfillCache: ccqBackfillCache, ccqBackfillChannel: make(chan *ccqBackfillRequest, 50), } } @@ -424,7 +426,7 @@ func (w *Watcher) Run(parentCtx context.Context) error { } }) - if w.ccqTimestampCache != nil { + if w.ccqTimestampCache != nil && w.ccqBackfillCache { if err := supervisor.Run(ctx, "ccq_backfiller", common.WrapWithScissors(w.ccqBackfiller, "ccq_backfiller")); err != nil { return fmt.Errorf("failed to start ccq_backfiller: %w", err) }