Skip to content

Commit

Permalink
collectors: collect payment and attempt counts
Browse files Browse the repository at this point in the history
collect raw payment and attempt count information from
lnd using the TrackPayments RPC.
  • Loading branch information
calvinrzachman committed Feb 28, 2025
1 parent 98a89e9 commit 0cf9fd4
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 9 deletions.
3 changes: 3 additions & 0 deletions collectors/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var (

// htlcLogger is a logger for lndmon's htlc collector.
htlcLogger = build.NewSubLogger("HTLC", backendLog.Logger)

// paymentLogger is a logger for lndmon's payments monitor.
paymentLogger = build.NewSubLogger("PMNT", backendLog.Logger)
)

// initLogRotator initializes the logging rotator to write logs to logFile and
Expand Down
151 changes: 151 additions & 0 deletions collectors/payments_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package collectors

import (
"context"
"fmt"
"sync"

"github.com/lightninglabs/lndclient"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/prometheus/client_golang/prometheus"
)

var (
totalPayments = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "lnd_total_payments",
Help: "Total number of payments initiated",
},
)
totalHTLCAttempts = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "lnd_total_htlc_attempts",
Help: "Total number of HTLC attempts across all payments",
},
)
paymentAttempts = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "lnd_payment_attempts_per_payment",
Help: "Histogram tracking the number of attempts per payment",
Buckets: prometheus.LinearBuckets(1, 1, 10), // 1 to 10 attempts
},
)
)

// paymentsMonitor listens for payments and updates Prometheus metrics.
type paymentsMonitor struct {
client routerrpc.RouterClient

ctx context.Context

errChan chan error

// quit is closed to signal that we need to shutdown.
quit chan struct{}

wg sync.WaitGroup
}

// newPaymentsMonitor creates a new payments monitor and ensures the context
// includes macaroon authentication.
func newPaymentsMonitor(lnd *lndclient.LndServices,
errChan chan error) (*paymentsMonitor, error) {

// Attach macaroon authentication for the router service.
ctx := context.Background()
ctx, err := lnd.WithMacaroonAuthForService(ctx,
lndclient.RouterServiceMac)
if err != nil {
return nil, fmt.Errorf("failed to get macaroon-authenticated "+
"context: %w", err)
}

return &paymentsMonitor{
client: routerrpc.NewRouterClient(lnd.ClientConn),
errChan: errChan,
quit: make(chan struct{}),
ctx: ctx,
}, nil
}

// start subscribes to `TrackPayments` and updates Prometheus metrics.
func (p *paymentsMonitor) start() error {
// Use the stored authenticated context.
ctx, cancel := context.WithCancel(p.ctx)

paymentLogger.Info("Starting payments monitor...")

stream, err := p.client.TrackPayments(ctx,
&routerrpc.TrackPaymentsRequest{
// NOTE: We only need to know the final result of the
// payment and all attempts.
NoInflightUpdates: true,
})
if err != nil {
paymentLogger.Errorf("Failed to subscribe to TrackPayments: %v",
err)

cancel()

return err
}

p.wg.Add(1)
go func() {
defer func() {
cancel()
p.wg.Done()
}()

for {
select {
case <-p.quit:
return

default:
payment, err := stream.Recv()
if err != nil {
paymentLogger.Errorf("Error receiving "+
"payment update: %v", err)

p.errChan <- err
return
}
processPaymentUpdate(payment)
}
}
}()

return nil
}

// stop cancels the payments monitor subscription.
func (p *paymentsMonitor) stop() {
paymentLogger.Info("Stopping payments monitor...")

close(p.quit)
p.wg.Wait()
}

// collectors returns all of the collectors that the htlc monitor uses.
func (p *paymentsMonitor) collectors() []prometheus.Collector {
return []prometheus.Collector{
totalPayments, totalHTLCAttempts, paymentAttempts,
}
}

