Skip to content

Commit

Permalink
Fix raw input stats for ScanFilterAndProject
Browse files Browse the repository at this point in the history
Previously, LazyBlocks loaded from a PageSource would update the
processed input stat, but would not inherently trigger the raw input
bytes to be recomputed. This wasn't necessarily a problem, but could
result in raw input data being left uncounted until the next call to
getOutput (which might never occur).
  • Loading branch information
pettyjamesm authored and mbasmanova committed Jan 7, 2020
1 parent 176e8f8 commit 0191168
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.spi.UpdatablePageSource;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.split.EmptySplit;
Expand Down Expand Up @@ -247,13 +248,7 @@ private Page processColumnSource()
CursorProcessorOutput output = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, cursor, pageBuilder);
pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage());

long bytesProcessed = cursor.getCompletedBytes() - completedBytes;
long elapsedNanos = cursor.getReadTimeNanos() - readTimeNanos;
operatorContext.recordRawInputWithTiming(bytesProcessed, output.getProcessedRows(), elapsedNanos);
// TODO: derive better values for cursors
operatorContext.recordProcessedInput(bytesProcessed, output.getProcessedRows());
completedBytes = cursor.getCompletedBytes();
readTimeNanos = cursor.getReadTimeNanos();
recordCursorInputStats(output.getProcessedRows());
if (output.isNoMoreRows()) {
finishing = true;
mergingOutput.finish();
Expand All @@ -280,16 +275,8 @@ private Page processPageSource()
pageSourceMemoryContext.setBytes(pageSource.getSystemMemoryUsage());

if (page != null) {
page = recordProcessedInput(page);

// update operator stats
long endCompletedBytes = pageSource.getCompletedBytes();
long endCompletedPositions = pageSource.getCompletedPositions();
long endReadTimeNanos = pageSource.getReadTimeNanos();
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
page = recordProcessedInput(page);

Iterator<Optional<Page>> output = pageProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, pageProcessorMemoryContext, page);
mergingOutput.addInput(output);
Expand All @@ -305,26 +292,72 @@ private Page processPageSource()
return result;
}

private final class RecordingLazyBlockLoader
implements LazyBlockLoader<LazyBlock>
{
private LazyBlock delegateLazyBlock;

private RecordingLazyBlockLoader(LazyBlock delegateLazyBlock)
{
this.delegateLazyBlock = requireNonNull(delegateLazyBlock, "delegateLazyBlock is null");
}

@Override
public void load(LazyBlock block)
{
checkState(delegateLazyBlock != null, "delegateLazyBlock already loaded");
Block loadedBlock = delegateLazyBlock.getLoadedBlock();
delegateLazyBlock = null;
// Position count already recorded for lazy blocks, input bytes are not
operatorContext.recordProcessedInput(loadedBlock.getSizeInBytes(), 0);
recordPageSourceRawInputStats();
block.setBlock(loadedBlock);
}
}

private void recordCursorInputStats(long positionCount)
{
checkState(cursor != null, "cursor is null");
long endCompletedBytes = cursor.getCompletedBytes();
long endReadTimeNanos = cursor.getReadTimeNanos();
long inputBytes = endCompletedBytes - completedBytes;
operatorContext.recordProcessedInput(inputBytes, positionCount);
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
readTimeNanos = endReadTimeNanos;
}

private void recordPageSourceRawInputStats()
{
checkState(pageSource != null, "pageSource is null");
long endCompletedBytes = pageSource.getCompletedBytes();
long endCompletedPositions = pageSource.getCompletedPositions();
long endReadTimeNanos = pageSource.getReadTimeNanos();
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
}

