Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Commit

Permalink
release/v0.8.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Bowman committed Jan 21, 2023
1 parent 8f3c02e commit 3912a21
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 25 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ chains:
```

## Changelog
### v0.8.2
- Improved efficiency
- More detailed metrics
- Don't panic on failed txs
- Don't query client headers that are going to be rejected

### v0.8.0
- Improved error handling
- Add metrics

### v0.6.2
- Fix default chain instantiation on first run
Expand Down
57 changes: 38 additions & 19 deletions pkg/runner/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
stdlog "log"
"math"
"math/rand"
"net/http"
"os"
"sort"
Expand Down Expand Up @@ -43,12 +44,12 @@ import (

type Clients []*lensclient.ChainClient

const VERSION = "icq/v0.8.0"
const VERSION = "icq/v0.8.2"

var (
WaitInterval = time.Second * 6
MaxHistoricQueries = 25
MaxTxMsgs = 15
MaxHistoricQueries = 5
MaxTxMsgs = 5
clients = Clients{}
ctx = context.Background()
sendQueue = map[string]chan sdk.Msg{}
Expand Down Expand Up @@ -150,6 +151,7 @@ func Run(cfg *config.Config, home string) error {
}

bz := c.Codec.Marshaler.MustMarshal(req)
metrics.HistoricQueryRequests.WithLabelValues("historic_requests").Inc()
res, err := c.RPCClient.ABCIQuery(ctx, "/quicksilver.interchainquery.v1.QuerySrvr/Queries", bz)
if err != nil {
if strings.Contains(err.Error(), "Client.Timeout") {
Expand Down Expand Up @@ -204,6 +206,9 @@ func handleHistoricRequests(queries []qstypes.Query, sourceChainId string, logge
return
}

rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(queries), func(i, j int) { queries[i], queries[j] = queries[j], queries[i] })

sort.Slice(queries, func(i, j int) bool {
return queries[i].LastEmission.GT(queries[j].LastEmission)
})
Expand Down Expand Up @@ -252,7 +257,7 @@ func handleEvent(event coretypes.ResultEvent, logger log.Logger, metrics prommet
}
}

func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method string, reqBz []byte, md metadata.MD) (abcitypes.ResponseQuery, metadata.MD, error) {
func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method string, reqBz []byte, md metadata.MD, metrics prommetrics.Metrics) (abcitypes.ResponseQuery, metadata.MD, error) {

// parse height header
height, err := lensclient.GetHeightFromMetadata(md)
Expand All @@ -272,22 +277,27 @@ func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method st
Prove: prove,
}

metrics.ABCIRequests.WithLabelValues("abci_requests", method).Inc()

abciRes, err := client.QueryABCI(ctx, abciReq)
if err != nil {
return abcitypes.ResponseQuery{}, nil, err
}
return abciRes, md, nil
}

func retryLightblock(ctx context.Context, client *lensclient.ChainClient, height int64, maxTime int, logger log.Logger) (*tmtypes.LightBlock, error) {
func retryLightblock(ctx context.Context, client *lensclient.ChainClient, height int64, maxTime int, logger log.Logger, metrics prommetrics.Metrics) (*tmtypes.LightBlock, error) {
interval := 1
_ = logger.Log("msg", "Querying lightblock", "attempt", interval)
lightBlock, err := client.LightProvider.LightBlock(ctx, height)
metrics.LightBlockRequests.WithLabelValues("lightblock_requests").Inc()

if err != nil {
for {
time.Sleep(time.Duration(interval) * time.Second)
_ = logger.Log("msg", "Requerying lightblock", "attempt", interval)
lightBlock, err = client.LightProvider.LightBlock(ctx, height)
metrics.LightBlockRequests.WithLabelValues("lightblock_requests").Inc()
interval = interval + 1
if err == nil {
break
Expand All @@ -300,10 +310,10 @@ func retryLightblock(ctx context.Context, client *lensclient.ChainClient, height
}
func doRequestWithMetrics(query Query, logger log.Logger, metrics prommetrics.Metrics) {
startTime := time.Now()
metrics.Requests.WithLabelValues("requests").Inc()
metrics.Requests.WithLabelValues("requests", query.Type).Inc()
doRequest(query, logger, metrics)
endTime := time.Now()
metrics.RequestsLatency.WithLabelValues("request-latency").Observe(endTime.Sub(startTime).Seconds())
metrics.RequestsLatency.WithLabelValues("request-latency", query.Type).Observe(endTime.Sub(startTime).Seconds())
}

func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) {
Expand Down Expand Up @@ -352,7 +362,7 @@ func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) {
}

_ = logger.Log("msg", "Handling GetTxsEvents", "id", query.QueryId, "height", query.Height)
res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd)
res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd, metrics)
if err != nil {
_ = logger.Log("msg", "Error: Failed in RunGRPCQuery", "type", query.Type, "id", query.QueryId, "height", query.Height)
panic(fmt.Sprintf("panic(7c): %v", err))
Expand Down Expand Up @@ -399,7 +409,7 @@ func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) {
}

clientId := connection.Connection.ClientId
header, err := getHeader(ctx, client, submitClient, clientId, out.Height-1, logger)
header, err := getHeader(ctx, client, submitClient, clientId, out.Height-1, logger, true, metrics)
if err != nil {
_ = logger.Log("msg", fmt.Sprintf("Error: Could not get header %s", err))
return
Expand All @@ -416,7 +426,7 @@ func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) {
sendQueue[query.SourceChainId] <- msg
return
default:
res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd)
res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd, metrics)
if err != nil {
_ = logger.Log("msg", "Error: Failed in RunGRPCQuery", "type", query.Type, "id", query.QueryId, "height", query.Height)
panic(fmt.Sprintf("panic(7): %v", err))
Expand Down Expand Up @@ -444,7 +454,7 @@ func submitClientUpdate(client, submitClient *lensclient.ChainClient, query Quer

clientId := connection.Connection.ClientId

header, err := getHeader(ctx, client, submitClient, clientId, height, logger)
header, err := getHeader(ctx, client, submitClient, clientId, height, logger, false, metrics)
if err != nil {
_ = logger.Log("msg", fmt.Sprintf("Error: Could not get header %s", err))
return
Expand All @@ -465,7 +475,7 @@ func submitClientUpdate(client, submitClient *lensclient.ChainClient, query Quer
metrics.SendQueue.WithLabelValues("send-queue").Set(float64(len(sendQueue)))
}

func getHeader(ctx context.Context, client, submitClient *lensclient.ChainClient, clientId string, requestHeight int64, logger log.Logger) (*tmclient.Header, error) {
func getHeader(ctx context.Context, client, submitClient *lensclient.ChainClient, clientId string, requestHeight int64, logger log.Logger, historicOk bool, metrics prommetrics.Metrics) (*tmclient.Header, error) {
submitQuerier := lensquery.Query{Client: submitClient, Options: lensquery.DefaultOptions()}
state, err := submitQuerier.Ibc_ClientState(clientId) // pass in from request
if err != nil {
Expand All @@ -483,13 +493,17 @@ func getHeader(ctx context.Context, client, submitClient *lensclient.ChainClient

}

if !historicOk && clientHeight.RevisionHeight >= uint64(requestHeight+1) {
return nil, fmt.Errorf("trusted height >= request height")
}

_ = logger.Log("msg", "Fetching client update for height", "height", requestHeight+1)
newBlock, err := retryLightblock(ctx, client, int64(requestHeight+1), 5, logger)
newBlock, err := retryLightblock(ctx, client, int64(requestHeight+1), 5, logger, metrics)
if err != nil {
panic(fmt.Sprintf("Error: Could not fetch updated LC from chain - bailing: %v", err))
}

trustedBlock, err := retryLightblock(ctx, client, int64(clientHeight.RevisionHeight)+1, 5, logger)
trustedBlock, err := retryLightblock(ctx, client, int64(clientHeight.RevisionHeight)+1, 5, logger, metrics)
if err != nil {
panic(fmt.Sprintf("Error: Could not fetch updated LC from chain - bailing (2): %v", err))
}
Expand Down Expand Up @@ -557,23 +571,23 @@ func FlushSendQueue(chainId string, logger log.Logger, metrics prommetrics.Metri

for {
if len(toSend) > MaxTxMsgs {
flush(chainId, toSend, logger)
flush(chainId, toSend, logger, metrics)
toSend = []sdk.Msg{}
}
select {
case msg := <-ch:
toSend = append(toSend, msg)
metrics.SendQueue.WithLabelValues("send-queue").Set(float64(len(sendQueue[chainId])))
case <-time.After(WaitInterval):
flush(chainId, toSend, logger)
flush(chainId, toSend, logger, metrics)
metrics.SendQueue.WithLabelValues("send-queue").Set(float64(len(sendQueue[chainId])))
toSend = []sdk.Msg{}
}
}
}

// TODO: refactor me!
func flush(chainId string, toSend []sdk.Msg, logger log.Logger) {
func flush(chainId string, toSend []sdk.Msg, logger log.Logger, metrics prommetrics.Metrics) {
if len(toSend) > 0 {
_ = logger.Log("msg", fmt.Sprintf("Sending batch of %d messages", len(toSend)))
chainClient := clients.GetForChainId(chainId)
Expand Down Expand Up @@ -606,12 +620,17 @@ func flush(chainId string, toSend []sdk.Msg, logger log.Logger) {
_ = logger.Log("msg", "Failed to submit in time, bailing")
return
} else {
panic(fmt.Sprintf("panic(1): %v", err))
//panic(fmt.Sprintf("panic(1): %v", err))
_ = logger.Log("msg", "Failed to submit after retry; nevermind, we'll try again!", "err", err)
metrics.FailedTxs.WithLabelValues("failed_txs").Inc()
}
}

} else {
panic(fmt.Sprintf("panic(2): %v", err))
// for some reason the submission failed; but we should be able to continue here.
// panic(fmt.Sprintf("panic(2): %v", err))
_ = logger.Log("msg", "Failed to submit; nevermind, we'll try again!", "err", err)
metrics.FailedTxs.WithLabelValues("failed_txs").Inc()
}
}
_ = logger.Log("msg", fmt.Sprintf("Sent batch of %d (deduplicated) messages", len(msgs)))
Expand Down
36 changes: 30 additions & 6 deletions prommetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package prommetrics
import "github.com/prometheus/client_golang/prometheus"

type Metrics struct {
Requests prometheus.CounterVec
RequestsLatency prometheus.HistogramVec
HistoricQueries prometheus.GaugeVec
SendQueue prometheus.GaugeVec
Requests prometheus.CounterVec
FailedTxs prometheus.CounterVec
RequestsLatency prometheus.HistogramVec
HistoricQueries prometheus.GaugeVec
SendQueue prometheus.GaugeVec
HistoricQueryRequests prometheus.CounterVec
ABCIRequests prometheus.CounterVec
LightBlockRequests prometheus.CounterVec
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
Expand All @@ -15,13 +19,18 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Namespace: "icq",
Name: "requests",
Help: "number of host requests",
}, []string{"name", "type"}),
FailedTxs: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "icq",
Name: "failed_txs",
Help: "number of failed txs",
}, []string{"name"}),
RequestsLatency: *prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "icq",
Name: "request_duration_seconds",
Help: "Latency of requests",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"name"}),
}, []string{"name", "type"}),
HistoricQueries: *prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "icq",
Name: "historic_queries",
Expand All @@ -32,7 +41,22 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Name: "send_queue",
Help: "send queue size",
}, []string{"name"}),
HistoricQueryRequests: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "icq",
Name: "historic_reqs",
Help: "number of historic query requests",
}, []string{"name"}),
ABCIRequests: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "icq",
Name: "abci_reqs",
Help: "number of abci requests",
}, []string{"name", "type"}),
LightBlockRequests: *prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "icq",
Name: "lightblock_reqs",
Help: "number of lightblock requests",
}, []string{"name"}),
}
reg.MustRegister(m.Requests, m.RequestsLatency, m.HistoricQueries, m.SendQueue)
reg.MustRegister(m.Requests, m.RequestsLatency, m.HistoricQueries, m.SendQueue, m.FailedTxs, m.HistoricQueryRequests, m.LightBlockRequests, m.ABCIRequests)
return m
}

0 comments on commit 3912a21

Please sign in to comment.