Skip to content

Commit

Permalink
Resolve conflicts from Github
Browse files Browse the repository at this point in the history
  • Loading branch information
GWphua committed Feb 28, 2025
2 parents f9c13ee + f15ba08 commit e37d3ef
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 97 deletions.
2 changes: 1 addition & 1 deletion distribution/docker/peon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,4 @@ fi
# If TASK_JSON is not set, CliPeon will pull the task.json file from deep storage.
mkdir -p ${TASK_DIR}; [ -n "$TASK_JSON" ] && echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
exec bin/run-java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon --taskId ${TASK_ID} $@
151 changes: 74 additions & 77 deletions docs/querying/scan-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ An example Scan query object is shown below:
"queryType": "scan",
"dataSource": "wikipedia",
"resultFormat": "list",
"columns":[],
"columns":[ "__time", "isRobot", "page","added", "isAnonymous", "user", "deleted" ],
"intervals": [
"2013-01-01/2013-01-02"
"2016-01-01/2017-01-02"
],
"batchSize":20480,
"limit":3
"limit":2
}
```

Expand All @@ -73,86 +73,83 @@ The following are the main parameters for Scan queries:
The format of the result when resultFormat equals `list`:

```json
[{
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"columns" : [
"timestamp",
"robot",
"namespace",
"anonymous",
"unpatrolled",
"page",
"language",
"newpage",
"user",
"count",
"added",
"delta",
"variation",
"deleted"
],
"events" : [ {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "1",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "11._korpus_(NOVJ)",
"language" : "sl",
"newpage" : "0",
"user" : "EmausBot",
"count" : 1.0,
"added" : 39.0,
"delta" : 39.0,
"variation" : 39.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "112_U.S._580",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 70.0,
"delta" : 70.0,
"variation" : 70.0,
"deleted" : 0.0
}, {
"timestamp" : "2013-01-01T00:00:00.000Z",
"robot" : "0",
"namespace" : "article",
"anonymous" : "0",
"unpatrolled" : "0",
"page" : "113_U.S._243",
"language" : "en",
"newpage" : "1",
"user" : "MZMcBride",
"count" : 1.0,
"added" : 77.0,
"delta" : 77.0,
"variation" : 77.0,
"deleted" : 0.0
} ]
[ {
"segmentId" : "wikipedia_2016-06-27T00:00:00.000Z_2016-06-28T00:00:00.000Z_2024-12-17T13:08:03.142Z",
"columns" : [ "__time", "isRobot", "page","added", "isAnonymous", "user", "deleted" ],
"events" : [ {
"__time" : 1466985611080,
"isRobot" : "true",
"page" : "Salo Toraut",
"added" : 31,
"isAnonymous" : "false",
"user" : "Lsjbot",
"deleted" : 0
}, {
"__time" : 1466985634959,
"isRobot" : "false",
"page" : "Bailando 2015",
"added" : 2,
"isAnonymous" : "true",
"user" : "181.230.118.178",
"deleted" : 0
} ],
"rowSignature" : [ {
"name" : "__time",
"type" : "LONG"
}, {
"name" : "isRobot",
"type" : "STRING"
}, {
"name" : "page",
"type" : "STRING"
}, {
"name" : "added",
"type" : "LONG"
}, {
"name" : "isAnonymous",
"type" : "STRING"
}, {
"name" : "user",
"type" : "STRING"
}, {
"name" : "deleted",
"type" : "LONG"
} ]
} ]
```

The format of the result when resultFormat equals `compactedList`:

```json
[{
"segmentId" : "wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9",
"columns" : [
"timestamp", "robot", "namespace", "anonymous", "unpatrolled", "page", "language", "newpage", "user", "count", "added", "delta", "variation", "deleted"
],
"events" : [
["2013-01-01T00:00:00.000Z", "1", "article", "0", "0", "11._korpus_(NOVJ)", "sl", "0", "EmausBot", 1.0, 39.0, 39.0, 39.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "112_U.S._580", "en", "1", "MZMcBride", 1.0, 70.0, 70.0, 70.0, 0.0],
["2013-01-01T00:00:00.000Z", "0", "article", "0", "0", "113_U.S._243", "en", "1", "MZMcBride", 1.0, 77.0, 77.0, 77.0, 0.0]
]
[ {
"segmentId" : "wikipedia_2016-06-27T00:00:00.000Z_2016-06-28T00:00:00.000Z_2024-12-17T13:08:03.142Z",
"columns" : [ "__time", "isRobot", "isUnpatrolled", "page","added", "isNew", "delta", "isAnonymous", "user", "deleted", "namespace" ],
"events" : [
[ 1466985611080, "true", "Salo Toraut", 31, "false", "Lsjbot", 0 ],
[ 1466985634959, "false", "Bailando 2015", 2, "true", "181.230.118.178", 0]
],
"rowSignature" : [ {
"name" : "__time",
"type" : "LONG"
}, {
"name" : "isRobot",
"type" : "STRING"
}, {
"name" : "page",
"type" : "STRING"
}, {
"name" : "added",
"type" : "LONG"
}, {
"name" : "isAnonymous",
"type" : "STRING"
}, {
"name" : "user",
"type" : "STRING"
}, {
"name" : "deleted",
"type" : "LONG"
} ]
} ]
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,26 @@ public void killOlderThan(long timestamp) throws IOException
}
}
}

@Override
public void pushTaskPayload(String taskId, File taskPayloadFile) throws IOException
{
final Path path = getTaskPayloadFileFromId(taskId);
log.info("Pushing payload for task[%s] to location[%s]", taskId, path);
pushTaskFile(path, taskPayloadFile);
}

@Override
public Optional<InputStream> streamTaskPayload(String taskId) throws IOException
{
final Path path = getTaskPayloadFileFromId(taskId);
return streamTaskFile(path, 0);
}

private Path getTaskPayloadFileFromId(String taskId)
{
return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".payload.json"));
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void test_taskStatus() throws Exception
final File statusFile = new File(tmpDir, "status.json");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());


