diff --git a/README.md b/README.md index 4951122..668577d 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,15 @@ chains: ``` ## Changelog +### v0.8.2 +- Improved efficiency +- More detailed metrics +- Don't panic on failed txs +- Don't query client headers that are going to be rejected + +### v0.8.0 +- Improved error handling +- Add metrics ### v0.6.2 - Fix default chain instantiation on first run diff --git a/pkg/runner/run.go b/pkg/runner/run.go index 7d0c9ba..437d514 100644 --- a/pkg/runner/run.go +++ b/pkg/runner/run.go @@ -6,6 +6,7 @@ import ( "fmt" stdlog "log" "math" + "math/rand" "net/http" "os" "sort" @@ -43,12 +44,12 @@ import ( type Clients []*lensclient.ChainClient -const VERSION = "icq/v0.8.0" +const VERSION = "icq/v0.8.2" var ( WaitInterval = time.Second * 6 - MaxHistoricQueries = 25 - MaxTxMsgs = 15 + MaxHistoricQueries = 5 + MaxTxMsgs = 5 clients = Clients{} ctx = context.Background() sendQueue = map[string]chan sdk.Msg{} @@ -150,6 +151,7 @@ func Run(cfg *config.Config, home string) error { } bz := c.Codec.Marshaler.MustMarshal(req) + metrics.HistoricQueryRequests.WithLabelValues("historic_requests").Inc() res, err := c.RPCClient.ABCIQuery(ctx, "/quicksilver.interchainquery.v1.QuerySrvr/Queries", bz) if err != nil { if strings.Contains(err.Error(), "Client.Timeout") { @@ -204,6 +206,9 @@ func handleHistoricRequests(queries []qstypes.Query, sourceChainId string, logge return } + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(queries), func(i, j int) { queries[i], queries[j] = queries[j], queries[i] }) + sort.Slice(queries, func(i, j int) bool { return queries[i].LastEmission.GT(queries[j].LastEmission) }) @@ -252,7 +257,7 @@ func handleEvent(event coretypes.ResultEvent, logger log.Logger, metrics prommet } } -func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method string, reqBz []byte, md metadata.MD) (abcitypes.ResponseQuery, metadata.MD, error) { +func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method string, reqBz []byte, md metadata.MD, metrics prommetrics.Metrics) (abcitypes.ResponseQuery, metadata.MD, error) { // parse height header height, err := lensclient.GetHeightFromMetadata(md) @@ -272,6 +277,8 @@ func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method st Prove: prove, } + metrics.ABCIRequests.WithLabelValues("abci_requests", method).Inc() + abciRes, err := client.QueryABCI(ctx, abciReq) if err != nil { return abcitypes.ResponseQuery{}, nil, err @@ -279,15 +286,18 @@ func RunGRPCQuery(ctx context.Context, client *lensclient.ChainClient, method st return abciRes, md, nil } -func retryLightblock(ctx context.Context, client *lensclient.ChainClient, height int64, maxTime int, logger log.Logger) (*tmtypes.LightBlock, error) { +func retryLightblock(ctx context.Context, client *lensclient.ChainClient, height int64, maxTime int, logger log.Logger, metrics prommetrics.Metrics) (*tmtypes.LightBlock, error) { interval := 1 _ = logger.Log("msg", "Querying lightblock", "attempt", interval) lightBlock, err := client.LightProvider.LightBlock(ctx, height) + metrics.LightBlockRequests.WithLabelValues("lightblock_requests").Inc() + if err != nil { for { time.Sleep(time.Duration(interval) * time.Second) _ = logger.Log("msg", "Requerying lightblock", "attempt", interval) lightBlock, err = client.LightProvider.LightBlock(ctx, height) + metrics.LightBlockRequests.WithLabelValues("lightblock_requests").Inc() interval = interval + 1 if err == nil { break @@ -300,10 +310,10 @@ func retryLightblock(ctx context.Context, client *lensclient.ChainClient, height } func doRequestWithMetrics(query Query, logger log.Logger, metrics prommetrics.Metrics) { startTime := time.Now() - metrics.Requests.WithLabelValues("requests").Inc() + metrics.Requests.WithLabelValues("requests", query.Type).Inc() doRequest(query, logger, metrics) endTime := time.Now() - metrics.RequestsLatency.WithLabelValues("request-latency").Observe(endTime.Sub(startTime).Seconds()) + metrics.RequestsLatency.WithLabelValues("request-latency", query.Type).Observe(endTime.Sub(startTime).Seconds()) } func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) { @@ -352,7 +362,7 @@ func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) { } _ = logger.Log("msg", "Handling GetTxsEvents", "id", query.QueryId, "height", query.Height) - res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd) + res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd, metrics) if err != nil { _ = logger.Log("msg", "Error: Failed in RunGRPCQuery", "type", query.Type, "id", query.QueryId, "height", query.Height) panic(fmt.Sprintf("panic(7c): %v", err)) @@ -399,7 +409,7 @@ func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) { } clientId := connection.Connection.ClientId - header, err := getHeader(ctx, client, submitClient, clientId, out.Height-1, logger) + header, err := getHeader(ctx, client, submitClient, clientId, out.Height-1, logger, true, metrics) if err != nil { _ = logger.Log("msg", fmt.Sprintf("Error: Could not get header %s", err)) return @@ -416,7 +426,7 @@ func doRequest(query Query, logger log.Logger, metrics prommetrics.Metrics) { sendQueue[query.SourceChainId] <- msg return default: - res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd) + res, _, err = RunGRPCQuery(ctx, client, "/"+query.Type, query.Request, inMd, metrics) if err != nil { _ = logger.Log("msg", "Error: Failed in RunGRPCQuery", "type", query.Type, "id", query.QueryId, "height", query.Height) panic(fmt.Sprintf("panic(7): %v", err)) @@ -444,7 +454,7 @@ func submitClientUpdate(client, submitClient *lensclient.ChainClient, query Quer clientId := connection.Connection.ClientId - header, err := getHeader(ctx, client, submitClient, clientId, height, logger) + header, err := getHeader(ctx, client, submitClient, clientId, height, logger, false, metrics) if err != nil { _ = logger.Log("msg", fmt.Sprintf("Error: Could not get header %s", err)) return @@ -465,7 +475,7 @@ func submitClientUpdate(client, submitClient *lensclient.ChainClient, query Quer metrics.SendQueue.WithLabelValues("send-queue").Set(float64(len(sendQueue))) } -func getHeader(ctx context.Context, client, submitClient *lensclient.ChainClient, clientId string, requestHeight int64, logger log.Logger) (*tmclient.Header, error) { +func getHeader(ctx context.Context, client, submitClient *lensclient.ChainClient, clientId string, requestHeight int64, logger log.Logger, historicOk bool, metrics prommetrics.Metrics) (*tmclient.Header, error) { submitQuerier := lensquery.Query{Client: submitClient, Options: lensquery.DefaultOptions()} state, err := submitQuerier.Ibc_ClientState(clientId) // pass in from request if err != nil { @@ -483,13 +493,17 @@ func getHeader(ctx context.Context, client, submitClient *lensclient.ChainClient } + if !historicOk && clientHeight.RevisionHeight >= uint64(requestHeight+1) { + return nil, fmt.Errorf("trusted height >= request height") + } + _ = logger.Log("msg", "Fetching client update for height", "height", requestHeight+1) - newBlock, err := retryLightblock(ctx, client, int64(requestHeight+1), 5, logger) + newBlock, err := retryLightblock(ctx, client, int64(requestHeight+1), 5, logger, metrics) if err != nil { panic(fmt.Sprintf("Error: Could not fetch updated LC from chain - bailing: %v", err)) } - trustedBlock, err := retryLightblock(ctx, client, int64(clientHeight.RevisionHeight)+1, 5, logger) + trustedBlock, err := retryLightblock(ctx, client, int64(clientHeight.RevisionHeight)+1, 5, logger, metrics) if err != nil { panic(fmt.Sprintf("Error: Could not fetch updated LC from chain - bailing (2): %v", err)) } @@ -557,7 +571,7 @@ func FlushSendQueue(chainId string, logger log.Logger, metrics prommetrics.Metri for { if len(toSend) > MaxTxMsgs { - flush(chainId, toSend, logger) + flush(chainId, toSend, logger, metrics) toSend = []sdk.Msg{} } select { @@ -565,7 +579,7 @@ func FlushSendQueue(chainId string, logger log.Logger, metrics prommetrics.Metri toSend = append(toSend, msg) metrics.SendQueue.WithLabelValues("send-queue").Set(float64(len(sendQueue[chainId]))) case <-time.After(WaitInterval): - flush(chainId, toSend, logger) + flush(chainId, toSend, logger, metrics) metrics.SendQueue.WithLabelValues("send-queue").Set(float64(len(sendQueue[chainId]))) toSend = []sdk.Msg{} } @@ -573,7 +587,7 @@ func FlushSendQueue(chainId string, logger log.Logger, metrics prommetrics.Metri } // TODO: refactor me! -func flush(chainId string, toSend []sdk.Msg, logger log.Logger) { +func flush(chainId string, toSend []sdk.Msg, logger log.Logger, metrics prommetrics.Metrics) { if len(toSend) > 0 { _ = logger.Log("msg", fmt.Sprintf("Sending batch of %d messages", len(toSend))) chainClient := clients.GetForChainId(chainId) @@ -606,12 +620,17 @@ func flush(chainId string, toSend []sdk.Msg, logger log.Logger) { _ = logger.Log("msg", "Failed to submit in time, bailing") return } else { - panic(fmt.Sprintf("panic(1): %v", err)) + //panic(fmt.Sprintf("panic(1): %v", err)) + _ = logger.Log("msg", "Failed to submit after retry; nevermind, we'll try again!", "err", err) + metrics.FailedTxs.WithLabelValues("failed_txs").Inc() } } } else { - panic(fmt.Sprintf("panic(2): %v", err)) + // for some reason the submission failed; but we should be able to continue here. + // panic(fmt.Sprintf("panic(2): %v", err)) + _ = logger.Log("msg", "Failed to submit; nevermind, we'll try again!", "err", err) + metrics.FailedTxs.WithLabelValues("failed_txs").Inc() } } _ = logger.Log("msg", fmt.Sprintf("Sent batch of %d (deduplicated) messages", len(msgs))) diff --git a/prommetrics/metrics.go b/prommetrics/metrics.go index 62114d7..b5f6203 100644 --- a/prommetrics/metrics.go +++ b/prommetrics/metrics.go @@ -3,10 +3,14 @@ package prommetrics import "github.com/prometheus/client_golang/prometheus" type Metrics struct { - Requests prometheus.CounterVec - RequestsLatency prometheus.HistogramVec - HistoricQueries prometheus.GaugeVec - SendQueue prometheus.GaugeVec + Requests prometheus.CounterVec + FailedTxs prometheus.CounterVec + RequestsLatency prometheus.HistogramVec + HistoricQueries prometheus.GaugeVec + SendQueue prometheus.GaugeVec + HistoricQueryRequests prometheus.CounterVec + ABCIRequests prometheus.CounterVec + LightBlockRequests prometheus.CounterVec } func NewMetrics(reg prometheus.Registerer) *Metrics { @@ -15,13 +19,18 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { Namespace: "icq", Name: "requests", Help: "number of host requests", + }, []string{"name", "type"}), + FailedTxs: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "icq", + Name: "failed_txs", + Help: "number of failed txs", }, []string{"name"}), RequestsLatency: *prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "icq", Name: "request_duration_seconds", Help: "Latency of requests", Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), - }, []string{"name"}), + }, []string{"name", "type"}), HistoricQueries: *prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "icq", Name: "historic_queries", @@ -32,7 +41,22 @@ func NewMetrics(reg prometheus.Registerer) *Metrics { Name: "send_queue", Help: "send queue size", }, []string{"name"}), + HistoricQueryRequests: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "icq", + Name: "historic_reqs", + Help: "number of historic query requests", + }, []string{"name"}), + ABCIRequests: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "icq", + Name: "abci_reqs", + Help: "number of abci requests", + }, []string{"name", "type"}), + LightBlockRequests: *prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "icq", + Name: "lightblock_reqs", + Help: "number of lightblock requests", + }, []string{"name"}), } - reg.MustRegister(m.Requests, m.RequestsLatency, m.HistoricQueries, m.SendQueue) + reg.MustRegister(m.Requests, m.RequestsLatency, m.HistoricQueries, m.SendQueue, m.FailedTxs, m.HistoricQueryRequests, m.LightBlockRequests, m.ABCIRequests) return m }