Skip to content

Commit

Permalink
Code review rework
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Dec 7, 2023
1 parent 23b1831 commit 39d21ae
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions node/pkg/watchers/evm/ccq_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,36 @@ import (
)

const CCQ_MAX_BATCH_SIZE = int64(1000)
const CCQ_TIMESTAMP_RANGE_IN_MS = uint64(30 * 60)
const CCQ_TIMESTAMP_RANGE_IN_SECONDS = uint64(30 * 60)

type ccqBackfillRequest struct {
timestamp uint64
}
type (
// ccqBackfillRequest represents a request to backfill the cache. It is the payload on the request channel.
ccqBackfillRequest struct {
timestamp uint64
}

// ccqBatchResult is the result of each query in a batch.
ccqBatchResult struct {
result ccqBlockMarshaller
err error
}

// ccqRequestBackfill submits a request to backfill a gap in the timestamp cache.
// ccqBlockMarshaller is used to marshal the query results.
ccqBlockMarshaller struct {
Number ethHexUtil.Uint64
Time ethHexUtil.Uint64 `json:"timestamp"`
// Hash ethCommon.Hash `json:"hash"`
}
)

// ccqRequestBackfill submits a request to backfill a gap in the timestamp cache. Note that the timestamp passed in should be in seconds, as expected by the timestamp cache.
func (w *Watcher) ccqRequestBackfill(timestamp uint64) {
select {
case w.ccqBackfillChannel <- &ccqBackfillRequest{timestamp: timestamp}:
w.ccqLogger.Debug("published backfill request", zap.Uint64("timestamp", timestamp))
default:
// This will get retried next interval.
w.ccqLogger.Error("failed to published query response error to handler", zap.Uint64("timestamp", timestamp))
w.ccqLogger.Error("failed to post backfill request, will get retried next interval", zap.Uint64("timestamp", timestamp))
}
}

Expand All @@ -48,19 +64,6 @@ func (w *Watcher) ccqBackfiller(ctx context.Context) error {
}
}

type (
ccqBatchResult struct {
result ccqBlockMarshaller
err error
}

ccqBlockMarshaller struct {
Number ethHexUtil.Uint64
Time ethHexUtil.Uint64 `json:"timestamp"`
// Hash ethCommon.Hash `json:"hash"`
}
)

// ccqBackfillInit determines the maximum batch size to be used for backfilling the cache. It also loads the initial batch of timestamps.
func (w *Watcher) ccqBackfillInit(ctx context.Context) error {
// Get the latest block so we can use that as the starting point in our cache.
Expand All @@ -84,16 +87,19 @@ func (w *Watcher) ccqBackfillInit(ctx context.Context) error {
w.ccqLogger.Info("using existing batch size for timestamp cache", zap.Int64("batchSize", w.ccqBatchSize))
}

if len(blocks) == 0 {
return fmt.Errorf("list of blocks is empty")
}

// We want to start with a half hour in our cache. Get batches until we cover that.
cutOffTime := latestBlock.Time - CCQ_TIMESTAMP_RANGE_IN_MS
if latestBlock.Time < CCQ_TIMESTAMP_RANGE_IN_MS {
cutOffTime := latestBlock.Time - CCQ_TIMESTAMP_RANGE_IN_SECONDS
if latestBlock.Time < CCQ_TIMESTAMP_RANGE_IN_SECONDS {
// In devnet the timestamps are just integers that start at zero on startup.
cutOffTime = 0
}

if len(blocks) == 0 {
// This should never happen, but the for loop would panic if it did!
return fmt.Errorf("list of blocks is empty")
}

// Query for more blocks until we go back the desired length of time. The last block in the array will be the oldest, so query starting one before that.
for blocks[len(blocks)-1].Timestamp > cutOffTime {
newBlocks, err := w.ccqBackfillGetBlocks(ctx, int64(blocks[len(blocks)-1].BlockNum-1), w.ccqBatchSize)
if err != nil {
Expand Down Expand Up @@ -213,7 +219,7 @@ func ccqBackFillDetermineMaxBatchSize(ctx context.Context, logger *zap.Logger, c
return batchSize, blocks, nil
}

// ccqBackfillGetBlocks gets a range of blocks from the RPC.
// ccqBackfillGetBlocks gets a range of blocks from the RPC, starting from initialBlockNum and going downward for numBlocks.
func (w *Watcher) ccqBackfillGetBlocks(ctx context.Context, initialBlockNum int64, numBlocks int64) (Blocks, error) {
w.ccqLogger.Info("getting batch", zap.Int64("initialBlockNum", initialBlockNum), zap.Int64("numBlocks", numBlocks))
batch := make([]ethRpc.BatchElem, numBlocks)
Expand Down

0 comments on commit 39d21ae

Please sign in to comment.