Files.asCharSink(statusFile, StandardCharsets.UTF_8).write("{}");
taskLogs.pushTaskStatus("id", statusFile);
Assert.assertEquals(
Expand All @@ -94,6 +95,19 @@ public void test_taskStatus() throws Exception
);
}

@Test
public void test_taskPayload() throws Exception
{
final File tmpDir = tempFolder.newFolder();
final File logDir = new File(tmpDir, "logs");
final File payload = new File(tmpDir, "payload.json");
final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration());

Files.asCharSink(payload, StandardCharsets.UTF_8).write("{}");
taskLogs.pushTaskPayload("id", payload);
Assert.assertEquals("{}", StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskPayload("id").get())));
}

@Test
public void testKill() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IOE;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Function;

/**
* An {@link OutputStream} that limits how many bytes can be written. Throws {@link IOException} if the limit
* is exceeded.
* is exceeded. *Not* thread-safe.
*/
public class LimitedOutputStream extends OutputStream
{
Expand Down Expand Up @@ -88,6 +89,14 @@ public void close() throws IOException
out.close();
}

public byte[] toByteArray()
{
if (!(out instanceof ByteArrayOutputStream)) {
throw new UnsupportedOperationException(out.getClass().getName() + "does not implement toByteArray()");
}
return ((ByteArrayOutputStream) out).toByteArray();
}

private void plus(final int n) throws IOException
{
written += n;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.internal.matchers.ThrowableMessageMatcher;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;

Expand Down Expand Up @@ -65,6 +66,26 @@ public void test_limitThree() throws IOException
}
}

@Test
public void test_toByteArray() throws IOException
{
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final LimitedOutputStream stream =
new LimitedOutputStream(baos, 3, LimitedOutputStreamTest::makeErrorMessage)) {
stream.write('a');
stream.write(new byte[]{'b'});
stream.write(new byte[]{'c'}, 0, 1);

MatcherAssert.assertThat(stream.toByteArray(), CoreMatchers.equalTo(new byte[]{'a', 'b', 'c'}));
}

try (final DataOutputStream dos = new DataOutputStream(new ByteArrayOutputStream());
final LimitedOutputStream stream =
new LimitedOutputStream(dos, 3, LimitedOutputStreamTest::makeErrorMessage)) {
Assert.assertThrows(UnsupportedOperationException.class, stream::toByteArray);
}
}

private static String makeErrorMessage(final long limit)
{
return StringUtils.format("Limit[%d] exceeded", limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.Cache.NamedKey;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.io.LimitedOutputStream;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -152,6 +153,8 @@ public void after(boolean isDone, Throwable thrown)
// The resultset identifier and its length is cached along with the resultset
resultLevelCachePopulator.populateResults();
log.debug("Cache population complete for query %s", query.getId());
} else { // thrown == null && !resultLevelCachePopulator.isShouldPopulate()
log.error("Failed (gracefully) to populate result level cache for query %s", query.getId());
}
resultLevelCachePopulator.stopPopulating();
}
Expand Down Expand Up @@ -233,8 +236,8 @@ private ResultLevelCachePopulator createResultLevelCachePopulator(
try {
// Save the resultSetId and its length
resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES)
.putInt(resultSetId.length())
.array());
.putInt(resultSetId.length())
.array());
resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId));
}
catch (IOException ioe) {
Expand All @@ -255,7 +258,7 @@ private class ResultLevelCachePopulator
private final Cache.NamedKey key;
private final CacheConfig cacheConfig;
@Nullable
private ByteArrayOutputStream cacheObjectStream;
private LimitedOutputStream cacheObjectStream;

private ResultLevelCachePopulator(
Cache cache,
Expand All @@ -270,7 +273,14 @@ private ResultLevelCachePopulator(
this.serialiers = mapper.getSerializerProviderInstance();
this.key = key;
this.cacheConfig = cacheConfig;
this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null;
this.cacheObjectStream = shouldPopulate ? new LimitedOutputStream(
new ByteArrayOutputStream(),
cacheConfig.getResultLevelCacheLimit(), limit -> StringUtils.format(
"resultLevelCacheLimit[%,d] exceeded. "
+ "Max ResultLevelCacheLimit for cache exceeded. Result caching failed.",
limit
)
) : null;
}

boolean isShouldPopulate()
Expand All @@ -289,12 +299,8 @@ private void cacheResultEntry(
)
{
Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream");
int cacheLimit = cacheConfig.getResultLevelCacheLimit();
try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) {
JacksonUtils.writeObjectUsingSerializerProvider(gen, serialiers, cacheFn.apply(resultEntry));
if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) {
stopPopulating();
}
}
catch (IOException ex) {
log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!");
Expand Down
Loading

0 comments on commit e37d3ef

Please sign in to comment.