Skip to content

Commit

Permalink
ddtrace/tracer: refactor the implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
knusbaum committed Dec 19, 2019
1 parent 7528a1a commit 02b4a67
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 46 deletions.
8 changes: 4 additions & 4 deletions ddtrace/tracer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func TestReportHealthMetrics(t *testing.T) {
exitChan: make(chan struct{}),
payloadChan: make(chan []*span, payloadQueueSize),
stopped: make(chan struct{}),
climit: make(chan struct{}, concurrentConnectionLimit),
prioritySampling: newPrioritySampler(),
}
internal.SetGlobalTracer(trc)
Expand Down Expand Up @@ -316,14 +317,13 @@ func TestTracerMetrics(t *testing.T) {
tracer, _, stop := startTestTracer(withStatsdClient(&tg))

tracer.StartSpan("operation").Finish()
flush := make(chan struct{})
tracer.flushChan <- flush
<-flush
tracer.flushChan <- nil
tg.Wait(5, 100*time.Millisecond)

calls := tg.CallsByName()
counts := tg.Counts()
assert.Equal(1, calls["datadog.tracer.started"])
assert.Equal(1, calls["datadog.tracer.flush_triggered"])
assert.True(calls["datadog.tracer.flush_triggered"] >= 1)
assert.Equal(1, calls["datadog.tracer.flush_duration"])
assert.Equal(1, calls["datadog.tracer.flush_bytes"])
assert.Equal(1, calls["datadog.tracer.flush_traces"])
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestSpanErrorNil(t *testing.T) {

// Prior to a bug fix, this failed when running `go test -race`
func TestSpanModifyWhileFlushing(t *testing.T) {
tracer, _, stop := startTestTracer()
tracer, transport, stop := startTestTracer()
defer stop()

done := make(chan struct{})
Expand All @@ -366,7 +366,7 @@ func TestSpanModifyWhileFlushing(t *testing.T) {
case <-done:
return
default:
tracer.forceFlush()
tracer.forceFlush(transport)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions ddtrace/tracer/spancontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestSpanTracePushOne(t *testing.T) {
assert.Equal(root, trace.spans[0], "the span is the one pushed before")

root.Finish()
tracer.forceFlush()
tracer.forceFlush(transport)

traces := transport.Traces()
assert.Len(traces, 1)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestSpanTracePushSeveral(t *testing.T) {
for _, span := range trace {
span.Finish()
}
tracer.forceFlush()
tracer.forceFlush(transport)

traces := transport.Traces()
assert.Len(traces, 1)
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestSpanFinishPriority(t *testing.T) {
child.Finish()
root.Finish()

tracer.forceFlush()
tracer.forceFlush(transport)

traces := transport.Traces()
assert.Len(traces, 1)
Expand Down
46 changes: 30 additions & 16 deletions ddtrace/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type tracer struct {
// stopped is a channel that will be closed when the worker has exited.
stopped chan struct{}

// climit limits the number of concurrent outgoing connections
climit chan struct{}

// stopOnce ensures the tracer is stopped exactly once.
stopOnce sync.Once

Expand Down Expand Up @@ -84,6 +87,10 @@ const (
// payloadSizeLimit specifies the maximum allowed size of the payload before
// it will trigger a flush to the transport.
payloadSizeLimit = payloadMaxLimit / 2

// concurrentConnectionLimit specifies the maximum number of concurrent outgoing
// connections allowed.
concurrentConnectionLimit = 100
)

// Start starts the tracer with the given set of options. It will stop and replace
Expand Down Expand Up @@ -165,6 +172,7 @@ func newTracer(opts ...StartOption) *tracer {
exitChan: make(chan struct{}),
payloadChan: make(chan []*span, payloadQueueSize),
stopped: make(chan struct{}),
climit: make(chan struct{}, concurrentConnectionLimit),
prioritySampling: newPrioritySampler(),
pid: strconv.Itoa(os.Getpid()),
}
Expand Down Expand Up @@ -355,23 +363,29 @@ func (t *tracer) flushPayload() {
if t.payload.itemCount() == 0 {
return
}
defer func(start time.Time) {
t.config.statsd.Timing("datadog.tracer.flush_duration", time.Since(start), nil, 1)
}(time.Now())
size, count := t.payload.size(), t.payload.itemCount()
log.Debug("Sending payload: size: %d traces: %d\n", size, count)
rc, err := t.config.transport.send(t.payload)
if err != nil {
t.config.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1)
log.Error("lost %d traces: %v", count, err)
} else {
t.config.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1)
t.config.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1)
if t.prioritySampling.readRatesJSON(rc) != nil {
t.config.statsd.Incr("datadog.tracer.decode_error", nil, 1)
t.climit <- struct{}{}
t.wg.Add(1)
go func(p *payload) {
defer func(start time.Time) {
<-t.climit
t.wg.Done()
t.config.statsd.Timing("datadog.tracer.flush_duration", time.Since(start), nil, 1)
}(time.Now())
size, count := p.size(), p.itemCount()
log.Debug("Sending payload: size: %d traces: %d\n", size, count)
rc, err := t.config.transport.send(p)
if err != nil {
t.config.statsd.Count("datadog.tracer.traces_dropped", int64(count), []string{"reason:send_failed"}, 1)
log.Error("lost %d traces: %v", count, err)
} else {
t.config.statsd.Count("datadog.tracer.flush_bytes", int64(size), nil, 1)
t.config.statsd.Count("datadog.tracer.flush_traces", int64(count), nil, 1)
if t.prioritySampling.readRatesJSON(rc) != nil {
t.config.statsd.Incr("datadog.tracer.decode_error", nil, 1)
}
}
}
t.payload.reset()
}(t.payload)
t.payload = newPayload()
}

// pushPayload pushes the trace onto the payload. If the payload becomes
Expand Down
62 changes: 41 additions & 21 deletions ddtrace/tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import (
"github.com/tinylib/msgp/msgp"
)

// forceFlush forces a flush of data (traces and services) to the agent
// synchronously.
func (t *tracer) forceFlush() {
confirm := make(chan struct{})
t.flushChan <- confirm
<-confirm
// forceFlush triggers a flush of the tracer's payload and waits for the transport to
// receive it. The wait will time out after 1 second.
func (t *tracer) forceFlush(transport *dummyTransport) {
t.flushChan <- nil
transport.waitFlush(1 * time.Second)
}

func (t *tracer) newEnvSpan(service, env string) *span {
Expand Down Expand Up @@ -136,7 +135,7 @@ func TestTracerStart(t *testing.T) {
Start()

// ensure at least one worker started and handles requests
internal.GetGlobalTracer().(*tracer).forceFlush()
internal.GetGlobalTracer().(*tracer).pushTrace([]*span{})

Stop()
Stop()
Expand All @@ -147,7 +146,7 @@ func TestTracerStart(t *testing.T) {
t.Run("deadlock/direct", func(t *testing.T) {
tr, _, stop := startTestTracer()
defer stop()
tr.forceFlush() // blocks until worker is started
tr.pushTrace([]*span{}) // blocks until worker is started
select {
case <-tr.stopped:
t.Fatal("stopped channel should be open")
Expand Down Expand Up @@ -480,7 +479,7 @@ func TestTracerPrioritySampler(t *testing.T) {
}))
addr := srv.Listener.Addr().String()

tr, _, stop := startTestTracer(
tr, transport, stop := startTestTracer(
withTransport(newHTTPTransport(addr, defaultRoundTripper)),
)
defer stop()
Expand All @@ -493,7 +492,7 @@ func TestTracerPrioritySampler(t *testing.T) {
assert.EqualValues(s.context.samplingPriority(), s.Metrics[keySamplingPriority])
s.Finish()

tr.forceFlush() // obtain new rates
tr.forceFlush(transport) // obtain new rates

for i, tt := range []struct {
service, env string
Expand Down Expand Up @@ -585,7 +584,7 @@ func TestTracerConcurrent(t *testing.T) {
}()

wg.Wait()
tracer.forceFlush()
tracer.forceFlush(transport)
traces := transport.Traces()
assert.Len(traces, 3)
assert.Len(traces[0], 1)
Expand All @@ -603,7 +602,7 @@ func TestTracerParentFinishBeforeChild(t *testing.T) {
parent := tracer.newRootSpan("pylons.request", "pylons", "/")
parent.Finish()

tracer.forceFlush()
tracer.forceFlush(transport)
traces := transport.Traces()
assert.Len(traces, 1)
assert.Len(traces[0], 1)
Expand All @@ -612,7 +611,7 @@ func TestTracerParentFinishBeforeChild(t *testing.T) {
child := tracer.newChildSpan("redis.command", parent)
child.Finish()

tracer.forceFlush()
tracer.forceFlush(transport)

traces = transport.Traces()
assert.Len(traces, 1)
Expand Down Expand Up @@ -646,7 +645,7 @@ func TestTracerConcurrentMultipleSpans(t *testing.T) {
}()

wg.Wait()
tracer.forceFlush()
tracer.forceFlush(transport)
traces := transport.Traces()
assert.Len(traces, 2)
assert.Len(traces[0], 2)
Expand All @@ -667,13 +666,16 @@ func TestTracerAtomicFlush(t *testing.T) {
span1.Finish()
span2.Finish()

tracer.forceFlush()
// Trigger a flush. Nothing should be flushed, so we can't forceFlush.
c := make(chan struct{})
tracer.flushChan <- c
<-c
traces := transport.Traces()
assert.Len(traces, 0, "nothing should be flushed now as span2 is not finished yet")

root.Finish()

tracer.forceFlush()
tracer.forceFlush(transport)
traces = transport.Traces()
assert.Len(traces, 1)
assert.Len(traces[0], 4, "all spans should show up at once")
Expand Down Expand Up @@ -795,7 +797,7 @@ func TestTracerRace(t *testing.T) {

wg.Wait()

tracer.forceFlush()
tracer.forceFlush(transport)
traces := transport.Traces()
assert.Len(traces, total, "we should have exactly as many traces as expected")
for _, trace := range traces {
Expand Down Expand Up @@ -937,7 +939,7 @@ func TestTracerFlush(t *testing.T) {
root := tracer.StartSpan("root")
tracer.StartSpan("child.direct", ChildOf(root.Context())).Finish()
root.Finish()
tracer.forceFlush()
tracer.forceFlush(transport)

list := transport.Traces()
assert.Len(list, 1)
Expand All @@ -958,7 +960,7 @@ func TestTracerFlush(t *testing.T) {
t.Fatal(err)
}
tracer.StartSpan("child.extracted", ChildOf(sctx)).Finish()
tracer.forceFlush()
tracer.forceFlush(transport)
list := transport.Traces()
assert.Len(list, 1)
assert.Len(list[0], 1)
Expand Down Expand Up @@ -1070,11 +1072,14 @@ func startTestTracer(opts ...StartOption) (*tracer, *dummyTransport, func()) {
// Mock Transport with a real Encoder
type dummyTransport struct {
sync.RWMutex
traces spanLists
traces spanLists
sendsig *sync.Cond
}

func newDummyTransport() *dummyTransport {
return &dummyTransport{traces: spanLists{}}
t := &dummyTransport{traces: spanLists{}}
t.sendsig = sync.NewCond(t)
return t
}

func (t *dummyTransport) send(p *payload) (io.ReadCloser, error) {
Expand All @@ -1085,6 +1090,7 @@ func (t *dummyTransport) send(p *payload) (io.ReadCloser, error) {
t.Lock()
t.traces = append(t.traces, traces...)
t.Unlock()
t.sendsig.Broadcast()
ok := ioutil.NopCloser(strings.NewReader("OK"))
return ok, nil
}
Expand All @@ -1095,6 +1101,20 @@ func decode(p *payload) (spanLists, error) {
return traces, err
}

func (t *dummyTransport) waitFlush(timeout time.Duration) {
c := make(chan struct{})
go func() {
t.Lock()
defer t.Unlock()
defer close(c)
t.sendsig.Wait()
}()
select {
case <-c:
case <-time.After(timeout):
}
}

func encode(traces [][]*span) (*payload, error) {
p := newPayload()
for _, t := range traces {
Expand Down

0 comments on commit 02b4a67

Please sign in to comment.