Skip to content

Commit

Permalink
Do not expire from the shard until no longer indexed any block starts
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed May 18, 2021
1 parent 2b167e7 commit 10cd227
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,13 @@ func (m *mutableSegments) backgroundCompactWithTask(

latestEntry, ok := entry.RelookupAndIncrementReaderWriterCount()
if !ok {
// Entry nolonger valid in shard.
return false
// Should not happen since shard will not expire until
// no more block starts are indexed.
// We do not GC this series if shard is missing since
// we open up a race condition where the entry is not
// in the shard yet and we GC it since we can't find it
// due to an asynchronous insert.
return true
}

result := latestEntry.RemoveIndexedForBlockStarts(sealedBlocks)
Expand Down
16 changes: 16 additions & 0 deletions src/dbnode/storage/series/lookup/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func (entry *Entry) IndexedForBlockStart(indexBlockStart xtime.UnixNano) bool {
return isIndexed
}

func (entry *Entry) IndexedOrAttemptedAny() bool {
entry.reverseIndex.RLock()
isIndexed := entry.reverseIndex.indexedOrAttemptedAnyWithRLock()
entry.reverseIndex.RUnlock()
return isIndexed
}

// NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed
// for the provided blockStart. It only allows a single index attempt at a time
// for a single entry.
Expand Down Expand Up @@ -343,6 +350,15 @@ func (s *entryIndexState) indexedWithRLock(t xtime.UnixNano) bool {
return false
}

func (s *entryIndexState) indexedOrAttemptedAnyWithRLock() bool {
for _, state := range s.states {
if state.success || state.attempt {
return true
}
}
return false
}

func (s *entryIndexState) indexedOrAttemptedWithRLock(t xtime.UnixNano) bool {
v, ok := s.states[t]
if ok {
Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,12 @@ func (s *dbShard) purgeExpiredSeries(expiredEntries []*lookup.Entry) {
}
// If there have been datapoints written to the series since its
// last empty check, we don't remove it.
if !series.IsEmpty() {
// Also check if still indexed, if so will be GC'd soon from
// index and shouldn't evict here since we need to let the index
// know this series definitely should be GC'd (it's ambiguous
// if the series is missing from the shard, do not know whether
// race to insert or whether it actually expired or not).
if !series.IsEmpty() || entry.IndexedOrAttemptedAny() {
continue
}
// NB(xichen): if we get here, we are guaranteed that there can be
Expand Down

0 comments on commit 10cd227

Please sign in to comment.