Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPE when stream name is use to reference attributes in agg query #1497

Merged
merged 6 commits into from
Sep 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public class AggregationExpressionVisitor extends BaseExpressionVisitor {
private List<String> allAttributesList;


AggregationExpressionVisitor(String inputStreamRefId, List<Attribute> inputStreamAttributesList,
List<String> tableAttributesNameList) {
AggregationExpressionVisitor(String inputStreamRefId,
List<Attribute> inputStreamAttributesList, List<String> tableAttributesNameList) {
this.conditionOperands = new Stack<>();
this.inputStreamRefId = inputStreamRefId;
this.tableAttributesNameList = tableAttributesNameList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,23 @@ private static MetaStreamEvent alterMetaStreamEvent(boolean isStoreQuery, MetaSt
List<Attribute> additionalAttributes) {

StreamDefinition alteredStreamDef = new StreamDefinition();
String inputReferenceId = originalMetaStreamEvent.getInputReferenceId();

if (!isStoreQuery) {
for (Attribute attribute : originalMetaStreamEvent.getLastInputDefinition().getAttributeList()) {
alteredStreamDef.attribute(attribute.getName(), attribute.getType());
}
if (inputReferenceId == null) {
alteredStreamDef.setId(originalMetaStreamEvent.getLastInputDefinition().getId());
}
} else {
// If it is store query, no original join stream
alteredStreamDef.setId("storeQueryStream");
}

additionalAttributes.forEach(attribute -> alteredStreamDef.attribute(attribute.getName(), attribute.getType()));

initMetaStreamEvent(originalMetaStreamEvent, alteredStreamDef, originalMetaStreamEvent.getInputReferenceId());
initMetaStreamEvent(originalMetaStreamEvent, alteredStreamDef, inputReferenceId);
return originalMetaStreamEvent;
}

Expand Down Expand Up @@ -297,8 +304,9 @@ public CompiledCondition compileExpression(Expression expression, Within within,

// Create new MatchingMetaInfoHolder containing newMetaStreamEventWithStartEnd and table meta event
String aggReferenceId = matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvent(1).getInputReferenceId();
MetaStreamEvent metaStoreEventForTableLookups = createMetaStoreEvent(tableDefinition,
aggReferenceId);
String referenceName = aggReferenceId == null ? aggregationName : aggReferenceId;

MetaStreamEvent metaStoreEventForTableLookups = createMetaStoreEvent(tableDefinition, referenceName);

// Create new MatchingMetaInfoHolder containing metaStreamEventForTableLookups and table meta event
MatchingMetaInfoHolder metaInfoHolderForTableLookups = createNewStreamTableMetaInfoHolder(
Expand Down Expand Up @@ -392,6 +400,7 @@ public CompiledCondition compileExpression(Expression expression, Within within,

//Check if there is no on conditions
if (!(expression instanceof BoolConstant)) {
// For abstract queryable table
AggregationExpressionBuilder aggregationExpressionBuilder = new AggregationExpressionBuilder(expression);
AggregationExpressionVisitor expressionVisitor = new AggregationExpressionVisitor(
metaStreamEventForTableLookups.getInputReferenceId(),
Expand Down Expand Up @@ -438,11 +447,11 @@ public CompiledCondition compileExpression(Expression expression, Within within,
}
for (Variable queryGroupBy : queryGroupByList) {
String referenceId = queryGroupBy.getStreamId();
if (aggReferenceId == null) {
if (referenceId == null) {
if (tableAttributesNameList.contains(queryGroupBy.getAttributeName())) {
groupByList.add(queryGroupBy);
}
} else if (aggReferenceId.equalsIgnoreCase(referenceId)) {
} else if (referenceId.equalsIgnoreCase(referenceName)) {
groupByList.add(queryGroupBy);
}
}
Expand All @@ -452,9 +461,8 @@ public CompiledCondition compileExpression(Expression expression, Within within,
}
}

if (aggReferenceId != null) {
groupByList.forEach((groupBy) -> groupBy.setStreamId(aggReferenceId));
}
groupByList.forEach((groupBy) -> groupBy.setStreamId(referenceName));

selector.addGroupByList(groupByList);

List<OutputAttribute> selectorList;
Expand All @@ -465,15 +473,14 @@ public CompiledCondition compileExpression(Expression expression, Within within,
} else {
selectorList = defaultSelectorList;
}
if (aggReferenceId != null) {
for (OutputAttribute outputAttribute : selectorList) {
if (outputAttribute.getExpression() instanceof Variable) {
((Variable) outputAttribute.getExpression()).setStreamId(aggReferenceId);
} else {
for (Expression parameter :
((AttributeFunction) outputAttribute.getExpression()).getParameters()) {
((Variable) parameter).setStreamId(aggReferenceId);
}

for (OutputAttribute outputAttribute : selectorList) {
if (outputAttribute.getExpression() instanceof Variable) {
((Variable) outputAttribute.getExpression()).setStreamId(referenceName);
} else {
for (Expression parameter :
((AttributeFunction) outputAttribute.getExpression()).getParameters()) {
((Variable) parameter).setStreamId(referenceName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,4 +1116,90 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
}
}

@Test(dependsOnMethods = {"aggregationFunctionTestcase12"})
public void aggregationFunctionTestcase13() throws InterruptedException {
LOG.info("Use stream name or reference for attributes ");
SiddhiManager siddhiManager = new SiddhiManager();

String stockStream =
"define stream stockStream (symbol string, price float, lastClosingPrice float, volume long , " +
"quantity int, timestamp long);";
String query =
"define aggregation stockAggregation " +
"from stockStream " +
"select symbol, avg(price) as avgPrice, sum(price) as totalPrice, (price * quantity) as " +
"lastTradeValue, count() as count " +
"group by symbol " +
"aggregate by timestamp every sec...year ;" +

"define stream inputStream (symbol string, value int, startTime string, " +
"endTime string, perValue string); " +

"@info(name = 'query1') " +
"from inputStream join stockAggregation " +
"on inputStream.symbol == stockAggregation.symbol " +
"within \"2017-06-01 04:05:**\" " +
"per \"seconds\" " +
"select stockAggregation.symbol, sum(totalPrice) as totalPrice " +
"group by stockAggregation.symbol " +
"order by AGG_TIMESTAMP " +
"insert all events into outputStream; ";

SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(stockStream + query);

try {
siddhiAppRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) {
if (inEvents != null) {
EventPrinter.print(timestamp, inEvents, removeEvents);
for (Event event : inEvents) {
inEventsList.add(event.getData());
inEventCount.incrementAndGet();
}
eventArrived = true;
}
eventArrived = true;
}
});
InputHandler stockStreamInputHandler = siddhiAppRuntime.getInputHandler("stockStream");
InputHandler inputStreamInputHandler = siddhiAppRuntime.getInputHandler("inputStream");
siddhiAppRuntime.start();

// Thursday, June 1, 2017 4:05:50 AM
stockStreamInputHandler.send(new Object[]{"WSO2", 50f, 60f, 90L, 6, 1496289950000L});
stockStreamInputHandler.send(new Object[]{"WSO2", 70f, null, 40L, 10, 1496289950000L});

// Thursday, June 1, 2017 4:05:49 AM
stockStreamInputHandler.send(new Object[]{"WSO2", 60f, 44f, 200L, 56, 1496289949000L});
stockStreamInputHandler.send(new Object[]{"WSO2", 100f, null, 200L, 16, 1496289949000L});

// Thursday, June 1, 2017 4:05:48 AM
stockStreamInputHandler.send(new Object[]{"IBM", 100f, null, 200L, 26, 1496289948000L});
stockStreamInputHandler.send(new Object[]{"IBM", 100f, null, 200L, 96, 1496289948000L});

// Thursday, June 1, 2017 4:05:47 AM
stockStreamInputHandler.send(new Object[]{"IBM", 900f, null, 200L, 60, 1496289947000L});
stockStreamInputHandler.send(new Object[]{"IBM", 500f, null, 200L, 7, 1496289947000L});

// Thursday, June 1, 2017 4:05:46 AM
stockStreamInputHandler.send(new Object[]{"IBM", 400f, null, 200L, 9, 1496289946000L});

Thread.sleep(2000);

inputStreamInputHandler.send(new Object[]{"IBM", 1, "2017-06-01 09:35:51 +05:30",
"2017-06-01 09:35:52 +05:30", "seconds"});
Thread.sleep(100);

Object[] expected = new Object[]{"IBM", 2000.0};
SiddhiTestHelper.waitForEvents(100, 1, inEventCount, 60000);
AssertJUnit.assertTrue("Event arrived", eventArrived);
AssertJUnit.assertEquals("Number of success events", 1, inEventCount.get());
AssertJUnit.assertEquals("In events matched", "IBM", inEventsList.get(0)[0]);
AssertJUnit.assertEquals("In events matched", 2000.0, inEventsList.get(0)[1]);
} finally {
siddhiAppRuntime.shutdown();
}
}

}