Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument beat output client #17274

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/garyburd/redigo v1.0.1-0.20160525165706-b8dc90050f24
github.com/go-ole/go-ole v1.2.5-0.20190920104607-14974a1cf647 // indirect
github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect
github.com/go-sql-driver/mysql v1.4.1
github.com/go-sql-driver/mysql v1.5.0
github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be
github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e
github.com/godror/godror v0.10.4
Expand Down Expand Up @@ -142,6 +142,8 @@ require (
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
go.elastic.co/apm v1.7.2
go.elastic.co/apm/module/apmhttp v1.7.2
go.uber.org/atomic v1.3.1
go.uber.org/multierr v1.1.1-0.20170829224307-fb7d312c2c04
go.uber.org/zap v1.7.1
Expand Down
17 changes: 15 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5 h1:nkZ9axP+MvUFCu8JRN/MCY+DmTfs6lY7hE0QnJbxSdI=
github.com/antlr/antlr4 v0.0.0-20200225173536-225249fdaef5/go.mod h1:T7PbCXFs94rrTttyxjbyT5+/1V8T2TYDejxUfHJjw1Y=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-lambda-go v1.6.0 h1:T+u/g79zPKw1oJM7xYhvpq7i4Sjc0iVsXZUaqRVVSOg=
Expand Down Expand Up @@ -171,6 +173,8 @@ github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea h1:n2Ltr3SrfQlf/9nOna1DoGKxLx3qTSI8Ttl6Xrqp6mw=
github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/cucumber/godog v0.8.1 h1:lVb+X41I4YDreE+ibZ50bdXmySxgRviYFgKY6Aw4XE8=
github.com/cucumber/godog v0.8.1/go.mod h1:vSh3r/lM+psC1BPXvdkSEuNjmXfpVqrMGYAElF6hxnA=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -237,6 +241,7 @@ github.com/elastic/go-seccomp-bpf v1.1.0 h1:jUzzDc6LyCtdolZdvL/26dad6rZ9vsc7xZ2e
github.com/elastic/go-seccomp-bpf v1.1.0/go.mod h1:l+89Vy5BzjVcaX8USZRMOwmwwDScE+vxCFzzvQwN7T8=
github.com/elastic/go-structform v0.0.6 h1:wqeK4LwD2NNDOoRGTImE24S6pkCDVr8+oUSIkmChzLk=
github.com/elastic/go-structform v0.0.6/go.mod h1:QrMyP3oM9Sjk92EVGLgRaL2lKt0Qx7ZNDRWDxB6khVs=
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-sysinfo v1.3.0 h1:eb2XFGTMlSwG/yyU9Y8jVAYLIzU2sFzWXwo2gmetyrE=
github.com/elastic/go-sysinfo v1.3.0/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
github.com/elastic/go-txfile v0.0.7 h1:Yn28gclW7X0Qy09nSMSsx0uOAvAGMsp6XHydbiLVe2s=
Expand Down Expand Up @@ -290,8 +295,8 @@ github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dp
github.com/go-sourcemap/sourcemap v2.1.2+incompatible h1:0b/xya7BKGhXuqFESKM4oIiRo9WOt2ebz7KxfreD6ug=
github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be h1:zXHeEEJ231bTf/IXqvCfeaqjLpXsq42ybLoT4ROSR6Y=
Expand Down Expand Up @@ -593,6 +598,8 @@ github.com/sanathkr/go-yaml v0.0.0-20170819195128-ed9d249f429b/go.mod h1:8458kAa
github.com/sanathkr/yaml v0.0.0-20170819201035-0056894fa522/go.mod h1:tQTYKOQgxoH3v6dEmdHiz4JG+nbxWwM5fgPQUpSZqVQ=
github.com/sanathkr/yaml v1.0.1-0.20170819201035-0056894fa522 h1:39BJIaZIhIBmXATIhdlTBlTQpAiGXHnz17CrO7vF2Ss=
github.com/sanathkr/yaml v1.0.1-0.20170819201035-0056894fa522/go.mod h1:tQTYKOQgxoH3v6dEmdHiz4JG+nbxWwM5fgPQUpSZqVQ=
github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis=
github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.19.11+incompatible h1:lJHR0foqAjI4exXqWsU3DbH7bX1xvdhGdnXTIARA9W4=
github.com/shirou/gopsutil v2.19.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down Expand Up @@ -666,6 +673,12 @@ github.com/xeipuuv/gojsonschema v0.0.0-20181112162635-ac52e6811b56 h1:yhqBHs09Sm
github.com/xeipuuv/gojsonschema v0.0.0-20181112162635-ac52e6811b56/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 h1:0gYLpmzecnaDCoeWxSfEJ7J1b6B/67+NV++4HKQXx+Y=
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7/go.mod h1:aEV29XrmTYFr3CiRxZeGHpkvbwq+prZduBqMaascyCU=
go.elastic.co/apm v1.7.2 h1:0nwzVIPp4PDBXSYYtN19+1W5V+sj+C25UjqxDVoKcA8=
go.elastic.co/apm v1.7.2/go.mod h1:tCw6CkOJgkWnzEthFN9HUP1uL3Gjc/Ur6m7gRPLaoH0=
go.elastic.co/apm/module/apmhttp v1.7.2 h1:2mRh7SwBuEVLmJlX+hsMdcSg9xaielCLElaPn/+i34w=
go.elastic.co/apm/module/apmhttp v1.7.2/go.mod h1:sTFWiWejnhSdZv6+dMgxGec2Nxe/ZKfHfz/xtRM+cRY=
go.elastic.co/fastjson v1.0.0 h1:ooXV/ABvf+tBul26jcVViPT3sBir0PvXgibYB1IQQzg=
go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
Expand Down
5 changes: 5 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package beat
import (
"time"

"go.elastic.co/apm"

"github.com/elastic/beats/v7/libbeat/common"
)

type Pipeline interface {
PipelineConnector
SetACKHandler(PipelineACKHandler) error
SetTracer(*apm.Tracer) error
}

// PipelineConnector creates a publishing Client. This is typically backed by a Pipeline.
Expand Down Expand Up @@ -78,6 +81,8 @@ type ClientConfig struct {
// ACKLastEvent reports the last ACKed event out of a batch of ACKed events only.
// Only the events 'Private' field will be reported.
ACKLastEvent func(interface{})

Tracer *apm.Tracer
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need another local Tracer?

}

// CloseRef allows users to close the client asynchronously.
Expand Down
8 changes: 8 additions & 0 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package eslegclient

import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"strings"

"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)
Expand Down Expand Up @@ -59,6 +63,7 @@ type BulkResult json.RawMessage
// Bulk performs many index/delete operations in a single API call.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
func (conn *Connection) Bulk(
ctx context.Context,
index, docType string,
params map[string]string, body []interface{},
) (int, BulkResult, error) {
Expand All @@ -69,13 +74,16 @@ func (conn *Connection) Bulk(
enc := conn.Encoder
enc.Reset()
if err := bulkEncode(conn.log, enc, body); err != nil {
apm.CaptureError(ctx, err).Send()
return 0, nil, err
}

requ, err := newBulkRequest(conn.URL, index, docType, params, enc)
if err != nil {
apm.CaptureError(ctx, err).Send()
return 0, nil, err
}
requ.requ = apmhttp.RequestWithContext(ctx, requ.requ)

return conn.sendBulkRequest(requ)
}
Expand Down
7 changes: 4 additions & 3 deletions libbeat/esleg/eslegclient/bulkapi_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package eslegclient

import (
"context"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -60,7 +61,7 @@ func TestOneHostSuccessResp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
_, _, err := client.Bulk(index, "type1", params, body)
_, _, err := client.Bulk(context.Background(), index, "type1", params, body)
if err != nil {
t.Errorf("Bulk() returns error: %s", err)
}
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestOneHost500Resp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
_, _, err := client.Bulk(index, "type1", params, body)
_, _, err := client.Bulk(context.Background(), index, "type1", params, body)
if err == nil {
t.Errorf("Bulk() should return error.")
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestOneHost503Resp_Bulk(t *testing.T) {
params := map[string]string{
"refresh": "true",
}
_, _, err := client.Bulk(index, "type1", params, body)
_, _, err := client.Bulk(context.Background(), index, "type1", params, body)
if err == nil {
t.Errorf("Bulk() should return error.")
}
Expand Down
6 changes: 4 additions & 2 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"net/url"
"time"

"go.elastic.co/apm/module/apmhttp"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
Expand Down Expand Up @@ -119,12 +121,12 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
return &Connection{
ConnectionSettings: s,
HTTP: &http.Client{
Transport: &http.Transport{
Transport: apmhttp.WrapRoundTripper(&http.Transport{
Dial: dialer.Dial,
DialTLS: tlsDialer.Dial,
TLSClientConfig: s.TLS.ToConfig(),
Proxy: proxy,
},
}),
Timeout: s.Timeout,
},
Encoder: encoder,
Expand Down
9 changes: 5 additions & 4 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package elasticsearch

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -103,7 +104,7 @@ func (c *publishClient) Close() error {
return c.es.Close()
}

func (c *publishClient) Publish(batch publisher.Batch) error {
func (c *publishClient) Publish(ctx context.Context, batch publisher.Batch) error {
events := batch.Events()
var failed []publisher.Event
var reason error
Expand Down Expand Up @@ -141,7 +142,7 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
case report.FormatXPackMonitoringBulk:
err = c.publishXPackBulk(params, event, typ)
case report.FormatBulk:
err = c.publishBulk(event, typ)
err = c.publishBulk(ctx, event, typ)
}

if err != nil {
Expand Down Expand Up @@ -186,7 +187,7 @@ func (c *publishClient) publishXPackBulk(params map[string]string, event publish
return err
}

func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, typ string) error {
meta := common.MapStr{
"_index": getMonitoringIndexName(),
"_routing": nil,
Expand Down Expand Up @@ -233,7 +234,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, result, err := c.es.Bulk(getMonitoringIndexName(), "", nil, bulk[:])
_, result, err := c.es.Bulk(ctx, getMonitoringIndexName(), "", nil, bulk[:])
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions libbeat/outputs/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package outputs

import (
"context"
"errors"
"time"

Expand Down Expand Up @@ -56,8 +57,8 @@ func (b *backoffClient) Close() error {
return err
}

func (b *backoffClient) Publish(batch publisher.Batch) error {
err := b.client.Publish(batch)
func (b *backoffClient) Publish(ctx context.Context, batch publisher.Batch) error {
err := b.client.Publish(ctx, batch)
if err != nil {
b.client.Close()
}
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package console

import (
"bufio"
"context"
"fmt"
"os"
"runtime"
Expand Down Expand Up @@ -102,7 +103,7 @@ func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*co
}

func (c *console) Close() error { return nil }
func (c *console) Publish(batch publisher.Batch) error {
func (c *console) Publish(_ context.Context, batch publisher.Batch) error {
st := c.observer
events := batch.Events()
st.NewBatch(len(events))
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/console/console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package console

import (
"bytes"
"context"
"io"
"os"
"testing"
Expand Down Expand Up @@ -130,7 +131,7 @@ func run(codec codec.Codec, batches ...publisher.Batch) (string, error) {
return withStdout(func() {
c, _ := newConsole("test", outputs.NewNilObserver(), codec)
for _, b := range batches {
c.Publish(b)
c.Publish(context.Background(), b)
}
})
}
Expand Down
25 changes: 17 additions & 8 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package elasticsearch

import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"time"

"go.elastic.co/apm"

"github.com/elastic/beats/v7/libbeat/testing"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -169,9 +172,9 @@ func (client *Client) Clone() *Client {
return c
}

func (client *Client) Publish(batch publisher.Batch) error {
func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error {
events := batch.Events()
rest, err := client.publishEvents(events)
rest, err := client.publishEvents(ctx, events)
if len(rest) == 0 {
batch.ACK()
} else {
Expand All @@ -183,9 +186,9 @@ func (client *Client) Publish(batch publisher.Batch) error {
// PublishEvents sends all events to elasticsearch. On error a slice with all
// events not published or confirmed to be processed by elasticsearch will be
// returned. The input slice backing memory will be reused by return the value.
func (client *Client) publishEvents(
data []publisher.Event,
) ([]publisher.Event, error) {
func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) {
span, ctx := apm.StartSpan(ctx, "publishEvents", "output")
defer span.End()
begin := time.Now()
st := client.observer

Expand All @@ -200,23 +203,29 @@ func (client *Client) publishEvents(
// encode events into bulk request buffer, dropping failed elements from
// events slice
origCount := len(data)
span.Context.SetLabel("events_original", origCount)
data, bulkItems := bulkEncodePublishRequest(client.log, client.conn.GetVersion(), client.index, client.pipeline, data)
newCount := len(data)
span.Context.SetLabel("events_encoded", newCount)
if st != nil && origCount > newCount {
st.Dropped(origCount - newCount)
}
if newCount == 0 {
return nil, nil
}

status, result, sendErr := client.conn.Bulk("", "", nil, bulkItems)
status, result, sendErr := client.conn.Bulk(ctx, "", "", nil, bulkItems)
if sendErr != nil {
client.log.Errorf("Failed to perform any bulk index operations: %s", sendErr)
err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr))
err.Send()
client.log.Error(err)
return data, sendErr
}
pubCount := len(data)
span.Context.SetLabel("events_published", pubCount)

client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.",
len(data),
pubCount,
time.Now().Sub(begin))

// check response for transient errors
Expand Down
Loading