Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mongodb OP_MSG (2013) #8594

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packetbeat/protos/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func newTransaction(requ, resp *mongodbMessage) *transaction {
trans.params = requ.params
trans.resource = requ.resource
trans.bytesIn = requ.messageLength
trans.documents = requ.documents
}

// fill response
Expand Down
77 changes: 72 additions & 5 deletions packetbeat/protos/mongodb/mongodb_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func mongodbMessageParser(s *stream) (bool, bool) {
opCode := opCode(code)

if !validOpcode(opCode) {
logp.Err("Unknown operation code: %v", opCode)
logp.Err("Unknown operation code: %d (%v)", opCode, opCode)
return false, false
}

s.message.opCode = opCode
s.message.isResponse = false // default is that the message is a request. If not opReplyParse will set this to false
s.message.expectsResponse = false
debugf("opCode = %v", s.message.opCode)
debugf("opCode = %d (%v)", s.message.opCode, s.message.opCode)

// then split depending on operation type
s.message.event = common.MapStr{}
Expand All @@ -72,9 +72,9 @@ func mongodbMessageParser(s *stream) (bool, bool) {
case opReply:
s.message.isResponse = true
return opReplyParse(d, s.message)
case opMsg:
case opMsgLegacy:
s.message.method = "msg"
return opMsgParse(d, s.message)
return opMsgLegacyParse(d, s.message)
case opUpdate:
s.message.method = "update"
return opUpdateParse(d, s.message)
Expand All @@ -94,6 +94,9 @@ func mongodbMessageParser(s *stream) (bool, bool) {
case opKillCursor:
s.message.method = "killCursors"
return opKillCursorsParse(d, s.message)
case opMsg:
s.message.method = "msg"
return opMsgParse(d, s.message)
}

return false, false
Expand Down Expand Up @@ -137,7 +140,7 @@ func opReplyParse(d *decoder, m *mongodbMessage) (bool, bool) {
return true, true
}

func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) {
func opMsgLegacyParse(d *decoder, m *mongodbMessage) (bool, bool) {
var err error
m.event["message"], err = d.readCStr()
if err != nil {
Expand Down Expand Up @@ -292,6 +295,61 @@ func opKillCursorsParse(d *decoder, m *mongodbMessage) (bool, bool) {
return true, true
}

func opMsgParse(d *decoder, m *mongodbMessage) (bool, bool) {
// ignore flagbits
_, err := d.readInt32()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}

// read sections
kind, err := d.readByte()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}

switch msgKind(kind) {
case msgKindBody:
document, err := d.readDocument()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
m.documents = []interface{}{document}

case msgKindDocumentSequence:
start := d.i
size, err := d.readInt32()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
cstring, err := d.readCStr()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
return false, false
}
m.event["message"] = cstring
var documents []interface{}
for d.i < start+size {
document, err := d.readDocument()
if err != nil {
logp.Err("An error occurred while parsing OP_MSG message: %s", err)
}
documents = append(documents, document)
}
m.documents = documents

default:
logp.Err("Unknown message kind: %v", kind)
return false, false
}

return true, true
}

// NOTE: The following functions are inspired by the source of the go-mgo/mgo project
// https://github.com/go-mgo/mgo/blob/v2/bson/decode.go

Expand Down Expand Up @@ -324,6 +382,15 @@ func (d *decoder) readCStr() (string, error) {
return string(d.in[start:end]), nil
}

func (d *decoder) readByte() (byte, error) {
i := d.i
d.i++
if d.i > len(d.in) {
return 0, errors.New("Read byte failed")
}
return d.in[i], nil
}

func (d *decoder) readInt32() (int, error) {
b, err := d.readBytes(4)

Expand Down
11 changes: 10 additions & 1 deletion packetbeat/protos/mongodb/mongodb_structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,26 @@ type transaction struct {
documents []interface{}
}

type msgKind byte

const (
msgKindBody msgKind = 0
msgKindDocumentSequence msgKind = 1
)

type opCode int32

const (
opReply opCode = 1
opMsg opCode = 1000
opMsgLegacy opCode = 1000
opUpdate opCode = 2001
opInsert opCode = 2002
opReserved opCode = 2003
opQuery opCode = 2004
opGetMore opCode = 2005
opDelete opCode = 2006
opKillCursor opCode = 2007
opMsg opCode = 2013
)

// List of valid mongodb wire protocol operation codes
Expand All @@ -123,6 +131,7 @@ var opCodeNames = map[opCode]string{
2005: "OP_GET_MORE",
2006: "OP_DELETE",
2007: "OP_KILL_CURSORS",
2013: "OP_MSG",
}

func validOpcode(o opCode) bool {
Expand Down