diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index acb24ebabf..ec0a3aa3d8 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -225,6 +225,7 @@ var ( ccqP2pPort *uint ccqP2pBootstrap *string ccqAllowedPeers *string + ccqBackfillCache *bool gatewayRelayerContract *string gatewayRelayerKeyPath *string @@ -408,6 +409,7 @@ func init() { ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port") ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (comma-separated)") ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)") + ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", true, "Should EVM chains backfill CCQ timestamp cache on startup") gatewayRelayerContract = NodeCmd.Flags().String("gatewayRelayerContract", "", "Address of the smart contract on wormchain to receive relayed VAAs") gatewayRelayerKeyPath = NodeCmd.Flags().String("gatewayRelayerKeyPath", "", "Path to gateway relayer private key for signing transactions") @@ -1181,6 +1183,7 @@ func runNode(cmd *cobra.Command, args []string) { Rpc: *ethRPC, Contract: *ethContract, GuardianSetUpdateChain: true, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1188,10 +1191,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(bscRPC) { wc := &evm.WatcherConfig{ - NetworkID: "bsc", - ChainID: vaa.ChainIDBSC, - Rpc: *bscRPC, - Contract: *bscContract, + NetworkID: "bsc", + ChainID: vaa.ChainIDBSC, + Rpc: *bscRPC, + Contract: *bscContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1199,10 +1203,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(polygonRPC) { wc := &evm.WatcherConfig{ - NetworkID: "polygon", - ChainID: vaa.ChainIDPolygon, - Rpc: *polygonRPC, - Contract: *polygonContract, + NetworkID: "polygon", + ChainID: vaa.ChainIDPolygon, + Rpc: *polygonRPC, + Contract: *polygonContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1210,10 +1215,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(avalancheRPC) { wc := &evm.WatcherConfig{ - NetworkID: "avalanche", - ChainID: vaa.ChainIDAvalanche, - Rpc: *avalancheRPC, - Contract: *avalancheContract, + NetworkID: "avalanche", + ChainID: vaa.ChainIDAvalanche, + Rpc: *avalancheRPC, + Contract: *avalancheContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1221,10 +1227,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(oasisRPC) { wc := &evm.WatcherConfig{ - NetworkID: "oasis", - ChainID: vaa.ChainIDOasis, - Rpc: *oasisRPC, - Contract: *oasisContract, + NetworkID: "oasis", + ChainID: vaa.ChainIDOasis, + Rpc: *oasisRPC, + Contract: *oasisContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1232,10 +1239,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(auroraRPC) { wc := &evm.WatcherConfig{ - NetworkID: "aurora", - ChainID: vaa.ChainIDAurora, - Rpc: *auroraRPC, - Contract: *auroraContract, + NetworkID: "aurora", + ChainID: vaa.ChainIDAurora, + Rpc: *auroraRPC, + Contract: *auroraContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1243,10 +1251,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(fantomRPC) { wc := &evm.WatcherConfig{ - NetworkID: "fantom", - ChainID: vaa.ChainIDFantom, - Rpc: *fantomRPC, - Contract: *fantomContract, + NetworkID: "fantom", + ChainID: vaa.ChainIDFantom, + Rpc: *fantomRPC, + Contract: *fantomContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1254,10 +1263,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(karuraRPC) { wc := &evm.WatcherConfig{ - NetworkID: "karura", - ChainID: vaa.ChainIDKarura, - Rpc: *karuraRPC, - Contract: *karuraContract, + NetworkID: "karura", + ChainID: vaa.ChainIDKarura, + Rpc: *karuraRPC, + Contract: *karuraContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1265,10 +1275,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(acalaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "acala", - ChainID: vaa.ChainIDAcala, - Rpc: *acalaRPC, - Contract: *acalaContract, + NetworkID: "acala", + ChainID: vaa.ChainIDAcala, + Rpc: *acalaRPC, + Contract: *acalaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1276,10 +1287,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(klaytnRPC) { wc := &evm.WatcherConfig{ - NetworkID: "klaytn", - ChainID: vaa.ChainIDKlaytn, - Rpc: *klaytnRPC, - Contract: *klaytnContract, + NetworkID: "klaytn", + ChainID: vaa.ChainIDKlaytn, + Rpc: *klaytnRPC, + Contract: *klaytnContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1287,10 +1299,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(celoRPC) { wc := &evm.WatcherConfig{ - NetworkID: "celo", - ChainID: vaa.ChainIDCelo, - Rpc: *celoRPC, - Contract: *celoContract, + NetworkID: "celo", + ChainID: vaa.ChainIDCelo, + Rpc: *celoRPC, + Contract: *celoContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1298,10 +1311,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(moonbeamRPC) { wc := &evm.WatcherConfig{ - NetworkID: "moonbeam", - ChainID: vaa.ChainIDMoonbeam, - Rpc: *moonbeamRPC, - Contract: *moonbeamContract, + NetworkID: "moonbeam", + ChainID: vaa.ChainIDMoonbeam, + Rpc: *moonbeamRPC, + Contract: *moonbeamContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1314,6 +1328,7 @@ func runNode(cmd *cobra.Command, args []string) { Rpc: *arbitrumRPC, Contract: *arbitrumContract, L1FinalizerRequired: "eth", + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1321,10 +1336,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(optimismRPC) { wc := &evm.WatcherConfig{ - NetworkID: "optimism", - ChainID: vaa.ChainIDOptimism, - Rpc: *optimismRPC, - Contract: *optimismContract, + NetworkID: "optimism", + ChainID: vaa.ChainIDOptimism, + Rpc: *optimismRPC, + Contract: *optimismContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1332,10 +1348,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(baseRPC) { wc := &evm.WatcherConfig{ - NetworkID: "base", - ChainID: vaa.ChainIDBase, - Rpc: *baseRPC, - Contract: *baseContract, + NetworkID: "base", + ChainID: vaa.ChainIDBase, + Rpc: *baseRPC, + Contract: *baseContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1343,10 +1360,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(scrollRPC) { wc := &evm.WatcherConfig{ - NetworkID: "scroll", - ChainID: vaa.ChainIDScroll, - Rpc: *scrollRPC, - Contract: *scrollContract, + NetworkID: "scroll", + ChainID: vaa.ChainIDScroll, + Rpc: *scrollRPC, + Contract: *scrollContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1354,10 +1372,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(mantleRPC) { wc := &evm.WatcherConfig{ - NetworkID: "mantle", - ChainID: vaa.ChainIDMantle, - Rpc: *mantleRPC, - Contract: *mantleContract, + NetworkID: "mantle", + ChainID: vaa.ChainIDMantle, + Rpc: *mantleRPC, + Contract: *mantleContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1512,10 +1531,11 @@ func runNode(cmd *cobra.Command, args []string) { if *testnetMode || *unsafeDevMode { if shouldStart(sepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "sepolia", - ChainID: vaa.ChainIDSepolia, - Rpc: *sepoliaRPC, - Contract: *sepoliaContract, + NetworkID: "sepolia", + ChainID: vaa.ChainIDSepolia, + Rpc: *sepoliaRPC, + Contract: *sepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1523,10 +1543,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(holeskyRPC) { wc := &evm.WatcherConfig{ - NetworkID: "holesky", - ChainID: vaa.ChainIDHolesky, - Rpc: *holeskyRPC, - Contract: *holeskyContract, + NetworkID: "holesky", + ChainID: vaa.ChainIDHolesky, + Rpc: *holeskyRPC, + Contract: *holeskyContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1534,10 +1555,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(arbitrumSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "arbitrum_sepolia", - ChainID: vaa.ChainIDArbitrumSepolia, - Rpc: *arbitrumSepoliaRPC, - Contract: *arbitrumSepoliaContract, + NetworkID: "arbitrum_sepolia", + ChainID: vaa.ChainIDArbitrumSepolia, + Rpc: *arbitrumSepoliaRPC, + Contract: *arbitrumSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1545,10 +1567,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(baseSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "base_sepolia", - ChainID: vaa.ChainIDBaseSepolia, - Rpc: *baseSepoliaRPC, - Contract: *baseSepoliaContract, + NetworkID: "base_sepolia", + ChainID: vaa.ChainIDBaseSepolia, + Rpc: *baseSepoliaRPC, + Contract: *baseSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1556,10 +1579,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(optimismSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "optimism_sepolia", - ChainID: vaa.ChainIDOptimismSepolia, - Rpc: *optimismSepoliaRPC, - Contract: *optimismSepoliaContract, + NetworkID: "optimism_sepolia", + ChainID: vaa.ChainIDOptimismSepolia, + Rpc: *optimismSepoliaRPC, + Contract: *optimismSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) @@ -1567,10 +1591,11 @@ func runNode(cmd *cobra.Command, args []string) { if shouldStart(polygonSepoliaRPC) { wc := &evm.WatcherConfig{ - NetworkID: "polygon_sepolia", - ChainID: vaa.ChainIDPolygonSepolia, - Rpc: *polygonSepoliaRPC, - Contract: *polygonSepoliaContract, + NetworkID: "polygon_sepolia", + ChainID: vaa.ChainIDPolygonSepolia, + Rpc: *polygonSepoliaRPC, + Contract: *polygonSepoliaContract, + CcqBackfillCache: *ccqBackfillCache, } watcherConfigs = append(watcherConfigs, wc) diff --git a/node/pkg/watchers/evm/ccq.go b/node/pkg/watchers/evm/ccq.go index c3aa0bd4d8..1cac8120f1 100644 --- a/node/pkg/watchers/evm/ccq.go +++ b/node/pkg/watchers/evm/ccq.go @@ -26,7 +26,7 @@ func (w *Watcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status case w.queryResponseC <- queryResponse: w.ccqLogger.Debug("published query response to handler") default: - w.ccqLogger.Error("failed to published query response error to handler") + w.ccqLogger.Error("failed to published query response to handler") } } @@ -213,15 +213,48 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q // Look the timestamp up in the cache. Note that the cache uses native EVM time, which is seconds, but CCQ uses microseconds, so we have to convert. blockNum, nextBlockNum, found := w.ccqTimestampCache.LookUp(req.TargetTimestamp / 1000000) if !found { - w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp not in cache, will retry", - zap.String("requestId", requestId), - zap.Uint64("timestamp", req.TargetTimestamp), - zap.String("block", block), - zap.String("nextBlock", nextBlock), - zap.Uint64("blockNum", blockNum), - zap.Uint64("nextBlockNum", nextBlockNum), - ) - w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil) + status := query.QueryRetryNeeded + if nextBlockNum == 0 { + w.ccqLogger.Warn("block look up failed in eth_call_by_timestamp query request, timestamp beyond the end of the cache, will wait and retry", + zap.String("requestId", requestId), + zap.Uint64("timestamp", req.TargetTimestamp), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Uint64("blockNum", blockNum), + zap.Uint64("nextBlockNum", nextBlockNum), + ) + } else if blockNum == 0 { + w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp too old, failing request", + zap.String("requestId", requestId), + zap.Uint64("timestamp", req.TargetTimestamp), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Uint64("blockNum", blockNum), + zap.Uint64("nextBlockNum", nextBlockNum), + ) + status = query.QueryFatalError + } else if w.ccqBackfillCache { + w.ccqLogger.Warn("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, will request a backfill and retry", + zap.String("requestId", requestId), + zap.Uint64("timestamp", req.TargetTimestamp), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Uint64("blockNum", blockNum), + zap.Uint64("nextBlockNum", nextBlockNum), + ) + w.ccqRequestBackfill(req.TargetTimestamp / 1000000) + } else { + w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, failing request", + zap.String("requestId", requestId), + zap.Uint64("timestamp", req.TargetTimestamp), + zap.String("block", block), + zap.String("nextBlock", nextBlock), + zap.Uint64("blockNum", blockNum), + zap.Uint64("nextBlockNum", nextBlockNum), + ) + status = query.QueryFatalError + } + w.ccqSendQueryResponse(queryRequest, status, nil) return } diff --git a/node/pkg/watchers/evm/ccq_backfill.go b/node/pkg/watchers/evm/ccq_backfill.go new file mode 100644 index 0000000000..bc20cf398f --- /dev/null +++ b/node/pkg/watchers/evm/ccq_backfill.go @@ -0,0 +1,317 @@ +package evm + +import ( + "context" + "fmt" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors" + + "go.uber.org/zap" + + ethHexUtil "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/rpc" + ethRpc "github.com/ethereum/go-ethereum/rpc" +) + +const CCQ_MAX_BATCH_SIZE = int64(1000) +const CCQ_TIMESTAMP_RANGE_IN_SECONDS = uint64(30 * 60) +const CCQ_BACKFILL_DELAY = 100 * time.Millisecond + +type ( + // ccqBackfillRequest represents a request to backfill the cache. It is the payload on the request channel. + ccqBackfillRequest struct { + timestamp uint64 + } + + // ccqBatchResult is the result of each query in a batch. + ccqBatchResult struct { + result ccqBlockMarshaller + err error + } + + // ccqBlockMarshaller is used to marshal the query results. + ccqBlockMarshaller struct { + Number ethHexUtil.Uint64 + Time ethHexUtil.Uint64 `json:"timestamp"` + // Hash ethCommon.Hash `json:"hash"` + } +) + +// ccqRequestBackfill submits a request to backfill a gap in the timestamp cache. Note that the timestamp passed in should be in seconds, as expected by the timestamp cache. +func (w *Watcher) ccqRequestBackfill(timestamp uint64) { + select { + case w.ccqBackfillChannel <- &ccqBackfillRequest{timestamp: timestamp}: + w.ccqLogger.Debug("published backfill request", zap.Uint64("timestamp", timestamp)) + default: + // This will get retried next interval. + w.ccqLogger.Error("failed to post backfill request, will get retried next interval", zap.Uint64("timestamp", timestamp)) + } +} + +// ccqBackfillStart initializes the timestamp cache by backfilling some history and starting a routine to handle backfill requests +// when a timestamp is not in the cache. This function does not return errors because we don't want to prevent the watcher from +// coming up if we can't backfill the cache. We just disable backfilling and hope for the best. +func (w *Watcher) ccqBackfillStart(ctx context.Context, errC chan error) { + if err := w.ccqBackfillInit(ctx); err != nil { + w.ccqLogger.Error("failed to backfill timestamp cache, disabling backfilling", zap.Error(err)) + w.ccqBackfillCache = false + return + } + + common.RunWithScissors(ctx, errC, "ccq_backfiller", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case evt := <-w.ccqBackfillChannel: + w.ccqPerformBackfill(ctx, evt) + } + } + }) +} + +// ccqBackfillInit determines the maximum batch size to be used for backfilling the cache. It also loads the initial batch of timestamps. +func (w *Watcher) ccqBackfillInit(ctx context.Context) error { + // Get the latest block so we can use that as the starting point in our cache. + latestBlock, err := connectors.GetLatestBlock(ctx, w.ccqLogger, w.ethConn) + if err != nil { + return fmt.Errorf("failed to look up latest block: %w", err) + } + latestBlockNum := latestBlock.Number.Int64() + w.ccqLogger.Info("looked up latest block", zap.Int64("latestBlockNum", latestBlockNum), zap.Uint64("timestamp", latestBlock.Time)) + + var blocks Blocks + if w.ccqBatchSize == 0 { + // Determine the max supported batch size and get the first batch which will start with the latest block and go backwards. + var err error + w.ccqBatchSize, blocks, err = ccqBackFillDetermineMaxBatchSize(ctx, w.ccqLogger, w.ethConn, latestBlockNum, CCQ_BACKFILL_DELAY) + if err != nil { + return fmt.Errorf("failed to determine max batch size: %w", err) + } + } else { + blocks = append(blocks, Block{BlockNum: latestBlock.Number.Uint64(), Timestamp: latestBlock.Time}) + w.ccqLogger.Info("using existing batch size for timestamp cache", zap.Int64("batchSize", w.ccqBatchSize)) + } + + // We want to start with a half hour in our cache. Get batches until we cover that. + cutOffTime := latestBlock.Time - CCQ_TIMESTAMP_RANGE_IN_SECONDS + if latestBlock.Time < CCQ_TIMESTAMP_RANGE_IN_SECONDS { + // In devnet the timestamps are just integers that start at zero on startup. + cutOffTime = 0 + } + + if len(blocks) == 0 { + // This should never happen, but the for loop would panic if it did! + return fmt.Errorf("list of blocks is empty") + } + + // Query for more blocks until we go back the desired length of time. The last block in the array will be the oldest, so query starting one before that. + for blocks[len(blocks)-1].Timestamp > cutOffTime { + newBlocks, err := w.ccqBackfillGetBlocks(ctx, int64(blocks[len(blocks)-1].BlockNum-1), w.ccqBatchSize) + if err != nil { + return fmt.Errorf("failed to get batch starting at %d: %w", blocks[len(blocks)-1].BlockNum-1, err) + } + + if len(newBlocks) == 0 { + w.ccqLogger.Error("failed to read any more blocks, giving up on the backfill") + break + } + + blocks = append(blocks, newBlocks...) + w.ccqLogger.Info("got batch", + zap.Uint64("oldestBlockNum", newBlocks[len(newBlocks)-1].BlockNum), + zap.Uint64("latestBlockNum", newBlocks[0].BlockNum), + zap.Uint64("oldestBlockTimestamp", newBlocks[len(newBlocks)-1].Timestamp), + zap.Uint64("latestBlockTimestamp", newBlocks[0].Timestamp), + zap.Stringer("oldestTime", time.Unix(int64(newBlocks[len(newBlocks)-1].Timestamp), 0)), + zap.Stringer("latestTime", time.Unix(int64(newBlocks[0].Timestamp), 0)), + ) + } + + w.ccqLogger.Info("adding initial batch to timestamp cache", + zap.Int64("batchSize", w.ccqBatchSize), + zap.Int("numBlocks", len(blocks)), + zap.Uint64("oldestBlockNum", blocks[len(blocks)-1].BlockNum), + zap.Uint64("latestBlockNum", blocks[0].BlockNum), + zap.Uint64("oldestBlockTimestamp", blocks[len(blocks)-1].Timestamp), + zap.Uint64("latestBlockTimestamp", blocks[0].Timestamp), + zap.Stringer("oldestTime", time.Unix(int64(blocks[len(blocks)-1].Timestamp), 0)), + zap.Stringer("latestTime", time.Unix(int64(blocks[0].Timestamp), 0)), + ) + + w.ccqTimestampCache.AddBatch(blocks) + + return nil +} + +// ccqBackfillConn is defined to allow for testing of ccqBackFillDetermineMaxBatchSize without mocking a full ethereum connection. +type ccqBackfillConn interface { + RawBatchCallContext(ctx context.Context, b []rpc.BatchElem) error +} + +// ccqBackFillDetermineMaxBatchSize performs a series of batch queries, starting with a size of 1000 and stepping down by halves, and then back up until we circle in on the maximum batch size supported by the RPC. +func ccqBackFillDetermineMaxBatchSize(ctx context.Context, logger *zap.Logger, conn ccqBackfillConn, latestBlockNum int64, delay time.Duration) (int64, Blocks, error) { + batchSize := int64(CCQ_MAX_BATCH_SIZE) + var batch []ethRpc.BatchElem + var results []ccqBatchResult + prevFailure := int64(0) + prevSuccess := int64(0) + for { + if latestBlockNum < batchSize { + batchSize = latestBlockNum + } + logger.Info("trying batch size", zap.Int64("batchSize", batchSize)) + batch = make([]ethRpc.BatchElem, batchSize) + results = make([]ccqBatchResult, batchSize) + blockNum := latestBlockNum + for idx := int64(0); idx < batchSize; idx++ { + batch[idx] = ethRpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{ + "0x" + fmt.Sprintf("%x", blockNum), + false, // no full transaction details + }, + Result: &results[idx].result, + Error: results[idx].err, + } + + blockNum-- + } + + timeout, cancel := context.WithTimeout(ctx, 30*time.Second) + err := conn.RawBatchCallContext(timeout, batch) + cancel() + + if err == nil { + logger.Info("batch query worked", zap.Int64("batchSize", batchSize)) + if prevFailure == 0 { + break + } + if batchSize+1 >= prevFailure { + break + } + prevSuccess = batchSize + } else { + logger.Info("batch query failed", zap.Int64("batchSize", batchSize), zap.Error(err)) + prevFailure = batchSize + } + batchSize = (prevFailure + prevSuccess) / 2 + if batchSize == 0 { + return 0, nil, fmt.Errorf("failed to determine batch size: %w", err) + } + + time.Sleep(delay) + } + + // Save the blocks we just retrieved to be used as our starting cache. + blocks := Blocks{} + for _, result := range results { + if result.err != nil { + return 0, nil, fmt.Errorf("failed to get block: %w", result.err) + } + + m := &result.result + + if m.Number != 0 { + blocks = append(blocks, Block{ + BlockNum: uint64(m.Number), + // Hash: m.Hash, + Timestamp: uint64(m.Time), + }) + } + } + + logger.Info("found supported batch size", zap.Int64("batchSize", batchSize), zap.Int("numBlocks", len(blocks))) + return batchSize, blocks, nil +} + +// ccqBackfillGetBlocks gets a range of blocks from the RPC, starting from initialBlockNum and going downward for numBlocks. +func (w *Watcher) ccqBackfillGetBlocks(ctx context.Context, initialBlockNum int64, numBlocks int64) (Blocks, error) { + w.ccqLogger.Info("getting batch", zap.Int64("initialBlockNum", initialBlockNum), zap.Int64("numBlocks", numBlocks)) + batch := make([]ethRpc.BatchElem, numBlocks) + results := make([]ccqBatchResult, numBlocks) + blockNum := initialBlockNum + for idx := int64(0); idx < numBlocks; idx++ { + batch[idx] = ethRpc.BatchElem{ + Method: "eth_getBlockByNumber", + Args: []interface{}{ + "0x" + fmt.Sprintf("%x", blockNum), + false, // no full transaction details + }, + Result: &results[idx].result, + Error: results[idx].err, + } + + blockNum-- + } + + timeout, cancel := context.WithTimeout(ctx, 30*time.Second) + err := w.ethConn.RawBatchCallContext(timeout, batch) + cancel() + if err != nil { + w.ccqLogger.Error("failed to get batch of blocks", + zap.Int64("initialBlockNum", initialBlockNum), + zap.Int64("numBlocks", numBlocks), + zap.Int64("finalBlockNum", blockNum), + zap.Error(err), + ) + + return nil, err + } + + blocks := Blocks{} + for _, result := range results { + if result.err != nil { + return nil, fmt.Errorf("failed to get block: %w", err) + } + + m := &result.result + + if m.Number != 0 { + blocks = append(blocks, Block{ + BlockNum: uint64(m.Number), + // Hash: m.Hash, + Timestamp: uint64(m.Time), + }) + } + } + + return blocks, nil +} + +// ccqPerformBackfill handles a request to backfill the timestamp cache. First it does another lookup to confirm that the backfill is still needed. +// If so, it submits a batch query for all of the requested blocks, up to what will fit in a single batch. +func (w *Watcher) ccqPerformBackfill(ctx context.Context, evt *ccqBackfillRequest) { + // Things may have changed since the request was posted to the channel. See if we still need to do the backfill. + firstBlock, lastBlock, found := w.ccqTimestampCache.LookUp(evt.timestamp) + if found { + w.ccqLogger.Info("received a backfill request which is now in the cache, ignoring it", zap.Uint64("timestamp", evt.timestamp), zap.Uint64("firstBlock", firstBlock), zap.Uint64("lastBlock", lastBlock)) + return + } + + numBlocks := int64(lastBlock - firstBlock - 1) + if numBlocks > w.ccqBatchSize { + numBlocks = w.ccqBatchSize + } + w.ccqLogger.Info("received a backfill request", zap.Uint64("timestamp", evt.timestamp), zap.Uint64("firstBlock", firstBlock), zap.Uint64("lastBlock", lastBlock), zap.Int64("numBlocks", numBlocks)) + blocks, err := w.ccqBackfillGetBlocks(ctx, int64(lastBlock-1), numBlocks) + if err != nil { + w.ccqLogger.Error("failed to get backfill batch", zap.Int64("startingBlock", int64(lastBlock-1)), zap.Int64("numBlocks", numBlocks)) + return + } + + w.ccqLogger.Info("adding backfill batch to timestamp cache", + zap.Int64("batchSize", w.ccqBatchSize), + zap.Int("numBlocks", len(blocks)), + zap.Uint64("oldestBlockNum", blocks[len(blocks)-1].BlockNum), + zap.Uint64("latestBlockNum", blocks[0].BlockNum), + zap.Uint64("oldestBlockTimestamp", blocks[len(blocks)-1].Timestamp), + zap.Uint64("latestBlockTimestamp", blocks[0].Timestamp), + zap.Stringer("oldestTime", time.Unix(int64(blocks[len(blocks)-1].Timestamp), 0)), + zap.Stringer("latestTime", time.Unix(int64(blocks[0].Timestamp), 0)), + ) + + w.ccqTimestampCache.AddBatch(blocks) +} diff --git a/node/pkg/watchers/evm/ccq_backfill_test.go b/node/pkg/watchers/evm/ccq_backfill_test.go new file mode 100644 index 0000000000..d3f44c8f8b --- /dev/null +++ b/node/pkg/watchers/evm/ccq_backfill_test.go @@ -0,0 +1,76 @@ +package evm + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "testing" + "time" + + ethHexUtil "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/rpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.uber.org/zap" +) + +// mockBackfillConn simulates a batch query but fails if the batch size is greater than maxBatchSize. +type mockBackfillConn struct { + maxBatchSize int64 +} + +func (conn *mockBackfillConn) RawBatchCallContext(ctx context.Context, b []rpc.BatchElem) error { + if int64(len(b)) > conn.maxBatchSize { + return fmt.Errorf("batch too large") + } + + for _, b := range b { + blockNum, err := strconv.ParseUint(fmt.Sprintf("%v", b.Args[0])[2:], 16, 64) + if err != nil { + return fmt.Errorf("invalid hex number: %s", b.Args[0]) + } + + result := ccqBlockMarshaller{Number: ethHexUtil.Uint64(blockNum), Time: ethHexUtil.Uint64(blockNum * 10)} + bytes, err := json.Marshal(result) + if err != nil { + return fmt.Errorf("failed to marshal result: %w", err) + } + + err = json.Unmarshal(bytes, b.Result) + if err != nil { + return fmt.Errorf("failed to unmarshal result: %w", err) + } + + b.Error = nil + } + return nil +} + +// TestCcqBackFillDetermineMaxBatchSize verifies that the search for the max allowed block size converges for all values between 1 and CCQ_MAX_BATCH_SIZE + 1 inclusive. +// It also verifies the returned set of blocks. +func TestCcqBackFillDetermineMaxBatchSize(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + latestBlockNum := int64(17533044) + + for maxBatchSize := int64(1); maxBatchSize <= CCQ_MAX_BATCH_SIZE+1; maxBatchSize++ { + conn := &mockBackfillConn{maxBatchSize: maxBatchSize} + batchSize, blocks, err := ccqBackFillDetermineMaxBatchSize(ctx, logger, conn, latestBlockNum, time.Microsecond) + require.NoError(t, err) + if maxBatchSize > CCQ_MAX_BATCH_SIZE { // If the node supports more than our max size, we should cap the batch size at our max. + require.Equal(t, CCQ_MAX_BATCH_SIZE, batchSize) + } else { + require.Equal(t, maxBatchSize, batchSize) + } + require.Equal(t, batchSize, int64(len(blocks))) + + blockNum := uint64(latestBlockNum) + for _, block := range blocks { + assert.Equal(t, blockNum, block.BlockNum) + assert.Equal(t, blockNum*10, block.Timestamp) + blockNum-- + } + } +} diff --git a/node/pkg/watchers/evm/config.go b/node/pkg/watchers/evm/config.go index 24bdff5059..9fe8dfa8b7 100644 --- a/node/pkg/watchers/evm/config.go +++ b/node/pkg/watchers/evm/config.go @@ -19,6 +19,7 @@ type WatcherConfig struct { GuardianSetUpdateChain bool // if `true`, we will retrieve the GuardianSet from this chain and watch this chain for GuardianSet updates L1FinalizerRequired watchers.NetworkID // (optional) l1Finalizer interfaces.L1Finalizer + CcqBackfillCache bool } func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID { @@ -54,7 +55,7 @@ func (wc *WatcherConfig) Create( var devMode bool = (env == common.UnsafeDevNet) - watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode) + watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode, wc.CcqBackfillCache) watcher.SetL1Finalizer(wc.l1Finalizer) return watcher, watcher.Run, nil } diff --git a/node/pkg/watchers/evm/watcher.go b/node/pkg/watchers/evm/watcher.go index 97b2ed8097..1f7b4fd24a 100644 --- a/node/pkg/watchers/evm/watcher.go +++ b/node/pkg/watchers/evm/watcher.go @@ -129,9 +129,12 @@ type ( latestFinalizedBlockNumber uint64 l1Finalizer interfaces.L1Finalizer - ccqMaxBlockNumber *big.Int - ccqTimestampCache *BlocksByTimestamp - ccqLogger *zap.Logger + ccqMaxBlockNumber *big.Int + ccqTimestampCache *BlocksByTimestamp + ccqBackfillChannel chan *ccqBackfillRequest + ccqBatchSize int64 + ccqBackfillCache bool + ccqLogger *zap.Logger } pendingKey struct { @@ -161,27 +164,24 @@ func NewEthWatcher( queryReqC <-chan *query.PerChainQueryInternal, queryResponseC chan<- *query.PerChainQueryResponseInternal, unsafeDevMode bool, + ccqBackfillCache bool, ) *Watcher { - var ccqTimestampCache *BlocksByTimestamp - if query.SupportsTimestampCaching(chainID) { - ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS) - } - return &Watcher{ - url: url, - contract: contract, - networkName: networkName, - readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), - chainID: chainID, - msgC: msgC, - setC: setC, - obsvReqC: obsvReqC, - queryReqC: queryReqC, - queryResponseC: queryResponseC, - pending: map[pendingKey]*pendingMessage{}, - unsafeDevMode: unsafeDevMode, - ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64), - ccqTimestampCache: ccqTimestampCache, + url: url, + contract: contract, + networkName: networkName, + readinessSync: common.MustConvertChainIdToReadinessSyncing(chainID), + chainID: chainID, + msgC: msgC, + setC: setC, + obsvReqC: obsvReqC, + queryReqC: queryReqC, + queryResponseC: queryResponseC, + pending: map[pendingKey]*pendingMessage{}, + unsafeDevMode: unsafeDevMode, + ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64), + ccqBackfillCache: ccqBackfillCache, + ccqBackfillChannel: make(chan *ccqBackfillRequest, 50), } } @@ -258,6 +258,10 @@ func (w *Watcher) Run(parentCtx context.Context) error { } } + if query.SupportsTimestampCaching(w.chainID) { + w.ccqTimestampCache = NewBlocksByTimestamp(BTS_MAX_BLOCKS) + } + errC := make(chan error) // Subscribe to new message publications. We don't use a timeout here because the LogPollConnector @@ -417,6 +421,10 @@ func (w *Watcher) Run(parentCtx context.Context) error { } }) + if w.ccqTimestampCache != nil && w.ccqBackfillCache { + w.ccqBackfillStart(ctx, errC) + } + common.RunWithScissors(ctx, errC, "evm_fetch_query_req", func(ctx context.Context) error { for { select { diff --git a/whitepapers/0013_ccq.md b/whitepapers/0013_ccq.md index 2ae02c3909..775fef8b60 100644 --- a/whitepapers/0013_ccq.md +++ b/whitepapers/0013_ccq.md @@ -260,7 +260,11 @@ Currently the supported query types on EVM are `eth_call`, `eth_call_by_timestam The request MUST include the target timestamp. - As of October 2023, the request MUST also include the block hints for the target block and following block. (A future release may make these hints optional.) These hints are used by the guardians to look up the blocks. The resulting block numbers MUST be `1` different and their timestamps MUST be such that the target block is _before_ the target time (inclusive) and the following block is _after_ (exclusive). In other words, + In the initial release of CCQ, the request had to include the block hints for the target block and following block. These hints are used by the guardians to look up the blocks. + + As of December 2023, the block hints are optional, as the guardian maintains a cache of block timestamp to block number. This cache is guaranteed to cover the last 30 minutes of blocks, which is back filled on start up. As new blocks come in, they are added to the cache, so it may well cover more than 30 minutes. (The most recent 10000 blocks are maintained.) If a request is received that does not have the hints and the timestamp is before the oldest entry in the cache, the request is rejected. + + The resulting block numbers MUST be `1` different and their timestamps MUST be such that the target block is _before_ the target time (inclusive) and the following block is _after_ (exclusive). In other words, ``` target_block.timestamp <= target_time < following_block.timestamp