diff --git a/go.mod b/go.mod index c399dd2a8411..a7755c488002 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/garyburd/redigo v1.0.1-0.20160525165706-b8dc90050f24 github.com/go-ole/go-ole v1.2.5-0.20190920104607-14974a1cf647 // indirect github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect - github.com/go-sql-driver/mysql v1.4.1 + github.com/go-sql-driver/mysql v1.5.0 github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e github.com/godror/godror v0.10.4 @@ -142,6 +142,8 @@ require ( github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect + go.elastic.co/apm v1.7.2 + go.elastic.co/apm/module/apmhttp v1.7.2 go.uber.org/atomic v1.3.1 go.uber.org/multierr v1.1.1-0.20170829224307-fb7d312c2c04 go.uber.org/zap v1.7.1 diff --git a/go.sum b/go.sum index 26dcb9910c13..fc90e7a9059f 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 h1:nkZ9axP+MvUFCu8JRN/MCY+DmTfs6lY7hE0QnJbxSdI= github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5/go.mod h1:T7PbCXFs94rrTttyxjbyT5+/1V8T2TYDejxUfHJjw1Y= +github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI= +github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aws/aws-lambda-go v1.6.0 h1:T+u/g79zPKw1oJM7xYhvpq7i4Sjc0iVsXZUaqRVVSOg= @@ -171,6 +173,8 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+ github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea h1:n2Ltr3SrfQlf/9nOna1DoGKxLx3qTSI8Ttl6Xrqp6mw= github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/cucumber/godog v0.8.1 h1:lVb+X41I4YDreE+ibZ50bdXmySxgRviYFgKY6Aw4XE8= +github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA= github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -237,6 +241,7 @@ github.com/elastic/go-seccomp-bpf v1.1.0 h1:jUzzDc6LyCtdolZdvL/26dad6rZ9vsc7xZ2e github.com/elastic/go-seccomp-bpf v1.1.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE+vxCFzzvQwN7T8= github.com/elastic/go-structform v0.0.6 h1:wqeK4LwD2NNDOoRGTImE24S6pkCDVr8+oUSIkmChzLk= github.com/elastic/go-structform v0.0.6/go.mod h1:QrMyP3oM9Sjk92EVGLgRaL2lKt0Qx7ZNDRWDxB6khVs= +github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE= github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-txfile v0.0.7 h1:Yn28gclW7X0Qy09nSMSsx0uOAvAGMsp6XHydbiLVe2s= @@ -290,8 +295,8 @@ github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dp github.com/go-sourcemap/sourcemap v2.1.2+incompatible h1:0b/xya7BKGhXuqFESKM4oIiRo9WOt2ebz7KxfreD6ug= github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= -github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be h1:zXHeEEJ231bTf/IXqvCfeaqjLpXsq42ybLoT4ROSR6Y= @@ -593,6 +598,8 @@ github.com/sanathkr/go-yaml v0.0.0-20170819195128-ed9d249f429b/go.mod h1:8458kAa github.com/sanathkr/yaml v0.0.0-20170819201035-0056894fa522/go.mod h1:tQTYKOQgxoH3v6dEmdHiz4JG+nbxWwM5fgPQUpSZqVQ= github.com/sanathkr/yaml v1.0.1-0.20170819201035-0056894fa522 h1:39BJIaZIhIBmXATIhdlTBlTQpAiGXHnz17CrO7vF2Ss= github.com/sanathkr/yaml v1.0.1-0.20170819201035-0056894fa522/go.mod h1:tQTYKOQgxoH3v6dEmdHiz4JG+nbxWwM5fgPQUpSZqVQ= +github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis= +github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.19.11+incompatible h1:lJHR0foqAjI4exXqWsU3DbH7bX1xvdhGdnXTIARA9W4= github.com/shirou/gopsutil v2.19.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -666,6 +673,12 @@ github.com/xeipuuv/gojsonschema v0.0.0-20181112162635-ac52e6811b56 h1:yhqBHs09Sm github.com/xeipuuv/gojsonschema v0.0.0-20181112162635-ac52e6811b56/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 h1:0gYLpmzecnaDCoeWxSfEJ7J1b6B/67+NV++4HKQXx+Y= github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7/go.mod h1:aEV29XrmTYFr3CiRxZeGHpkvbwq+prZduBqMaascyCU= +go.elastic.co/apm v1.7.2 h1:0nwzVIPp4PDBXSYYtN19+1W5V+sj+C25UjqxDVoKcA8= +go.elastic.co/apm v1.7.2/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0= +go.elastic.co/apm/module/apmhttp v1.7.2 h1:2mRh7SwBuEVLmJlX+hsMdcSg9xaielCLElaPn/+i34w= +go.elastic.co/apm/module/apmhttp v1.7.2/go.mod h1:sTFWiWejnhSdZv6+dMgxGec2Nxe/ZKfHfz/xtRM+cRY= +go.elastic.co/fastjson v1.0.0 h1:ooXV/ABvf+tBul26jcVViPT3sBir0PvXgibYB1IQQzg= +go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index c4f4665af1b4..8dab6501cb2a 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -20,12 +20,15 @@ package beat import ( "time" + "go.elastic.co/apm" + "github.com/elastic/beats/v7/libbeat/common" ) type Pipeline interface { PipelineConnector SetACKHandler(PipelineACKHandler) error + SetTracer(*apm.Tracer) error } // PipelineConnector creates a publishing Client. This is typically backed by a Pipeline. @@ -78,6 +81,8 @@ type ClientConfig struct { // ACKLastEvent reports the last ACKed event out of a batch of ACKed events only. // Only the events 'Private' field will be reported. ACKLastEvent func(interface{}) + + Tracer *apm.Tracer } // CloseRef allows users to close the client asynchronously. diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 86b518eeea1c..ae7ea92f8baa 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -19,6 +19,7 @@ package eslegclient import ( "bytes" + "context" "encoding/json" "errors" "io" @@ -26,6 +27,9 @@ import ( "net/http" "strings" + "go.elastic.co/apm" + "go.elastic.co/apm/module/apmhttp" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -59,6 +63,7 @@ type BulkResult json.RawMessage // Bulk performs many index/delete operations in a single API call. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html func (conn *Connection) Bulk( + ctx context.Context, index, docType string, params map[string]string, body []interface{}, ) (int, BulkResult, error) { @@ -69,13 +74,16 @@ func (conn *Connection) Bulk( enc := conn.Encoder enc.Reset() if err := bulkEncode(conn.log, enc, body); err != nil { + apm.CaptureError(ctx, err).Send() return 0, nil, err } requ, err := newBulkRequest(conn.URL, index, docType, params, enc) if err != nil { + apm.CaptureError(ctx, err).Send() return 0, nil, err } + requ.requ = apmhttp.RequestWithContext(ctx, requ.requ) return conn.sendBulkRequest(requ) } diff --git a/libbeat/esleg/eslegclient/bulkapi_mock_test.go b/libbeat/esleg/eslegclient/bulkapi_mock_test.go index 1fbd53d94253..ded3e53c95c0 100644 --- a/libbeat/esleg/eslegclient/bulkapi_mock_test.go +++ b/libbeat/esleg/eslegclient/bulkapi_mock_test.go @@ -20,6 +20,7 @@ package eslegclient import ( + "context" "fmt" "net/http" "os" @@ -60,7 +61,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) { params := map[string]string{ "refresh": "true", } - _, _, err := client.Bulk(index, "type1", params, body) + _, _, err := client.Bulk(context.Background(), index, "type1", params, body) if err != nil { t.Errorf("Bulk() returns error: %s", err) } @@ -96,7 +97,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) { params := map[string]string{ "refresh": "true", } - _, _, err := client.Bulk(index, "type1", params, body) + _, _, err := client.Bulk(context.Background(), index, "type1", params, body) if err == nil { t.Errorf("Bulk() should return error.") } @@ -136,7 +137,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) { params := map[string]string{ "refresh": "true", } - _, _, err := client.Bulk(index, "type1", params, body) + _, _, err := client.Bulk(context.Background(), index, "type1", params, body) if err == nil { t.Errorf("Bulk() should return error.") } diff --git a/libbeat/esleg/eslegclient/connection.go b/libbeat/esleg/eslegclient/connection.go index b591307c444b..ed8bc109b17f 100644 --- a/libbeat/esleg/eslegclient/connection.go +++ b/libbeat/esleg/eslegclient/connection.go @@ -26,6 +26,8 @@ import ( "net/url" "time" + "go.elastic.co/apm/module/apmhttp" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" @@ -119,12 +121,12 @@ func NewConnection(s ConnectionSettings) (*Connection, error) { return &Connection{ ConnectionSettings: s, HTTP: &http.Client{ - Transport: &http.Transport{ + Transport: apmhttp.WrapRoundTripper(&http.Transport{ Dial: dialer.Dial, DialTLS: tlsDialer.Dial, TLSClientConfig: s.TLS.ToConfig(), Proxy: proxy, - }, + }), Timeout: s.Timeout, }, Encoder: encoder, diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 9e8469ab547a..468324eec481 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -18,6 +18,7 @@ package elasticsearch import ( + "context" "encoding/json" "fmt" "net/http" @@ -103,7 +104,7 @@ func (c *publishClient) Close() error { return c.es.Close() } -func (c *publishClient) Publish(batch publisher.Batch) error { +func (c *publishClient) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() var failed []publisher.Event var reason error @@ -141,7 +142,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error { case report.FormatXPackMonitoringBulk: err = c.publishXPackBulk(params, event, typ) case report.FormatBulk: - err = c.publishBulk(event, typ) + err = c.publishBulk(ctx, event, typ) } if err != nil { @@ -186,7 +187,7 @@ func (c *publishClient) publishXPackBulk(params map[string]string, event publish return err } -func (c *publishClient) publishBulk(event publisher.Event, typ string) error { +func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, typ string) error { meta := common.MapStr{ "_index": getMonitoringIndexName(), "_routing": nil, @@ -233,7 +234,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error { // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. - _, result, err := c.es.Bulk(getMonitoringIndexName(), "", nil, bulk[:]) + _, result, err := c.es.Bulk(ctx, getMonitoringIndexName(), "", nil, bulk[:]) if err != nil { return err } diff --git a/libbeat/outputs/backoff.go b/libbeat/outputs/backoff.go index 256b8029b099..5c1ece2e5db4 100644 --- a/libbeat/outputs/backoff.go +++ b/libbeat/outputs/backoff.go @@ -18,6 +18,7 @@ package outputs import ( + "context" "errors" "time" @@ -56,8 +57,8 @@ func (b *backoffClient) Close() error { return err } -func (b *backoffClient) Publish(batch publisher.Batch) error { - err := b.client.Publish(batch) +func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error { + err := b.client.Publish(ctx, batch) if err != nil { b.client.Close() } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index 79aee6957d62..bbce8f449a9a 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -19,6 +19,7 @@ package console import ( "bufio" + "context" "fmt" "os" "runtime" @@ -102,7 +103,7 @@ func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*co } func (c *console) Close() error { return nil } -func (c *console) Publish(batch publisher.Batch) error { +func (c *console) Publish(_ context.Context, batch publisher.Batch) error { st := c.observer events := batch.Events() st.NewBatch(len(events)) diff --git a/libbeat/outputs/console/console_test.go b/libbeat/outputs/console/console_test.go index 29201beee54c..a8e85601a89a 100644 --- a/libbeat/outputs/console/console_test.go +++ b/libbeat/outputs/console/console_test.go @@ -21,6 +21,7 @@ package console import ( "bytes" + "context" "io" "os" "testing" @@ -130,7 +131,7 @@ func run(codec codec.Codec, batches ...publisher.Batch) (string, error) { return withStdout(func() { c, _ := newConsole("test", outputs.NewNilObserver(), codec) for _, b := range batches { - c.Publish(b) + c.Publish(context.Background(), b) } }) } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 2969c0f057bc..8de8e04d261a 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -18,12 +18,15 @@ package elasticsearch import ( + "context" "encoding/base64" "errors" "fmt" "net/http" "time" + "go.elastic.co/apm" + "github.com/elastic/beats/v7/libbeat/testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -169,9 +172,9 @@ func (client *Client) Clone() *Client { return c } -func (client *Client) Publish(batch publisher.Batch) error { +func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() - rest, err := client.publishEvents(events) + rest, err := client.publishEvents(ctx, events) if len(rest) == 0 { batch.ACK() } else { @@ -183,9 +186,9 @@ func (client *Client) Publish(batch publisher.Batch) error { // PublishEvents sends all events to elasticsearch. On error a slice with all // events not published or confirmed to be processed by elasticsearch will be // returned. The input slice backing memory will be reused by return the value. -func (client *Client) publishEvents( - data []publisher.Event, -) ([]publisher.Event, error) { +func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) { + span, ctx := apm.StartSpan(ctx, "publishEvents", "output") + defer span.End() begin := time.Now() st := client.observer @@ -200,8 +203,10 @@ func (client *Client) publishEvents( // encode events into bulk request buffer, dropping failed elements from // events slice origCount := len(data) + span.Context.SetLabel("events_original", origCount) data, bulkItems := bulkEncodePublishRequest(client.log, client.conn.GetVersion(), client.index, client.pipeline, data) newCount := len(data) + span.Context.SetLabel("events_encoded", newCount) if st != nil && origCount > newCount { st.Dropped(origCount - newCount) } @@ -209,14 +214,18 @@ func (client *Client) publishEvents( return nil, nil } - status, result, sendErr := client.conn.Bulk("", "", nil, bulkItems) + status, result, sendErr := client.conn.Bulk(ctx, "", "", nil, bulkItems) if sendErr != nil { - client.log.Errorf("Failed to perform any bulk index operations: %s", sendErr) + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) + err.Send() + client.log.Error(err) return data, sendErr } + pubCount := len(data) + span.Context.SetLabel("events_published", pubCount) client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", - len(data), + pubCount, time.Now().Sub(begin)) // check response for transient errors diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 1e01b757da05..4ff5d3664c6d 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -58,7 +58,7 @@ func TestClientPublishEvent(t *testing.T) { }, }) - err := output.Publish(batch) + err := output.Publish(context.Background(), batch) if err != nil { t.Fatal(err) } @@ -96,7 +96,7 @@ func TestClientPublishEventWithPipeline(t *testing.T) { } publish := func(event beat.Event) { - err := output.Publish(outest.NewBatch(event)) + err := output.Publish(context.Background(), outest.NewBatch(event)) if err != nil { t.Fatal(err) } @@ -177,7 +177,7 @@ func TestClientBulkPublishEventsWithPipeline(t *testing.T) { } publish := func(events ...beat.Event) { - err := output.Publish(outest.NewBatch(events...)) + err := output.Publish(context.Background(), outest.NewBatch(events...)) if err != nil { t.Fatal(err) } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index d69849dabab5..5219d052987a 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -20,6 +20,7 @@ package elasticsearch import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -242,7 +243,7 @@ func TestClientWithHeaders(t *testing.T) { }} batch := outest.NewBatch(event, event, event) - err = client.Publish(batch) + err = client.Publish(context.Background(), batch) assert.NoError(t, err) assert.Equal(t, 2, requestCount) } diff --git a/libbeat/outputs/failover.go b/libbeat/outputs/failover.go index b388a58a61f0..f64720a7895c 100644 --- a/libbeat/outputs/failover.go +++ b/libbeat/outputs/failover.go @@ -18,6 +18,7 @@ package outputs import ( + "context" "errors" "fmt" "math/rand" @@ -91,12 +92,12 @@ func (f *failoverClient) Close() error { return f.clients[f.active].Close() } -func (f *failoverClient) Publish(batch publisher.Batch) error { +func (f *failoverClient) Publish(ctx context.Context, batch publisher.Batch) error { if f.active < 0 { batch.Retry() return errNoActiveConnection } - return f.clients[f.active].Publish(batch) + return f.clients[f.active].Publish(ctx, batch) } func (f *failoverClient) Test(d testing.Driver) { diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index c3f5d3c5e4e9..2c2f52162942 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -18,6 +18,7 @@ package fileout import ( + "context" "os" "path/filepath" @@ -109,9 +110,7 @@ func (out *fileOutput) Close() error { return out.rotator.Close() } -func (out *fileOutput) Publish( - batch publisher.Batch, -) error { +func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { defer batch.ACK() st := out.observer diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 99e41bd1a8b3..c785f9e729f2 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -18,6 +18,7 @@ package kafka import ( + "context" "errors" "fmt" "strings" @@ -126,7 +127,7 @@ func (c *client) Close() error { return nil } -func (c *client) Publish(batch publisher.Batch) error { +func (c *client) Publish(_ context.Context, batch publisher.Batch) error { events := batch.Events() c.observer.NewBatch(len(events)) diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 58d03d1c1e70..af46aa65c731 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -20,6 +20,7 @@ package kafka import ( + "context" "encoding/json" "fmt" "math/rand" @@ -220,7 +221,7 @@ func TestKafkaPublish(t *testing.T) { } wg.Add(1) - output.Publish(batch) + output.Publish(context.Background(), batch) } // wait for all published batches to be ACKed diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index bcbbbdbc4282..f196d137b88e 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -18,6 +18,7 @@ package logstash import ( + "context" "errors" "net" "sync" @@ -134,7 +135,7 @@ func (c *asyncClient) Close() error { return c.Client.Close() } -func (c *asyncClient) Publish(batch publisher.Batch) error { +func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { st := c.observer events := batch.Events() st.NewBatch(len(events)) diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go index 5e6e416a0b40..04d97d8c40b1 100644 --- a/libbeat/outputs/logstash/async_test.go +++ b/libbeat/outputs/logstash/async_test.go @@ -20,6 +20,7 @@ package logstash import ( + "context" "sync" "testing" "time" @@ -85,7 +86,7 @@ func newAsyncTestDriver(client outputs.NetworkClient) *testAsyncDriver { case driverCmdClose: driver.client.Close() case driverCmdPublish: - err := driver.client.Publish(cmd.batch) + err := driver.client.Publish(context.Background(), cmd.batch) driver.returns = append(driver.returns, testClientReturn{cmd.batch, err}) } } diff --git a/libbeat/outputs/logstash/logstash_test.go b/libbeat/outputs/logstash/logstash_test.go index 06d15567ec5f..7b8adeb8f439 100644 --- a/libbeat/outputs/logstash/logstash_test.go +++ b/libbeat/outputs/logstash/logstash_test.go @@ -18,6 +18,7 @@ package logstash import ( + "context" "fmt" "os" "testing" @@ -126,7 +127,7 @@ func testConnectionType( batch.OnSignal = func(_ outest.BatchSignal) { close(sig) } - err = output.Publish(batch) + err = output.Publish(context.Background(), batch) t.Log("wait signal") <-sig diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index d13740d37f86..22e133db906c 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -18,6 +18,7 @@ package logstash import ( + "context" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -101,7 +102,7 @@ func (c *syncClient) reconnect() error { return c.Client.Connect() } -func (c *syncClient) Publish(batch publisher.Batch) error { +func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { events := batch.Events() st := c.observer diff --git a/libbeat/outputs/logstash/sync_test.go b/libbeat/outputs/logstash/sync_test.go index af90cfa130dd..3ba9e6822327 100644 --- a/libbeat/outputs/logstash/sync_test.go +++ b/libbeat/outputs/logstash/sync_test.go @@ -20,6 +20,7 @@ package logstash import ( + "context" "sync" "testing" "time" @@ -99,7 +100,7 @@ func newClientTestDriver(client outputs.NetworkClient) *testSyncDriver { case driverCmdClose: driver.client.Close() case driverCmdPublish: - err := driver.client.Publish(cmd.batch) + err := driver.client.Publish(context.Background(), cmd.batch) driver.returns = append(driver.returns, testClientReturn{cmd.batch, err}) } } diff --git a/libbeat/outputs/outputs.go b/libbeat/outputs/outputs.go index c6808321ce76..9e431c152f75 100644 --- a/libbeat/outputs/outputs.go +++ b/libbeat/outputs/outputs.go @@ -21,6 +21,8 @@ package outputs import ( + "context" + "github.com/elastic/beats/v7/libbeat/publisher" ) @@ -34,7 +36,7 @@ type Client interface { // Using Retry/Cancelled a client can return a batch of unprocessed events to // the publisher pipeline. The publisher pipeline (if configured by the output // factory) will take care of retrying/dropping events. - Publish(publisher.Batch) error + Publish(context.Context, publisher.Batch) error // String identifies the client type and endpoint. String() string diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index 30107df90fa5..41f448ca318e 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -18,6 +18,7 @@ package redis import ( + "context" "time" "github.com/garyburd/redigo/redis" @@ -78,8 +79,8 @@ func (b *backoffClient) Close() error { return err } -func (b *backoffClient) Publish(batch publisher.Batch) error { - err := b.client.Publish(batch) +func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error { + err := b.client.Publish(ctx, batch) if err != nil { b.client.Close() b.updateFailReason(err) diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index fbaa40f4d3e3..70e316cba3f2 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -18,6 +18,7 @@ package redis import ( + "context" "errors" "regexp" "strconv" @@ -134,7 +135,7 @@ func (c *client) Close() error { return c.Client.Close() } -func (c *client) Publish(batch publisher.Batch) error { +func (c *client) Publish(_ context.Context, batch publisher.Batch) error { if c == nil { panic("no client") } diff --git a/libbeat/outputs/redis/redis_integration_test.go b/libbeat/outputs/redis/redis_integration_test.go index 66c3375246a1..25189fa90080 100644 --- a/libbeat/outputs/redis/redis_integration_test.go +++ b/libbeat/outputs/redis/redis_integration_test.go @@ -20,6 +20,7 @@ package redis import ( + "context" "encoding/json" "fmt" "os" @@ -348,7 +349,7 @@ func sendTestEvents(out outputs.Client, batches, N int) error { } batch := outest.NewBatch(events...) - err := out.Publish(batch) + err := out.Publish(context.Background(), batch) if err != nil { return err } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 05bd65338a98..46e6425c7dd2 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -18,6 +18,8 @@ package pipeline import ( + "go.elastic.co/apm" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" @@ -39,6 +41,8 @@ type outputController struct { retryer *retryer consumer *eventConsumer out *outputGroup + + getTracer func() *apm.Tracer } // outputGroup configures a group of load balanced outputs with shared work queue. @@ -63,12 +67,14 @@ func newOutputController( monitors Monitors, observer outputObserver, b queue.Queue, + traceGetter func() *apm.Tracer, ) *outputController { c := &outputController{ - beat: beat, - monitors: monitors, - observer: observer, - queue: b, + beat: beat, + monitors: monitors, + observer: observer, + queue: b, + getTracer: traceGetter, } ctx := &batchContext{} @@ -103,7 +109,7 @@ func (c *outputController) Set(outGrp outputs.Group) { queue := makeWorkQueue() worker := make([]outputWorker, len(clients)) for i, client := range clients { - worker[i] = makeClientWorker(c.observer, queue, client) + worker[i] = makeClientWorker(c.observer, queue, client, c.getTracer) } grp := &outputGroup{ workQueue: queue, diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go index f32785a8d22b..cf1b276db91d 100644 --- a/libbeat/publisher/pipeline/nilpipeline.go +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -17,7 +17,9 @@ package pipeline -import "github.com/elastic/beats/v7/libbeat/beat" +import ( + "github.com/elastic/beats/v7/libbeat/beat" +) type nilPipeline struct{} diff --git a/libbeat/publisher/pipeline/output.go b/libbeat/publisher/pipeline/output.go index 07b27f4d3f36..2563bd6e258f 100644 --- a/libbeat/publisher/pipeline/output.go +++ b/libbeat/publisher/pipeline/output.go @@ -18,6 +18,11 @@ package pipeline import ( + "context" + "fmt" + + "go.elastic.co/apm" + "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" @@ -41,15 +46,18 @@ type netClientWorker struct { batchSize int batchSizer func() int logger *logp.Logger + + getTracer func() *apm.Tracer } -func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker { +func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client, traceGetter func() *apm.Tracer) outputWorker { if nc, ok := client.(outputs.NetworkClient); ok { c := &netClientWorker{ - observer: observer, - qu: qu, - client: nc, - logger: logp.NewLogger("publisher_pipeline_output"), + observer: observer, + qu: qu, + client: nc, + logger: logp.NewLogger("publisher_pipeline_output"), + getTracer: traceGetter, } go c.run() return c @@ -69,7 +77,7 @@ func (w *clientWorker) run() { for batch := range w.qu { w.observer.outBatchSend(len(batch.events)) - if err := w.client.Publish(batch); err != nil { + if err := w.client.Publish(context.TODO(), batch); err != nil { break } } @@ -114,6 +122,7 @@ func (w *netClientWorker) run() { } // send loop + tracer := w.getTracer() for batch := range w.qu { if w.closed.Load() { if batch != nil { @@ -122,10 +131,21 @@ func (w *netClientWorker) run() { return } - err := w.client.Publish(batch) - if err != nil { - w.logger.Errorf("Failed to publish events: %v", err) - // on error return to connect loop + if err := func() error { + tx := tracer.StartTransaction("publish", "output") + defer tx.End() + tx.Context.SetLabel("worker", "netclient") + ctx := apm.ContextWithTransaction(context.Background(), tx) + err := w.client.Publish(ctx, batch) + if err != nil { + err = fmt.Errorf("failed to publish events: %w", err) + apm.CaptureError(ctx, err).Send() + w.logger.Error(err) + // on error return to connect loop + return err + } + return nil + }(); err != nil { break } } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 7f462b91152f..d6ad875ce619 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -26,6 +26,8 @@ import ( "sync" "time" + "go.elastic.co/apm" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/atomic" @@ -82,6 +84,7 @@ type Pipeline struct { sigNewClient chan *client processors processing.Supporter + tracer *apm.Tracer } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -154,6 +157,11 @@ func New( monitors.Logger = logp.NewLogger("publish") } + tracer, err := apm.NewTracer(beat.Beat, beat.Version) + if err != nil { + panic(err) + } + p := &Pipeline{ beatInfo: beat, monitors: monitors, @@ -161,6 +169,7 @@ func New( waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, processors: settings.Processors, + tracer: tracer, } p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) @@ -195,12 +204,21 @@ func New( } p.eventSema = newSema(maxEvents) - p.output = newOutputController(beat, monitors, p.observer, p.queue) + p.output = newOutputController(beat, monitors, p.observer, p.queue, p.getTracer) p.output.Set(out) return p, nil } +func (p *Pipeline) getTracer() *apm.Tracer { + return p.tracer +} + +func (p *Pipeline) SetTracer(tracer *apm.Tracer) error { + p.tracer = tracer + return nil +} + // SetACKHandler sets a global ACK handler on all events published to the pipeline. // SetACKHandler must be called before any connection is made. func (p *Pipeline) SetACKHandler(handler beat.PipelineACKHandler) error { diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 692d62f98ab3..00afb1ac74e9 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -18,6 +18,7 @@ package stress import ( + "context" "math/rand" "time" @@ -70,7 +71,7 @@ func makeTestOutput(_ outputs.IndexManager, beat beat.Info, observer outputs.Obs func (*testOutput) Close() error { return nil } -func (t *testOutput) Publish(batch publisher.Batch) error { +func (t *testOutput) Publish(_ context.Context, batch publisher.Batch) error { config := &t.config n := len(batch.Events()) diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 401aa833c9d5..fafa11372dc2 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -19,6 +19,10 @@ package testing // ChanClient implements Client interface, forwarding published events to some import ( + "errors" + + "go.elastic.co/apm" + "github.com/elastic/beats/v7/libbeat/beat" ) @@ -48,6 +52,10 @@ func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error { panic("Not supported") } +func (pub *TestPublisher) SetTracer(_ *apm.Tracer) error { + return errors.New("not supported") +} + func NewChanClient(bufSize int) *ChanClient { return NewChanClientWith(make(chan beat.Event, bufSize)) } diff --git a/x-pack/functionbeat/function/core/sync_client_test.go b/x-pack/functionbeat/function/core/sync_client_test.go index 623729525d06..8f9abdc0e680 100644 --- a/x-pack/functionbeat/function/core/sync_client_test.go +++ b/x-pack/functionbeat/function/core/sync_client_test.go @@ -7,6 +7,8 @@ package core import ( "testing" + "go.elastic.co/apm" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" @@ -53,6 +55,10 @@ func (d *dummyPipeline) SetACKHandler(ackhandler beat.PipelineACKHandler) error return nil } +func (d *dummyPipeline) SetTracer(_ *apm.Tracer) error { + return nil +} + func TestSyncClient(t *testing.T) { receiver := func(c *dummyClient, sc *SyncClient) { select {