Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add raw log message to log handler #7207

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Add support human friendly size for the UDP input. {pull}6886[6886]
- Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842]
- Remove the undefined `username` option from the Redis input and clarify the documentation. {pull}6662[6662]
- Add log.original to each log event. {pull}[]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ filebeat.inputs:
# This is especially useful for multiline log messages which can get large.
#max_bytes: 10485760


# By default Filebeat adds log.original to each event as this contains the original log.
# This can be useful to reprocess the log files.
#original_message: true

### Recursive glob configuration

# Expand "**" patterns into regular glob patterns.
Expand Down
6 changes: 6 additions & 0 deletions filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@
description: >
Logging level.

- name: log.original
type: keyword
description: >
The unprocessed original log message. This can be used for reprocessing logs.
index: false

- name: event.created
type: date
description: >
Expand Down
12 changes: 12 additions & 0 deletions filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,18 @@ type: keyword
Logging level.


--

*`log.original`*::
+
--
type: keyword

The unprocessed original log message. This can be used for reprocessing logs.


Field is not indexed.

--

*`event.created`*::
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ filebeat.inputs:
# This is especially useful for multiline log messages which can get large.
#max_bytes: 10485760


# By default Filebeat adds log.original to each event as this contains the original log.
# This can be useful to reprocess the log files.
#original_message: true

### Recursive glob configuration

# Expand "**" patterns into regular glob patterns.
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

32 changes: 17 additions & 15 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ var (
CleanInactive: 0,

// Input
Enabled: true,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanSort: "",
ScanOrder: "asc",
RecursiveGlob: true,
Enabled: true,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanSort: "",
ScanOrder: "asc",
RecursiveGlob: true,
OriginalMessage: true,

// Harvester
BufferSize: 16 * humanize.KiByte,
Expand Down Expand Up @@ -96,11 +97,12 @@ type config struct {
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *readjson.Config `config:"json"`
ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *multiline.Config `config:"multiline"`
JSON *readjson.Config `config:"json"`
OriginalMessage bool `config:"original_message"`

// Hidden on purpose, used by the docker input:
DockerJSON *struct {
Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func (h *Harvester) Run() error {
}
fields.DeepUpdate(message.Fields)

if h.config.OriginalMessage {
fields.Put("log.original", string(message.Original))
}

// Check if json fields exist
var jsonFields common.MapStr
if f, ok := fields["json"]; ok {
Expand Down
9 changes: 5 additions & 4 deletions filebeat/reader/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
// Message represents a reader event with timestamp, content and actual number
// of bytes read from input before decoding.
type Message struct {
Ts time.Time // timestamp the content was read
Content []byte // actual content read
Bytes int // total number of bytes read to generate the message
Fields common.MapStr // optional fields that can be added by reader
Ts time.Time // timestamp the content was read
Content []byte // actual content read
Original []byte // original content read
Bytes int // total number of bytes read to generate the message
Fields common.MapStr // optional fields that can be added by reader
}

// IsEmpty returns true in case the message is empty
Expand Down
1 change: 1 addition & 0 deletions filebeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (mlr *Reader) clear() {
func (mlr *Reader) finalize() reader.Message {
// Copy message from existing content
msg := mlr.message
msg.Original = mlr.message.Content
mlr.clear()
return msg
}
Expand Down
4 changes: 4 additions & 0 deletions filebeat/reader/readfile/strip_newline.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (p *StripNewline) Next() (reader.Message, error) {
L := message.Content
message.Content = L[:len(L)-lineEndingChars(L)]

// Also strip new lines from raw message
M := message.Original
message.Original = M[:len(M)-lineEndingChars(M)]

return message, err
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ filebeat.{{input_config | default("inputs")}}:
harvester_limit: {{harvester_limit | default(0) }}
symlinks: {{symlinks}}
pipeline: {{pipeline}}
original_message: {{original_message| default("true")}}
{%- if input_processors %}
processors:
{%- for processor in input_processors %}
Expand Down
55 changes: 55 additions & 0 deletions filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,3 +818,58 @@ def test_decode_error(self):

output = self.read_output_json()
assert output[2]["message"] == "hello world2"

def test_original_message_enabled(self):
"""
Test original message enabled for json use case
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
keys_under_root=True,
),
)
os.mkdir(self.working_dir + "/log/")
logfile = self.working_dir + "/log/test.log"
message = '{"hello":"world"}'
with open(logfile, 'a') as file:
file.write(message + "\n")

proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()
assert len(output) == 1

assert output[0]["log.message"] == message

def test_original_message_disabled(self):
"""
Test original message enabled for json use case
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
keys_under_root=True,
),
original_message=False,
)
os.mkdir(self.working_dir + "/log/")
logfile = self.working_dir + "/log/test.log"
message = '{"hello":"world"}'
with open(logfile, 'a') as file:
file.write(message + "\n")

proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()
assert len(output) == 1

assert "log.original" not in output[0]
6 changes: 4 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def test_docker_logs_filtering(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(message_key="log", keys_under_root=True),
exclude_lines=["windows"]
exclude_lines=["windows"],
original_message=False,
)

os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -74,7 +75,8 @@ def test_docker_logs_multiline(self):
multiline=True,
pattern="^\[log\]",
match="after",
negate="true"
negate="true",
original_message=False,
)

os.mkdir(self.working_dir + "/log/")
Expand Down