Skip to content

Commit

Permalink
Add metrics for PeersUpdateManager (#1310)
Browse files Browse the repository at this point in the history
With this change we should be able to collect and expose the following histograms:

* `management.updatechannel.create.duration.ms`  with `closed` boolean label
* `management.updatechannel.create.duration.micro` with `closed` boolean label
* `management.updatechannel.close.one.duration.ms`
* `management.updatechannel.close.one.duration.micro`
* `management.updatechannel.close.multiple.duration.ms`
* `management.updatechannel.close.multiple.duration.micro`
* `management.updatechannel.close.multiple.channels`
* `management.updatechannel.send.duration.ms` with `found` and `dropped` boolean labels
* `management.updatechannel.send.duration.micro` with `found` and `dropped` boolean labels
* `management.updatechannel.get.all.duration.ms`
* `management.updatechannel.get.all.duration.micro`
* `management.updatechannel.get.all.peers`
  • Loading branch information
surik committed Dec 14, 2023
1 parent d379c25 commit c46d278
Show file tree
Hide file tree
Showing 16 changed files with 266 additions and 41 deletions.
2 changes: 1 addition & 1 deletion client/cmd/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste
t.Fatal(err)
}

peersUpdateManager := mgmt.NewPeersUpdateManager()
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ func startManagement(dataDir string) (*grpc.Server, string, error) {
return nil, "", err
}

peersUpdateManager := server.NewPeersUpdateManager()
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
Expand Down
2 changes: 1 addition & 1 deletion management/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}

peersUpdateManager := mgmt.NewPeersUpdateManager()
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
accountManager, err := mgmt.BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false)
Expand Down
2 changes: 1 addition & 1 deletion management/cmd/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var (
if err != nil {
return fmt.Errorf("failed creating Store: %s: %v", config.Datadir, err)
}
peersUpdateManager := server.NewPeersUpdateManager()
peersUpdateManager := server.NewPeersUpdateManager(appMetrics)

var idpManager idp.Manager
if config.IdpManagerConfig != nil {
Expand Down
2 changes: 1 addition & 1 deletion management/server/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ func createManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.cloud", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, false)
}

func createStore(t *testing.T) (Store, error) {
Expand Down
2 changes: 1 addition & 1 deletion management/server/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.test", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, false)
}

func createDNSStore(t *testing.T) (Store, error) {
Expand Down
2 changes: 1 addition & 1 deletion management/server/management_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func startManagement(t *testing.T, config *Config) (*grpc.Server, string, error)
if err != nil {
return nil, "", err
}
peersUpdateManager := NewPeersUpdateManager()
peersUpdateManager := NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
accountManager, err := BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false)
Expand Down
2 changes: 1 addition & 1 deletion management/server/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) {
if err != nil {
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
}
peersUpdateManager := server.NewPeersUpdateManager()
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
accountManager, err := server.BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false)
Expand Down
2 changes: 1 addition & 1 deletion management/server/nameserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false)
}

func createNSStore(t *testing.T) (Store, error) {
Expand Down
2 changes: 1 addition & 1 deletion management/server/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false)
}

func createRouterStore(t *testing.T) (Store, error) {
Expand Down
60 changes: 44 additions & 16 deletions management/server/telemetry/app_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ const defaultEndpoint = "/metrics"

// MockAppMetrics mocks the AppMetrics interface
type MockAppMetrics struct {
GetMeterFunc func() metric2.Meter
CloseFunc func() error
ExposeFunc func(port int, endpoint string) error
IDPMetricsFunc func() *IDPMetrics
HTTPMiddlewareFunc func() *HTTPMiddleware
GRPCMetricsFunc func() *GRPCMetrics
StoreMetricsFunc func() *StoreMetrics
GetMeterFunc func() metric2.Meter
CloseFunc func() error
ExposeFunc func(port int, endpoint string) error
IDPMetricsFunc func() *IDPMetrics
HTTPMiddlewareFunc func() *HTTPMiddleware
GRPCMetricsFunc func() *GRPCMetrics
StoreMetricsFunc func() *StoreMetrics
UpdateChannelMetricsFunc func() *UpdateChannelMetrics
}

// GetMeter mocks the GetMeter function of the AppMetrics interface
Expand Down Expand Up @@ -85,6 +86,14 @@ func (mock *MockAppMetrics) StoreMetrics() *StoreMetrics {
return nil
}

// UpdateChannelMetrics mocks the MockAppMetrics function of the UpdateChannelMetrics interface
func (mock *MockAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics {
if mock.UpdateChannelMetricsFunc != nil {
return mock.UpdateChannelMetricsFunc()
}
return nil
}

// AppMetrics is metrics interface
type AppMetrics interface {
GetMeter() metric2.Meter
Expand All @@ -94,18 +103,20 @@ type AppMetrics interface {
HTTPMiddleware() *HTTPMiddleware
GRPCMetrics() *GRPCMetrics
StoreMetrics() *StoreMetrics
UpdateChannelMetrics() *UpdateChannelMetrics
}

// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
type defaultAppMetrics struct {
// Meter can be used by different application parts to create counters and measure things
Meter metric2.Meter
listener net.Listener
ctx context.Context
idpMetrics *IDPMetrics
httpMiddleware *HTTPMiddleware
grpcMetrics *GRPCMetrics
storeMetrics *StoreMetrics
Meter metric2.Meter
listener net.Listener
ctx context.Context
idpMetrics *IDPMetrics
httpMiddleware *HTTPMiddleware
grpcMetrics *GRPCMetrics
storeMetrics *StoreMetrics
updateChannelMetrics *UpdateChannelMetrics
}

// IDPMetrics returns metrics for the idp package
Expand All @@ -128,6 +139,11 @@ func (appMetrics *defaultAppMetrics) StoreMetrics() *StoreMetrics {
return appMetrics.storeMetrics
}

// UpdateChannelMetrics returns metrics for the updatechannel
func (appMetrics *defaultAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics {
return appMetrics.updateChannelMetrics
}

// Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil {
Expand Down Expand Up @@ -199,6 +215,18 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
return nil, err
}

return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware,
grpcMetrics: grpcMetrics, storeMetrics: storeMetrics}, nil
updateChannelMetrics, err := NewUpdateChannelMetrics(ctx, meter)
if err != nil {
return nil, err
}

return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
idpMetrics: idpMetrics,
httpMiddleware: middleware,
grpcMetrics: grpcMetrics,
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
}, nil
}
2 changes: 1 addition & 1 deletion management/server/telemetry/store_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)

