Skip to content

Commit

Permalink
Avoid breaking otlp consumer interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Oct 18, 2023
1 parent 9c35a63 commit 159405b
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 29 deletions.
11 changes: 9 additions & 2 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ type ConsumeLogsResult struct {
RejectedLogRecords int64
}

// ConsumeLogs consumes OpenTelemetry log data, converting into
// ConsumeLogs calls ConsumeLogsWithResult but ignores the result.
// It exists to satisfy the go.opentelemetry.io/collector/consumer.Logs interface.
func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) error {
_, err := c.ConsumeLogsWithResult(ctx, logs)
return err
}

// ConsumeLogsWithResult consumes OpenTelemetry log data, converting into
// the Elastic APM log model and sending to the reporter.
// The returned ConsumeLogsResult contains the number of rejected log records.
func (c *Consumer) ConsumeLogs(ctx context.Context, logs plog.Logs) (ConsumeLogsResult, error) {
func (c *Consumer) ConsumeLogsWithResult(ctx context.Context, logs plog.Logs) (ConsumeLogsResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
return ConsumeLogsResult{}, err
}
Expand Down
21 changes: 13 additions & 8 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
Expand All @@ -52,6 +53,10 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestConsumer_ConsumeLogs_Interface(t *testing.T) {
var _ consumer.Logs = otlp.NewConsumer(otlp.ConsumerConfig{})
}

func TestConsumerConsumeLogs(t *testing.T) {
t.Run("empty", func(t *testing.T) {
var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error {
Expand All @@ -64,7 +69,7 @@ func TestConsumerConsumeLogs(t *testing.T) {
Semaphore: semaphore.NewWeighted(100),
})
logs := plog.NewLogs()
result, err := consumer.ConsumeLogs(context.Background(), logs)
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)
})
Expand Down Expand Up @@ -194,7 +199,7 @@ func TestConsumerConsumeLogs(t *testing.T) {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogs(context.Background(), logs)
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

Expand Down Expand Up @@ -235,18 +240,18 @@ func TestConsumeLogsSemaphore(t *testing.T) {
startCh := make(chan struct{})
go func() {
close(startCh)
_, err := consumer.ConsumeLogs(context.Background(), logs)
_, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
}()

<-startCh
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeLogs(ctx, logs)
_, err := consumer.ConsumeLogsWithResult(ctx, logs)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

_, err = consumer.ConsumeLogs(context.Background(), logs)
_, err = consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -300,7 +305,7 @@ Caused by: LowLevelException
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogs(context.Background(), logs)
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

Expand Down Expand Up @@ -452,7 +457,7 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogs(context.Background(), logs)
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

Expand Down Expand Up @@ -497,7 +502,7 @@ func TestConsumerConsumeLogsLabels(t *testing.T) {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeLogs(context.Background(), logs)
result, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeLogsResult{}, result)

Expand Down
11 changes: 9 additions & 2 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,17 @@ type ConsumeMetricsResult struct {
RejectedDataPoints int64
}

// ConsumeMetrics consumes OpenTelemetry metrics data, converting into
// ConsumeMetrics calls ConsumeMetricsWithResult but ignores the result.
// It exists to satisfy the go.opentelemetry.io/collector/consumer.Metrics interface.
func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) error {
_, err := c.ConsumeMetricsWithResult(ctx, metrics)
return err
}

// ConsumeMetricsWithResult consumes OpenTelemetry metrics data, converting into
// the Elastic APM metrics model and sending to the reporter.
// The returned ConsumeMetricsResult contains the number of rejected data points.
func (c *Consumer) ConsumeMetrics(ctx context.Context, metrics pmetric.Metrics) (ConsumeMetricsResult, error) {
func (c *Consumer) ConsumeMetricsWithResult(ctx context.Context, metrics pmetric.Metrics) (ConsumeMetricsResult, error) {
totalDataPoints := int64(metrics.DataPointCount())
totalMetrics := int64(metrics.MetricCount())
if err := c.sem.Acquire(ctx, 1); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"golang.org/x/sync/semaphore"
Expand All @@ -55,6 +56,10 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestConsumer_ConsumeMetrics_Interface(t *testing.T) {
var _ consumer.Metrics = otlp.NewConsumer(otlp.ConsumerConfig{})
}

func TestConsumeMetrics(t *testing.T) {
metrics := pmetric.NewMetrics()
resourceMetrics := metrics.ResourceMetrics().AppendEmpty()
Expand Down Expand Up @@ -238,18 +243,18 @@ func TestConsumeMetricsSemaphore(t *testing.T) {
startCh := make(chan struct{})
go func() {
close(startCh)
_, err := consumer.ConsumeMetrics(context.Background(), metrics)
_, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics)
assert.NoError(t, err)
}()

<-startCh
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeMetrics(ctx, metrics)
_, err := consumer.ConsumeMetricsWithResult(ctx, metrics)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

_, err = consumer.ConsumeMetrics(context.Background(), metrics)
_, err = consumer.ConsumeMetricsWithResult(context.Background(), metrics)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -747,7 +752,7 @@ func transformMetrics(t *testing.T, metrics pmetric.Metrics) ([]*modelpb.APMEven
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeMetrics(context.Background(), metrics)
result, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics)
require.Len(t, batches, 1)
return *batches[0], consumer.Stats(), result, err
}
Expand Down
11 changes: 9 additions & 2 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,17 @@ type ConsumeTracesResult struct {
RejectedSpans int64
}

// ConsumeTraces consumes OpenTelemetry trace data,
// ConsumeTraces calls ConsumeTracesWithResult but ignores the result.
// It exists to satisfy the go.opentelemetry.io/collector/consumer.Traces interface.
func (c *Consumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
_, err := c.ConsumeTracesWithResult(ctx, traces)
return err
}

// ConsumeTracesWithResult consumes OpenTelemetry trace data,
// converting into Elastic APM events and reporting to the Elastic APM schema.
// The returned ConsumeTracesResult contains the number of rejected spans.
func (c *Consumer) ConsumeTraces(ctx context.Context, traces ptrace.Traces) (ConsumeTracesResult, error) {
func (c *Consumer) ConsumeTracesWithResult(ctx context.Context, traces ptrace.Traces) (ConsumeTracesResult, error) {
if err := c.sem.Acquire(ctx, 1); err != nil {
return ConsumeTracesResult{}, err
}
Expand Down
27 changes: 16 additions & 11 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/fastjson"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.5.0"
Expand All @@ -62,6 +63,10 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestConsumer_ConsumeTraces_Interface(t *testing.T) {
var _ consumer.Traces = otlp.NewConsumer(otlp.ConsumerConfig{})
}

func TestConsumer_ConsumeTraces_Empty(t *testing.T) {
var processor modelpb.ProcessBatchFunc = func(ctx context.Context, batch *modelpb.Batch) error {
assert.Empty(t, batch)
Expand All @@ -73,7 +78,7 @@ func TestConsumer_ConsumeTraces_Empty(t *testing.T) {
Semaphore: semaphore.NewWeighted(100),
})
traces := ptrace.NewTraces()
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
assert.Equal(t, otlp.ConsumeTracesResult{}, result)
}
Expand Down Expand Up @@ -943,18 +948,18 @@ func TestConsumeTracesSemaphore(t *testing.T) {
startCh := make(chan struct{})
go func() {
close(startCh)
_, err := consumer.ConsumeTraces(context.Background(), traces)
_, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
}()

<-startCh
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeTraces(ctx, traces)
_, err := consumer.ConsumeTracesWithResult(ctx, traces)
assert.Equal(t, err.Error(), "context deadline exceeded")
close(doneCh)

_, err = consumer.ConsumeTraces(context.Background(), traces)
_, err = consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -1004,7 +1009,7 @@ func TestConsumer_JaegerMetadata(t *testing.T) {
jaegerBatch.Process = tc.process
traces, err := jaegertranslator.ProtoToTraces([]*jaegermodel.Batch{jaegerBatch})
require.NoError(t, err)
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
require.Equal(t, otlp.ConsumeTracesResult{}, result)

Expand Down Expand Up @@ -1074,7 +1079,7 @@ func TestConsumer_JaegerSampleRate(t *testing.T) {
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
require.Equal(t, otlp.ConsumeTracesResult{}, result)
require.Len(t, batches, 1)
Expand Down Expand Up @@ -1110,7 +1115,7 @@ func TestConsumer_JaegerTraceID(t *testing.T) {
}},
}})
require.NoError(t, err)
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
require.Equal(t, otlp.ConsumeTracesResult{}, result)

Expand Down Expand Up @@ -1240,7 +1245,7 @@ func TestConsumer_JaegerTransaction(t *testing.T) {
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
require.Equal(t, otlp.ConsumeTracesResult{}, result)

Expand Down Expand Up @@ -1362,7 +1367,7 @@ func TestConsumer_JaegerSpan(t *testing.T) {
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
require.Equal(t, otlp.ConsumeTracesResult{}, result)

Expand Down Expand Up @@ -1397,7 +1402,7 @@ func TestJaegerServiceVersion(t *testing.T) {
Processor: recorder,
Semaphore: semaphore.NewWeighted(100),
})
result, err := consumer.ConsumeTraces(context.Background(), traces)
result, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
require.Equal(t, otlp.ConsumeTracesResult{}, result)

Expand Down Expand Up @@ -1808,7 +1813,7 @@ func transformTraces(t *testing.T, traces ptrace.Traces) *modelpb.Batch {
Processor: processor,
Semaphore: semaphore.NewWeighted(100),
})
_, err := consumer.ConsumeTraces(context.Background(), traces)
_, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
require.NoError(t, err)
return &processed
}
Expand Down

0 comments on commit 159405b

Please sign in to comment.