diff --git a/distribution/docker/peon.sh b/distribution/docker/peon.sh index 8103f475ccb3..3b4dfc4326b9 100755 --- a/distribution/docker/peon.sh +++ b/distribution/docker/peon.sh @@ -161,4 +161,4 @@ fi # If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage. mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json; -exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@ +exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon --taskId ${TASK_ID} $@ diff --git a/docs/querying/scan-query.md b/docs/querying/scan-query.md index 503664633ba3..07decff8b1a7 100644 --- a/docs/querying/scan-query.md +++ b/docs/querying/scan-query.md @@ -43,12 +43,12 @@ An example Scan query object is shown below: "queryType": "scan", "dataSource": "wikipedia", "resultFormat": "list", - "columns":[], + "columns":[ "__time", "isRobot", "page","added", "isAnonymous", "user", "deleted" ], "intervals": [ - "2013-01-01/2013-01-02" + "2016-01-01/2017-01-02" ], "batchSize":20480, - "limit":3 + "limit":2 } ``` @@ -73,86 +73,83 @@ The following are the main parameters for Scan queries: The format of the result when resultFormat equals `list`: ```json - [{ - "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", - "columns" : [ - "timestamp", - "robot", - "namespace", - "anonymous", - "unpatrolled", - "page", - "language", - "newpage", - "user", - "count", - "added", - "delta", - "variation", - "deleted" - ], - "events" : [ { - "timestamp" : "2013-01-01T00:00:00.000Z", - "robot" : "1", - "namespace" : "article", - "anonymous" : "0", - "unpatrolled" : "0", - "page" : "11._korpus_(NOVJ)", - "language" : "sl", - "newpage" : "0", - "user" : "EmausBot", - "count" : 1.0, - "added" : 39.0, - "delta" : 39.0, - "variation" : 39.0, - "deleted" : 0.0 - }, { - "timestamp" : "2013-01-01T00:00:00.000Z", - "robot" : "0", - "namespace" : "article", - "anonymous" : "0", - "unpatrolled" : "0", - "page" : "112_U.S._580", - "language" : "en", - "newpage" : "1", - "user" : "MZMcBride", - "count" : 1.0, - "added" : 70.0, - "delta" : 70.0, - "variation" : 70.0, - "deleted" : 0.0 - }, { - "timestamp" : "2013-01-01T00:00:00.000Z", - "robot" : "0", - "namespace" : "article", - "anonymous" : "0", - "unpatrolled" : "0", - "page" : "113_U.S._243", - "language" : "en", - "newpage" : "1", - "user" : "MZMcBride", - "count" : 1.0, - "added" : 77.0, - "delta" : 77.0, - "variation" : 77.0, - "deleted" : 0.0 - } ] + [ { + "segmentId" : "wikipedia_2016-06-27T00:00:00.000Z_2016-06-28T00:00:00.000Z_2024-12-17T13:08:03.142Z", + "columns" : [ "__time", "isRobot", "page","added", "isAnonymous", "user", "deleted" ], + "events" : [ { + "__time" : 1466985611080, + "isRobot" : "true", + "page" : "Salo Toraut", + "added" : 31, + "isAnonymous" : "false", + "user" : "Lsjbot", + "deleted" : 0 + }, { + "__time" : 1466985634959, + "isRobot" : "false", + "page" : "Bailando 2015", + "added" : 2, + "isAnonymous" : "true", + "user" : "181.230.118.178", + "deleted" : 0 + } ], + "rowSignature" : [ { + "name" : "__time", + "type" : "LONG" + }, { + "name" : "isRobot", + "type" : "STRING" + }, { + "name" : "page", + "type" : "STRING" + }, { + "name" : "added", + "type" : "LONG" + }, { + "name" : "isAnonymous", + "type" : "STRING" + }, { + "name" : "user", + "type" : "STRING" + }, { + "name" : "deleted", + "type" : "LONG" + } ] } ] ``` The format of the result when resultFormat equals `compactedList`: ```json - [{ - "segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9", - "columns" : [ - "timestamp", "robot", "namespace", "anonymous", "unpatrolled", "page", "language", "newpage", "user", "count", "added", "delta", "variation", "deleted" - ], - "events" : [ - ["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0], - ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0], - ["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0] - ] + [ { + "segmentId" : "wikipedia_2016-06-27T00:00:00.000Z_2016-06-28T00:00:00.000Z_2024-12-17T13:08:03.142Z", + "columns" : [ "__time", "isRobot", "isUnpatrolled", "page","added", "isNew", "delta", "isAnonymous", "user", "deleted", "namespace" ], + "events" : [ + [ 1466985611080, "true", "Salo Toraut", 31, "false", "Lsjbot", 0 ], + [ 1466985634959, "false", "Bailando 2015", 2, "true", "181.230.118.178", 0] + ], + "rowSignature" : [ { + "name" : "__time", + "type" : "LONG" + }, { + "name" : "isRobot", + "type" : "STRING" + }, { + "name" : "page", + "type" : "STRING" + }, { + "name" : "added", + "type" : "LONG" + }, { + "name" : "isAnonymous", + "type" : "STRING" + }, { + "name" : "user", + "type" : "STRING" + }, { + "name" : "deleted", + "type" : "LONG" + } ] } ] ``` diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index f89b0fafde95..535598bc2d70 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -207,6 +207,26 @@ public void killOlderThan(long timestamp) throws IOException } } } + + @Override + public void pushTaskPayload(String taskId, File taskPayloadFile) throws IOException + { + final Path path = getTaskPayloadFileFromId(taskId); + log.info("Pushing payload for task[%s] to location[%s]", taskId, path); + pushTaskFile(path, taskPayloadFile); + } + + @Override + public Optional streamTaskPayload(String taskId) throws IOException + { + final Path path = getTaskPayloadFileFromId(taskId); + return streamTaskFile(path, 0); + } + + private Path getTaskPayloadFileFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".payload.json")); + } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index d5b32aa57eb0..125fbd288721 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -86,6 +86,7 @@ public void test_taskStatus() throws Exception final File statusFile = new File(tmpDir, "status.json"); final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + Files.asCharSink(statusFile, StandardCharsets.UTF_8).write("{}"); taskLogs.pushTaskStatus("id", statusFile); Assert.assertEquals( @@ -94,6 +95,19 @@ public void test_taskStatus() throws Exception ); } + @Test + public void test_taskPayload() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File payload = new File(tmpDir, "payload.json"); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + Files.asCharSink(payload, StandardCharsets.UTF_8).write("{}"); + taskLogs.pushTaskPayload("id", payload); + Assert.assertEquals("{}", StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get()))); + } + @Test public void testKill() throws Exception { diff --git a/processing/src/main/java/org/apache/druid/io/LimitedOutputStream.java b/processing/src/main/java/org/apache/druid/io/LimitedOutputStream.java index 6d27abb42739..043bd53d5267 100644 --- a/processing/src/main/java/org/apache/druid/io/LimitedOutputStream.java +++ b/processing/src/main/java/org/apache/druid/io/LimitedOutputStream.java @@ -22,13 +22,14 @@ import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.IOE; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.function.Function; /** * An {@link OutputStream} that limits how many bytes can be written. Throws {@link IOException} if the limit - * is exceeded. + * is exceeded. *Not* thread-safe. */ public class LimitedOutputStream extends OutputStream { @@ -88,6 +89,14 @@ public void close() throws IOException out.close(); } + public byte[] toByteArray() + { + if (!(out instanceof ByteArrayOutputStream)) { + throw new UnsupportedOperationException(out.getClass().getName() + "does not implement toByteArray()"); + } + return ((ByteArrayOutputStream) out).toByteArray(); + } + private void plus(final int n) throws IOException { written += n; diff --git a/processing/src/test/java/org/apache/druid/io/LimitedOutputStreamTest.java b/processing/src/test/java/org/apache/druid/io/LimitedOutputStreamTest.java index a11b63149710..54757570d6f6 100644 --- a/processing/src/test/java/org/apache/druid/io/LimitedOutputStreamTest.java +++ b/processing/src/test/java/org/apache/druid/io/LimitedOutputStreamTest.java @@ -27,6 +27,7 @@ import org.junit.internal.matchers.ThrowableMessageMatcher; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -65,6 +66,26 @@ public void test_limitThree() throws IOException } } + @Test + public void test_toByteArray() throws IOException + { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final LimitedOutputStream stream = + new LimitedOutputStream(baos, 3, LimitedOutputStreamTest::makeErrorMessage)) { + stream.write('a'); + stream.write(new byte[]{'b'}); + stream.write(new byte[]{'c'}, 0, 1); + + MatcherAssert.assertThat(stream.toByteArray(), CoreMatchers.equalTo(new byte[]{'a', 'b', 'c'})); + } + + try (final DataOutputStream dos = new DataOutputStream(new ByteArrayOutputStream()); + final LimitedOutputStream stream = + new LimitedOutputStream(dos, 3, LimitedOutputStreamTest::makeErrorMessage)) { + Assert.assertThrows(UnsupportedOperationException.class, stream::toByteArray); + } + } + private static String makeErrorMessage(final long limit) { return StringUtils.format("Limit[%d] exceeded", limit); diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index dedfb0028b77..8cc6348a7c8a 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -30,6 +30,7 @@ import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.Cache.NamedKey; import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.io.LimitedOutputStream; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; @@ -152,6 +153,8 @@ public void after(boolean isDone, Throwable thrown) // The resultset identifier and its length is cached along with the resultset resultLevelCachePopulator.populateResults(); log.debug("Cache population complete for query %s", query.getId()); + } else { // thrown == null && !resultLevelCachePopulator.isShouldPopulate() + log.error("Failed (gracefully) to populate result level cache for query %s", query.getId()); } resultLevelCachePopulator.stopPopulating(); } @@ -233,8 +236,8 @@ private ResultLevelCachePopulator createResultLevelCachePopulator( try { // Save the resultSetId and its length resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES) - .putInt(resultSetId.length()) - .array()); + .putInt(resultSetId.length()) + .array()); resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId)); } catch (IOException ioe) { @@ -255,7 +258,7 @@ private class ResultLevelCachePopulator private final Cache.NamedKey key; private final CacheConfig cacheConfig; @Nullable - private ByteArrayOutputStream cacheObjectStream; + private LimitedOutputStream cacheObjectStream; private ResultLevelCachePopulator( Cache cache, @@ -270,7 +273,14 @@ private ResultLevelCachePopulator( this.serialiers = mapper.getSerializerProviderInstance(); this.key = key; this.cacheConfig = cacheConfig; - this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null; + this.cacheObjectStream = shouldPopulate ? new LimitedOutputStream( + new ByteArrayOutputStream(), + cacheConfig.getResultLevelCacheLimit(), limit -> StringUtils.format( + "resultLevelCacheLimit[%,d] exceeded. " + + "Max ResultLevelCacheLimit for cache exceeded. Result caching failed.", + limit + ) + ) : null; } boolean isShouldPopulate() @@ -289,12 +299,8 @@ private void cacheResultEntry( ) { Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream"); - int cacheLimit = cacheConfig.getResultLevelCacheLimit(); try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) { JacksonUtils.writeObjectUsingSerializerProvider(gen, serialiers, cacheFn.apply(resultEntry)); - if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) { - stopPopulating(); - } } catch (IOException ex) { log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!"); diff --git a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java index 6245509465c1..3cb4ae528e67 100644 --- a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java @@ -39,6 +39,7 @@ public class ResultLevelCachingQueryRunnerTest extends QueryRunnerBasedOnClusteredClientTestBase { private Cache cache; + private static final int DEFAULT_CACHE_ENTRY_MAX_SIZE = Integer.MAX_VALUE; @Before public void setup() @@ -58,7 +59,7 @@ public void testNotPopulateAndNotUse() prepareCluster(10); final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final ResultLevelCachingQueryRunner> queryRunner1 = createQueryRunner( - newCacheConfig(false, false), + newCacheConfig(false, false, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -72,7 +73,7 @@ public void testNotPopulateAndNotUse() Assert.assertEquals(0, cache.getStats().getNumMisses()); final ResultLevelCachingQueryRunner> queryRunner2 = createQueryRunner( - newCacheConfig(false, false), + newCacheConfig(false, false, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -93,7 +94,7 @@ public void testPopulateAndNotUse() prepareCluster(10); final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final ResultLevelCachingQueryRunner> queryRunner1 = createQueryRunner( - newCacheConfig(true, false), + newCacheConfig(true, false, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -107,7 +108,7 @@ public void testPopulateAndNotUse() Assert.assertEquals(0, cache.getStats().getNumMisses()); final ResultLevelCachingQueryRunner> queryRunner2 = createQueryRunner( - newCacheConfig(true, false), + newCacheConfig(true, false, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -128,7 +129,7 @@ public void testNotPopulateAndUse() prepareCluster(10); final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final ResultLevelCachingQueryRunner> queryRunner1 = createQueryRunner( - newCacheConfig(false, false), + newCacheConfig(false, false, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -142,7 +143,7 @@ public void testNotPopulateAndUse() Assert.assertEquals(0, cache.getStats().getNumMisses()); final ResultLevelCachingQueryRunner> queryRunner2 = createQueryRunner( - newCacheConfig(false, true), + newCacheConfig(false, true, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -163,7 +164,7 @@ public void testPopulateAndUse() prepareCluster(10); final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final ResultLevelCachingQueryRunner> queryRunner1 = createQueryRunner( - newCacheConfig(true, true), + newCacheConfig(true, true, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -177,7 +178,7 @@ public void testPopulateAndUse() Assert.assertEquals(1, cache.getStats().getNumMisses()); final ResultLevelCachingQueryRunner> queryRunner2 = createQueryRunner( - newCacheConfig(true, true), + newCacheConfig(true, true, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -192,6 +193,41 @@ public void testPopulateAndUse() Assert.assertEquals(1, cache.getStats().getNumMisses()); } + @Test + public void testNoPopulateIfEntrySizeExceedsMaximum() + { + prepareCluster(10); + final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); + final ResultLevelCachingQueryRunner> queryRunner1 = createQueryRunner( + newCacheConfig(true, true, 128), + query + ); + + final Sequence> sequence1 = queryRunner1.run( + QueryPlus.wrap(query), + responseContext() + ); + final List> results1 = sequence1.toList(); + Assert.assertEquals(0, cache.getStats().getNumHits()); + Assert.assertEquals(0, cache.getStats().getNumEntries()); + Assert.assertEquals(1, cache.getStats().getNumMisses()); + + final ResultLevelCachingQueryRunner> queryRunner2 = createQueryRunner( + newCacheConfig(true, true, DEFAULT_CACHE_ENTRY_MAX_SIZE), + query + ); + + final Sequence> sequence2 = queryRunner2.run( + QueryPlus.wrap(query), + responseContext() + ); + final List> results2 = sequence2.toList(); + Assert.assertEquals(results1, results2); + Assert.assertEquals(0, cache.getStats().getNumHits()); + Assert.assertEquals(1, cache.getStats().getNumEntries()); + Assert.assertEquals(2, cache.getStats().getNumMisses()); + } + @Test public void testPopulateCacheWhenQueryThrowExceptionShouldNotCache() { @@ -206,7 +242,7 @@ public void testPopulateCacheWhenQueryThrowExceptionShouldNotCache() final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); final ResultLevelCachingQueryRunner> queryRunner = createQueryRunner( - newCacheConfig(true, false), + newCacheConfig(true, false, DEFAULT_CACHE_ENTRY_MAX_SIZE), query ); @@ -249,7 +285,11 @@ private ResultLevelCachingQueryRunner createQueryRunner( ); } - private CacheConfig newCacheConfig(boolean populateResultLevelCache, boolean useResultLevelCache) + private CacheConfig newCacheConfig( + boolean populateResultLevelCache, + boolean useResultLevelCache, + int resultLevelCacheLimit + ) { return new CacheConfig() { @@ -264,6 +304,12 @@ public boolean isUseResultLevelCache() { return useResultLevelCache; } + + @Override + public int getResultLevelCacheLimit() + { + return resultLevelCacheLimit; + } }; } }