// processPaymentUpdate updates Prometheus metrics based on received payments.
//
// NOTE: It is expected that this receive the *final* payment update with the
// complete list of all htlc attempts made for this payment.
func processPaymentUpdate(payment *lnrpc.Payment) {
totalPayments.Inc()
attemptCount := len(payment.Htlcs)

totalHTLCAttempts.Add(float64(attemptCount))
paymentAttempts.Observe(float64(attemptCount))

paymentLogger.Debugf("Payment %s updated: %d attempts",
payment.PaymentHash, attemptCount)
}
47 changes: 40 additions & 7 deletions collectors/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type PrometheusExporter struct {

monitoringCfg *MonitoringConfig

htlcMonitor *htlcMonitor
htlcMonitor *htlcMonitor
paymentsMonitor *paymentsMonitor

// collectors is the exporter's active set of collectors.
collectors []prometheus.Collector
Expand Down Expand Up @@ -72,6 +73,9 @@ type MonitoringConfig struct {
// DisableHtlc disables collection of HTLCs metrics
DisableHtlc bool

// DisablePayments disables collection of payment metrics
DisablePayments bool

// ProgramStartTime stores a best-effort estimate of when lnd/lndmon was
// started.
ProgramStartTime time.Time
Expand Down Expand Up @@ -100,6 +104,15 @@ func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices,

htlcMonitor := newHtlcMonitor(lnd.Router, errChan)

// Create payments monitor.
paymentsMonitor, err := newPaymentsMonitor(lnd, errChan)
if err != nil {
paymentLogger.Errorf("Failed to initialize payment monitor: %v",
err)

return nil
}

chanCollector := NewChannelsCollector(
lnd.Client, errChan, quitChan, monitoringCfg,
)
Expand All @@ -117,19 +130,26 @@ func NewPrometheusExporter(cfg *PrometheusConfig, lnd *lndclient.LndServices,
collectors = append(collectors, htlcMonitor.collectors()...)
}

if !monitoringCfg.DisablePayments {
collectors = append(
collectors, paymentsMonitor.collectors()...,
)
}

if !monitoringCfg.DisableGraph {
collectors = append(
collectors, NewGraphCollector(lnd.Client, errChan),
)
}

return &PrometheusExporter{
cfg: cfg,
lnd: lnd,
monitoringCfg: monitoringCfg,
collectors: collectors,
htlcMonitor: htlcMonitor,
errChan: errChan,
cfg: cfg,
lnd: lnd,
monitoringCfg: monitoringCfg,
collectors: collectors,
htlcMonitor: htlcMonitor,
paymentsMonitor: paymentsMonitor,
errChan: errChan,
}
}

Expand Down Expand Up @@ -165,6 +185,15 @@ func (p *PrometheusExporter) Start() error {
}
}

// Start the payment monitor goroutine. This will subscribe to receive
// update for all payments made by lnd and update our payments related
// metrics.
if !p.monitoringCfg.DisablePayments {
if err := p.paymentsMonitor.start(); err != nil {
return err
}
}

// Finally, we'll launch the HTTP server that Prometheus will use to
// scape our metrics.
go func() {
Expand Down Expand Up @@ -199,6 +228,10 @@ func (p *PrometheusExporter) Stop() {
if !p.monitoringCfg.DisableHtlc {
p.htlcMonitor.stop()
}

if !p.monitoringCfg.DisablePayments {
p.paymentsMonitor.stop()
}
}

// Errors returns an error channel that any failures experienced by its
Expand Down
3 changes: 3 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type config struct {

// DisableHtlc disables the collection of HTLCs metrics.
DisableHtlc bool `long:"disablehtlc" description:"Do not collect HTLCs metrics"`

// DisablePayments disables the collection of payments metrics.
DisablePayments bool `long:"disablepayments" description:"Do not collect payments metrics"`
}

var defaultConfig = config{
Expand Down
5 changes: 3 additions & 2 deletions lndmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func start() error {
defer lnd.Close()

monitoringCfg := collectors.MonitoringConfig{
DisableGraph: cfg.DisableGraph,
DisableHtlc: cfg.DisableHtlc,
DisableGraph: cfg.DisableGraph,
DisableHtlc: cfg.DisableHtlc,
DisablePayments: cfg.DisablePayments,
}
if cfg.PrimaryNode != "" {
primaryNode, err := route.NewVertexFromStr(cfg.PrimaryNode)
Expand Down

0 comments on commit 0cf9fd4

Please sign in to comment.