From fee73c8c23354dcd46855b5b20dd2b032f7882fb Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Fri, 6 Oct 2023 16:30:25 +0200 Subject: [PATCH 1/3] Setting event timestamp from OTel observed timestamp when no timestamp is provided --- input/otlp/logs.go | 6 ++- input/otlp/logs_test.go | 94 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/input/otlp/logs.go b/input/otlp/logs.go index 71d43395..35e1bf7b 100644 --- a/input/otlp/logs.go +++ b/input/otlp/logs.go @@ -100,7 +100,11 @@ func (c *Consumer) convertLogRecord( ) *modelpb.APMEvent { event := baseEvent.CloneVT() initEventLabels(event) - event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) + if record.Timestamp() == 0 { + event.Timestamp = modelpb.FromTime(record.ObservedTimestamp().AsTime().Add(timeDelta)) + } else { + event.Timestamp = modelpb.FromTime(record.Timestamp().AsTime().Add(timeDelta)) + } if event.Event == nil { event.Event = modelpb.EventFromVTPool() } diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index e559f361..f5882754 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -451,6 +451,100 @@ func TestConsumerConsumeOTelEventLogs(t *testing.T) { assert.Equal(t, "MyEvent", processed[0].Event.Action) } +func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(timestamp) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + + assert.Len(t, processed, 1) + assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + +func TestConsumerConsumeOTelLogsWithoutTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.UnixMilli(0)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(0) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + + assert.Len(t, processed, 1) + assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + +func TestConsumerConsumeOTelLogsWithObservedTimestampWithoutTimestamp(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceAttrs := logs.ResourceLogs().At(0).Resource().Attributes() + resourceAttrs.PutStr(semconv.AttributeTelemetrySDKLanguage, "java") + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + observedTimestamp := pcommon.NewTimestampFromTime(time.UnixMilli(946684800000)) + + record1 := newLogRecord("") // no log body + record1.SetTimestamp(0) + record1.SetObservedTimestamp(observedTimestamp) + + record1.CopyTo(scopeLogs.LogRecords().AppendEmpty()) + + var processed modelpb.Batch + var processor modelpb.ProcessBatchFunc = func(_ context.Context, batch *modelpb.Batch) error { + if processed != nil { + panic("already processes batch") + } + processed = batch.Clone() + return nil + } + consumer := otlp.NewConsumer(otlp.ConsumerConfig{ + Processor: processor, + Semaphore: semaphore.NewWeighted(100), + }) + assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + + assert.Len(t, processed, 1) + assert.Equal(t, int(observedTimestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) +} + func TestConsumerConsumeLogsLabels(t *testing.T) { logs := plog.NewLogs() resourceLogs := logs.ResourceLogs().AppendEmpty() From e7b86bbac669cfc2db0e77dbd47d65ab643f237a Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 18 Oct 2023 11:49:32 +0200 Subject: [PATCH 2/3] Updating logs tests ConsumeLogs calls --- input/otlp/logs_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index 0b28af6e..20f16d4b 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -487,7 +487,9 @@ func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) { Processor: processor, Semaphore: semaphore.NewWeighted(100), }) - assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + result, err := consumer.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) assert.Len(t, processed, 1) assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) @@ -518,7 +520,9 @@ func TestConsumerConsumeOTelLogsWithoutTimestamp(t *testing.T) { Processor: processor, Semaphore: semaphore.NewWeighted(100), }) - assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + result, err := consumer.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) assert.Len(t, processed, 1) assert.Equal(t, int(timestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) @@ -550,7 +554,9 @@ func TestConsumerConsumeOTelLogsWithObservedTimestampWithoutTimestamp(t *testing Processor: processor, Semaphore: semaphore.NewWeighted(100), }) - assert.NoError(t, consumer.ConsumeLogs(context.Background(), logs)) + result, err := consumer.ConsumeLogs(context.Background(), logs) + assert.NoError(t, err) + assert.Equal(t, otlp.ConsumeLogsResult{}, result) assert.Len(t, processed, 1) assert.Equal(t, int(observedTimestamp.AsTime().UnixNano()), int(processed[0].Timestamp)) From 74d1099de1c87345281238b27190088422a3904a Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Thu, 19 Oct 2023 10:46:56 +0200 Subject: [PATCH 3/3] Updating logs tests with ConsumeLogsWithResult --- input/otlp/logs_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index e92f752c..43844e46 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -492,7 +492,7 @@ func TestConsumerConsumeOTelLogsWithTimestamp(t *testing.T) { Processor: processor, Semaphore: semaphore.NewWeighted(100), }) - result, err := consumer.ConsumeLogs(context.Background(), logs) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) assert.NoError(t, err) assert.Equal(t, otlp.ConsumeLogsResult{}, result) @@ -525,7 +525,7 @@ func TestConsumerConsumeOTelLogsWithoutTimestamp(t *testing.T) { Processor: processor, Semaphore: semaphore.NewWeighted(100), }) - result, err := consumer.ConsumeLogs(context.Background(), logs) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) assert.NoError(t, err) assert.Equal(t, otlp.ConsumeLogsResult{}, result) @@ -559,7 +559,7 @@ func TestConsumerConsumeOTelLogsWithObservedTimestampWithoutTimestamp(t *testing Processor: processor, Semaphore: semaphore.NewWeighted(100), }) - result, err := consumer.ConsumeLogs(context.Background(), logs) + result, err := consumer.ConsumeLogsWithResult(context.Background(), logs) assert.NoError(t, err) assert.Equal(t, otlp.ConsumeLogsResult{}, result)