-
Notifications
You must be signed in to change notification settings - Fork 459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Refactoring dbShard #2848
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
f093774
[dbnode] Introduce Aggregator type
linasm 1b2ef76
Merge branch 'master' into linasm/aggregator-type
linasm f756658
Lint
linasm 92ed0bf
mock gen
linasm 086a007
Merge branch 'master' into linasm/aggregator-type
linasm f276767
[dbnode] Refactoring dbShard
linasm 58547ab
Merge branch 'master' into linasm/aggregator-type
linasm 553771e
[dbnode] Refactor wide query path (#2826)
arnikola 799d9ef
Rename Aggregator to TileAggregator
linasm 8fc49db
Merge branch 'master' into linasm/aggregator-type
linasm 26fe4a9
mock gen
linasm c816f86
Merge remote-tracking branch 'origin/linasm/aggregator-type' into lin…
linasm e01d4d4
Fix
linasm d8cc5fd
Formatting
linasm 00e3304
[dbnode] Introduce Aggregator type (#2840)
linasm 13c093d
Merge branch 'master' into linasm/refactor-dbShard
linasm 959e503
Merge branch 'master' into linasm/refactor-dbShard
linasm 3b47328
Address PR feedback
linasm 40f9b17
Merge branch 'master' into linasm/refactor-dbShard
linasm 9488bc5
Change error msg
linasm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,9 +29,6 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"github.com/m3db/m3/src/dbnode/encoding" | ||
"github.com/m3db/m3/src/dbnode/encoding/tile" | ||
"github.com/m3db/m3/src/dbnode/generated/proto/annotation" | ||
"github.com/m3db/m3/src/dbnode/generated/proto/pagetoken" | ||
"github.com/m3db/m3/src/dbnode/namespace" | ||
"github.com/m3db/m3/src/dbnode/persist" | ||
|
@@ -47,7 +44,6 @@ import ( | |
"github.com/m3db/m3/src/dbnode/storage/series/lookup" | ||
"github.com/m3db/m3/src/dbnode/tracepoint" | ||
"github.com/m3db/m3/src/dbnode/ts" | ||
"github.com/m3db/m3/src/dbnode/ts/downsample" | ||
"github.com/m3db/m3/src/dbnode/ts/writes" | ||
"github.com/m3db/m3/src/dbnode/x/xio" | ||
"github.com/m3db/m3/src/m3ninx/doc" | ||
|
@@ -72,18 +68,16 @@ const ( | |
) | ||
|
||
var ( | ||
errShardEntryNotFound = errors.New("shard entry not found") | ||
errShardNotOpen = errors.New("shard is not open") | ||
errShardAlreadyTicking = errors.New("shard is already ticking") | ||
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick") | ||
errShardInvalidPageToken = errors.New("shard could not unmarshal page token") | ||
errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid") | ||
errNewShardEntryTagsIterNotAtIndexZero = errors.New("new shard entry options error: tags iter not at index zero") | ||
errShardIsNotBootstrapped = errors.New("shard is not bootstrapped") | ||
errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped") | ||
errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized") | ||
errFlushStateAlreadyInitialized = errors.New("shard flush state is already initialized") | ||
errTriedToLoadNilSeries = errors.New("tried to load nil series into shard") | ||
errShardEntryNotFound = errors.New("shard entry not found") | ||
errShardNotOpen = errors.New("shard is not open") | ||
errShardAlreadyTicking = errors.New("shard is already ticking") | ||
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick") | ||
errShardInvalidPageToken = errors.New("shard could not unmarshal page token") | ||
errNewShardEntryTagsTypeInvalid = errors.New("new shard entry options error: tags type invalid") | ||
errShardIsNotBootstrapped = errors.New("shard is not bootstrapped") | ||
errShardAlreadyBootstrapped = errors.New("shard is already bootstrapped") | ||
errFlushStateIsNotInitialized = errors.New("shard flush state is not initialized") | ||
errTriedToLoadNilSeries = errors.New("tried to load nil series into shard") | ||
|
||
// ErrDatabaseLoadLimitHit is the error returned when the database load limit | ||
// is hit or exceeded. | ||
|
@@ -188,6 +182,7 @@ type dbShard struct { | |
currRuntimeOptions dbShardRuntimeOptions | ||
logger *zap.Logger | ||
metrics dbShardMetrics | ||
tileAggregator TileAggregator | ||
ticking bool | ||
shard uint32 | ||
coldWritesEnabled bool | ||
|
@@ -328,6 +323,7 @@ func newDatabaseShard( | |
coldWritesEnabled: namespaceMetadata.Options().ColdWritesEnabled(), | ||
logger: opts.InstrumentOptions().Logger(), | ||
metrics: newDatabaseShardMetrics(shard, scope), | ||
tileAggregator: opts.TileAggregator(), | ||
} | ||
s.insertQueue = newDatabaseShardInsertQueue(s.insertSeriesBatch, | ||
s.nowFn, scope, opts.InstrumentOptions().Logger()) | ||
|
@@ -2663,21 +2659,26 @@ func (s *dbShard) Repair( | |
|
||
func (s *dbShard) AggregateTiles( | ||
sourceNsID ident.ID, | ||
sourceShardID uint32, | ||
targetNs Namespace, | ||
shardID uint32, | ||
blockReaders []fs.DataFileSetReader, | ||
writer fs.StreamingWriter, | ||
sourceBlockVolumes []shardBlockVolume, | ||
opts AggregateTilesOptions, | ||
targetSchemaDescr namespace.SchemaDescr, | ||
) (int64, error) { | ||
if len(blockReaders) != len(sourceBlockVolumes) { | ||
return 0, fmt.Errorf("blockReaders and sourceBlockVolumes length mismatch (%d != %d)", len(blockReaders), len(sourceBlockVolumes)) | ||
return 0, fmt.Errorf( | ||
"blockReaders and sourceBlockVolumes length mismatch (%d != %d)", | ||
len(blockReaders), | ||
len(sourceBlockVolumes)) | ||
} | ||
|
||
openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders)) | ||
defer func() { | ||
for _, reader := range openBlockReaders { | ||
reader.Close() | ||
if err := reader.Close(); err != nil { | ||
s.logger.Error("could not close DataFileSetReader", zap.Error(err)) | ||
} | ||
} | ||
}() | ||
|
||
|
@@ -2687,7 +2688,7 @@ func (s *dbShard) AggregateTiles( | |
openOpts := fs.DataReaderOpenOptions{ | ||
Identifier: fs.FileSetFileIdentifier{ | ||
Namespace: sourceNsID, | ||
Shard: sourceShardID, | ||
Shard: shardID, | ||
BlockStart: sourceBlockVolume.blockStart, | ||
VolumeIndex: sourceBlockVolume.latestVolume, | ||
}, | ||
|
@@ -2706,46 +2707,15 @@ func (s *dbShard) AggregateTiles( | |
zap.Int("volumeIndex", sourceBlockVolume.latestVolume)) | ||
return 0, err | ||
} | ||
if blockReader.Entries() > maxEntries { | ||
maxEntries = blockReader.Entries() | ||
|
||
entries := blockReader.Entries() | ||
if entries > maxEntries { | ||
maxEntries = entries | ||
} | ||
|
||
openBlockReaders = append(openBlockReaders, blockReader) | ||
} | ||
|
||
crossBlockReader, err := fs.NewCrossBlockReader(openBlockReaders, s.opts.InstrumentOptions()) | ||
if err != nil { | ||
s.logger.Error("NewCrossBlockReader", zap.Error(err)) | ||
return 0, err | ||
} | ||
defer crossBlockReader.Close() | ||
|
||
tileOpts := tile.Options{ | ||
FrameSize: opts.Step, | ||
Start: xtime.ToUnixNano(opts.Start), | ||
ReaderIteratorPool: s.opts.ReaderIteratorPool(), | ||
} | ||
|
||
readerIter, err := tile.NewSeriesBlockIterator(crossBlockReader, tileOpts) | ||
if err != nil { | ||
s.logger.Error("error when creating new series block iterator", zap.Error(err)) | ||
return 0, err | ||
} | ||
|
||
closed := false | ||
defer func() { | ||
if !closed { | ||
if err := readerIter.Close(); err != nil { | ||
// NB: log the error on ungraceful exit. | ||
s.logger.Error("could not close read iterator on error", zap.Error(err)) | ||
} | ||
} | ||
}() | ||
|
||
encoder := s.opts.EncoderPool().Get() | ||
defer encoder.Close() | ||
encoder.Reset(opts.Start, 0, targetSchemaDescr) | ||
|
||
latestTargetVolume, err := s.LatestVolume(opts.Start) | ||
if err != nil { | ||
return 0, err | ||
|
@@ -2764,54 +2734,12 @@ func (s *dbShard) AggregateTiles( | |
return 0, err | ||
} | ||
|
||
var ( | ||
annotationPayload annotation.Payload | ||
// NB: there is a maximum of 4 datapoints per frame for counters. | ||
downsampledValues = make([]downsample.Value, 0, 4) | ||
processedTileCount int64 | ||
segmentCapacity int | ||
writerData = make([][]byte, 2) | ||
multiErr xerrors.MultiError | ||
) | ||
|
||
for readerIter.Next() { | ||
seriesIter, id, encodedTags := readerIter.Current() | ||
|
||
seriesTileCount, err := encodeAggregatedSeries(seriesIter, annotationPayload, downsampledValues, encoder) | ||
if err != nil { | ||
s.metrics.largeTilesWriteErrors.Inc(1) | ||
multiErr = multiErr.Add(err) | ||
break | ||
} | ||
|
||
if seriesTileCount == 0 { | ||
break | ||
} | ||
|
||
processedTileCount += seriesTileCount | ||
segment := encoder.DiscardReset(opts.Start, segmentCapacity, targetSchemaDescr) | ||
|
||
segmentLen := segment.Len() | ||
if segmentLen > segmentCapacity { | ||
// Will use the same capacity for the next series. | ||
segmentCapacity = segmentLen | ||
} | ||
|
||
writerData[0] = segment.Head.Bytes() | ||
writerData[1] = segment.Tail.Bytes() | ||
checksum := segment.CalculateChecksum() | ||
|
||
if err := writer.WriteAll(id, encodedTags, writerData, checksum); err != nil { | ||
s.metrics.largeTilesWriteErrors.Inc(1) | ||
multiErr = multiErr.Add(err) | ||
} else { | ||
s.metrics.largeTilesWrites.Inc(1) | ||
} | ||
|
||
segment.Finalize() | ||
} | ||
var multiErr xerrors.MultiError | ||
|
||
if err := readerIter.Err(); err != nil { | ||
processedTileCount, err := s.tileAggregator.AggregateTiles( | ||
opts, targetNs, s.ID(), openBlockReaders, writer) | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add a comment here that we still need to do cleanup etc and should log any errors from that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
// NB: cannot return on the error here, must finish writing. | ||
multiErr = multiErr.Add(err) | ||
} | ||
|
||
|
@@ -2833,11 +2761,6 @@ func (s *dbShard) AggregateTiles( | |
} | ||
} | ||
|
||
closed = true | ||
if err := readerIter.Close(); err != nil { | ||
multiErr = multiErr.Add(err) | ||
} | ||
|
||
if err := multiErr.FinalError(); err != nil { | ||
return 0, err | ||
} | ||
|
@@ -2849,102 +2772,6 @@ func (s *dbShard) AggregateTiles( | |
return processedTileCount, nil | ||
} | ||
|
||
func encodeAggregatedSeries( | ||
seriesIter tile.SeriesFrameIterator, | ||
annotationPayload annotation.Payload, | ||
downsampledValues []downsample.Value, | ||
encoder encoding.Encoder, | ||
) (int64, error) { | ||
var ( | ||
prevFrameLastValue = math.NaN() | ||
processedTileCount int64 | ||
handleValueResets bool | ||
firstUnit xtime.Unit | ||
firstAnnotation ts.Annotation | ||
err error | ||
) | ||
|
||
for seriesIter.Next() { | ||
frame := seriesIter.Current() | ||
|
||
frameValues := frame.Values() | ||
if len(frameValues) == 0 { | ||
continue | ||
} | ||
|
||
if processedTileCount == 0 { | ||
firstUnit, err = frame.Units().Value(0) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
firstAnnotation, err = frame.Annotations().Value(0) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
annotationPayload.Reset() | ||
if annotationPayload.Unmarshal(firstAnnotation) == nil { | ||
// NB: unmarshall error might be a result of some historical annotation data | ||
// which is not compatible with protobuf payload struct. This would generally mean | ||
// that metrics type is unknown, so we should ignore the error here. | ||
handleValueResets = annotationPayload.HandleValueResets | ||
} | ||
} | ||
|
||
downsampledValues = downsampledValues[:0] | ||
lastIdx := len(frameValues) - 1 | ||
|
||
if handleValueResets { | ||
// Last value plus possible few more datapoints to preserve counter semantics. | ||
downsampledValues = downsample.DownsampleCounterResets(prevFrameLastValue, frameValues, downsampledValues) | ||
} else { | ||
// Plain last value per frame. | ||
downsampledValue := downsample.Value{ | ||
FrameIndex: lastIdx, | ||
Value: frameValues[lastIdx], | ||
} | ||
downsampledValues = append(downsampledValues, downsampledValue) | ||
} | ||
|
||
if err = encodeDownsampledValues(downsampledValues, frame, firstUnit, firstAnnotation, encoder); err != nil { | ||
return 0, err | ||
} | ||
|
||
prevFrameLastValue = frameValues[lastIdx] | ||
processedTileCount++ | ||
} | ||
|
||
if err := seriesIter.Err(); err != nil { | ||
return 0, err | ||
} | ||
|
||
return processedTileCount, nil | ||
} | ||
|
||
func encodeDownsampledValues( | ||
downsampledValues []downsample.Value, | ||
frame tile.SeriesBlockFrame, | ||
unit xtime.Unit, | ||
annotation ts.Annotation, | ||
encoder encoding.Encoder, | ||
) error { | ||
for _, downsampledValue := range downsampledValues { | ||
timestamp := frame.Timestamps()[downsampledValue.FrameIndex] | ||
dp := ts.Datapoint{ | ||
Timestamp: timestamp, | ||
TimestampNanos: xtime.ToUnixNano(timestamp), | ||
Value: downsampledValue.Value, | ||
} | ||
|
||
if err := encoder.Encode(dp, unit, annotation); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (s *dbShard) BootstrapState() BootstrapState { | ||
s.RLock() | ||
bs := s.bootstrapState | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be done under RLock if we're passing in
n
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method called accepts
n
as astorage.Namespace
interface which only exposes public methods ofdbNamespace
. These public methods have locking when they access mutable fields ofdbNamespace
(eg.dbNamespace.Schema()
). So I guess this should be fine?