Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to track ingesting native histograms #6370

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
Expand Down
62 changes: 34 additions & 28 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,19 +1122,21 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Keep track of some stats which are tracked only if the samples will be
// successfully committed
var (
succeededSamplesCount = 0
failedSamplesCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
nativeHistogramCount = 0
succeededSamplesCount = 0
failedSamplesCount = 0
succeededHistogramsCount = 0
failedHistogramsCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
discardedNativeHistogramCount = 0

updateFirstPartial = func(errFn func() error) {
if firstPartialErr == nil {
Expand Down Expand Up @@ -1215,6 +1217,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// To find out if any sample was added to this series, we keep old value.
oldSucceededSamplesCount := succeededSamplesCount
// To find out if any histogram was added to this series, we keep old value.
oldSucceededHistogramsCount := succeededHistogramsCount

for _, s := range ts.Samples {
var err error
Expand Down Expand Up @@ -1266,19 +1270,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

if ref != 0 {
if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil {
succeededSamplesCount++
succeededHistogramsCount++
continue
}
} else {
// Copy the label set because both TSDB and the active series tracker may retain it.
copiedLabels = cortexpb.FromLabelAdaptersToLabelsWithCopy(ts.Labels)
if ref, err = app.AppendHistogram(0, copiedLabels, hp.TimestampMs, h, fh); err == nil {
succeededSamplesCount++
succeededHistogramsCount++
continue
}
}

failedSamplesCount++
failedHistogramsCount++

if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels); !rollback {
continue
Expand All @@ -1290,12 +1294,12 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return nil, wrapWithUser(err, userID)
}
} else {
nativeHistogramCount += len(ts.Histograms)
discardedNativeHistogramCount += len(ts.Histograms)
}

if i.cfg.ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || (succeededHistogramsCount > oldSucceededHistogramsCount)
if i.cfg.ActiveSeriesMetricsEnabled && shouldUpdateSeries {
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels {
// we must already have copied the labels if succeededSamplesCount has been incremented.
// we must already have copied the labels if succeededSamplesCount or succeededHistogramsCount has been incremented.
return copiedLabels
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine this part with the code above so that we only update series once?

shouldUpdateSeries := (succeededSamplesCount > oldSucceededSamplesCount) || (succeededHistogramCount > oldSucceededHistogramsCount)
if i.cfg.ActiveSeriesMetricsEnabled && shouldUpdateSeries {...}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we introduce a variable instead? To avoid the line to be too long

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

Expand Down Expand Up @@ -1343,8 +1347,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
}
i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds())

// If only invalid samples are pushed, don't change "last update", as TSDB was not modified.
if succeededSamplesCount > 0 {
// If only invalid samples or histograms are pushed, don't change "last update", as TSDB was not modified.
if succeededSamplesCount > 0 || succeededHistogramsCount > 0 {
db.setLastUpdate(time.Now())
}

Expand All @@ -1353,6 +1357,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// which will be converted into an HTTP 5xx and the client should/will retry.
i.metrics.ingestedSamples.Add(float64(succeededSamplesCount))
i.metrics.ingestedSamplesFail.Add(float64(failedSamplesCount))
i.metrics.ingestedHistograms.Add(float64(succeededHistogramsCount))
i.metrics.ingestedHistogramsFail.Add(float64(failedHistogramsCount))
i.metrics.ingestedExemplars.Add(float64(succeededExemplarsCount))
i.metrics.ingestedExemplarsFail.Add(float64(failedExemplarsCount))

Expand All @@ -1378,20 +1384,20 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
i.validateMetrics.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
}

if !i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms && nativeHistogramCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
if !i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms && discardedNativeHistogramCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(discardedNativeHistogramCount))
}

// Distributor counts both samples, metadata and histograms, so for consistency ingester does the same.
i.ingestionRate.Add(int64(succeededSamplesCount + ingestedMetadata))
i.ingestionRate.Add(int64(succeededSamplesCount + succeededHistogramsCount + ingestedMetadata))

switch req.Source {
case cortexpb.RULE:
db.ingestedRuleSamples.Add(int64(succeededSamplesCount))
db.ingestedRuleSamples.Add(int64(succeededSamplesCount + succeededHistogramsCount))
case cortexpb.API:
fallthrough
default:
db.ingestedAPISamples.Add(int64(succeededSamplesCount))
db.ingestedAPISamples.Add(int64(succeededSamplesCount + succeededHistogramsCount))
}

if firstPartialErr != nil {
Expand All @@ -1400,7 +1406,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if errors.As(firstPartialErr, &ve) {
code = ve.code
}
level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "partial failures to push", "totalSamples", succeededSamplesCount+failedSamplesCount, "failedSamples", failedSamplesCount, "firstPartialErr", firstPartialErr)
level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "partial failures to push", "totalSamples", succeededSamplesCount+failedSamplesCount, "failedSamples", failedSamplesCount, "totalHistograms", succeededHistogramsCount+failedHistogramsCount, "failedHistograms", failedHistogramsCount, "firstPartialErr", firstPartialErr)
return &cortexpb.WriteResponse{}, httpgrpc.Errorf(code, wrapWithUser(firstPartialErr, userID).Error())
}

