From 81c5eee7e856d35c2e3307200217df3488229ed8 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 27 Feb 2025 16:52:13 +0800 Subject: [PATCH] [receiver/awsfirehose] Add support for encoding extensions (#37262) #### Description Add support for using encoding extensions for unmarshalling records transmitted via Amazon Data Firehose. The "record_type" config is now deprecated and has been replaced by "encoding". This new config setting supports all of the existing encodings (cwlogs, cwmetrics otlp_v1) as well as support for loading additional encodings via extensions. #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37113 #### Testing Should be a non-functional change, so mostly relying on existing unit tests to catch issues. Tests have been added for new extension functionality. Manually tested creating a Firehose delivery stream and using the `text_encoding` extension: 1. Ran collector with following config: ```yaml receivers: awsfirehose: endpoint: localhost:1234 encoding: text_encoding exporters: debug: verbosity: detailed extensions: text_encoding: service: extensions: [text_encoding] pipelines: logs: receivers: [awsfirehose] processors: [] exporters: [debug] ``` 2. Exposed to the internet with ngrok 3. Created a Firehose delivery stream pointed at ngrok HTTPS endpoint 4. Used AWS CLI to send a record: `aws firehose put-record --delivery-stream-name=axwtest --record Data=$(echo -n abc | base64)` 5. Observed log record being exported by the debug exporter: ``` 2025-02-11T14:09:17.090+0800 info Logs {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1} 2025-02-11T14:09:17.090+0800 info ResourceLog #0 Resource SchemaURL: ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope LogRecord #0 ObservedTimestamp: 2025-02-11 06:09:17.090506322 +0000 UTC Timestamp: 1970-01-01 00:00:00 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Str(abc) Trace ID: Span ID: Flags: 0 ``` #### Documentation Updated README. --------- Co-authored-by: Anthony Mirabella Co-authored-by: Pablo Baeyens --- .chloggen/firehose-encoding-extension.yaml | 27 +++++++++ receiver/awsfirehosereceiver/README.md | 19 ++++--- receiver/awsfirehosereceiver/config.go | 25 +++++++-- receiver/awsfirehosereceiver/config_test.go | 45 +++++++++------ receiver/awsfirehosereceiver/factory.go | 52 ++--------------- receiver/awsfirehosereceiver/factory_test.go | 8 --- receiver/awsfirehosereceiver/logs_receiver.go | 49 +++++++++++----- .../awsfirehosereceiver/logs_receiver_test.go | 56 ++++++++++++++----- .../awsfirehosereceiver/metrics_receiver.go | 53 ++++++++++++------ .../metrics_receiver_test.go | 54 +++++++++++++----- receiver/awsfirehosereceiver/receiver.go | 39 +++++++++++-- receiver/awsfirehosereceiver/receiver_test.go | 54 +++++++++++++++++- .../testdata/invalid_config.yaml | 3 +- 13 files changed, 329 insertions(+), 155 deletions(-) create mode 100644 .chloggen/firehose-encoding-extension.yaml diff --git a/.chloggen/firehose-encoding-extension.yaml b/.chloggen/firehose-encoding-extension.yaml new file mode 100644 index 000000000000..6afdac5262d6 --- /dev/null +++ b/.chloggen/firehose-encoding-extension.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsfirehosereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for encoding extensions + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37113] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Adds `encoding` config setting, and deprecates the `record_type` setting. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/awsfirehosereceiver/README.md b/receiver/awsfirehosereceiver/README.md index 2aaaf88a280d..f20aed81790c 100644 --- a/receiver/awsfirehosereceiver/README.md +++ b/receiver/awsfirehosereceiver/README.md @@ -45,25 +45,28 @@ See [documentation](https://github.com/open-telemetry/opentelemetry-collector/bl A `cert_file` and `key_file` are required. -### record_type: -The type of record being received from the delivery stream. Each unmarshaler handles a specific type, so the field allows the receiver to use the correct one. +### encoding: + +The ID of an [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) for decoding logs or metrics. +This configuration also supports the built-in encodings listed in the [Encodings](#encodings) section. +If no encoding is specified, then the receiver will default to a signal-specific encoding: `cwmetrics` for metrics, and `cwlogs` for logs. -default: `cwmetrics` +### record_type: -See the [Record Types](#record-types) section for all available options. +Deprecated, use `encoding` instead. `record_type` will be removed in a future release; it is an alias for `encoding`. ### access_key (Optional): The access key to be checked on each request received. This can be set when creating or updating the delivery stream. See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http) for details. -## Record Types +## Encodings ### cwmetrics -The record type for the CloudWatch metric stream. Expects the format for the records to be JSON. +The encoding for the CloudWatch metric stream. Expects the format for the records to be JSON. See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details. ### cwlogs -The record type for the CloudWatch log stream. Expects the format for the records to be JSON. +The encoding for the CloudWatch log stream. Expects the format for the records to be JSON. For example: ```json @@ -84,5 +87,5 @@ For example: ``` ### otlp_v1 -The OTLP v1 format as produced by CloudWatch metric streams. +The OTLP v1 encoding as produced by CloudWatch metric streams. See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details. diff --git a/receiver/awsfirehosereceiver/config.go b/receiver/awsfirehosereceiver/config.go index 784e0af2501b..0d3ca03b5510 100644 --- a/receiver/awsfirehosereceiver/config.go +++ b/receiver/awsfirehosereceiver/config.go @@ -8,15 +8,24 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" + "go.uber.org/zap" ) +var errRecordTypeEncodingSet = errors.New("record_type must not be set when encoding is set") + type Config struct { // ServerConfig is used to set up the Firehose delivery // endpoint. The Firehose delivery stream expects an HTTPS // endpoint, so TLSSettings must be used to enable that. confighttp.ServerConfig `mapstructure:",squash"` - // RecordType is the key used to determine which unmarshaler to use - // when receiving the requests. + // Encoding identifies the encoding of records received from + // Firehose. Defaults to telemetry-specific encodings: "cwlog" + // for logs, and "cwmetrics" for metrics. + Encoding string `mapstructure:"encoding"` + // RecordType is an alias for Encoding for backwards compatibility. + // It is an error to specify both encoding and record_type. + // + // Deprecated: [v0.121.0] use Encoding instead. RecordType string `mapstructure:"record_type"` // AccessKey is checked against the one received with each request. // This can be set when creating or updating the Firehose delivery @@ -30,10 +39,14 @@ func (c *Config) Validate() error { if c.Endpoint == "" { return errors.New("must specify endpoint") } - // If a record type is specified, it must be valid. - // An empty string is acceptable, however, because it will use a telemetry-type-specific default. - if c.RecordType != "" { - return validateRecordType(c.RecordType) + if c.RecordType != "" && c.Encoding != "" { + return errRecordTypeEncodingSet } return nil } + +func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) { + if cfg.RecordType != "" { + logger.Warn("record_type is deprecated, and will be removed in a future version. Use encoding instead.") + } +} diff --git a/receiver/awsfirehosereceiver/config_test.go b/receiver/awsfirehosereceiver/config_test.go index eb9add020831..1c8c1ad2de8c 100644 --- a/receiver/awsfirehosereceiver/config_test.go +++ b/receiver/awsfirehosereceiver/config_test.go @@ -20,7 +20,7 @@ import ( func TestLoadConfig(t *testing.T) { for _, configType := range []string{ - "cwmetrics", "cwlogs", "otlp_v1", "invalid", + "cwmetrics", "cwlogs", "otlp_v1", } { t.Run(configType, func(t *testing.T) { fileName := configType + "_config.yaml" @@ -35,24 +35,35 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, sub.Unmarshal(cfg)) err = xconfmap.Validate(cfg) - if configType == "invalid" { - assert.Error(t, err) - } else { - assert.NoError(t, err) - require.Equal(t, &Config{ - RecordType: configType, - AccessKey: "some_access_key", - ServerConfig: confighttp.ServerConfig{ - Endpoint: "0.0.0.0:4433", - TLSSetting: &configtls.ServerConfig{ - Config: configtls.Config{ - CertFile: "server.crt", - KeyFile: "server.key", - }, + assert.NoError(t, err) + require.Equal(t, &Config{ + RecordType: configType, + AccessKey: "some_access_key", + ServerConfig: confighttp.ServerConfig{ + Endpoint: "0.0.0.0:4433", + TLSSetting: &configtls.ServerConfig{ + Config: configtls.Config{ + CertFile: "server.crt", + KeyFile: "server.key", }, }, - }, cfg) - } + }, + }, cfg) }) } } + +func TestLoadConfigInvalid(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "invalid_config.yaml")) + require.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + + err = xconfmap.Validate(cfg) + assert.ErrorIs(t, err, errRecordTypeEncodingSet) +} diff --git a/receiver/awsfirehosereceiver/factory.go b/receiver/awsfirehosereceiver/factory.go index 12801e364fde..0e049d930d12 100644 --- a/receiver/awsfirehosereceiver/factory.go +++ b/receiver/awsfirehosereceiver/factory.go @@ -5,35 +5,19 @@ package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-c import ( "context" - "errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" ) const ( defaultEndpoint = "localhost:4433" ) -var ( - errUnrecognizedRecordType = errors.New("unrecognized record type") - availableRecordTypes = map[string]bool{ - cwmetricstream.TypeStr: true, - cwlog.TypeStr: true, - otlpmetricstream.TypeStr: true, - } -) - // NewFactory creates a receiver factory for awsfirehose. Currently, only // available in metrics pipelines. func NewFactory() receiver.Factory { @@ -44,34 +28,6 @@ func NewFactory() receiver.Factory { receiver.WithLogs(createLogsReceiver, metadata.LogsStability)) } -// validateRecordType checks the available record types for the -// passed in one and returns an error if not found. -func validateRecordType(recordType string) error { - if _, ok := availableRecordTypes[recordType]; !ok { - return errUnrecognizedRecordType - } - return nil -} - -// defaultMetricsUnmarshalers creates a map of the available metrics -// unmarshalers. -func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]pmetric.Unmarshaler { - cwmsu := cwmetricstream.NewUnmarshaler(logger) - otlpv1msu := otlpmetricstream.NewUnmarshaler(logger) - return map[string]pmetric.Unmarshaler{ - cwmsu.Type(): cwmsu, - otlpv1msu.Type(): otlpv1msu, - } -} - -// defaultLogsUnmarshalers creates a map of the available logs unmarshalers. -func defaultLogsUnmarshalers(logger *zap.Logger) map[string]plog.Unmarshaler { - u := cwlog.NewUnmarshaler(logger) - return map[string]plog.Unmarshaler{ - u.Type(): u, - } -} - // createDefaultConfig creates a default config with the endpoint set // to port 8443 and the record type set to the CloudWatch metric stream. func createDefaultConfig() component.Config { @@ -89,7 +45,9 @@ func createMetricsReceiver( cfg component.Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer) + c := cfg.(*Config) + handleDeprecatedConfig(c, set.Logger) + return newMetricsReceiver(c, set, nextConsumer) } // createMetricsReceiver implements the CreateMetricsReceiver function type. @@ -99,5 +57,7 @@ func createLogsReceiver( cfg component.Config, nextConsumer consumer.Logs, ) (receiver.Logs, error) { - return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer) + c := cfg.(*Config) + handleDeprecatedConfig(c, set.Logger) + return newLogsReceiver(c, set, nextConsumer) } diff --git a/receiver/awsfirehosereceiver/factory_test.go b/receiver/awsfirehosereceiver/factory_test.go index 498981c48632..0cd7888db417 100644 --- a/receiver/awsfirehosereceiver/factory_test.go +++ b/receiver/awsfirehosereceiver/factory_test.go @@ -13,7 +13,6 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" ) func TestValidConfig(t *testing.T) { @@ -42,10 +41,3 @@ func TestCreateLogsReceiver(t *testing.T) { require.NoError(t, err) require.NotNil(t, r) } - -func TestValidateRecordType(t *testing.T) { - require.NoError(t, validateRecordType(defaultMetricsRecordType)) - require.NoError(t, validateRecordType(defaultLogsRecordType)) - require.NoError(t, validateRecordType(otlpmetricstream.TypeStr)) - require.Error(t, validateRecordType("nop")) -} diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 98e5c49f9bcc..0aa8ced9b579 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -10,6 +10,7 @@ import ( "io" "net/http" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/plog" @@ -18,11 +19,14 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" ) -const defaultLogsRecordType = cwlog.TypeStr +const defaultLogsEncoding = cwlog.TypeStr // logsConsumer implements the firehoseConsumer // to use a logs consumer and unmarshaler. type logsConsumer struct { + config *Config + settings receiver.Settings + // consumer passes the translated logs on to the // next consumer. consumer consumer.Logs @@ -38,21 +42,12 @@ var _ firehoseConsumer = (*logsConsumer)(nil) func newLogsReceiver( config *Config, set receiver.Settings, - unmarshalers map[string]plog.Unmarshaler, nextConsumer consumer.Logs, ) (receiver.Logs, error) { - recordType := config.RecordType - if recordType == "" { - recordType = defaultLogsRecordType - } - configuredUnmarshaler := unmarshalers[recordType] - if configuredUnmarshaler == nil { - return nil, fmt.Errorf("%w: recordType = %s", errUnrecognizedRecordType, recordType) - } - c := &logsConsumer{ - consumer: nextConsumer, - unmarshaler: configuredUnmarshaler, + config: config, + settings: set, + consumer: nextConsumer, } return &firehoseReceiver{ settings: set, @@ -61,8 +56,32 @@ func newLogsReceiver( }, nil } -// Consume uses the configured unmarshaler to unmarshal each record -// into a plog.Logs and pass it to the next consumer, one record at a time. +// Start sets the consumer's log unmarshaler to either a built-in +// unmarshaler or one loaded from an encoding extension. +func (c *logsConsumer) Start(_ context.Context, host component.Host) error { + encoding := c.config.Encoding + if encoding == "" { + encoding = c.config.RecordType + if encoding == "" { + encoding = defaultLogsEncoding + } + } + if encoding == cwlog.TypeStr { + // TODO: make cwlogs an encoding extension + c.unmarshaler = cwlog.NewUnmarshaler(c.settings.Logger) + } else { + unmarshaler, err := loadEncodingExtension[plog.Unmarshaler](host, encoding, "logs") + if err != nil { + return fmt.Errorf("failed to load encoding extension: %w", err) + } + c.unmarshaler = unmarshaler + } + return nil +} + +// Consume uses the configured unmarshaler to deserialize each record, +// with each resulting plog.Logs being sent to the next consumer as +// they are unmarshalled. func (c *logsConsumer) Consume(ctx context.Context, nextRecord nextRecordFunc, commonAttributes map[string]string) (int, error) { for { record, err := nextRecord() diff --git a/receiver/awsfirehosereceiver/logs_receiver_test.go b/receiver/awsfirehosereceiver/logs_receiver_test.go index fa5f07349f25..0bc6b5187e9b 100644 --- a/receiver/awsfirehosereceiver/logs_receiver_test.go +++ b/receiver/awsfirehosereceiver/logs_receiver_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -40,46 +41,71 @@ func (rc *logsRecordConsumer) Capabilities() consumer.Capabilities { } func TestLogsReceiver_Start(t *testing.T) { - unmarshalers := map[string]plog.Unmarshaler{ - "cwlogs": &cwlog.Unmarshaler{}, - "otlp_logs": &plog.ProtoUnmarshaler{}, - } - testCases := map[string]struct { + encoding string recordType string wantUnmarshalerType plog.Unmarshaler wantErr string }{ - "WithDefaultRecordType": { + "WithDefaultEncoding": { wantUnmarshalerType: &cwlog.Unmarshaler{}, }, - "WithSpecifiedRecordType": { + "WithBuiltinEncoding": { + encoding: "cwlogs", + wantUnmarshalerType: &cwlog.Unmarshaler{}, + }, + "WithExtensionEncoding": { + encoding: "otlp_logs", + wantUnmarshalerType: plogUnmarshalerExtension{}, + }, + "WithDeprecatedRecordType": { recordType: "otlp_logs", - wantUnmarshalerType: &plog.ProtoUnmarshaler{}, + wantUnmarshalerType: plogUnmarshalerExtension{}, }, - "WithUnknownRecordType": { - recordType: "invalid", - wantErr: errUnrecognizedRecordType.Error() + ": recordType = invalid", + "WithUnknownEncoding": { + encoding: "invalid", + wantErr: "failed to start consumer: failed to load encoding extension: unknown encoding extension \"invalid\"", + }, + "WithNonLogUnmarshalerExtension": { + encoding: "otlp_metrics", + wantErr: `failed to start consumer: failed to load encoding extension: extension "otlp_metrics" is not a logs unmarshaler`, }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { cfg := createDefaultConfig().(*Config) + cfg.Encoding = testCase.encoding cfg.RecordType = testCase.recordType got, err := newLogsReceiver( cfg, receivertest.NewNopSettings(metadata.Type), - unmarshalers, consumertest.NewNop(), ) + require.NoError(t, err) + require.NotNil(t, got) + require.IsType(t, &firehoseReceiver{}, got) + t.Cleanup(func() { + require.NoError(t, got.Shutdown(context.Background())) + }) + + host := hostWithExtensions{ + extensions: map[component.ID]component.Component{ + component.MustNewID("otlp_logs"): plogUnmarshalerExtension{}, + component.MustNewID("otlp_metrics"): pmetricUnmarshalerExtension{}, + }, + } + + err = got.Start(context.Background(), host) if testCase.wantErr != "" { require.EqualError(t, err, testCase.wantErr) - require.Nil(t, got) } else { require.NoError(t, err) - require.NotNil(t, got) - require.IsType(t, &firehoseReceiver{}, got) } + + assert.IsType(t, + testCase.wantUnmarshalerType, + got.(*firehoseReceiver).consumer.(*logsConsumer).unmarshaler, + ) }) } } diff --git a/receiver/awsfirehosereceiver/metrics_receiver.go b/receiver/awsfirehosereceiver/metrics_receiver.go index 05e356c4873e..98b0e4773489 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver.go +++ b/receiver/awsfirehosereceiver/metrics_receiver.go @@ -10,19 +10,23 @@ import ( "io" "net/http" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" ) -const defaultMetricsRecordType = cwmetricstream.TypeStr +const defaultMetricsEncoding = cwmetricstream.TypeStr // The metricsConsumer implements the firehoseConsumer // to use a metrics consumer and unmarshaler. type metricsConsumer struct { + config *Config + settings receiver.Settings // consumer passes the translated metrics on to the // next consumer. consumer consumer.Metrics @@ -38,21 +42,12 @@ var _ firehoseConsumer = (*metricsConsumer)(nil) func newMetricsReceiver( config *Config, set receiver.Settings, - unmarshalers map[string]pmetric.Unmarshaler, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - recordType := config.RecordType - if recordType == "" { - recordType = defaultMetricsRecordType - } - configuredUnmarshaler := unmarshalers[recordType] - if configuredUnmarshaler == nil { - return nil, fmt.Errorf("%w: recordType = %s", errUnrecognizedRecordType, recordType) - } - c := &metricsConsumer{ - consumer: nextConsumer, - unmarshaler: configuredUnmarshaler, + config: config, + settings: set, + consumer: nextConsumer, } return &firehoseReceiver{ settings: set, @@ -61,10 +56,34 @@ func newMetricsReceiver( }, nil } -// Consume uses the configured unmarshaler to deserialize the records into a -// single pmetric.Metrics. If there are common attributes available, then it will -// attach those to each of the pcommon.Resources. It will send the final result -// to the next consumer. +func (c *metricsConsumer) Start(_ context.Context, host component.Host) error { + encoding := c.config.Encoding + if encoding == "" { + encoding = c.config.RecordType + if encoding == "" { + encoding = defaultMetricsEncoding + } + } + switch encoding { + case cwmetricstream.TypeStr: + // TODO: make cwmetrics an encoding extension + c.unmarshaler = cwmetricstream.NewUnmarshaler(c.settings.Logger) + case otlpmetricstream.TypeStr: + // TODO: make otlp_v1 an encoding extension + c.unmarshaler = otlpmetricstream.NewUnmarshaler(c.settings.Logger) + default: + unmarshaler, err := loadEncodingExtension[pmetric.Unmarshaler](host, encoding, "metrics") + if err != nil { + return fmt.Errorf("failed to load encoding extension: %w", err) + } + c.unmarshaler = unmarshaler + } + return nil +} + +// Consume uses the configured unmarshaler to deserialize each record, +// with each resulting pmetric.Metrics being sent to the next consumer +// as they are unmarshalled. func (c *metricsConsumer) Consume(ctx context.Context, nextRecord nextRecordFunc, commonAttributes map[string]string) (int, error) { for { record, err := nextRecord() diff --git a/receiver/awsfirehosereceiver/metrics_receiver_test.go b/receiver/awsfirehosereceiver/metrics_receiver_test.go index 5e625dc901a1..1d3d067822d4 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver_test.go +++ b/receiver/awsfirehosereceiver/metrics_receiver_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -39,45 +40,70 @@ func (rc *metricsRecordConsumer) Capabilities() consumer.Capabilities { } func TestMetricsReceiver_Start(t *testing.T) { - unmarshalers := map[string]pmetric.Unmarshaler{ - "cwmetrics": &cwmetricstream.Unmarshaler{}, - "otlp_metrics": &pmetric.ProtoUnmarshaler{}, - } - testCases := map[string]struct { + encoding string recordType string wantUnmarshalerType pmetric.Unmarshaler wantErr string }{ - "WithDefaultRecordType": { + "WithDefaultEncoding": { wantUnmarshalerType: &cwmetricstream.Unmarshaler{}, }, - "WithSpecifiedRecordType": { + "WithBuiltinEncoding": { + encoding: "cwmetrics", + wantUnmarshalerType: &cwmetricstream.Unmarshaler{}, + }, + "WithExtensionEncoding": { + encoding: "otlp_metrics", + wantUnmarshalerType: pmetricUnmarshalerExtension{}, + }, + "WithDeprecatedRecordType": { recordType: "otlp_metrics", - wantUnmarshalerType: &pmetric.ProtoUnmarshaler{}, + wantUnmarshalerType: pmetricUnmarshalerExtension{}, }, - "WithUnknownRecordType": { - recordType: "invalid", - wantErr: errUnrecognizedRecordType.Error() + ": recordType = invalid", + "WithUnknownEncoding": { + encoding: "invalid", + wantErr: `failed to start consumer: failed to load encoding extension: unknown encoding extension "invalid"`, + }, + "WithNonLogUnmarshalerExtension": { + encoding: "otlp_logs", + wantErr: `failed to start consumer: failed to load encoding extension: extension "otlp_logs" is not a metrics unmarshaler`, }, } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { cfg := createDefaultConfig().(*Config) + cfg.Encoding = testCase.encoding cfg.RecordType = testCase.recordType got, err := newMetricsReceiver( cfg, receivertest.NewNopSettings(metadata.Type), - unmarshalers, consumertest.NewNop(), ) + require.NoError(t, err) + require.NotNil(t, got) + t.Cleanup(func() { + require.NoError(t, got.Shutdown(context.Background())) + }) + + host := hostWithExtensions{ + extensions: map[component.ID]component.Component{ + component.MustNewID("otlp_logs"): plogUnmarshalerExtension{}, + component.MustNewID("otlp_metrics"): pmetricUnmarshalerExtension{}, + }, + } + + err = got.Start(context.Background(), host) if testCase.wantErr != "" { require.EqualError(t, err, testCase.wantErr) - require.Nil(t, got) } else { require.NoError(t, err) - require.NotNil(t, got) } + + assert.IsType(t, + testCase.wantUnmarshalerType, + got.(*firehoseReceiver).consumer.(*metricsConsumer).unmarshaler, + ) }) } } diff --git a/receiver/awsfirehosereceiver/receiver.go b/receiver/awsfirehosereceiver/receiver.go index c2002dece303..330374327e71 100644 --- a/receiver/awsfirehosereceiver/receiver.go +++ b/receiver/awsfirehosereceiver/receiver.go @@ -32,7 +32,6 @@ const ( ) var ( - errMissingHost = errors.New("nil host") errInvalidAccessKey = errors.New("invalid firehose access key") errInHeaderMissingRequestID = errors.New("missing request id in header") errInBodyMissingRequestID = errors.New("missing request id in body") @@ -41,6 +40,8 @@ var ( // The firehoseConsumer is responsible for using the unmarshaler and the consumer. type firehoseConsumer interface { + Start(context.Context, component.Host) error + // Consume unmarshals and consumes the records returned by f. Consume(ctx context.Context, f nextRecordFunc, commonAttributes map[string]string) (int, error) } @@ -116,20 +117,20 @@ var ( // Start spins up the receiver's HTTP server and makes the receiver start // its processing. func (fmr *firehoseReceiver) Start(ctx context.Context, host component.Host) error { - if host == nil { - return errMissingHost + if err := fmr.consumer.Start(ctx, host); err != nil { + return fmt.Errorf("failed to start consumer: %w", err) } var err error fmr.server, err = fmr.config.ServerConfig.ToServer(ctx, host, fmr.settings.TelemetrySettings, fmr) if err != nil { - return err + return fmt.Errorf("failed to initialize HTTP server: %w", err) } var listener net.Listener listener, err = fmr.config.ServerConfig.ToListener(ctx) if err != nil { - return err + return fmt.Errorf("failed to start listening for HTTP requests: %w", err) } fmr.shutdownWG.Add(1) go func() { @@ -276,3 +277,31 @@ func (fmr *firehoseReceiver) sendResponse(w http.ResponseWriter, requestID strin fmr.settings.Logger.Error("Failed to send response", zap.Error(err)) } } + +// loadEncodingExtension tries to load an available extension for the given encoding. +func loadEncodingExtension[T any](host component.Host, encoding, signalType string) (T, error) { + var zero T + extensionID, err := encodingToComponentID(encoding) + if err != nil { + return zero, err + } + encodingExtension, ok := host.GetExtensions()[*extensionID] + if !ok { + return zero, fmt.Errorf("unknown encoding extension %q", encoding) + } + unmarshaler, ok := encodingExtension.(T) + if !ok { + return zero, fmt.Errorf("extension %q is not a %s unmarshaler", encoding, signalType) + } + return unmarshaler, nil +} + +// encodingToComponentID converts an encoding string to a component ID using the given encoding as type. +func encodingToComponentID(encoding string) (*component.ID, error) { + componentType, err := component.NewType(encoding) + if err != nil { + return nil, fmt.Errorf("invalid component type: %w", err) + } + id := component.NewID(componentType) + return &id, nil +} diff --git a/receiver/awsfirehosereceiver/receiver_test.go b/receiver/awsfirehosereceiver/receiver_test.go index 80730349225b..b0838671fad6 100644 --- a/receiver/awsfirehosereceiver/receiver_test.go +++ b/receiver/awsfirehosereceiver/receiver_test.go @@ -22,6 +22,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata" @@ -43,12 +45,20 @@ func newNopFirehoseConsumer(statusCode int, err error) *nopFirehoseConsumer { return &nopFirehoseConsumer{statusCode, err} } +func (*nopFirehoseConsumer) Start(context.Context, component.Host) error { + return nil +} + func (nfc *nopFirehoseConsumer) Consume(context.Context, nextRecordFunc, map[string]string) (int, error) { return nfc.statusCode, nfc.err } type firehoseConsumerFunc func(context.Context, nextRecordFunc, map[string]string) (int, error) +func (firehoseConsumerFunc) Start(context.Context, component.Host) error { + return nil +} + func (f firehoseConsumerFunc) Consume(ctx context.Context, next nextRecordFunc, common map[string]string) (int, error) { return f(ctx, next, common) } @@ -58,9 +68,6 @@ func TestStart(t *testing.T) { host component.Host wantErr error }{ - "WithoutHost": { - wantErr: errMissingHost, - }, "WithHost": { host: componenttest.NewNopHost(), }, @@ -291,3 +298,44 @@ func newNextRecordFunc(records [][]byte) nextRecordFunc { return next, nil } } + +type hostWithExtensions struct { + component.Host + extensions map[component.ID]component.Component +} + +func (h hostWithExtensions) GetExtensions() map[component.ID]component.Component { + return h.extensions +} + +type plogUnmarshalerExtension struct { + logs plog.Logs +} + +func (e plogUnmarshalerExtension) Start(context.Context, component.Host) error { + return nil +} + +func (e plogUnmarshalerExtension) Shutdown(context.Context) error { + return nil +} + +func (e plogUnmarshalerExtension) UnmarshalLogs([]byte) (plog.Logs, error) { + return e.logs, nil +} + +type pmetricUnmarshalerExtension struct { + metrics pmetric.Metrics +} + +func (e pmetricUnmarshalerExtension) Start(context.Context, component.Host) error { + return nil +} + +func (e pmetricUnmarshalerExtension) Shutdown(context.Context) error { + return nil +} + +func (e pmetricUnmarshalerExtension) UnmarshalMetrics([]byte) (pmetric.Metrics, error) { + return e.metrics, nil +} diff --git a/receiver/awsfirehosereceiver/testdata/invalid_config.yaml b/receiver/awsfirehosereceiver/testdata/invalid_config.yaml index 17ad1316b7cd..dc51bf24290d 100644 --- a/receiver/awsfirehosereceiver/testdata/invalid_config.yaml +++ b/receiver/awsfirehosereceiver/testdata/invalid_config.yaml @@ -1,3 +1,4 @@ awsfirehose: endpoint: 0.0.0.0:4433 - record_type: invalid \ No newline at end of file + encoding: can't have + record_type: both