Skip to content

Commit

Permalink
Make backfilling configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Dec 11, 2023
1 parent a3ab420 commit 4896d52
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 65 deletions.
140 changes: 80 additions & 60 deletions node/cmd/guardiand/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ var (
ccqP2pPort *uint
ccqP2pBootstrap *string
ccqAllowedPeers *string
ccqBackfillCache *bool

gatewayRelayerContract *string
gatewayRelayerKeyPath *string
Expand Down Expand Up @@ -383,6 +384,7 @@ func init() {
ccqP2pPort = NodeCmd.Flags().Uint("ccqP2pPort", 8996, "CCQ P2P UDP listener port")
ccqP2pBootstrap = NodeCmd.Flags().String("ccqP2pBootstrap", "", "CCQ P2P bootstrap peers (comma-separated)")
ccqAllowedPeers = NodeCmd.Flags().String("ccqAllowedPeers", "", "CCQ allowed P2P peers (comma-separated)")
ccqBackfillCache = NodeCmd.Flags().Bool("ccqBackfillCache", false, "Should EVM chains backfill CCQ timestamp cache on startup")

gatewayRelayerContract = NodeCmd.Flags().String("gatewayRelayerContract", "", "Address of the smart contract on wormchain to receive relayed VAAs")
gatewayRelayerKeyPath = NodeCmd.Flags().String("gatewayRelayerKeyPath", "", "Path to gateway relayer private key for signing transactions")
Expand Down Expand Up @@ -1100,127 +1102,139 @@ func runNode(cmd *cobra.Command, args []string) {
Rpc: *ethRPC,
Contract: *ethContract,
GuardianSetUpdateChain: true,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(bscRPC) {
wc := &evm.WatcherConfig{
NetworkID: "bsc",
ChainID: vaa.ChainIDBSC,
Rpc: *bscRPC,
Contract: *bscContract,
NetworkID: "bsc",
ChainID: vaa.ChainIDBSC,
Rpc: *bscRPC,
Contract: *bscContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(polygonRPC) {
wc := &evm.WatcherConfig{
NetworkID: "polygon",
ChainID: vaa.ChainIDPolygon,
Rpc: *polygonRPC,
Contract: *polygonContract,
NetworkID: "polygon",
ChainID: vaa.ChainIDPolygon,
Rpc: *polygonRPC,
Contract: *polygonContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(avalancheRPC) {
wc := &evm.WatcherConfig{
NetworkID: "avalanche",
ChainID: vaa.ChainIDAvalanche,
Rpc: *avalancheRPC,
Contract: *avalancheContract,
NetworkID: "avalanche",
ChainID: vaa.ChainIDAvalanche,
Rpc: *avalancheRPC,
Contract: *avalancheContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(oasisRPC) {
wc := &evm.WatcherConfig{
NetworkID: "oasis",
ChainID: vaa.ChainIDOasis,
Rpc: *oasisRPC,
Contract: *oasisContract,
NetworkID: "oasis",
ChainID: vaa.ChainIDOasis,
Rpc: *oasisRPC,
Contract: *oasisContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(auroraRPC) {
wc := &evm.WatcherConfig{
NetworkID: "aurora",
ChainID: vaa.ChainIDAurora,
Rpc: *auroraRPC,
Contract: *auroraContract,
NetworkID: "aurora",
ChainID: vaa.ChainIDAurora,
Rpc: *auroraRPC,
Contract: *auroraContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(fantomRPC) {
wc := &evm.WatcherConfig{
NetworkID: "fantom",
ChainID: vaa.ChainIDFantom,
Rpc: *fantomRPC,
Contract: *fantomContract,
NetworkID: "fantom",
ChainID: vaa.ChainIDFantom,
Rpc: *fantomRPC,
Contract: *fantomContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(karuraRPC) {
wc := &evm.WatcherConfig{
NetworkID: "karura",
ChainID: vaa.ChainIDKarura,
Rpc: *karuraRPC,
Contract: *karuraContract,
NetworkID: "karura",
ChainID: vaa.ChainIDKarura,
Rpc: *karuraRPC,
Contract: *karuraContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(acalaRPC) {
wc := &evm.WatcherConfig{
NetworkID: "acala",
ChainID: vaa.ChainIDAcala,
Rpc: *acalaRPC,
Contract: *acalaContract,
NetworkID: "acala",
ChainID: vaa.ChainIDAcala,
Rpc: *acalaRPC,
Contract: *acalaContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(klaytnRPC) {
wc := &evm.WatcherConfig{
NetworkID: "klaytn",
ChainID: vaa.ChainIDKlaytn,
Rpc: *klaytnRPC,
Contract: *klaytnContract,
NetworkID: "klaytn",
ChainID: vaa.ChainIDKlaytn,
Rpc: *klaytnRPC,
Contract: *klaytnContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(celoRPC) {
wc := &evm.WatcherConfig{
NetworkID: "celo",
ChainID: vaa.ChainIDCelo,
Rpc: *celoRPC,
Contract: *celoContract,
NetworkID: "celo",
ChainID: vaa.ChainIDCelo,
Rpc: *celoRPC,
Contract: *celoContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(moonbeamRPC) {
wc := &evm.WatcherConfig{
NetworkID: "moonbeam",
ChainID: vaa.ChainIDMoonbeam,
Rpc: *moonbeamRPC,
Contract: *moonbeamContract,
NetworkID: "moonbeam",
ChainID: vaa.ChainIDMoonbeam,
Rpc: *moonbeamRPC,
Contract: *moonbeamContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
Expand All @@ -1233,39 +1247,43 @@ func runNode(cmd *cobra.Command, args []string) {
Rpc: *arbitrumRPC,
Contract: *arbitrumContract,
L1FinalizerRequired: "eth",
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(optimismRPC) {
wc := &evm.WatcherConfig{
NetworkID: "optimism",
ChainID: vaa.ChainIDOptimism,
Rpc: *optimismRPC,
Contract: *optimismContract,
NetworkID: "optimism",
ChainID: vaa.ChainIDOptimism,
Rpc: *optimismRPC,
Contract: *optimismContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(baseRPC) {
wc := &evm.WatcherConfig{
NetworkID: "base",
ChainID: vaa.ChainIDBase,
Rpc: *baseRPC,
Contract: *baseContract,
NetworkID: "base",
ChainID: vaa.ChainIDBase,
Rpc: *baseRPC,
Contract: *baseContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(scrollRPC) {
wc := &evm.WatcherConfig{
NetworkID: "scroll",
ChainID: vaa.ChainIDScroll,
Rpc: *scrollRPC,
Contract: *scrollContract,
NetworkID: "scroll",
ChainID: vaa.ChainIDScroll,
Rpc: *scrollRPC,
Contract: *scrollContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
Expand Down Expand Up @@ -1439,17 +1457,19 @@ func runNode(cmd *cobra.Command, args []string) {
Rpc: *neonRPC,
Contract: *neonContract,
L1FinalizerRequired: "solana-finalized",
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
}

if shouldStart(sepoliaRPC) {
wc := &evm.WatcherConfig{
NetworkID: "sepolia",
ChainID: vaa.ChainIDSepolia,
Rpc: *sepoliaRPC,
Contract: *sepoliaContract,
NetworkID: "sepolia",
ChainID: vaa.ChainIDSepolia,
Rpc: *sepoliaRPC,
Contract: *sepoliaContract,
CcqBackfillCache: *ccqBackfillCache,
}

watcherConfigs = append(watcherConfigs, wc)
Expand Down
16 changes: 13 additions & 3 deletions node/pkg/watchers/evm/ccq.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
if !found {
status := query.QueryRetryNeeded
if nextBlockNum == 0 {
w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp beyond the end of the cache, will wait and retry",
w.ccqLogger.Warn("block look up failed in eth_call_by_timestamp query request, timestamp beyond the end of the cache, will wait and retry",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
Expand All @@ -233,8 +233,8 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
zap.Uint64("nextBlockNum", nextBlockNum),
)
status = query.QueryFatalError
} else {
w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, will request a backfill and retry",
} else if w.ccqBackfillCache {
w.ccqLogger.Warn("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, will request a backfill and retry",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
Expand All @@ -243,6 +243,16 @@ func (w *Watcher) ccqHandleEthCallByTimestampQueryRequest(ctx context.Context, q
zap.Uint64("nextBlockNum", nextBlockNum),
)
w.ccqRequestBackfill(req.TargetTimestamp / 1000000)
} else {
w.ccqLogger.Error("block look up failed in eth_call_by_timestamp query request, timestamp is in a gap in the cache, failing request",
zap.String("requestId", requestId),
zap.Uint64("timestamp", req.TargetTimestamp),
zap.String("block", block),
zap.String("nextBlock", nextBlock),
zap.Uint64("blockNum", blockNum),
zap.Uint64("nextBlockNum", nextBlockNum),
)
status = query.QueryFatalError
}
w.ccqSendQueryResponse(queryRequest, status, nil)
return
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/watchers/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type WatcherConfig struct {
WaitForConfirmations bool // (optional)
L1FinalizerRequired watchers.NetworkID // (optional)
l1Finalizer interfaces.L1Finalizer
CcqBackfillCache bool
}

func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
Expand Down Expand Up @@ -55,7 +56,7 @@ func (wc *WatcherConfig) Create(

var devMode bool = (env == common.UnsafeDevNet)

watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode)
watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode, wc.CcqBackfillCache)
watcher.SetWaitForConfirmations(wc.WaitForConfirmations)
watcher.SetL1Finalizer(wc.l1Finalizer)
return watcher, watcher.Run, nil
Expand Down
5 changes: 4 additions & 1 deletion node/pkg/watchers/evm/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type (
ccqTimestampCache *BlocksByTimestamp
ccqBackfillChannel chan *ccqBackfillRequest
ccqBatchSize int64
ccqBackfillCache bool
ccqLogger *zap.Logger
}

Expand Down Expand Up @@ -171,6 +172,7 @@ func NewEthWatcher(
queryReqC <-chan *query.PerChainQueryInternal,
queryResponseC chan<- *query.PerChainQueryResponseInternal,
unsafeDevMode bool,
ccqBackfillCache bool,
) *Watcher {
return &Watcher{
url: url,
Expand All @@ -188,6 +190,7 @@ func NewEthWatcher(
pending: map[pendingKey]*pendingMessage{},
unsafeDevMode: unsafeDevMode,
ccqMaxBlockNumber: big.NewInt(0).SetUint64(math.MaxUint64),
ccqBackfillCache: ccqBackfillCache,
ccqBackfillChannel: make(chan *ccqBackfillRequest, 50),
}
}
Expand Down Expand Up @@ -464,7 +467,7 @@ func (w *Watcher) Run(parentCtx context.Context) error {
}
})

if w.ccqTimestampCache != nil {
if w.ccqTimestampCache != nil && w.ccqBackfillCache {
if err := supervisor.Run(ctx, "ccq_backfiller", common.WrapWithScissors(w.ccqBackfiller, "ccq_backfiller")); err != nil {
return fmt.Errorf("failed to start ccq_backfiller: %w", err)
}
Expand Down

0 comments on commit 4896d52

Please sign in to comment.