Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mongodb OP_MSG (2013) #11500

Merged
merged 3 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- `http.response.body` moves to `http.response.body.content`
- Changed Packetbeat fields to align with ECS. {issue}7968[7968]
- Removed trailing dot from domain names reported by the DNS protocol. {pull}9941[9941]
- Add support for mongodb opcode 2013 (OP_MSG). {issue}6191[6191] {pull}8594[8594]

*Winlogbeat*

Expand Down
24 changes: 24 additions & 0 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,30 @@ def log_contains_count(self, msg, logfile=None, ignore_case=False):

return counter

def log_contains_countmap(self, pattern, capture_group, logfile=None):
"""
Returns a map of the number of appearances of each captured group in the log file
"""
counts = {}

if logfile is None:
logfile = self.beat_name + ".log"

try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
res = pattern.search(line)
if res is not None:
capt = res.group(capture_group)
if capt in counts:
counts[capt] += 1
else:
counts[capt] = 1
except IOError:
pass

return counts

def output_lines(self, output_file=None):
""" Count number of lines in a file."""
if output_file is None:
Expand Down
1 change: 1 addition & 0 deletions packetbeat/protos/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func newTransaction(requ, resp *mongodbMessage) *transaction {
trans.params = requ.params
trans.resource = requ.resource
trans.bytesIn = requ.messageLength
trans.documents = requ.documents
}

// fill response
Expand Down
77 changes: 72 additions & 5 deletions packetbeat/protos/mongodb/mongodb_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func mongodbMessageParser(s *stream) (bool, bool) {
mutex.Lock()
defer mutex.Unlock()
if _, reported := unknownOpcodes[opCode]; !reported {
logp.Err("Unknown operation code: %v", opCode)
logp.Err("Unknown operation code: %d (%v)", opCode, opCode)
unknownOpcodes[opCode] = struct{}{}
}
return false, false
Expand All @@ -74,7 +74,7 @@ func mongodbMessageParser(s *stream) (bool, bool) {
s.message.opCode = opCode
s.message.isResponse = false // default is that the message is a request. If not opReplyParse will set this to false
s.message.expectsResponse = false
debugf("opCode = %v", s.message.opCode)
debugf("opCode = %d (%v)", s.message.opCode, s.message.opCode)

// then split depending on operation type
s.message.event = common.MapStr{}
Expand All @@ -83,9 +83,9 @@ func mongodbMessageParser(s *stream) (bool, bool) {
case opReply:
s.message.isResponse = true
return opReplyParse(d, s.message)
case opMsg:
case opMsgLegacy:
s.message.method = "msg"
return opMsgParse(d, s.message)
return opMsgLegacyParse(d, s.message)
case opUpdate:
s.message.method = "update"
return opUpdateParse(d, s.message)
Expand All @@ -105,6 +105,9 @@ func mongodbMessageParser(s *stream) (bool, bool) {
case opKillCursor:
s.message.method = "killCursors"
return opKillCursorsParse(d, s.message)
case opMsg:
s.message.method = "msg"
return opMsgParse(d, s.message)
}

return false, false
Expand Down Expand Up @@ -148,7 +151,7 @@ func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) {
return true, true
}

func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) {
func opMsgLegacyParse(d *decoder, m *mongodbMessage) (bool, bool) {
var err error
m.event["message"], err = d.readCStr()
if err != nil {
Expand Down Expand Up @@ -303,6 +306,61 @@ func opKillCursorsParse(d *decoder, m *mongodbMessage) (bool, bool) {
return true, true
}

func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) {
// ignore flagbits
_, err := d.readInt32()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}

// read sections
kind, err := d.readByte()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}

switch msgKind(kind) {
case msgKindBody:
document, err := d.readDocument()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
m.documents = []interface{}{document}

case msgKindDocumentSequence:
start := d.i
size, err := d.readInt32()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
cstring, err := d.readCStr()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
m.event["message"] = cstring
var documents []interface{}
for d.i < start+size {
document, err := d.readDocument()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
}
documents = append(documents, document)
}
m.documents = documents

default:
logp.Err("Unknown message kind: %v", kind)
return false, false
}

return true, true
}

// NOTE: The following functions are inspired by the source of the go-mgo/mgo project
// https://github.com/go-mgo/mgo/blob/v2/bson/decode.go

Expand Down Expand Up @@ -335,6 +393,15 @@ func (d *decoder) readCStr() (string, error) {
return string(d.in[start:end]), nil
}

func (d *decoder) readByte() (byte, error) {
i := d.i
d.i++
if d.i > len(d.in) {
return 0, errors.New("Read byte failed")
}
return d.in[i], nil
}

func (d *decoder) readInt32() (int, error) {
b, err := d.readBytes(4)

Expand Down
11 changes: 10 additions & 1 deletion packetbeat/protos/mongodb/mongodb_structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,26 @@ type transaction struct {
documents []interface{}
}

type msgKind byte

const (
msgKindBody msgKind = 0
msgKindDocumentSequence msgKind = 1
)

type opCode int32

const (
opReply opCode = 1
opMsg opCode = 1000
opMsgLegacy opCode = 1000
opUpdate opCode = 2001
opInsert opCode = 2002
opReserved opCode = 2003
opQuery opCode = 2004
opGetMore opCode = 2005
opDelete opCode = 2006
opKillCursor opCode = 2007
opMsg opCode = 2013
)

// List of valid mongodb wire protocol operation codes
Expand All @@ -123,6 +131,7 @@ var opCodeNames = map[opCode]string{
2005: "OP_GET_MORE",
2006: "OP_DELETE",
2007: "OP_KILL_CURSORS",
2013: "OP_MSG",
}

func validOpcode(o opCode) bool {
Expand Down
Binary file not shown.
32 changes: 28 additions & 4 deletions packetbeat/tests/system/test_0025_mongodb_basic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from packetbeat import BaseTest


Expand Down Expand Up @@ -220,14 +221,37 @@ def test_request_after_reply(self):
assert o["type"] == "mongodb"
assert o["event.duration"] >= 0

def test_unknown_opcode_flood(self):
def test_opmsg(self):
"""
Tests that a repeated unknown opcode is reported just once.
Tests parser works with opcode 2013 (OP_MSG).
"""
self.render_config_template(
mongodb_ports=[9991]
)
self.run_packetbeat(pcap="mongodb_op_msg_opcode.pcap",
debug_selectors=["mongodb"])
num_msgs = self.log_contains_count('Unknown operation code: ')
assert num_msgs == 1, "Unknown opcode reported more than once: {0}".format(num_msgs)

objs = self.read_output()
o = objs[0]
assert o["type"] == "mongodb"

count = self.log_contains_count('Unknown operation code: ')
assert count == 0

def test_unknown_opcode_flood(self):
"""
Tests that any repeated unknown opcodes are reported just once.
"""
self.render_config_template(
mongodb_ports=[27017]
)
self.run_packetbeat(pcap="mongodb_invalid_opcode_2269.pcap",
debug_selectors=["mongodb"])

unknown_counts = self.log_contains_countmap(
re.compile(r'Unknown operation code: (\d+)'), 1)

assert len(unknown_counts) > 0
for k, v in unknown_counts.items():
assert v == 1, "Unknown opcode reported more than once: opcode={0}, count={1}".format(
k, v)