Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[index] Prevent duplicated writes to commit log #1375

Merged
merged 19 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@
//go:generate sh -c "mockgen -package=retention -destination=$GOPATH/src/$PACKAGE/src/dbnode/retention/retention_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/retention/types.go"
//go:generate sh -c "mockgen -package=namespace -destination=$GOPATH/src/$PACKAGE/src/dbnode/storage/namespace/namespace_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/storage/namespace/types.go"
//go:generate sh -c "mockgen -package=runtime -destination=$GOPATH/src/$PACKAGE/src/dbnode/runtime/runtime_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/runtime/types.go"
//go:generate sh -c "mockgen -package=ts -destination=$GOPATH/src/$PACKAGE/src/dbnode/ts/write_batch_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/ts/types.go"

package mocks
6 changes: 6 additions & 0 deletions src/dbnode/persist/fs/commitlog/commit_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ func (l *commitLog) write() {
continue
}

if writeBatch.SkipWrite {
// This entry should not be written to the commitlog as it is a duplicate
// datapoint.
continue
}

write := writeBatch.Write
err := l.writerState.writer.Write(write.Series,
write.Datapoint, write.Unit, write.Annotation)
Expand Down
75 changes: 74 additions & 1 deletion src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package commitlog

import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (w testWrite) assert(
require.True(t, w.series.Tags.Equal(series.Tags))

require.True(t, w.t.Equal(datapoint.Timestamp))
require.Equal(t, datapoint.Value, datapoint.Value)
require.Equal(t, w.v, datapoint.Value)
require.Equal(t, w.u, unit)
require.Equal(t, w.a, annotation)
}
Expand Down Expand Up @@ -919,3 +920,75 @@ var (
testTags2 = ident.NewTags(testTag2)
testTags3 = ident.NewTags(testTag3)
)

func TestCommitLogBatchWriteDoesNotAddErroredOrSkippedSeries(t *testing.T) {
opts, scope := newTestOptions(t, overrides{
strategy: StrategyWriteWait,
})

defer cleanup(t, opts)
commitLog := newTestCommitLog(t, opts)
finalized := 0
finalizeFn := func(_ ts.WriteBatch) {
finalized++
}

writes := ts.NewWriteBatch(4, ident.StringID("ns"), finalizeFn)

clock := mclock.NewMock()
alignedStart := clock.Now().Truncate(time.Hour)
for i := 0; i < 4; i++ {
tt := alignedStart.Add(time.Minute * time.Duration(i))
writes.Add(i, ident.StringID(fmt.Sprint(i)), tt, float64(i)*10.5, xtime.Second, nil)
}

writes.SetSkipWrite(0)
writes.SetOutcome(1, testSeries(1, "foo.bar", testTags1, 127), nil)
writes.SetOutcome(2, testSeries(2, "err.err", testTags2, 255), errors.New("oops"))
writes.SetOutcome(3, testSeries(3, "biz.qux", testTags3, 511), nil)

// Call write batch sync
wg := sync.WaitGroup{}

getAllWrites := func() int {
result := int64(0)
success, ok := snapshotCounterValue(scope, "commitlog.writes.success")
if ok {
result += success.Value()
}
errors, ok := snapshotCounterValue(scope, "commitlog.writes.errors")
if ok {
result += errors.Value()
}
return int(result)
}

ctx := context.NewContext()
defer ctx.Close()

wg.Add(1)
go func() {
defer wg.Done()
err := commitLog.WriteBatch(ctx, writes)
require.NoError(t, err)
}()

// Wait for all writes to enqueue
for getAllWrites() != 2 {
time.Sleep(time.Microsecond)
}

wg.Wait()

// Close the commit log and consequently flush
require.NoError(t, commitLog.Close())

// Assert writes occurred by reading the commit log
expected := []testWrite{
{testSeries(1, "foo.bar", testTags1, 127), alignedStart.Add(time.Minute), 10.5, xtime.Second, nil, nil},
{testSeries(3, "biz.qux", testTags3, 511), alignedStart.Add(time.Minute * 3), 31.5, xtime.Second, nil, nil},
}

assertCommitLogWritesByIterating(t, commitLog, expected)
require.Equal(t, 1, finalized)
}
37 changes: 21 additions & 16 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,12 +548,12 @@ func (d *db) Write(
return err
}

series, err := n.Write(ctx, id, timestamp, value, unit, annotation)
series, wasWritten, err := n.Write(ctx, id, timestamp, value, unit, annotation)
if err != nil {
return err
}

if !n.Options().WritesToCommitLog() {
if !n.Options().WritesToCommitLog() || !wasWritten {
return nil
}

Expand Down Expand Up @@ -585,12 +585,12 @@ func (d *db) WriteTagged(
return err
}

series, err := n.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation)
series, wasWritten, err := n.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation)
if err != nil {
return err
}

if !n.Options().WritesToCommitLog() {
if !n.Options().WritesToCommitLog() || !wasWritten {
return nil
}

Expand Down Expand Up @@ -664,12 +664,13 @@ func (d *db) writeBatch(
iter := writes.Iter()
for i, write := range iter {
var (
series ts.Series
err error
series ts.Series
wasWritten bool
err error
)

if tagged {
series, err = n.WriteTagged(
series, wasWritten, err = n.WriteTagged(
ctx,
write.Write.Series.ID,
write.TagIter,
Expand All @@ -679,7 +680,7 @@ func (d *db) writeBatch(
write.Write.Annotation,
)
} else {
series, err = n.Write(
series, wasWritten, err = n.Write(
ctx,
write.Write.Series.ID,
write.Write.Datapoint.Timestamp,
Expand All @@ -694,17 +695,21 @@ func (d *db) writeBatch(
errHandler.HandleError(write.OriginalIndex, err)
}

// Need to set the outcome in the success case so the commitlog gets the updated
// series object which contains identifiers (like the series ID) whose lifecycle
// live longer than the span of this request, making them safe for use by the async
// commitlog. Need to set the outcome in the error case so that the commitlog knows
// to skip this entry.
// Need to set the outcome in the success case so the commitlog gets the
// updated series object which contains identifiers (like the series ID)
// whose lifecycle lives longer than the span of this request, making them
// safe for use by the async commitlog. Need to set the outcome in the
// error case so that the commitlog knows to skip this entry.
writes.SetOutcome(i, series, err)
if !wasWritten || err != nil {
// This series has no additional information that needs to be written to
// the commit log; set this series to skip writing to the commit log.
writes.SetSkipWrite(i)
}
}

if !n.Options().WritesToCommitLog() {
// Finalize here because we can't rely on the commitlog to do it since we're not
// using it.
// Finalize here because we can't rely on the commitlog to do it since
// we're not using it.
writes.Finalize()
return nil
}
Expand Down
Loading