Expand Down
46 changes: 38 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,13 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_ingested_samples_total 1
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 2
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested.
# TYPE cortex_ingester_ingested_native_histograms_total counter
cortex_ingester_ingested_native_histograms_total 0
# HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion.
# TYPE cortex_ingester_ingested_native_histograms_failures_total counter
cortex_ingester_ingested_native_histograms_failures_total 1
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
Expand Down Expand Up @@ -1032,7 +1038,13 @@ func TestIngester_Push(t *testing.T) {
cortex_ingester_ingested_samples_total 1
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 2
cortex_ingester_ingested_samples_failures_total 1
# HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested.
# TYPE cortex_ingester_ingested_native_histograms_total counter
cortex_ingester_ingested_native_histograms_total 1
# HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion.
# TYPE cortex_ingester_ingested_native_histograms_failures_total counter
cortex_ingester_ingested_native_histograms_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
Expand Down Expand Up @@ -1290,12 +1302,18 @@ func TestIngester_Push(t *testing.T) {
"cortex_ingester_active_series",
},
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 1
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 0
# HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested.
# TYPE cortex_ingester_ingested_native_histograms_total counter
cortex_ingester_ingested_native_histograms_total 1
# HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion.
# TYPE cortex_ingester_ingested_native_histograms_failures_total counter
cortex_ingester_ingested_native_histograms_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
Expand Down Expand Up @@ -1342,10 +1360,16 @@ func TestIngester_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 1
cortex_ingester_ingested_samples_total 0
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested.
# TYPE cortex_ingester_ingested_native_histograms_total counter
cortex_ingester_ingested_native_histograms_total 1
# HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion.
# TYPE cortex_ingester_ingested_native_histograms_failures_total counter
cortex_ingester_ingested_native_histograms_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
Expand Down Expand Up @@ -1393,10 +1417,16 @@ func TestIngester_Push(t *testing.T) {
expectedMetrics: `
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 2
cortex_ingester_ingested_samples_total 1
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
# TYPE cortex_ingester_ingested_samples_failures_total counter
cortex_ingester_ingested_samples_failures_total 0
# HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested.
# TYPE cortex_ingester_ingested_native_histograms_total counter
cortex_ingester_ingested_native_histograms_total 1
# HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion.
# TYPE cortex_ingester_ingested_native_histograms_failures_total counter
cortex_ingester_ingested_native_histograms_failures_total 0
# HELP cortex_ingester_memory_users The current number of users in memory.
# TYPE cortex_ingester_memory_users gauge
cortex_ingester_memory_users 1
Expand Down Expand Up @@ -1684,7 +1714,7 @@ func TestIngester_PushNativeHistogramErrors(t *testing.T) {
_, err = i.Push(ctx, req)
assert.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(tc.expectedErr, model.Time(10), metricLabelAdapters), userID).Error()), err)

require.Equal(t, testutil.ToFloat64(i.metrics.ingestedSamplesFail), float64(1))
require.Equal(t, testutil.ToFloat64(i.metrics.ingestedHistogramsFail), float64(1))
})
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ const (

type ingesterMetrics struct {
ingestedSamples prometheus.Counter
ingestedHistograms prometheus.Counter
ingestedExemplars prometheus.Counter
ingestedMetadata prometheus.Counter
ingestedSamplesFail prometheus.Counter
ingestedHistogramsFail prometheus.Counter
ingestedExemplarsFail prometheus.Counter
ingestedMetadataFail prometheus.Counter
queries prometheus.Counter
Expand Down Expand Up @@ -80,6 +82,10 @@ func newIngesterMetrics(r prometheus.Registerer,
Name: "cortex_ingester_ingested_samples_total",
Help: "The total number of samples ingested.",
}),
ingestedHistograms: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_ingested_native_histograms_total",
Help: "The total number of native histograms ingested.",
}),
ingestedExemplars: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_ingested_exemplars_total",
Help: "The total number of exemplars ingested.",
Expand All @@ -92,6 +98,10 @@ func newIngesterMetrics(r prometheus.Registerer,
Name: "cortex_ingester_ingested_samples_failures_total",
Help: "The total number of samples that errored on ingestion.",
}),
ingestedHistogramsFail: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_ingested_native_histograms_failures_total",
Help: "The total number of native histograms that errored on ingestion.",
}),
ingestedExemplarsFail: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_ingested_exemplars_failures_total",
Help: "The total number of exemplars that errored on ingestion.",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ func TestIngesterMetrics(t *testing.T) {
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
# TYPE cortex_ingester_ingested_samples_total counter
cortex_ingester_ingested_samples_total 0
# HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested.
# TYPE cortex_ingester_ingested_native_histograms_total counter
cortex_ingester_ingested_native_histograms_total 0
# HELP cortex_ingester_ingested_native_histograms_failures_total The total number of native histograms that errored on ingestion.
# TYPE cortex_ingester_ingested_native_histograms_failures_total counter
cortex_ingester_ingested_native_histograms_failures_total 0
# HELP cortex_ingester_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that ingester is using to limit access.
# TYPE cortex_ingester_ingestion_rate_samples_per_second gauge
cortex_ingester_ingestion_rate_samples_per_second 0
Expand Down
Loading