Skip to content

Commit

Permalink
Make NewRecorder a proper singleton
Browse files Browse the repository at this point in the history
`NewRecorder` cannot be called twice safely because it would result in the same view being registered twice, which always returns an error.

The tests work around this by themselves manually unregistering the metrics that `NewRecorder` registers, so an alternative here might be to have `NewRecorder` also return a `context.CancelFunc` to unregister things, or to have `NewRecorder` take `ctx` so that it can unregister the metrics on context-cancellation.

This change simply opts for making `NewRecorder` a proper singleton with a `sync.Once` and private globals for the `Recorder` and the possible error, so multiple calls to `NewRecorder` return the same singleton `Recorder` (or its error).

I've updated the tests to clear this new singleton state (as well as continuing to manually unregistering the metrics).
  • Loading branch information
mattmoor authored and tekton-robot committed Sep 2, 2021
1 parent 9e200a2 commit a07c0f7
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 134 deletions.
114 changes: 63 additions & 51 deletions pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
Expand Down Expand Up @@ -69,66 +70,77 @@ type Recorder struct {
ReportingPeriod time.Duration
}

// We cannot register the view multiple times, so NewRecorder lazily
// initializes this singleton and returns the same recorder across any
// subsequent invocations.
var (
once sync.Once
r *Recorder
recorderErr error
)

// NewRecorder creates a new metrics recorder instance
// to log the PipelineRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,
once.Do(func() {
r = &Recorder{
initialized: true,

// Default to 30s intervals.
ReportingPeriod: 30 * time.Second,
}

pipeline, err := tag.NewKey("pipeline")
if err != nil {
return nil, err
}
r.pipeline = pipeline
// Default to 30s intervals.
ReportingPeriod: 30 * time.Second,
}

pipelineRun, err := tag.NewKey("pipelinerun")
if err != nil {
return nil, err
}
r.pipelineRun = pipelineRun
pipeline, recorderErr := tag.NewKey("pipeline")
if recorderErr != nil {
return
}
r.pipeline = pipeline

namespace, err := tag.NewKey("namespace")
if err != nil {
return nil, err
}
r.namespace = namespace
pipelineRun, recorderErr := tag.NewKey("pipelinerun")
if recorderErr != nil {
return
}
r.pipelineRun = pipelineRun

status, err := tag.NewKey("status")
if err != nil {
return nil, err
}
r.status = status

err = view.Register(
&view.View{
Description: prDuration.Description(),
Measure: prDuration,
Aggregation: prDistributions,
TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status},
},
&view.View{
Description: prCount.Description(),
Measure: prCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningPRsCount.Description(),
Measure: runningPRsCount,
Aggregation: view.LastValue(),
},
)
namespace, recorderErr := tag.NewKey("namespace")
if recorderErr != nil {
return
}
r.namespace = namespace

if err != nil {
r.initialized = false
return r, err
}
status, recorderErr := tag.NewKey("status")
if recorderErr != nil {
return
}
r.status = status

recorderErr = view.Register(
&view.View{
Description: prDuration.Description(),
Measure: prDuration,
Aggregation: prDistributions,
TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status},
},
&view.View{
Description: prCount.Description(),
Measure: prCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningPRsCount.Description(),
Measure: runningPRsCount,
Aggregation: view.LastValue(),
},
)

if recorderErr != nil {
r.initialized = false
return
}
})

return r, nil
return r, recorderErr
}

