Skip to content

Commit

Permalink
Add error message to Consume*Result
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Oct 18, 2023
1 parent 159405b commit bf96f31
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
3 changes: 2 additions & 1 deletion input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

// ConsumeLogsResult contains the number of rejected log records and error message for partial success response.
type ConsumeLogsResult struct {
RejectedLogRecords int64
ErrorMessage string
}

// ConsumeLogs calls ConsumeLogsWithResult but ignores the result.
Expand All @@ -60,7 +62,6 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {

// ConsumeLogsWithResult consumes OpenTelemetry log data, converting into
// the Elastic APM log model and sending to the reporter.
// The returned ConsumeLogsResult contains the number of rejected log records.
func (c *Consumer) ConsumeLogsWithResult(ctx context.Context, logs plog.Logs) (ConsumeLogsResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
return ConsumeLogsResult{}, err
Expand Down
8 changes: 7 additions & 1 deletion input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

// ConsumeMetricsResult contains the number of rejected data points and error message for partial success response.
type ConsumeMetricsResult struct {
RejectedDataPoints int64
ErrorMessage string
}

// ConsumeMetrics calls ConsumeMetricsWithResult but ignores the result.
Expand All @@ -61,7 +63,6 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics)

// ConsumeMetricsWithResult consumes OpenTelemetry metrics data, converting into
// the Elastic APM metrics model and sending to the reporter.
// The returned ConsumeMetricsResult contains the number of rejected data points.
func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric.Metrics) (ConsumeMetricsResult, error) {
totalDataPoints := int64(metrics.DataPointCount())
totalMetrics := int64(metrics.MetricCount())
Expand All @@ -82,8 +83,13 @@ func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric
if err := c.config.Processor.ProcessBatch(ctx, batch); err != nil {
return ConsumeMetricsResult{}, err
}
var errMsg string
if remainingDataPoints > 0 {
errMsg = "unsupported data points"
}
return ConsumeMetricsResult{
RejectedDataPoints: remainingDataPoints,
ErrorMessage: errMsg,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ const (
attributeDbElasticsearchClusterName = "db.elasticsearch.cluster.name"
)

// ConsumeTracesResult contains the number of rejected spans and error message for partial success response.
type ConsumeTracesResult struct {
RejectedSpans int64
ErrorMessage string
}

// ConsumeTraces calls ConsumeTracesWithResult but ignores the result.
Expand All @@ -91,7 +93,6 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) erro

// ConsumeTracesWithResult consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
// The returned ConsumeTracesResult contains the number of rejected spans.
func (c *Consumer) ConsumeTracesWithResult(ctx context.Context, traces ptrace.Traces) (ConsumeTracesResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
return ConsumeTracesResult{}, err
Expand Down

0 comments on commit bf96f31

Please sign in to comment.