Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][ml] Filter out deleted entries before read entries from ledger. #21739

Merged
merged 19 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
Expand All @@ -35,6 +37,7 @@
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -46,6 +49,8 @@
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -59,6 +64,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -2088,44 +2094,132 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
return;
}

long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
Predicate<PositionImpl> skipCond = opReadEntry.skipCondition;
if (skipCond == null) {
long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
asyncReadEntry(
ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry, opReadEntry.ctx);
return;
}

// Filer out and skip unnecessary read entry
if (opReadEntry.skipCondition != null) {
long firstValidEntry = -1L;
long lastValidEntry = -1L;
long entryId = firstEntry;
for (; entryId <= lastEntry; entryId++) {
if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
if (firstValidEntry != -1L) {
break;
}
} else {
if (firstValidEntry == -1L) {
firstValidEntry = entryId;
}
// Try to read entries in the current ledger what we need through a single `entryIdSet` as much as possible.
long entryId = firstEntry;
int count = 0;
SortedSet<Long> entryIds = new TreeSet<>();
int entriesToRead = opReadEntry.getNumberOfEntriesToRead();
while (entryId <= lastEntryInLedger || count <= entriesToRead) {
PositionImpl position = PositionImpl.get(ledger.getId(), entryId);
if (!skipCond.test(position)) {
entryIds.add(entryId);
count++;
}
entryId++;
}
asyncReadEntry(ledger, entryIds, opReadEntry, opReadEntry.ctx);
}

lastValidEntry = entryId;

private void asyncReadEntry(ReadHandle ledger, SortedSet<Long> entryIds, OpReadEntry opReadEntry, Object ctx) {
if (entryIds.isEmpty()) {
// If the entryIds is empty, should not move the `readPosition` of `cursor`.
// OpReadEntry#internalReadEntriesComplete will move the `readPosition` of `cursor`
// to the next position of `lastEntry`, so here uses the previous position of `readPosition`
// to offset the impact of OpReadEntry#internalReadEntriesComplete.
PositionImpl previous = this.getPreviousPosition(opReadEntry.readPosition);
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), ctx, previous);
return;
}

Set<Range<Long>> ranges = toRanges(entryIds);
ReadEntriesCallback callback = new BatchReadEntriesCallback(entryIds, opReadEntry);
for (Range<Long> range : ranges) {
long start = range.lowerEndpoint();
long end = range.upperEndpoint();
// TODO: should handle `lastReadCallback` timeout check???
asyncReadEntry(ledger, start, end, opReadEntry.cursor.isCacheReadEntry(), callback, ctx);
}
}

// Parse entryIds into ranges.
@VisibleForTesting
public static Set<Range<Long>> toRanges(SortedSet<Long> entryIds) {
RangeSet<Long> set = TreeRangeSet.create();
long start = entryIds.first();
long end = start;
for (long entryId : entryIds) {
if (entryId - end > 1) {
set.add(Range.closed(start, end));
start = entryId;
end = start;
} else {
end = entryId;
}
}
set.add(Range.closed(start, end));
return set.asRanges();
}

@VisibleForTesting
public static class BatchReadEntriesCallback implements ReadEntriesCallback {
private final Set<Long> entryIdSet;
private final SortedSet<Entry> entrySet;
private final OpReadEntry callback;
private final AtomicBoolean failed = new AtomicBoolean(false);

@VisibleForTesting
public BatchReadEntriesCallback(Set<Long> entryIdSet, OpReadEntry callback) {
this.entryIdSet = entryIdSet;
this.entrySet = new TreeSet<>(Comparator.comparing(Entry::getEntryId));
this.callback = callback;
}

@Override
public synchronized void readEntriesComplete(List<Entry> entries0, Object ctx) {
for (Entry entry : entries0) {
entrySet.add(entry);
if (entrySet.size() != entryIdSet.size()) {
return;
}
callback.readEntriesComplete(entrySet.stream().toList(), ctx);
}
}

// If all messages in [firstEntry...lastEntry] are filter out,
// then manual call internalReadEntriesComplete to advance read position.
if (firstValidEntry == -1L) {
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
PositionImpl.get(ledger.getId(), lastEntry));
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
// Should fail AT_MOST ONCE
if (!failed.compareAndSet(false, true)) {
return;
}

firstEntry = firstValidEntry;
lastEntry = lastValidEntry;
// If there are entries been read success, try to let the read operation success as possible.
List<Entry> entries = filterEntries();
if (entries.isEmpty()) {
callback.readEntriesFailed(exception, ctx);
} else {
callback.readEntriesComplete(entries, ctx);
}
}

if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
private List<Entry> filterEntries() {
if (entrySet.isEmpty()) {
return Collections.emptyList();
}
// Make sure the `readPosition` of `cursor` could be moved correctly.
List<Entry> entries = new ArrayList<>();
for (long entryId : entryIdSet) {
Entry entry = entrySet.first();
if (entry.getEntryId() == entryId) {
entries.add(entry);
entrySet.remove(entry);
} else {
break;
}
}
return entries;
}
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
Expand All @@ -2142,20 +2236,19 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr
}
}

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
Object ctx) {
@VisibleForTesting
public void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean shouldCacheEntries,
ReadEntriesCallback callback, Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
callback, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
readCallback, readOpCount);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
ctx);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, shouldCacheEntries, callback, ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -60,7 +61,18 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;

Predicate<PositionImpl> skipCondition0 = cursor instanceof ReadOnlyCursor ? null : cursor::isMessageDeleted;
if (skipCondition == null) {
op.skipCondition = skipCondition0;
} else {
if (skipCondition0 == null) {
op.skipCondition = skipCondition;
} else {
op.skipCondition = skipCondition.or(skipCondition0);
}
}

op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
Expand Down
Loading