// DurationAndCount logs the duration of PipelineRun execution and
Expand Down
6 changes: 6 additions & 0 deletions pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pipelinerunmetrics

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -213,4 +214,9 @@ func TestRecordRunningPipelineRunsCount(t *testing.T) {

func unregisterMetrics() {
metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count")

// Allow the recorder singleton to be recreated.
once = sync.Once{}
r = nil
recorderErr = nil
}
179 changes: 96 additions & 83 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package taskrunmetrics
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -79,102 +80,114 @@ type Recorder struct {
ReportingPeriod time.Duration
}

// We cannot register the view multiple times, so NewRecorder lazily
// initializes this singleton and returns the same recorder across any
// subsequent invocations.
var (
once sync.Once
r *Recorder
recorderErr error
)

// NewRecorder creates a new metrics recorder instance
// to log the TaskRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,
// Views cannot be registered multiple times, so recorder should be a singleton.
once.Do(func() {
r = &Recorder{
initialized: true,

// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}
// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}

task, err := tag.NewKey("task")
if err != nil {
return nil, err
}
r.task = task
task, recorderErr := tag.NewKey("task")
if recorderErr != nil {
return
}
r.task = task

taskRun, err := tag.NewKey("taskrun")
if err != nil {
return nil, err
}
r.taskRun = taskRun
taskRun, recorderErr := tag.NewKey("taskrun")
if recorderErr != nil {
return
}
r.taskRun = taskRun

namespace, err := tag.NewKey("namespace")
if err != nil {
return nil, err
}
r.namespace = namespace
namespace, recorderErr := tag.NewKey("namespace")
if recorderErr != nil {
return
}
r.namespace = namespace

status, err := tag.NewKey("status")
if err != nil {
return nil, err
}
r.status = status
status, recorderErr := tag.NewKey("status")
if recorderErr != nil {
return
}
r.status = status

pipeline, err := tag.NewKey("pipeline")
if err != nil {
return nil, err
}
r.pipeline = pipeline
pipeline, recorderErr := tag.NewKey("pipeline")
if recorderErr != nil {
return
}
r.pipeline = pipeline

pipelineRun, err := tag.NewKey("pipelinerun")
if err != nil {
return nil, err
}
r.pipelineRun = pipelineRun
pipelineRun, recorderErr := tag.NewKey("pipelinerun")
if recorderErr != nil {
return
}
r.pipelineRun = pipelineRun

pod, err := tag.NewKey("pod")
if err != nil {
return nil, err
}
r.pod = pod

err = view.Register(
&view.View{
Description: trDuration.Description(),
Measure: trDuration,
Aggregation: trDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status},
},
&view.View{
Description: prTRDuration.Description(),
Measure: prTRDuration,
Aggregation: prTRLatencyDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
&view.View{
Description: trCount.Description(),
Measure: trCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningTRsCount.Description(),
Measure: runningTRsCount,
Aggregation: view.LastValue(),
},
&view.View{
Description: podLatency.Description(),
Measure: podLatency,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod},
},
&view.View{
Description: cloudEvents.Description(),
Measure: cloudEvents,
Aggregation: view.Sum(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
)
pod, recorderErr := tag.NewKey("pod")
if recorderErr != nil {
return
}
r.pod = pod

recorderErr = view.Register(
&view.View{
Description: trDuration.Description(),
Measure: trDuration,
Aggregation: trDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status},
},
&view.View{
Description: prTRDuration.Description(),
Measure: prTRDuration,
Aggregation: prTRLatencyDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
&view.View{
Description: trCount.Description(),
Measure: trCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningTRsCount.Description(),
Measure: runningTRsCount,
Aggregation: view.LastValue(),
},
&view.View{
Description: podLatency.Description(),
Measure: podLatency,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod},
},
&view.View{
Description: cloudEvents.Description(),
Measure: cloudEvents,
Aggregation: view.Sum(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
)

if err != nil {
r.initialized = false
return r, err
}
if recorderErr != nil {
r.initialized = false
return
}
})

return r, nil
return r, recorderErr
}

// DurationAndCount logs the duration of TaskRun execution and
Expand Down
6 changes: 6 additions & 0 deletions pkg/taskrunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package taskrunmetrics

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -453,4 +454,9 @@ func TestRecordCloudEvents(t *testing.T) {

func unregisterMetrics() {
metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "taskruns_pod_latency", "cloudevent_count")

// Allow the recorder singleton to be recreated.
once = sync.Once{}
r = nil
recorderErr = nil
}

0 comments on commit a07c0f7

Please sign in to comment.