diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index 0dc98bc96b..5f0a7bff07 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -20,7 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode) -//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" +//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" diff --git a/src/dbnode/persist/fs/cross_block_reader.go b/src/dbnode/persist/fs/cross_block_reader.go new file mode 100644 index 0000000000..7642653861 --- /dev/null +++ b/src/dbnode/persist/fs/cross_block_reader.go @@ -0,0 +1,274 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "bytes" + "container/heap" + "errors" + "fmt" + "io" + "time" + + "github.com/m3db/m3/src/x/checked" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + + "go.uber.org/zap" +) + +var ( + errReaderNotOrderedByIndex = errors.New("crossBlockReader can only use DataFileSetReaders ordered by index") + errEmptyReader = errors.New("trying to read from empty reader") + _ heap.Interface = (*minHeap)(nil) +) + +type crossBlockReader struct { + dataFileSetReaders []DataFileSetReader + id ident.ID + tags ident.TagIterator + records []BlockRecord + started bool + minHeap minHeap + err error + iOpts instrument.Options +} + +// NewCrossBlockReader constructs a new CrossBlockReader based on given DataFileSetReaders. +// DataFileSetReaders must be configured to return the data in the order of index, and must be +// provided in a slice sorted by block start time. +// Callers are responsible for closing the DataFileSetReaders. +func NewCrossBlockReader(dataFileSetReaders []DataFileSetReader, iOpts instrument.Options) (CrossBlockReader, error) { + var previousStart time.Time + for _, dataFileSetReader := range dataFileSetReaders { + if !dataFileSetReader.OrderedByIndex() { + return nil, errReaderNotOrderedByIndex + } + currentStart := dataFileSetReader.Range().Start + if !currentStart.After(previousStart) { + return nil, fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", previousStart, currentStart) + } + previousStart = currentStart + } + + return &crossBlockReader{ + dataFileSetReaders: append(make([]DataFileSetReader, 0, len(dataFileSetReaders)), dataFileSetReaders...), + records: make([]BlockRecord, 0, len(dataFileSetReaders)), + iOpts: iOpts, + }, nil +} + +func (r *crossBlockReader) Next() bool { + if r.err != nil { + return false + } + + var emptyRecord BlockRecord + if !r.started { + if r.err = r.start(); r.err != nil { + return false + } + } else { + // use empty var in inner loop with "for i := range" to have compiler use memclr optimization + // see: https://codereview.appspot.com/137880043 + for i := range r.records { + r.records[i] = emptyRecord + } + } + + if len(r.minHeap) == 0 { + return false + } + + firstEntry, err := r.readOne() + if err != nil { + r.err = err + return false + } + + r.id = firstEntry.id + r.tags = firstEntry.tags + + r.records = r.records[:0] + r.records = append(r.records, BlockRecord{firstEntry.data, firstEntry.checksum}) + + for len(r.minHeap) > 0 && r.minHeap[0].id.Equal(firstEntry.id) { + nextEntry, err := r.readOne() + if err != nil { + // Close the resources that were already read but not returned to the consumer: + r.id.Finalize() + r.tags.Close() + for _, record := range r.records { + record.Data.DecRef() + record.Data.Finalize() + } + for i := range r.records { + r.records[i] = emptyRecord + } + r.records = r.records[:0] + r.err = err + return false + } + + // id and tags not needed for subsequent blocks because they are the same as in the first block + nextEntry.id.Finalize() + nextEntry.tags.Close() + + r.records = append(r.records, BlockRecord{nextEntry.data, nextEntry.checksum}) + } + + return true +} + +func (r *crossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) { + return r.id, r.tags, r.records +} + +func (r *crossBlockReader) readOne() (*minHeapEntry, error) { + if len(r.minHeap) == 0 { + return nil, errEmptyReader + } + + entry := heap.Pop(&r.minHeap).(*minHeapEntry) + if r.dataFileSetReaders[entry.dataFileSetReaderIndex] != nil { + nextEntry, err := r.readFromDataFileSet(entry.dataFileSetReaderIndex) + if err == io.EOF { + // will no longer read from this one + r.dataFileSetReaders[entry.dataFileSetReaderIndex] = nil + } else if err != nil { + return nil, err + } else if bytes.Equal(nextEntry.id.Bytes(), entry.id.Bytes()) { + err := fmt.Errorf("duplicate id %s on block starting at %s", + entry.id, r.dataFileSetReaders[entry.dataFileSetReaderIndex].Range().Start) + + instrument.EmitAndLogInvariantViolation(r.iOpts, func(l *zap.Logger) { + l.Error(err.Error()) + }) + + return nil, err + } else { + heap.Push(&r.minHeap, nextEntry) + } + } + + return entry, nil +} + +func (r *crossBlockReader) start() error { + r.started = true + r.minHeap = make([]*minHeapEntry, 0, len(r.dataFileSetReaders)) + + for i := range r.dataFileSetReaders { + entry, err := r.readFromDataFileSet(i) + if err == io.EOF { + continue + } + if err != nil { + return err + } + r.minHeap = append(r.minHeap, entry) + } + + heap.Init(&r.minHeap) + + return nil +} + +func (r *crossBlockReader) readFromDataFileSet(index int) (*minHeapEntry, error) { + id, tags, data, checksum, err := r.dataFileSetReaders[index].Read() + + if err == io.EOF { + return nil, err + } + + if err != nil { + multiErr := xerrors.NewMultiError(). + Add(err). + Add(r.Close()) + return nil, multiErr.FinalError() + } + + return &minHeapEntry{ + dataFileSetReaderIndex: index, + id: id, + tags: tags, + data: data, + checksum: checksum, + }, nil +} + +func (r *crossBlockReader) Err() error { + return r.err +} + +func (r *crossBlockReader) Close() error { + // Close the resources that were buffered in minHeap: + for i, entry := range r.minHeap { + entry.id.Finalize() + entry.tags.Close() + entry.data.DecRef() + entry.data.Finalize() + r.minHeap[i] = nil + } + + r.minHeap = r.minHeap[:0] + return nil +} + +type minHeapEntry struct { + dataFileSetReaderIndex int + id ident.ID + tags ident.TagIterator + data checked.Bytes + checksum uint32 +} + +type minHeap []*minHeapEntry + +func (h minHeap) Len() int { + return len(h) +} + +func (h minHeap) Less(i, j int) bool { + idsCmp := bytes.Compare(h[i].id.Bytes(), h[j].id.Bytes()) + if idsCmp == 0 { + return h[i].dataFileSetReaderIndex < h[j].dataFileSetReaderIndex + } + return idsCmp < 0 +} + +func (h minHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *minHeap) Push(x interface{}) { + *h = append(*h, x.(*minHeapEntry)) +} + +func (h *minHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + old[n-1] = nil + *h = old[0 : n-1] + return x +} diff --git a/src/dbnode/persist/fs/cross_block_reader_test.go b/src/dbnode/persist/fs/cross_block_reader_test.go new file mode 100644 index 0000000000..819740fee6 --- /dev/null +++ b/src/dbnode/persist/fs/cross_block_reader_test.go @@ -0,0 +1,221 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "errors" + "fmt" + "io" + "strconv" + "strings" + "testing" + "time" + + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var expectedError = errors.New("expected error") + +func TestCrossBlockReaderRejectMisconfiguredInputs(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + dfsReader := NewMockDataFileSetReader(ctrl) + dfsReader.EXPECT().OrderedByIndex().Return(false) + + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader}, instrument.NewTestOptions(t)) + + assert.Equal(t, errReaderNotOrderedByIndex, err) +} + +func TestCrossBlockReaderRejectMisorderedInputs(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + now := time.Now().Truncate(time.Hour) + dfsReader1 := NewMockDataFileSetReader(ctrl) + dfsReader1.EXPECT().OrderedByIndex().Return(true) + dfsReader1.EXPECT().Range().Return(xtime.Range{Start: now}) + + later := now.Add(time.Hour) + dfsReader2 := NewMockDataFileSetReader(ctrl) + dfsReader2.EXPECT().OrderedByIndex().Return(true) + dfsReader2.EXPECT().Range().Return(xtime.Range{Start: later}) + + _, err := NewCrossBlockReader([]DataFileSetReader{dfsReader2, dfsReader1}, instrument.NewTestOptions(t)) + + expectedErr := fmt.Errorf("dataFileSetReaders are not ordered by time (%s followed by %s)", later, now) + + assert.Equal(t, expectedErr, err) +} + +func TestCrossBlockReader(t *testing.T) { + tests := []struct { + name string + blockSeriesIDs [][]string + expectedIDs []string + }{ + { + name: "no readers", + blockSeriesIDs: [][]string{}, + expectedIDs: []string{}, + }, + { + name: "empty readers", + blockSeriesIDs: [][]string{{}, {}, {}}, + expectedIDs: []string{}, + }, + { + name: "one reader, one series", + blockSeriesIDs: [][]string{{"id1"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "one reader, many series", + blockSeriesIDs: [][]string{{"id1", "id2", "id3"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "many readers with same series", + blockSeriesIDs: [][]string{{"id1"}, {"id1"}, {"id1"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "many readers with different series", + blockSeriesIDs: [][]string{{"id1"}, {"id2"}, {"id3"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "many readers with unordered series", + blockSeriesIDs: [][]string{{"id3"}, {"id1"}, {"id2"}}, + expectedIDs: []string{"id1", "id2", "id3"}, + }, + { + name: "complex case", + blockSeriesIDs: [][]string{{"id2", "id3", "id5"}, {"id1", "id2", "id4"}, {"id1", "id4"}}, + expectedIDs: []string{"id1", "id2", "id3", "id4", "id5"}, + }, + { + name: "duplicate ids within a reader", + blockSeriesIDs: [][]string{{"id1", "id2"}, {"id2", "id2"}}, + expectedIDs: []string{"id1"}, + }, + { + name: "immediate reader error", + blockSeriesIDs: [][]string{{"error"}}, + expectedIDs: []string{}, + }, + { + name: "reader error later", + blockSeriesIDs: [][]string{{"id1", "id2"}, {"id1", "error"}}, + expectedIDs: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testCrossBlockReader(t, tt.blockSeriesIDs, tt.expectedIDs) + }) + } +} + +func testCrossBlockReader(t *testing.T, blockSeriesIds [][]string, expectedIDs []string) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + now := time.Now().Truncate(time.Hour) + var dfsReaders []DataFileSetReader + expectedBlockCount := 0 + + for blockIndex, ids := range blockSeriesIds { + dfsReader := NewMockDataFileSetReader(ctrl) + dfsReader.EXPECT().OrderedByIndex().Return(true) + dfsReader.EXPECT().Range().Return(xtime.Range{Start: now.Add(time.Hour * time.Duration(blockIndex))}).AnyTimes() + + blockHasError := false + for j, id := range ids { + tags := ident.NewTags(ident.StringTag("foo", strconv.Itoa(j))) + data := checkedBytes([]byte{byte(j)}) + checksum := uint32(blockIndex) // somewhat hacky - using checksum to propagate block index value for assertions + if id == "error" { + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), expectedError) + blockHasError = true + } else { + dfsReader.EXPECT().Read().Return(ident.StringID(id), ident.NewTagsIterator(tags), data, checksum, nil) + } + } + + if !blockHasError { + dfsReader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF).MaxTimes(1) + } + + dfsReaders = append(dfsReaders, dfsReader) + expectedBlockCount += len(ids) + } + + cbReader, err := NewCrossBlockReader(dfsReaders, instrument.NewTestOptions(t)) + require.NoError(t, err) + defer cbReader.Close() + + blockCount := 0 + seriesCount := 0 + for cbReader.Next() { + id, tags, records := cbReader.Current() + + strId := id.String() + id.Finalize() + assert.Equal(t, expectedIDs[seriesCount], strId) + + assert.NotNil(t, tags) + tags.Close() + + previousBlockIndex := -1 + for _, record := range records { + blockIndex := int(record.DataChecksum) // see the comment above + assert.True(t, blockIndex > previousBlockIndex, "same id blocks must be read in temporal order") + previousBlockIndex = blockIndex + assert.NotNil(t, record.Data) + record.Data.DecRef() + record.Data.Finalize() + } + + blockCount += len(records) + seriesCount++ + } + + assert.Equal(t, len(expectedIDs), seriesCount, "count of series read") + + err = cbReader.Err() + if err == nil || (err.Error() != expectedError.Error() && !strings.HasPrefix(err.Error(), "duplicate id")) { + require.NoError(t, cbReader.Err()) + assert.Equal(t, expectedBlockCount, blockCount, "count of blocks read") + } + + for _, dfsReader := range dfsReaders { + assert.NotNil(t, dfsReader) + } +} diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 084b8e2afc..583e801116 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith) +// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader) // Copyright (c) 2020 Uber Technologies, Inc. // @@ -229,6 +229,20 @@ func (mr *MockDataFileSetReaderMockRecorder) Open(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockDataFileSetReader)(nil).Open), arg0) } +// OrderedByIndex mocks base method +func (m *MockDataFileSetReader) OrderedByIndex() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OrderedByIndex") + ret0, _ := ret[0].(bool) + return ret0 +} + +// OrderedByIndex indicates an expected call of OrderedByIndex +func (mr *MockDataFileSetReaderMockRecorder) OrderedByIndex() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OrderedByIndex", reflect.TypeOf((*MockDataFileSetReader)(nil).OrderedByIndex)) +} + // Range mocks base method func (m *MockDataFileSetReader) Range() time0.Range { m.ctrl.T.Helper() @@ -1262,3 +1276,84 @@ func (mr *MockMergeWithMockRecorder) Read(arg0, arg1, arg2, arg3 interface{}) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockMergeWith)(nil).Read), arg0, arg1, arg2, arg3) } + +// MockCrossBlockReader is a mock of CrossBlockReader interface +type MockCrossBlockReader struct { + ctrl *gomock.Controller + recorder *MockCrossBlockReaderMockRecorder +} + +// MockCrossBlockReaderMockRecorder is the mock recorder for MockCrossBlockReader +type MockCrossBlockReaderMockRecorder struct { + mock *MockCrossBlockReader +} + +// NewMockCrossBlockReader creates a new mock instance +func NewMockCrossBlockReader(ctrl *gomock.Controller) *MockCrossBlockReader { + mock := &MockCrossBlockReader{ctrl: ctrl} + mock.recorder = &MockCrossBlockReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockCrossBlockReader) EXPECT() *MockCrossBlockReaderMockRecorder { + return m.recorder +} + +// Close mocks base method +func (m *MockCrossBlockReader) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockCrossBlockReaderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockCrossBlockReader)(nil).Close)) +} + +// Current mocks base method +func (m *MockCrossBlockReader) Current() (ident.ID, ident.TagIterator, []BlockRecord) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Current") + ret0, _ := ret[0].(ident.ID) + ret1, _ := ret[1].(ident.TagIterator) + ret2, _ := ret[2].([]BlockRecord) + return ret0, ret1, ret2 +} + +// Current indicates an expected call of Current +func (mr *MockCrossBlockReaderMockRecorder) Current() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Current", reflect.TypeOf((*MockCrossBlockReader)(nil).Current)) +} + +// Err mocks base method +func (m *MockCrossBlockReader) Err() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Err") + ret0, _ := ret[0].(error) + return ret0 +} + +// Err indicates an expected call of Err +func (mr *MockCrossBlockReaderMockRecorder) Err() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Err", reflect.TypeOf((*MockCrossBlockReader)(nil).Err)) +} + +// Next mocks base method +func (m *MockCrossBlockReader) Next() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(bool) + return ret0 +} + +// Next indicates an expected call of Next +func (mr *MockCrossBlockReaderMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockCrossBlockReader)(nil).Next)) +} diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index 88712236dc..773c7b5291 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -207,7 +207,7 @@ func (r *indexReader) ReadSegmentFileSet() ( ) success := false defer func() { - // Do not close opened files if read finishes sucessfully. + // Do not close opened files if read finishes successfully. if success { return } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 9805f06170..bae654234c 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -50,6 +50,8 @@ var ( // errReadNotExpectedSize returned when the size of the next read does not match size specified by the index errReadNotExpectedSize = errors.New("next read not expected size") + + errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by Id") ) const ( @@ -99,6 +101,8 @@ type reader struct { shard uint32 volume int open bool + + orderedByIndex bool } // NewReader returns a new reader and expects all files to exist. Will read the @@ -151,6 +155,8 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { dataFilepath string ) + r.orderedByIndex = opts.OrderedByIndex + switch opts.FileSetType { case persist.FileSetSnapshotType: shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard) @@ -263,7 +269,9 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { r.Close() return err } - if err := r.readIndexAndSortByOffsetAsc(); err != nil { + if opts.OrderedByIndex { + r.decoder.Reset(r.indexDecoderStream) + } else if err := r.readIndexAndSortByOffsetAsc(); err != nil { r.Close() return err } @@ -282,7 +290,7 @@ func (r *reader) Status() DataFileSetReaderStatus { Shard: r.shard, Volume: r.volume, BlockStart: r.start, - BlockSize: time.Duration(r.blockSize), + BlockSize: r.blockSize, } } @@ -329,6 +337,10 @@ func (r *reader) readInfo(size int) error { } func (r *reader) readIndexAndSortByOffsetAsc() error { + if r.orderedByIndex { + return errUnexpectedSortByOffset + } + r.decoder.Reset(r.indexDecoderStream) for i := 0; i < r.entries; i++ { entry, err := r.decoder.DecodeIndexEntry(nil) @@ -344,6 +356,50 @@ func (r *reader) readIndexAndSortByOffsetAsc() error { } func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.orderedByIndex { + return r.readInIndexedOrder() + } + return r.readInStoredOrder() +} + +func (r *reader) readInIndexedOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, nil, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, nil, 0, err + } + + var data checked.Bytes + if r.bytesPool != nil { + data = r.bytesPool.Get(int(entry.Size)) + data.IncRef() + defer data.DecRef() + } else { + data = checked.NewBytes(make([]byte, 0, entry.Size), nil) + data.IncRef() + defer data.DecRef() + } + + data.AppendAll(r.dataMmap.Bytes[entry.Offset : entry.Offset+entry.Size]) + + // NB(r): _must_ check the checksum against known checksum as the data + // file might not have been verified if we haven't read through the file yet. + if entry.DataChecksum != int64(digest.Checksum(data.Bytes())) { + return nil, nil, nil, 0, errSeekChecksumMismatch + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + + r.entriesRead++ + + return id, tags, data, uint32(entry.DataChecksum), nil +} + +func (r *reader) readInStoredOrder() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) { if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries { // Have not read the index yet, this is required when reading // data as we need each index entry in order by by the offset ascending @@ -386,6 +442,32 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err } func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.orderedByIndex { + return r.readMetadataInIndexedOrder() + } + return r.readMetadataInStoredOrder() +} + +func (r *reader) readMetadataInIndexedOrder() (ident.ID, ident.TagIterator, int, uint32, error) { + if r.entriesRead >= r.entries { + return nil, nil, 0, 0, io.EOF + } + + entry, err := r.decoder.DecodeIndexEntry(nil) + if err != nil { + return nil, nil, 0, 0, err + } + + id := r.entryClonedID(entry.ID) + tags := r.entryClonedEncodedTagsIter(entry.EncodedTags) + length := int(entry.Size) + checksum := uint32(entry.DataChecksum) + + r.metadataRead++ + return id, tags, length, checksum, nil +} + +func (r *reader) readMetadataInStoredOrder() (ident.ID, ident.TagIterator, int, uint32, error) { if r.metadataRead >= r.entries { return nil, nil, 0, 0, io.EOF } @@ -486,6 +568,10 @@ func (r *reader) MetadataRead() int { return r.metadataRead } +func (r *reader) OrderedByIndex() bool { + return r.orderedByIndex +} + func (r *reader) Close() error { // Close and prepare resources that are to be reused multiErr := xerrors.NewMultiError() diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index 5f6c3ca6aa..15eb299eaa 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -144,7 +144,7 @@ func TestReadEmptyIndexUnreadData(t *testing.T) { assert.NoError(t, err) _, _, _, _, err = r.Read() - assert.Error(t, err) + assert.Equal(t, io.EOF, err) assert.NoError(t, r.Close()) } @@ -311,7 +311,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) { BlockSize: testBlockSize, Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: uint32(shard), + Shard: shard, BlockStart: start, }, } @@ -350,11 +350,11 @@ func TestReadOpenDigestOfDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0x1}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x12, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0x1}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x12, 0x0, 0x7a, 0x0}, }, ) } @@ -363,11 +363,11 @@ func TestReadOpenInfoDigestMismatch(t *testing.T) { testReadOpen( t, map[string][]byte{ - infoFileSuffix: []byte{0xa}, - indexFileSuffix: []byte{0x2}, - dataFileSuffix: []byte{0x3}, - digestFileSuffix: []byte{0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, - checkpointFileSuffix: []byte{0x13, 0x0, 0x7a, 0x0}, + infoFileSuffix: {0xa}, + indexFileSuffix: {0x2}, + dataFileSuffix: {0x3}, + digestFileSuffix: {0x2, 0x0, 0x2, 0x0, 0x3, 0x0, 0x3, 0x0, 0x4, 0x0, 0x4, 0x0}, + checkpointFileSuffix: {0x13, 0x0, 0x7a, 0x0}, }, ) } @@ -388,8 +388,8 @@ func TestReadOpenIndexDigestMismatch(t *testing.T) { t, map[string][]byte{ infoFileSuffix: b, - indexFileSuffix: []byte{0xa}, - dataFileSuffix: []byte{0x3}, + indexFileSuffix: {0xa}, + dataFileSuffix: {0x3}, digestFileSuffix: digestOfDigest, checkpointFileSuffix: buf, }, diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 521f9df19f..630167e29e 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -75,6 +75,20 @@ func (e testEntry) Tags() ident.Tags { return tags } +type testEntries []testEntry + +func (e testEntries) Less(i, j int) bool { + return e[i].id < e[j].id +} + +func (e testEntries) Len() int { + return len(e) +} + +func (e testEntries) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} + func newTestWriter(t *testing.T, filePathPrefix string) DataFileSetWriter { writer, err := NewWriter(testDefaultOpts. SetFilePathPrefix(filePathPrefix). @@ -158,20 +172,37 @@ var readTestTypes = []readTestType{ readTestTypeMetadata, } +func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { + readTestDataWithOrderOpt(t, r, shard, timestamp, entries, false) + + sortedEntries := append(make(testEntries, 0, len(entries)), entries...) + sort.Sort(sortedEntries) + + readTestDataWithOrderOpt(t, r, shard, timestamp, sortedEntries, true) +} + // readTestData will test reading back the data matches what was written, // note that this test also tests reuse of the reader since it first reads // all the data then closes it, reopens and reads through again but just // reading the metadata the second time. // If it starts to fail during the pass that reads just the metadata it could // be a newly introduced reader reuse bug. -func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp time.Time, entries []testEntry) { +func readTestDataWithOrderOpt( + t *testing.T, + r DataFileSetReader, + shard uint32, + timestamp time.Time, + entries []testEntry, + orderByIndex bool, +) { for _, underTest := range readTestTypes { rOpenOpts := DataReaderOpenOptions{ Identifier: FileSetFileIdentifier{ Namespace: testNs1ID, - Shard: 0, + Shard: shard, BlockStart: timestamp, }, + OrderedByIndex: orderByIndex, } err := r.Open(rOpenOpts) require.NoError(t, err) @@ -220,6 +251,7 @@ func readTestData(t *testing.T, r DataFileSetReader, shard uint32, timestamp tim tags.Close() data.DecRef() data.Finalize() + case readTestTypeMetadata: id, tags, length, checksum, err := r.ReadMetadata() require.NoError(t, err) diff --git a/src/dbnode/persist/fs/seek_manager.go b/src/dbnode/persist/fs/seek_manager.go index 0e8db86594..049eb11c73 100644 --- a/src/dbnode/persist/fs/seek_manager.go +++ b/src/dbnode/persist/fs/seek_manager.go @@ -467,7 +467,7 @@ func (m *seekerManager) markBorrowedSeekerAsReturned(seekers *seekersAndBloom, s // 4. Every call to Return() for an "inactive" seeker will check if it's the last borrowed inactive seeker, // and if so, will close all the inactive seekers and call wg.Done() which will notify the goroutine // running the UpdateOpenlease() function that all inactive seekers have been returned and closed at -// which point the function will return sucessfully. +// which point the function will return successfully. func (m *seekerManager) UpdateOpenLease( descriptor block.LeaseDescriptor, state block.LeaseState, diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 7c15b6d56e..32c1ce2a39 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -120,8 +120,10 @@ type DataFileSetReaderStatus struct { // DataReaderOpenOptions is options struct for the reader open method. type DataReaderOpenOptions struct { - Identifier FileSetFileIdentifier - FileSetType persist.FileSetType + Identifier FileSetFileIdentifier + FileSetType persist.FileSetType + // OrderedByIndex enforces reading of series in the order of index (which is by series Id). + OrderedByIndex bool } // DataFileSetReader provides an unsynchronized reader for a TSDB file set @@ -170,6 +172,10 @@ type DataFileSetReader interface { // MetadataRead returns the position of metadata read into the volume MetadataRead() int + + // OrderedByIndex returns true if the reader reads the data in the order of index. + // If false, the reader reads the data in the same order as it is stored in the data file. + OrderedByIndex() bool } // DataFileSetSeeker provides an out of order reader for a TSDB file set @@ -598,3 +604,28 @@ type Segments interface { AbsoluteFilePaths() []string BlockStart() time.Time } + +// BlockRecord wraps together M3TSZ data bytes with their checksum. +type BlockRecord struct { + Data checked.Bytes + DataChecksum uint32 +} + +// CrossBlockReader allows reading data (encoded bytes) from multiple DataFileSetReaders of the same shard, +// ordered by series id first, and block start time next. +type CrossBlockReader interface { + io.Closer + + // Next advances to the next data record and returns true, or returns false if no more data exists. + Next() bool + + // Err returns the last error encountered (if any). + Err() error + + // Current returns distinct series id and tags, plus a slice with data and checksums from all blocks corresponding + // to that series (in temporal order). + // Note: make sure to finalize the ID, close the Tags and finalize the Data when done with + // them so they can be returned to their respective pools. Also, []BlockRecord slice and underlying data + // is being invalidated on each call to Next(). + Current() (ident.ID, ident.TagIterator, []BlockRecord) +} diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 47de4c9d35..0f145ba574 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1229,7 +1229,7 @@ func (n *dbNamespace) ColdFlush(flushPersist persist.FlushPreparer) error { // We go through this error checking process to allow for partially successful flushes. indexColdFlushError := onColdFlushNs.Done() if indexColdFlushError == nil && onColdFlushDone != nil { - // Only evict rotated cold mutable index segments if the index cold flush was sucessful + // Only evict rotated cold mutable index segments if the index cold flush was successful // or we will lose queryability of data that's still in mem. indexColdFlushError = onColdFlushDone() }