From 874e01f87e2511c346936783e6f30cf78b828960 Mon Sep 17 00:00:00 2001 From: Christoph Wurm Date: Thu, 23 May 2019 10:25:48 -0700 Subject: [PATCH] [Auditbeat] Socket: Add network.transport and network.community_id (#12231) Adds `network.transport` (always `tcp` at the moment) and `network.community_id` to the `socket` dataset. --- CHANGELOG.next.asciidoc | 1 + .../module/system/socket/_meta/data.json | 2 + .../auditbeat/module/system/socket/socket.go | 52 ++++++++++++++++++- .../module/system/socket/socket_test.go | 17 ++++++ .../auditbeat/tests/system/test_metricsets.py | 2 +- 5 files changed, 72 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 92a2b5e107e0..8e1e8670c45d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -167,6 +167,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Package: Enable suse. {pull}11634[11634] - Add support to the system package dataset for the SUSE OS family. {pull}11634[11634] - Process: Add file hash of process executable. {pull}11722[11722] +- Socket: Add network.transport and network.community_id. {pull}12231[12231] *Filebeat* diff --git a/x-pack/auditbeat/module/system/socket/_meta/data.json b/x-pack/auditbeat/module/system/socket/_meta/data.json index 60c87c66d3f0..927e8f5b4ede 100644 --- a/x-pack/auditbeat/module/system/socket/_meta/data.json +++ b/x-pack/auditbeat/module/system/socket/_meta/data.json @@ -12,7 +12,9 @@ }, "message": "Inbound socket (10.0.2.2:55270 -\u003e 10.0.2.15:22) CLOSED by process sshd (PID: 22799) and user root (UID: 0)", "network": { + "community_id": "1:IXrg9Y06W7zrkqBlE30jpC/mzjo=", "direction": "inbound", + "transport": "tcp", "type": "ipv4" }, "process": { diff --git a/x-pack/auditbeat/module/system/socket/socket.go b/x-pack/auditbeat/module/system/socket/socket.go index ba8f658a5276..079df951abda 100644 --- a/x-pack/auditbeat/module/system/socket/socket.go +++ b/x-pack/auditbeat/module/system/socket/socket.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/common/flowhash" "github.com/elastic/beats/libbeat/logp" sock "github.com/elastic/beats/metricbeat/helper/socket" "github.com/elastic/beats/metricbeat/mb" @@ -39,6 +40,22 @@ const ( eventTypeEvent = "event" ) +type ipProtocol uint8 + +const ( + // TODO: Unify IP protocol constants in Beats + tcp ipProtocol = 6 +) + +func (proto ipProtocol) String() string { + switch proto { + case tcp: + return "tcp" + default: + return "" + } +} + type eventAction uint8 const ( @@ -85,6 +102,7 @@ type MetricSet struct { // Socket represents information about a socket. type Socket struct { Family linux.AddressFamily + Protocol ipProtocol LocalIP net.IP LocalPort int RemoteIP net.IP @@ -102,6 +120,7 @@ type Socket struct { func newSocket(diag *linux.InetDiagMsg) *Socket { return &Socket{ Family: linux.AddressFamily(diag.Family), + Protocol: tcp, LocalIP: diag.SrcIP(), LocalPort: diag.SrcPort(), RemoteIP: diag.DstIP(), @@ -126,14 +145,20 @@ func (s Socket) Hash() uint64 { func (s Socket) toMapStr() common.MapStr { mapstr := common.MapStr{ "network": common.MapStr{ - "type": s.Family.String(), "direction": s.Direction.String(), + "transport": s.Protocol.String(), + "type": s.Family.String(), }, "user": common.MapStr{ "id": s.UID, }, } + communityID := s.communityID() + if communityID != "" { + mapstr.Put("network.community_id", communityID) + } + if s.Username != "" { mapstr.Put("user.name", s.Username) } @@ -190,6 +215,31 @@ func (s Socket) entityID(hostID string) string { return h.Sum() } +// communityID calculates the community ID of this socket. +func (s Socket) communityID() string { + var flow flowhash.Flow + + switch s.Direction { + case sock.Inbound: + flow.SourceIP = s.RemoteIP + flow.SourcePort = uint16(s.RemotePort) + flow.DestinationIP = s.LocalIP + flow.DestinationPort = uint16(s.LocalPort) + case sock.Outbound: + flow.SourceIP = s.LocalIP + flow.SourcePort = uint16(s.LocalPort) + flow.DestinationIP = s.RemoteIP + flow.DestinationPort = uint16(s.RemotePort) + default: + // Listening socket, not a flow + return "" + } + + flow.Protocol = uint8(s.Protocol) + + return flowhash.CommunityID.Hash(flow) +} + // New constructs a new MetricSet. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { cfgwarn.Beta("The %v/%v dataset is beta", moduleName, metricsetName) diff --git a/x-pack/auditbeat/module/system/socket/socket_test.go b/x-pack/auditbeat/module/system/socket/socket_test.go index 5b28d85e61b3..790132cd4df3 100644 --- a/x-pack/auditbeat/module/system/socket/socket_test.go +++ b/x-pack/auditbeat/module/system/socket/socket_test.go @@ -49,9 +49,18 @@ func TestData(t *testing.T) { mbtest.WriteEventToDataJSON(t, fullEvent, "") } +func TestSocket(t *testing.T) { + s := testSocket() + + assert.Equal(t, uint64(0xee1186910755e9b1), s.Hash()) + assert.Equal(t, "fIj66YRoGyoe8dML", s.entityID("fa8a1edd06864f47ba4cad5d0f5ca134")) + assert.Equal(t, "1:IXrg9Y06W7zrkqBlE30jpC/mzjo=", s.communityID()) +} + func testSocket() *Socket { return &Socket{ Family: linux.AF_INET, + Protocol: tcp, LocalIP: net.IPv4(10, 0, 2, 15), LocalPort: 22, RemoteIP: net.IPv4(10, 0, 2, 2), @@ -107,7 +116,13 @@ func TestOutbound(t *testing.T) { checkFieldValue(t, event.RootFields, "process.name", "socket.test") checkFieldValue(t, event.RootFields, "user.id", os.Geteuid()) checkFieldValue(t, event.RootFields, "network.direction", sock.Outbound.String()) + checkFieldValue(t, event.RootFields, "network.transport", "tcp") checkFieldValue(t, event.RootFields, "destination.port", 80) + + communityID, err := event.RootFields.GetValue("network.community_id") + if assert.NoError(t, err) { + assert.NotEmpty(t, communityID) + } } func TestListening(t *testing.T) { @@ -153,6 +168,7 @@ func TestListening(t *testing.T) { checkFieldValue(t, event.RootFields, "process.name", "socket.test") checkFieldValue(t, event.RootFields, "user.id", os.Geteuid()) checkFieldValue(t, event.RootFields, "network.direction", sock.Listening.String()) + checkFieldValue(t, event.RootFields, "network.transport", "tcp") } func TestLocalhost(t *testing.T) { @@ -203,6 +219,7 @@ func TestLocalhost(t *testing.T) { checkFieldValue(t, event.RootFields, "process.name", "socket.test") checkFieldValue(t, event.RootFields, "user.id", os.Geteuid()) checkFieldValue(t, event.RootFields, "network.direction", sock.Listening.String()) + checkFieldValue(t, event.RootFields, "network.transport", "tcp") checkFieldValue(t, event.RootFields, "destination.ip", "127.0.0.1") } diff --git a/x-pack/auditbeat/tests/system/test_metricsets.py b/x-pack/auditbeat/tests/system/test_metricsets.py index f44d92a6656f..816cd6664da8 100644 --- a/x-pack/auditbeat/tests/system/test_metricsets.py +++ b/x-pack/auditbeat/tests/system/test_metricsets.py @@ -79,7 +79,7 @@ def test_metricset_socket(self): socket metricset collects information about open sockets on a system. """ - fields = ["socket.entity_id", "destination.port"] + fields = ["socket.entity_id", "destination.port", "network.direction", "network.transport"] # errors_allowed=True - The socket metricset fills the `error` field if the process enrichment fails # (e.g. process has exited). This should not fail the test.