private Page recordProcessedInput(Page page)
{
operatorContext.recordProcessedInput(0, page.getPositionCount());
// account processed bytes from lazy blocks only when they are loaded
long blockSizeSum = 0L;
Block[] blocks = new Block[page.getChannelCount()];
for (int i = 0; i < page.getChannelCount(); ++i) {
for (int i = 0; i < blocks.length; ++i) {
Block block = page.getBlock(i);
// account processed bytes from lazy blocks only when they are loaded
if (block instanceof LazyBlock) {
LazyBlock delegateLazyBlock = (LazyBlock) block;
blocks[i] = new LazyBlock(page.getPositionCount(), lazyBlock -> {
Block loadedBlock = delegateLazyBlock.getLoadedBlock();
operatorContext.recordProcessedInput(loadedBlock.getSizeInBytes(), 0L);
lazyBlock.setBlock(loadedBlock);
});
blocks[i] = new LazyBlock(page.getPositionCount(), new RecordingLazyBlockLoader((LazyBlock) block));
}
else {
operatorContext.recordProcessedInput(block.getSizeInBytes(), 0L);
blockSizeSum += block.getSizeInBytes();
blocks[i] = block;
}
}
// stats update
operatorContext.recordProcessedInput(blockSizeSum, page.getPositionCount());
recordPageSourceRawInputStats();

return new Page(page.getPositionCount(), blocks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.relation.RowExpression;
Expand All @@ -44,9 +45,11 @@
import com.facebook.presto.testing.TestingSplit;
import com.facebook.presto.testing.TestingTransactionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import io.airlift.units.DataSize;
import org.testng.annotations.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -71,8 +74,10 @@
import static com.facebook.presto.sql.relational.Expressions.field;
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -222,6 +227,54 @@ public void testPageSourceLazyLoad()
assertEquals(actual, expected);
}

@Test
public void testPageSourceLazyBlock()
{
// Tests that a page containing a LazyBlock is loaded and its bytes are counted by the operator.
DriverContext driverContext = newDriverContext();
List<RowExpression> projections = ImmutableList.of(field(0, BIGINT));
Supplier<CursorProcessor> cursorProcessor = expressionCompiler.compileCursorProcessor(driverContext.getSession().getSqlFunctionProperties(), Optional.empty(), projections, "key");
Supplier<PageProcessor> pageProcessor = expressionCompiler.compilePageProcessor(driverContext.getSession().getSqlFunctionProperties(), Optional.empty(), projections);

// This Block will be wrapped in a LazyBlock inside the operator on call to getNextPage().
Block inputBlock = BlockAssertions.createLongSequenceBlock(0, 10);

CountingLazyPageSource pageSource = new CountingLazyPageSource(ImmutableList.of(new Page(inputBlock)));

ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory factory = new ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory(
0,
new PlanNodeId("test"),
new PlanNodeId("0"),
(session, split, table, columns) -> pageSource,
cursorProcessor,
pageProcessor,
TESTING_TABLE_HANDLE,
ImmutableList.of(),
ImmutableList.of(BIGINT),
new DataSize(0, BYTE),
0);

SourceOperator operator = factory.createOperator(driverContext);
operator.addSplit(new Split(new ConnectorId("test"), TestingTransactionHandle.create(), TestingSplit.createLocalSplit()));
operator.noMoreSplits();

MaterializedResult expected = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), ImmutableList.of(new Page(inputBlock)));
Page expectedPage = expected.toPage();
MaterializedResult actual = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), toPages(operator));
Page actualPage = actual.toPage();

// Assert expected page and actual page are equal.
assertPageEquals(actual.getTypes(), actualPage, expectedPage);

// PageSource counting isn't flawed, assert on the test implementation
assertEquals(pageSource.getCompletedBytes(), expectedPage.getSizeInBytes());
assertEquals(pageSource.getCompletedPositions(), expectedPage.getPositionCount());

// Assert operator stats match the expected values
assertEquals(operator.getOperatorContext().getOperatorStats().getRawInputDataSize().toBytes(), expectedPage.getSizeInBytes());
assertEquals(operator.getOperatorContext().getOperatorStats().getInputPositions(), expected.getRowCount());
}

@Test
public void testRecordCursorSource()
{
Expand Down Expand Up @@ -467,4 +520,97 @@ public Page getNextPage()
return page;
}
}

private static class CountingLazyPageSource
implements ConnectorPageSource
{
private Iterator<Page> pages;
private long completedBytes;
private long completedPositions;

public CountingLazyPageSource(List<Page> pages)
{
this.pages = requireNonNull(pages, "pages is null").iterator();
}

@Override
public void close()
{
pages = Iterators.forArray();
}

@Override
public long getReadTimeNanos()
{
return 0;
}

@Override
public long getSystemMemoryUsage()
{
return 0;
}

@Override
public boolean isFinished()
{
return !pages.hasNext();
}

@Override
public Page getNextPage()
{
if (isFinished()) {
return null;
}

Page page = pages.next();
int channelCount = page.getChannelCount();
Block[] blocks = new Block[channelCount];

for (int i = 0; i < channelCount; ++i) {
Block block = page.getBlock(i);
// Wrap current Block in a LazyBlock.
blocks[i] = new LazyBlock(block.getPositionCount(), new CountingLazyBlockLoader(block));
}
completedPositions += page.getPositionCount();

return new Page(page.getPositionCount(), blocks);
}

@Override
public long getCompletedBytes()
{
return completedBytes;
}

@Override
public long getCompletedPositions()
{
return completedPositions;
}

private final class CountingLazyBlockLoader
implements LazyBlockLoader<LazyBlock>
{
private Block loaderBlock;

public CountingLazyBlockLoader(Block block)
{
loaderBlock = block;
}

@Override
public final void load(LazyBlock lazyBlock)
{
checkState(loaderBlock != null, "loaderBlock already loaded");

Block loadedBlock = loaderBlock.getLoadedBlock();
// Increment completed/read bytes for the page source.
completedBytes += loadedBlock.getSizeInBytes();
loaderBlock = null;
lazyBlock.setBlock(loadedBlock);
}
}
}
}

0 comments on commit 0191168

Please sign in to comment.