// StoreMetrics represents all metrics related to the FileStore
// StoreMetrics represents all metrics related to the Store
type StoreMetrics struct {
globalLockAcquisitionDurationMicro syncint64.Histogram
globalLockAcquisitionDurationMs syncint64.Histogram
Expand Down
141 changes: 141 additions & 0 deletions management/server/telemetry/updatechannel_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package telemetry

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)

// UpdateChannelMetrics represents all metrics related to the UpdateChannel
type UpdateChannelMetrics struct {
createChannelDurationMs syncint64.Histogram
createChannelDurationMicro syncint64.Histogram
closeChannelDurationMs syncint64.Histogram
closeChannelDurationMicro syncint64.Histogram
closeChannelsDurationMs syncint64.Histogram
closeChannelsDurationMicro syncint64.Histogram
closeChannels syncint64.Histogram
sendUpdateDurationMs syncint64.Histogram
sendUpdateDurationMicro syncint64.Histogram
getAllConnectedPeersDurationMs syncint64.Histogram
getAllConnectedPeersDurationMicro syncint64.Histogram
getAllConnectedPeers syncint64.Histogram
ctx context.Context
}

// NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms")
if err != nil {
return nil, err
}

createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro")
if err != nil {
return nil, err
}

closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms")
if err != nil {
return nil, err
}

closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro")
if err != nil {
return nil, err
}

closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms")
if err != nil {
return nil, err
}

closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro")
if err != nil {
return nil, err
}

closeChannels, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.channels")
if err != nil {
return nil, err
}

sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms")
if err != nil {
return nil, err
}

sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
if err != nil {
return nil, err
}

return &UpdateChannelMetrics{
createChannelDurationMs: createChannelDurationMs,
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMs: closeChannelDurationMs,
closeChannelDurationMicro: closeChannelDurationMicro,
closeChannelsDurationMs: closeChannelsDurationMs,
closeChannelsDurationMicro: closeChannelsDurationMicro,
closeChannels: closeChannels,
sendUpdateDurationMs: sendUpdateDurationMs,
sendUpdateDurationMicro: sendUpdateDurationMicro,
getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs,
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers,
ctx: ctx,
}, nil
}

// CountCreateChannelDuration counts the duration of the CreateChannel method,
// closed indicates if existing channel was closed before creation of a new one
func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) {
metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed))
metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed))
}

// CountCloseChannelDuration counts the duration of the CloseChannel method
func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) {
metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}

// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed
func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) {
metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.closeChannels.Record(metrics.ctx, int64(channels))
}

// CountSendUpdateDuration counts the duration of the SendUpdate method
// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload
func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) {
attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)}
metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...)
metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...)
}

// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned
func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) {
metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers))
}
6 changes: 3 additions & 3 deletions management/server/turncredentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var TurnTestHost = &Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager()
peersManager := NewPeersUpdateManager(nil)

tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{
CredentialsTTL: ttl,
Expand All @@ -44,7 +44,7 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret"
peersManager := NewPeersUpdateManager()
peersManager := NewPeersUpdateManager(nil)
peer := "some_peer"
updateChannel := peersManager.CreateChannel(peer)

Expand Down Expand Up @@ -93,7 +93,7 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager()
peersManager := NewPeersUpdateManager(nil)
peer := "some_peer"

tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{
Expand Down
Loading

0 comments on commit c46d278

Please sign in to comment.