Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[StatsD receiver] Add timing/histogram for statsD receiver as OTLP summary #3261

Merged
merged 1 commit into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/moricho/tparallel v0.2.1/go.mod h1:fXEIZxG2vdfl0ZF8b42f5a78EhjjD5mX8qUplsoSU4k=
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
Expand Down
20 changes: 7 additions & 13 deletions receiver/statsdreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ The Following settings are optional:

- `timer_histogram_mapping:`(default value is below): Specify what OTLP type to convert received timing/histogram data to.

// TODO: can add regex support for `match` later.

`"match"`, we only support `"*"` now.

`"statsd_type"` specifies received Statsd data type. Possible values for this setting are `"timing"`, `"timer"` and `"histogram"`.

`"observer_type"` specifies OTLP data type to convert to. The only supported target data type currently is `"gauge"`, which does not perform any aggregation.
Support for `"summary"` data type is planned to be added in the future.
`"observer_type"` specifies OTLP data type to convert to. We support `"gauge"` and `"summary"`. For `"gauge"`, it does not perform any aggregation.
For `"summary`, the statsD receiver will aggregate to one OTLP summary metric for one metric description(the same metric name with the same tags). It will send percentile 0, 10, 50, 90, 95, 100 to the downstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the quantiles configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The quantiles are fixed.

TODO: Add a new option to use a smoothed summary like Promethetheus: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/3261

Example:

Expand All @@ -42,11 +40,9 @@ receivers:
aggregation_interval: 70s
enable_metric_type: true
timer_histogram_mapping:
- match: "*"
statsd_type: "histogram"
- statsd_type: "histogram"
observer_type: "gauge"
- match: "*"
statsd_type: "timing"
- statsd_type: "timing"
observer_type: "gauge"
```

Expand Down Expand Up @@ -120,11 +116,9 @@ receivers:
aggregation_interval: 60s # default
enable_metric_type: false # default
timer_histogram_mapping:
- match: "*"
statsd_type: "histogram"
- statsd_type: "histogram"
observer_type: "gauge"
- match: "*"
statsd_type: "timing"
- statsd_type: "timing"
observer_type: "gauge"

exporters:
Expand Down
11 changes: 1 addition & 10 deletions receiver/statsdreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,15 @@ type Config struct {
func (c *Config) validate() error {

var errors []error
supportMatch := []string{"*"}
supportedStatsdType := []string{"timing", "timer", "histogram"}
supportedObserverType := []string{"gauge"}
supportedObserverType := []string{"gauge", "summary"}

if c.AggregationInterval <= 0 {
errors = append(errors, fmt.Errorf("aggregation_interval must be a positive duration"))
}

var TimerHistogramMappingMissingObjectName bool
for _, eachMap := range c.TimerHistogramMapping {
if eachMap.Match == "" {
TimerHistogramMappingMissingObjectName = true
break
}

if !protocol.Contains(supportMatch, eachMap.Match) {
errors = append(errors, fmt.Errorf("match is not supported: %s", eachMap.Match))
}

if eachMap.StatsdType == "" {
TimerHistogramMappingMissingObjectName = true
Expand Down
33 changes: 6 additions & 27 deletions receiver/statsdreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestLoadConfig(t *testing.T) {
Transport: "custom_transport",
},
AggregationInterval: 70 * time.Second,
TimerHistogramMapping: []protocol.TimerHistogramMapping{{Match: "*", StatsdType: "histogram", ObserverType: "gauge"}, {Match: "*", StatsdType: "timing", ObserverType: "gauge"}},
TimerHistogramMapping: []protocol.TimerHistogramMapping{{StatsdType: "histogram", ObserverType: "gauge"}, {StatsdType: "timing", ObserverType: "gauge"}},
}, r1)
}

Expand All @@ -73,7 +73,6 @@ func TestValidate(t *testing.T) {
const (
negativeAggregationIntervalErr = "aggregation_interval must be a positive duration"
noObjectNameErr = "must specify object name for all TimerHistogramMappings"
matchNotSupportErr = "match is not supported: %s"
statsdTypeNotSupportErr = "statsd_type is not supported: %s"
observerTypeNotSupportErr = "observer_type is not supported: %s"
)
Expand All @@ -83,28 +82,18 @@ func TestValidate(t *testing.T) {
name: "negativeAggregationInterval",
cfg: &Config{
AggregationInterval: -1,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: negativeAggregationIntervalErr,
},
{
name: "emptyMatch",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: noObjectNameErr,
expectedErr: negativeAggregationIntervalErr,
},
{
name: "emptyStatsdType",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", ObserverType: "gauge"},
{ObserverType: "gauge"},
},
},
expectedErr: noObjectNameErr,
Expand All @@ -114,27 +103,17 @@ func TestValidate(t *testing.T) {
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing"},
{StatsdType: "timing"},
},
},
expectedErr: noObjectNameErr,
},
{
name: "MatchNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "aaa", StatsdType: "timing", ObserverType: "gauge"},
},
},
expectedErr: fmt.Sprintf(matchNotSupportErr, "aaa"),
},
{
name: "StatsdTypeNotSupport",
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "abc", ObserverType: "gauge"},
{StatsdType: "abc", ObserverType: "gauge"},
},
},
expectedErr: fmt.Sprintf(statsdTypeNotSupportErr, "abc"),
Expand All @@ -144,7 +123,7 @@ func TestValidate(t *testing.T) {
cfg: &Config{
AggregationInterval: 10,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timer", ObserverType: "gauge1"},
{StatsdType: "timer", ObserverType: "gauge1"},
},
},
expectedErr: fmt.Sprintf(observerTypeNotSupportErr, "gauge1"),
Expand Down
6 changes: 5 additions & 1 deletion receiver/statsdreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
defaultEnableMetricType = false
)

var (
defaultTimerHistogramMapping = []protocol.TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}}
)

// NewFactory creates a factory for the StatsD receiver.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
Expand All @@ -57,7 +61,7 @@ func createDefaultConfig() config.Receiver {
},
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
TimerHistogramMapping: []protocol.TimerHistogramMapping{{Match: "*", StatsdType: "timer", ObserverType: "gauge"}, {Match: "*", StatsdType: "histogram", ObserverType: "gauge"}},
TimerHistogramMapping: defaultTimerHistogramMapping,
}
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestCreateReceiverWithConfigErr(t *testing.T) {
cfg := &Config{
AggregationInterval: -1,
TimerHistogramMapping: []protocol.TimerHistogramMapping{
{Match: "*", StatsdType: "timing", ObserverType: "gauge"},
{StatsdType: "timing", ObserverType: "gauge"},
},
}
receiver, err := createMetricsReceiver(
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/onsi/ginkgo v1.14.1 // indirect
github.com/onsi/gomega v1.10.2 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
Expand Down
29 changes: 29 additions & 0 deletions receiver/statsdreceiver/protocol/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package protocol
import (
"time"

"github.com/montanaflynn/stats"
"go.opentelemetry.io/collector/consumer/pdata"
)

Expand Down Expand Up @@ -58,3 +59,31 @@ func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.Instru

return ilm
}

func buildSummaryMetric(summaryMetric summaryMetric) pdata.InstrumentationLibraryMetrics {
dp := pdata.NewSummaryDataPoint()
dp.SetCount(uint64(len(summaryMetric.summaryPoints)))
sum, _ := stats.Sum(summaryMetric.summaryPoints)
dp.SetSum(sum)
dp.SetTimestamp(pdata.TimestampFromTime(summaryMetric.timeNow))

quantile := []float64{0, 10, 50, 90, 95, 100}
for _, v := range quantile {
eachQuantile := pdata.NewValueAtQuantile()
eachQuantile.SetQuantile(v)
eachQuantileValue, _ := stats.PercentileNearestRank(summaryMetric.summaryPoints, v)
eachQuantile.SetValue(eachQuantileValue)
dp.QuantileValues().Append(eachQuantile)
}

nm := pdata.NewMetric()
nm.SetName(summaryMetric.name)
nm.SetDataType(pdata.MetricDataTypeSummary)
nm.Summary().DataPoints().Append(dp)

ilm := pdata.NewInstrumentationLibraryMetrics()
ilm.Metrics().Append(nm)

return ilm

}
34 changes: 34 additions & 0 deletions receiver/statsdreceiver/protocol/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,37 @@ func TestBuildGaugeMetric(t *testing.T) {
dp.LabelsMap().Insert("mykey2", "myvalue2")
assert.Equal(t, metric, expectedMetrics)
}

func TestBuildSummaryMetric(t *testing.T) {
timeNow := time.Now()

oneSummaryMetric := summaryMetric{
name: "testSummary",
summaryPoints: []float64{1, 2, 4, 6, 5, 3},
labelKeys: []string{"mykey", "mykey2"},
labelValues: []string{"myvalue", "myvalue2"},
timeNow: timeNow,
}

metric := buildSummaryMetric(oneSummaryMetric)
expectedMetric := pdata.NewInstrumentationLibraryMetrics()
expectedMetric.Metrics().Resize(1)
expectedMetric.Metrics().At(0).SetName("testSummary")
expectedMetric.Metrics().At(0).SetDataType(pdata.MetricDataTypeSummary)
expectedMetric.Metrics().At(0).Summary().DataPoints().Resize(1)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).SetSum(21)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).SetCount(6)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(timeNow))
quantile := []float64{0, 10, 50, 90, 95, 100}
value := []float64{1, 1, 3, 6, 6, 6}
for int, v := range quantile {
eachQuantile := pdata.NewValueAtQuantile()
eachQuantile.SetQuantile(v)
eachQuantileValue := value[int]
eachQuantile.SetValue(eachQuantileValue)
expectedMetric.Metrics().At(0).Summary().DataPoints().At(0).QuantileValues().Append(eachQuantile)
}

assert.Equal(t, metric, expectedMetric)

}
58 changes: 56 additions & 2 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
)

type TimerHistogramMapping struct {
Match string `mapstructure:"match"`
StatsdType string `mapstructure:"statsd_type"`
ObserverType string `mapstructure:"observer_type"`
}
Expand All @@ -52,12 +51,21 @@ type TimerHistogramMapping struct {
type StatsDParser struct {
gauges map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
counters map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
summaries map[statsDMetricdescription]summaryMetric
timersAndDistributions []pdata.InstrumentationLibraryMetrics
enableMetricType bool
observeTimer string
observeHistogram string
}

type summaryMetric struct {
name string
summaryPoints []float64
labelKeys []string
labelValues []string
timeNow time.Time
}

type statsDMetric struct {
description statsDMetricdescription
value string
Expand All @@ -80,6 +88,8 @@ func (p *StatsDParser) Initialize(enableMetricType bool, sendTimerHistogram []Ti
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.timersAndDistributions = make([]pdata.InstrumentationLibraryMetrics, 0)
p.summaries = make(map[statsDMetricdescription]summaryMetric)

p.enableMetricType = enableMetricType
for _, eachMap := range sendTimerHistogram {
switch eachMap.StatsdType {
Expand Down Expand Up @@ -109,10 +119,14 @@ func (p *StatsDParser) GetMetrics() pdata.Metrics {
rm.InstrumentationLibraryMetrics().Append(metric)
}

for _, summaryMetric := range p.summaries {
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Append(buildSummaryMetric(summaryMetric))
}

p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
p.timersAndDistributions = make([]pdata.InstrumentationLibraryMetrics, 0)

p.summaries = make(map[statsDMetricdescription]summaryMetric)
return metrics
}

Expand Down Expand Up @@ -155,12 +169,52 @@ func (p *StatsDParser) Aggregate(line string) error {
switch p.observeHistogram {
case "gauge":
p.timersAndDistributions = append(p.timersAndDistributions, buildGaugeMetric(parsedMetric, timeNowFunc()))
case "summary":
eachSummaryMetric, ok := p.summaries[parsedMetric.description]
if !ok {
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: []float64{parsedMetric.floatvalue},
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
} else {
points := eachSummaryMetric.summaryPoints
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: append(points, parsedMetric.floatvalue),
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
}
}

case statsdTiming:
switch p.observeTimer {
case "gauge":
p.timersAndDistributions = append(p.timersAndDistributions, buildGaugeMetric(parsedMetric, timeNowFunc()))
case "summary":
eachSummaryMetric, ok := p.summaries[parsedMetric.description]
if !ok {
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: []float64{parsedMetric.floatvalue},
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
} else {
points := eachSummaryMetric.summaryPoints
p.summaries[parsedMetric.description] = summaryMetric{
name: parsedMetric.description.name,
summaryPoints: append(points, parsedMetric.floatvalue),
labelKeys: parsedMetric.labelKeys,
labelValues: parsedMetric.labelValues,
timeNow: timeNowFunc(),
}
}
}
}

Expand Down
Loading