Skip to content

Commit

Permalink
Remove unnecessary functions in PrometheusRemoteWriteExporter (#3261)
Browse files Browse the repository at this point in the history
* remove unnecessary functions

* rename functions

* update
  • Loading branch information
sincejune authored Jun 7, 2021
1 parent 4171b70 commit db28270
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 84 deletions.
133 changes: 53 additions & 80 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,56 @@ func (prwe *PRWExporter) PushMetrics(ctx context.Context, md pdata.Metrics) erro

// handle individual metric based on type
switch metric.DataType() {
case pdata.MetricDataTypeDoubleSum, pdata.MetricDataTypeIntSum, pdata.MetricDataTypeDoubleGauge, pdata.MetricDataTypeIntGauge:
if err := prwe.handleScalarMetric(tsMap, resource, metric); err != nil {
case pdata.MetricDataTypeDoubleGauge:
dataPoints := metric.DoubleGauge().DataPoints()
if err := prwe.addDoubleDataPointSlice(dataPoints, tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, consumererror.Permanent(err))
errs = append(errs, err)
}
case pdata.MetricDataTypeHistogram, pdata.MetricDataTypeIntHistogram:
if err := prwe.handleHistogramMetric(tsMap, resource, metric); err != nil {
case pdata.MetricDataTypeIntGauge:
dataPoints := metric.IntGauge().DataPoints()
if err := prwe.addIntDataPointSlice(dataPoints, tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, consumererror.Permanent(err))
errs = append(errs, err)
}
case pdata.MetricDataTypeDoubleSum:
dataPoints := metric.DoubleSum().DataPoints()
if err := prwe.addDoubleDataPointSlice(dataPoints, tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, err)
}
case pdata.MetricDataTypeIntSum:
dataPoints := metric.IntSum().DataPoints()
if err := prwe.addIntDataPointSlice(dataPoints, tsMap, resource, metric); err != nil {
dropped++
errs = append(errs, err)
}
case pdata.MetricDataTypeIntHistogram:
dataPoints := metric.IntHistogram().DataPoints()
if dataPoints.Len() == 0 {
dropped++
errs = append(errs, consumererror.Permanent(fmt.Errorf("empty data points. %s is dropped", metric.Name())))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleIntHistogramDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pdata.MetricDataTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
dropped++
errs = append(errs, consumererror.Permanent(fmt.Errorf("empty data points. %s is dropped", metric.Name())))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleHistogramDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pdata.MetricDataTypeSummary:
if err := prwe.handleSummaryMetric(tsMap, resource, metric); err != nil {
dataPoints := metric.Summary().DataPoints()
if dataPoints.Len() == 0 {
dropped++
errs = append(errs, consumererror.Permanent(err))
errs = append(errs, consumererror.Permanent(fmt.Errorf("empty data points. %s is dropped", metric.Name())))
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleSummaryDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
default:
dropped++
Expand Down Expand Up @@ -185,85 +221,22 @@ func validateAndSanitizeExternalLabels(externalLabels map[string]string) (map[st
return sanitizedLabels, nil
}

// handleScalarMetric processes data points in a single OTLP scalar metric by adding the each point as a Sample into
// its corresponding TimeSeries in tsMap.
// tsMap and metric cannot be nil, and metric must have a non-nil descriptor
func (prwe *PRWExporter) handleScalarMetric(tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
switch metric.DataType() {
// int points
case pdata.MetricDataTypeDoubleGauge:
dataPoints := metric.DoubleGauge().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}

for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pdata.MetricDataTypeIntGauge:
dataPoints := metric.IntGauge().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for i := 0; i < dataPoints.Len(); i++ {
addSingleIntDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pdata.MetricDataTypeDoubleSum:
dataPoints := metric.DoubleSum().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)

}
case pdata.MetricDataTypeIntSum:
dataPoints := metric.IntSum().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for i := 0; i < dataPoints.Len(); i++ {
addSingleIntDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
func (prwe *PRWExporter) addIntDataPointSlice(dataPoints pdata.IntDataPointSlice, tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
if dataPoints.Len() == 0 {
return consumererror.Permanent(fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
return nil
}

// handleHistogramMetric processes data points in a single OTLP histogram metric by mapping the sum, count and each
// bucket of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PRWExporter) handleHistogramMetric(tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
switch metric.DataType() {
case pdata.MetricDataTypeIntHistogram:
dataPoints := metric.IntHistogram().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for i := 0; i < dataPoints.Len(); i++ {
addSingleIntHistogramDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
case pdata.MetricDataTypeHistogram:
dataPoints := metric.Histogram().DataPoints()
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
}
for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleHistogramDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
for x := 0; x < dataPoints.Len(); x++ {
addSingleIntDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
return nil
}

// handleSummaryMetric processes data points in a single OTLP summary metric by mapping the sum, count and each
// quantile of every data point as a Sample, and adding each Sample to its corresponding TimeSeries.
// tsMap and metric cannot be nil.
func (prwe *PRWExporter) handleSummaryMetric(tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
dataPoints := metric.Summary().DataPoints()
func (prwe *PRWExporter) addDoubleDataPointSlice(dataPoints pdata.DoubleDataPointSlice, tsMap map[string]*prompb.TimeSeries, resource pdata.Resource, metric pdata.Metric) error {
if dataPoints.Len() == 0 {
return fmt.Errorf("empty data points. %s is dropped", metric.Name())
return consumererror.Permanent(fmt.Errorf("empty data points. %s is dropped", metric.Name()))
}
for i := 0; i < dataPoints.Len(); i++ {
addSingleDoubleSummaryDataPoint(dataPoints.At(i), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
for x := 0; x < dataPoints.Len(); x++ {
addSingleDoubleDataPoint(dataPoints.At(x), resource, metric, prwe.namespace, tsMap, prwe.externalLabels)
}
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,9 @@ func addSingleIntHistogramDataPoint(pt pdata.IntHistogramDataPoint, resource pda
addSample(tsMap, infBucket, infLabels, metric)
}

// addSingleDoubleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It
// addSingleHistogramDataPoint converts pt to 2 + min(len(ExplicitBounds), len(BucketCount)) + 1 samples. It
// ignore extra buckets if len(ExplicitBounds) > len(BucketCounts)
func addSingleDoubleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Resource, metric pdata.Metric, namespace string,
func addSingleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pdata.Resource, metric pdata.Metric, namespace string,
tsMap map[string]*prompb.TimeSeries, externalLabels map[string]string) {
time := convertTimeStamp(pt.Timestamp())
// sum, count, and buckets of the histogram should append suffix to baseName
Expand Down Expand Up @@ -398,8 +398,8 @@ func addSingleDoubleHistogramDataPoint(pt pdata.HistogramDataPoint, resource pda
addSample(tsMap, infBucket, infLabels, metric)
}

// addSingleDoubleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples.
func addSingleDoubleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resource, metric pdata.Metric, namespace string,
// addSingleSummaryDataPoint converts pt to len(QuantileValues) + 2 samples.
func addSingleSummaryDataPoint(pt pdata.SummaryDataPoint, resource pdata.Resource, metric pdata.Metric, namespace string,
tsMap map[string]*prompb.TimeSeries, externalLabels map[string]string) {
time := convertTimeStamp(pt.Timestamp())
// sum and count of the summary should append suffix to baseName
Expand Down

0 comments on commit db28270

Please sign in to comment.