Skip to content

Commit

Permalink
New processor: copy_fields (#11303)
Browse files Browse the repository at this point in the history
A new processor is introduced as part of support for keeping orignal messages. Options and naming follows the convention of other processors.

### `copy_fields`

This processor copies one field to another. Example configuration is below:

```yaml
processors:
- copy_fields:
    fields:
      - from: message
        to: event.original
    fail_on_error: false
    ignore_missing: true
```
  • Loading branch information
kvch authored Mar 22, 2019
1 parent 09a8fa8 commit 4b9f945
Show file tree
Hide file tree
Showing 17 changed files with 385 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add output test to kafka output {pull}10834[10834]
- Add ip fields to default_field in Elasticsearch template. {pull}11035[11035]
- Gracefully shut down on SIGHUP {pull}10704[10704]
- Add `script` processor that supports using Javascript to process events. {pull}10850[10850] {pull}11260[11260]
- New processor: `copy_fields`. {pull}11303[11303]
- Add `error.message` to events when `fail_on_error` is set in `rename` and `copy_fields` processors. {pull}11303[11303]

*Auditbeat*

Expand Down
10 changes: 10 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ auditbeat.modules:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,16 @@ filebeat.inputs:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,16 @@ heartbeat.scheduler:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,16 @@ setup.template.settings:
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
10 changes: 10 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@
# max_depth: 1
# target: ""
# overwrite_keys: false
#
# The following example copies the value of message to message_copied
#
#processors:
#- copy_fields:
# fields:
# - from: message
# to: message_copied
# fail_on_error: true
# ignore_missing: false

#============================= Elastic Cloud ==================================

Expand Down
108 changes: 108 additions & 0 deletions libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

type copyFields struct {
config copyFieldsConfig
}

type copyFieldsConfig struct {
Fields []fromTo `config:"fields"`
IgnoreMissing bool `config:"ignore_missing"`
FailOnError bool `config:"fail_on_error"`
}

func init() {
processors.RegisterPlugin("copy_fields",
configChecked(newCopyFields,
requireFields("fields"),
),
)
}

func newCopyFields(c *common.Config) (processors.Processor, error) {
config := copyFieldsConfig{
IgnoreMissing: false,
FailOnError: true,
}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("failed to unpack the configuration of copy processor: %s", err)
}

f := &copyFields{
config: config,
}
return f, nil
}

func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) {
var backup common.MapStr
if f.config.FailOnError {
backup = event.Fields.Clone()
}

for _, field := range f.config.Fields {
err := f.copyField(field.From, field.To, event.Fields)
if err != nil && f.config.FailOnError {
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err)
logp.Debug("copy_fields", errMsg.Error())
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
}

return event, nil
}

func (f *copyFields) copyField(from string, to string, fields common.MapStr) error {
exists, _ := fields.HasKey(to)
if exists {
return fmt.Errorf("target field %s already exists, drop or rename this field first", to)
}

value, err := fields.GetValue(from)
if err != nil {
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return nil
}
return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err)
}

_, err = fields.Put(to, value)
if err != nil {
return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err)
}
return nil
}

func (f *copyFields) String() string {
return "copy_fields=" + fmt.Sprintf("%+v", f.config.Fields)
}
145 changes: 145 additions & 0 deletions libbeat/processors/actions/copy_fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestCopyFields(t *testing.T) {

var tests = map[string]struct {
FromTo fromTo
Input common.MapStr
Expected common.MapStr
}{
"copy string from message to message_copied": {
FromTo: fromTo{
From: "message",
To: "message_copied",
},
Input: common.MapStr{
"message": "please copy this line",
},
Expected: common.MapStr{
"message": "please copy this line",
"message_copied": "please copy this line",
},
},
"copy string from nested key nested.message to top level field message_copied": {
FromTo: fromTo{
From: "nested.message",
To: "message_copied",
},
Input: common.MapStr{
"nested": common.MapStr{
"message": "please copy this line",
},
},
Expected: common.MapStr{
"nested": common.MapStr{
"message": "please copy this line",
},
"message_copied": "please copy this line",
},
},
"copy string from fieldname with dot to message_copied": {
FromTo: fromTo{
From: "dotted.message",
To: "message_copied",
},
Input: common.MapStr{
"dotted.message": "please copy this line",
},
Expected: common.MapStr{
"dotted.message": "please copy this line",
"message_copied": "please copy this line",
},
},
"copy number from fieldname with dot to dotted message.copied": {
FromTo: fromTo{
From: "message.original",
To: "message.copied",
},
Input: common.MapStr{
"message.original": 42,
},
Expected: common.MapStr{
"message.original": 42,
"message": common.MapStr{
"copied": 42,
},
},
},
"copy number from hierarchical message.original to top level message which fails": {
FromTo: fromTo{
From: "message.original",
To: "message",
},
Input: common.MapStr{
"message": common.MapStr{
"original": 42,
},
},
Expected: common.MapStr{
"message": common.MapStr{
"original": 42,
},
},
},
"copy number from hierarchical message.original to top level message": {
FromTo: fromTo{
From: "message.original",
To: "message",
},
Input: common.MapStr{
"message.original": 42,
},
Expected: common.MapStr{
"message.original": 42,
"message": 42,
},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
p := copyFields{
copyFieldsConfig{
Fields: []fromTo{
test.FromTo,
},
},
}

event := &beat.Event{
Fields: test.Input,
}

newEvent, err := p.Run(event)
assert.NoError(t, err)

assert.Equal(t, test.Expected, newEvent.Fields)
})
}
}
6 changes: 4 additions & 2 deletions libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ func (f *renameFields) Run(event *beat.Event) (*beat.Event, error) {
for _, field := range f.config.Fields {
err := f.renameField(field.From, field.To, event.Fields)
if err != nil && f.config.FailOnError {
logp.Debug("rename", "Failed to rename fields, revert to old event: %s", err)
errMsg := fmt.Errorf("Failed to rename fields in processor: %s", err)
logp.Debug("rename", errMsg.Error())
event.Fields = backup
event.PutValue("error.message", errMsg.Error())
return event, err
}
}
Expand Down Expand Up @@ -108,7 +110,7 @@ func (f *renameFields) renameField(from string, to string, fields common.MapStr)

_, err = fields.Put(to, value)
if err != nil {
return fmt.Errorf("could not put value: %s: %v, %+v", to, value, err)
return fmt.Errorf("could not put value: %s: %v, %v", to, value, err)
}
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/actions/rename_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func TestRenameRun(t *testing.T) {
Output: common.MapStr{
"a": 2,
"b": "q",
"error": common.MapStr{
"message": "Failed to rename fields in processor: target field b already exists, drop or rename this field first",
},
},
error: true,
FailOnError: true,
Expand Down Expand Up @@ -188,6 +191,9 @@ func TestRenameRun(t *testing.T) {
Output: common.MapStr{
"a": 9,
"c": 10,
"error": common.MapStr{
"message": "Failed to rename fields in processor: could not put value: a.c: 10, expected map but type is int",
},
},
error: true,
IgnoreMissing: false,
Expand Down
Loading

0 comments on commit 4b9f945

Please sign in to comment.