From bad35803304ec10ab344aa2d1da25dad9426bbbd Mon Sep 17 00:00:00 2001 From: Niveathika Date: Tue, 10 Sep 2019 17:41:42 +0530 Subject: [PATCH 1/5] Fix NPE when stream name is use to reference attributes in agg query --- .../AggregationExpressionVisitor.java | 13 ++- .../core/aggregation/AggregationRuntime.java | 12 ++- ...SelectOptimisationAggregationTestCase.java | 88 +++++++++++++++++++ 3 files changed, 106 insertions(+), 7 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java index 423fa93901..a63c6979e7 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java @@ -35,14 +35,16 @@ public class AggregationExpressionVisitor extends BaseExpressionVisitor { private Stack conditionOperands; private String inputStreamRefId; + private boolean isAggregationReferenced; private List tableAttributesNameList; private List allAttributesList; - AggregationExpressionVisitor(String inputStreamRefId, List inputStreamAttributesList, - List tableAttributesNameList) { + AggregationExpressionVisitor(String inputStreamRefId, boolean isAggregationReferenced, + List inputStreamAttributesList, List tableAttributesNameList) { this.conditionOperands = new Stack<>(); this.inputStreamRefId = inputStreamRefId; + this.isAggregationReferenced = isAggregationReferenced; this.tableAttributesNameList = tableAttributesNameList; this.allAttributesList = inputStreamAttributesList.stream() .map(Attribute::getName) @@ -240,7 +242,12 @@ public void addVariableExpression(Expression expression) { if (streamId.equals(inputStreamRefId)) { this.conditionOperands.push(expression); } else if (this.tableAttributesNameList.contains(variable.getAttributeName())) { - this.conditionOperands.push(expression); + if (this.isAggregationReferenced) { + this.conditionOperands.push(expression); + } else { + Variable tableVariable = new Variable(((Variable) expression).getAttributeName()); + this.conditionOperands.push(tableVariable); + } } else { this.conditionOperands.push("true"); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index 63bcd2710b..d8a08dadf5 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -178,7 +178,12 @@ private static MetaStreamEvent alterMetaStreamEvent(boolean isStoreQuery, MetaSt additionalAttributes.forEach(attribute -> alteredStreamDef.attribute(attribute.getName(), attribute.getType())); - initMetaStreamEvent(originalMetaStreamEvent, alteredStreamDef, originalMetaStreamEvent.getInputReferenceId()); + String inputReferenceId = originalMetaStreamEvent.getInputReferenceId(); + if (!isStoreQuery && inputReferenceId == null) { + alteredStreamDef.setId(originalMetaStreamEvent.getLastInputDefinition().getId()); + } + + initMetaStreamEvent(originalMetaStreamEvent, alteredStreamDef, inputReferenceId); return originalMetaStreamEvent; } @@ -297,8 +302,7 @@ public CompiledCondition compileExpression(Expression expression, Within within, // Create new MatchingMetaInfoHolder containing newMetaStreamEventWithStartEnd and table meta event String aggReferenceId = matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvent(1).getInputReferenceId(); - MetaStreamEvent metaStoreEventForTableLookups = createMetaStoreEvent(tableDefinition, - aggReferenceId); + MetaStreamEvent metaStoreEventForTableLookups = createMetaStoreEvent(tableDefinition, aggReferenceId); // Create new MatchingMetaInfoHolder containing metaStreamEventForTableLookups and table meta event MatchingMetaInfoHolder metaInfoHolderForTableLookups = createNewStreamTableMetaInfoHolder( @@ -394,7 +398,7 @@ public CompiledCondition compileExpression(Expression expression, Within within, if (!(expression instanceof BoolConstant)) { AggregationExpressionBuilder aggregationExpressionBuilder = new AggregationExpressionBuilder(expression); AggregationExpressionVisitor expressionVisitor = new AggregationExpressionVisitor( - metaStreamEventForTableLookups.getInputReferenceId(), + metaStreamEventForTableLookups.getInputReferenceId(), aggReferenceId != null, metaStreamEventForTableLookups.getLastInputDefinition().getAttributeList(), this.tableAttributesNameList ); diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java index 728197a4c1..c38c3c23bb 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java @@ -1116,4 +1116,92 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { } } + @Test(dependsOnMethods = {"aggregationFunctionTestcase12"}) + public void aggregationFunctionTestcase13() throws InterruptedException { + LOG.info("Use stream name or reference for attributes "); + SiddhiManager siddhiManager = new SiddhiManager(); + + String stockStream = + "define stream stockStream (symbol string, price float, lastClosingPrice float, volume long , " + + "quantity int, timestamp long);"; + String query = + "define aggregation stockAggregation " + + "from stockStream " + + "select symbol, avg(price) as avgPrice, sum(price) as totalPrice, (price * quantity) as " + + "lastTradeValue, count() as count " + + "group by symbol " + + "aggregate by timestamp every sec...year ;" + + + "define stream inputStream (symbol string, value int, startTime string, " + + "endTime string, perValue string); " + + + "@info(name = 'query1') " + + "from inputStream join stockAggregation " + + "on inputStream.symbol == stockAggregation.symbol " + + "within \"2017-06-01 04:05:**\" " + + "per \"seconds\" " + + "select AGG_TIMESTAMP, stockAggregation.avgPrice, totalPrice, lastTradeValue, count " + + "order by AGG_TIMESTAMP " + + "insert all events into outputStream; "; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(stockStream + query); + + try { + siddhiAppRuntime.addCallback("query1", new QueryCallback() { + @Override + public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { + if (inEvents != null) { + EventPrinter.print(timestamp, inEvents, removeEvents); + for (Event event : inEvents) { + inEventsList.add(event.getData()); + inEventCount.incrementAndGet(); + } + eventArrived = true; + } + eventArrived = true; + } + }); + InputHandler stockStreamInputHandler = siddhiAppRuntime.getInputHandler("stockStream"); + InputHandler inputStreamInputHandler = siddhiAppRuntime.getInputHandler("inputStream"); + siddhiAppRuntime.start(); + + // Thursday, June 1, 2017 4:05:50 AM + stockStreamInputHandler.send(new Object[]{"WSO2", 50f, 60f, 90L, 6, 1496289950000L}); + stockStreamInputHandler.send(new Object[]{"WSO2", 70f, null, 40L, 10, 1496289950000L}); + + // Thursday, June 1, 2017 4:05:49 AM + stockStreamInputHandler.send(new Object[]{"WSO2", 60f, 44f, 200L, 56, 1496289949000L}); + stockStreamInputHandler.send(new Object[]{"WSO2", 100f, null, 200L, 16, 1496289949000L}); + + // Thursday, June 1, 2017 4:05:48 AM + stockStreamInputHandler.send(new Object[]{"IBM", 100f, null, 200L, 26, 1496289948000L}); + stockStreamInputHandler.send(new Object[]{"IBM", 100f, null, 200L, 96, 1496289948000L}); + + // Thursday, June 1, 2017 4:05:47 AM + stockStreamInputHandler.send(new Object[]{"IBM", 900f, null, 200L, 60, 1496289947000L}); + stockStreamInputHandler.send(new Object[]{"IBM", 500f, null, 200L, 7, 1496289947000L}); + + // Thursday, June 1, 2017 4:05:46 AM + stockStreamInputHandler.send(new Object[]{"IBM", 400f, null, 200L, 9, 1496289946000L}); + + Thread.sleep(2000); + + inputStreamInputHandler.send(new Object[]{"IBM", 1, "2017-06-01 09:35:51 +05:30", + "2017-06-01 09:35:52 +05:30", "seconds"}); + Thread.sleep(100); + + List expected = Arrays.asList( + new Object[]{1496289946000L, 400.0, 400.0, 3600f, 1L}, + new Object[]{1496289947000L, 700.0, 1400.0, 3500f, 2L}, + new Object[]{1496289948000L, 100.0, 200.0, 9600f, 2L} + ); + SiddhiTestHelper.waitForEvents(100, 3, inEventCount, 60000); + AssertJUnit.assertTrue("Event arrived", eventArrived); + AssertJUnit.assertEquals("Number of success events", 3, inEventCount.get()); + AssertJUnit.assertTrue("In events matched", SiddhiTestHelper.isUnsortedEventsMatch(inEventsList, expected)); + } finally { + siddhiAppRuntime.shutdown(); + } + } + } From 278278aa33fe18725cd1f6a567f62feaf033cb00 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Tue, 10 Sep 2019 20:34:15 +0530 Subject: [PATCH 2/5] Fix NPE when agg name is use to reference attributes in agg query --- .../AggregationExpressionVisitor.java | 11 ++------- .../core/aggregation/AggregationRuntime.java | 24 ++++++++++--------- ...SelectOptimisationAggregationTestCase.java | 5 ++-- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java index a63c6979e7..2304c1b157 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationExpressionVisitor.java @@ -35,16 +35,14 @@ public class AggregationExpressionVisitor extends BaseExpressionVisitor { private Stack conditionOperands; private String inputStreamRefId; - private boolean isAggregationReferenced; private List tableAttributesNameList; private List allAttributesList; - AggregationExpressionVisitor(String inputStreamRefId, boolean isAggregationReferenced, + AggregationExpressionVisitor(String inputStreamRefId, List inputStreamAttributesList, List tableAttributesNameList) { this.conditionOperands = new Stack<>(); this.inputStreamRefId = inputStreamRefId; - this.isAggregationReferenced = isAggregationReferenced; this.tableAttributesNameList = tableAttributesNameList; this.allAttributesList = inputStreamAttributesList.stream() .map(Attribute::getName) @@ -242,12 +240,7 @@ public void addVariableExpression(Expression expression) { if (streamId.equals(inputStreamRefId)) { this.conditionOperands.push(expression); } else if (this.tableAttributesNameList.contains(variable.getAttributeName())) { - if (this.isAggregationReferenced) { - this.conditionOperands.push(expression); - } else { - Variable tableVariable = new Variable(((Variable) expression).getAttributeName()); - this.conditionOperands.push(tableVariable); - } + this.conditionOperands.push(expression); } else { this.conditionOperands.push("true"); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index d8a08dadf5..185df175e1 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -302,7 +302,9 @@ public CompiledCondition compileExpression(Expression expression, Within within, // Create new MatchingMetaInfoHolder containing newMetaStreamEventWithStartEnd and table meta event String aggReferenceId = matchingMetaInfoHolder.getMetaStateEvent().getMetaStreamEvent(1).getInputReferenceId(); - MetaStreamEvent metaStoreEventForTableLookups = createMetaStoreEvent(tableDefinition, aggReferenceId); + String referenceName = aggReferenceId == null ? aggregationName : aggReferenceId; + + MetaStreamEvent metaStoreEventForTableLookups = createMetaStoreEvent(tableDefinition, referenceName); // Create new MatchingMetaInfoHolder containing metaStreamEventForTableLookups and table meta event MatchingMetaInfoHolder metaInfoHolderForTableLookups = createNewStreamTableMetaInfoHolder( @@ -396,9 +398,10 @@ public CompiledCondition compileExpression(Expression expression, Within within, //Check if there is no on conditions if (!(expression instanceof BoolConstant)) { + // For abstract queryable table AggregationExpressionBuilder aggregationExpressionBuilder = new AggregationExpressionBuilder(expression); AggregationExpressionVisitor expressionVisitor = new AggregationExpressionVisitor( - metaStreamEventForTableLookups.getInputReferenceId(), aggReferenceId != null, + metaStreamEventForTableLookups.getInputReferenceId(), metaStreamEventForTableLookups.getLastInputDefinition().getAttributeList(), this.tableAttributesNameList ); @@ -469,15 +472,14 @@ public CompiledCondition compileExpression(Expression expression, Within within, } else { selectorList = defaultSelectorList; } - if (aggReferenceId != null) { - for (OutputAttribute outputAttribute : selectorList) { - if (outputAttribute.getExpression() instanceof Variable) { - ((Variable) outputAttribute.getExpression()).setStreamId(aggReferenceId); - } else { - for (Expression parameter : - ((AttributeFunction) outputAttribute.getExpression()).getParameters()) { - ((Variable) parameter).setStreamId(aggReferenceId); - } + + for (OutputAttribute outputAttribute : selectorList) { + if (outputAttribute.getExpression() instanceof Variable) { + ((Variable) outputAttribute.getExpression()).setStreamId(referenceName); + } else { + for (Expression parameter : + ((AttributeFunction) outputAttribute.getExpression()).getParameters()) { + ((Variable) parameter).setStreamId(referenceName); } } } diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java index c38c3c23bb..59e4814194 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java @@ -1116,7 +1116,8 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { } } - @Test(dependsOnMethods = {"aggregationFunctionTestcase12"}) + @Test + //(dependsOnMethods = {"aggregationFunctionTestcase12"}) public void aggregationFunctionTestcase13() throws InterruptedException { LOG.info("Use stream name or reference for attributes "); SiddhiManager siddhiManager = new SiddhiManager(); @@ -1140,7 +1141,7 @@ public void aggregationFunctionTestcase13() throws InterruptedException { "on inputStream.symbol == stockAggregation.symbol " + "within \"2017-06-01 04:05:**\" " + "per \"seconds\" " + - "select AGG_TIMESTAMP, stockAggregation.avgPrice, totalPrice, lastTradeValue, count " + + "select AGG_TIMESTAMP, avgPrice, totalPrice, lastTradeValue, count " + "order by AGG_TIMESTAMP " + "insert all events into outputStream; "; From 1110829e15ffb3547870646f71a73969c00b80ae Mon Sep 17 00:00:00 2001 From: Niveathika Date: Tue, 10 Sep 2019 21:00:09 +0530 Subject: [PATCH 3/5] Update test case --- .../core/aggregation/AggregationRuntime.java | 9 ++++---- ...SelectOptimisationAggregationTestCase.java | 23 ++++++++----------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index 185df175e1..6f842f6cbb 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -445,11 +445,11 @@ public CompiledCondition compileExpression(Expression expression, Within within, } for (Variable queryGroupBy : queryGroupByList) { String referenceId = queryGroupBy.getStreamId(); - if (aggReferenceId == null) { + if (referenceId == null) { if (tableAttributesNameList.contains(queryGroupBy.getAttributeName())) { groupByList.add(queryGroupBy); } - } else if (aggReferenceId.equalsIgnoreCase(referenceId)) { + } else if (referenceId.equalsIgnoreCase(referenceName)) { groupByList.add(queryGroupBy); } } @@ -459,9 +459,8 @@ public CompiledCondition compileExpression(Expression expression, Within within, } } - if (aggReferenceId != null) { - groupByList.forEach((groupBy) -> groupBy.setStreamId(aggReferenceId)); - } + groupByList.forEach((groupBy) -> groupBy.setStreamId(referenceName)); + selector.addGroupByList(groupByList); List selectorList; diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java index 59e4814194..a60582ec14 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java @@ -1116,8 +1116,7 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { } } - @Test - //(dependsOnMethods = {"aggregationFunctionTestcase12"}) + @Test(dependsOnMethods = {"aggregationFunctionTestcase12"}) public void aggregationFunctionTestcase13() throws InterruptedException { LOG.info("Use stream name or reference for attributes "); SiddhiManager siddhiManager = new SiddhiManager(); @@ -1137,11 +1136,12 @@ public void aggregationFunctionTestcase13() throws InterruptedException { "endTime string, perValue string); " + "@info(name = 'query1') " + - "from inputStream join stockAggregation " + - "on inputStream.symbol == stockAggregation.symbol " + + "from inputStream join stockAggregation as s " + + "on inputStream.symbol == s.symbol " + "within \"2017-06-01 04:05:**\" " + "per \"seconds\" " + - "select AGG_TIMESTAMP, avgPrice, totalPrice, lastTradeValue, count " + + "select s.symbol, sum(totalPrice) as totalPrice " + + "group by s.symbol " + "order by AGG_TIMESTAMP " + "insert all events into outputStream; "; @@ -1191,15 +1191,12 @@ public void receive(long timestamp, Event[] inEvents, Event[] removeEvents) { "2017-06-01 09:35:52 +05:30", "seconds"}); Thread.sleep(100); - List expected = Arrays.asList( - new Object[]{1496289946000L, 400.0, 400.0, 3600f, 1L}, - new Object[]{1496289947000L, 700.0, 1400.0, 3500f, 2L}, - new Object[]{1496289948000L, 100.0, 200.0, 9600f, 2L} - ); - SiddhiTestHelper.waitForEvents(100, 3, inEventCount, 60000); + Object[] expected = new Object[]{"IBM", 2000.0}; + SiddhiTestHelper.waitForEvents(100, 1, inEventCount, 60000); AssertJUnit.assertTrue("Event arrived", eventArrived); - AssertJUnit.assertEquals("Number of success events", 3, inEventCount.get()); - AssertJUnit.assertTrue("In events matched", SiddhiTestHelper.isUnsortedEventsMatch(inEventsList, expected)); + AssertJUnit.assertEquals("Number of success events", 1, inEventCount.get()); + AssertJUnit.assertEquals("In events matched", "IBM", inEventsList.get(0)[0]); + AssertJUnit.assertEquals("In events matched", 2000.0, inEventsList.get(0)[1]); } finally { siddhiAppRuntime.shutdown(); } From 7bf093ef4732464ee7489ccb209e677d8f121df1 Mon Sep 17 00:00:00 2001 From: Niveathika Date: Wed, 11 Sep 2019 17:25:59 +0530 Subject: [PATCH 4/5] Hardcode stream name if is on demand query --- .../siddhi/core/aggregation/AggregationRuntime.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index 6f842f6cbb..4f445ca51d 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -169,20 +169,22 @@ private static MetaStreamEvent alterMetaStreamEvent(boolean isStoreQuery, MetaSt List additionalAttributes) { StreamDefinition alteredStreamDef = new StreamDefinition(); + String inputReferenceId = originalMetaStreamEvent.getInputReferenceId(); if (!isStoreQuery) { for (Attribute attribute : originalMetaStreamEvent.getLastInputDefinition().getAttributeList()) { alteredStreamDef.attribute(attribute.getName(), attribute.getType()); } + if (inputReferenceId == null) { + alteredStreamDef.setId(originalMetaStreamEvent.getLastInputDefinition().getId()); + } + } else { + // If it is store query, no original join stream + alteredStreamDef.setId("storeQueryStream"); } additionalAttributes.forEach(attribute -> alteredStreamDef.attribute(attribute.getName(), attribute.getType())); - String inputReferenceId = originalMetaStreamEvent.getInputReferenceId(); - if (!isStoreQuery && inputReferenceId == null) { - alteredStreamDef.setId(originalMetaStreamEvent.getLastInputDefinition().getId()); - } - initMetaStreamEvent(originalMetaStreamEvent, alteredStreamDef, inputReferenceId); return originalMetaStreamEvent; } From 0f8e66752f5f134dfbba89ac548ccb817e388a5e Mon Sep 17 00:00:00 2001 From: Niveathika Date: Thu, 12 Sep 2019 10:44:42 +0530 Subject: [PATCH 5/5] update testcase to use name of aggrgeation --- .../SelectOptimisationAggregationTestCase.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java index a60582ec14..f0cbe7432e 100644 --- a/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java +++ b/modules/siddhi-core/src/test/java/io/siddhi/core/aggregation/SelectOptimisationAggregationTestCase.java @@ -1136,12 +1136,12 @@ public void aggregationFunctionTestcase13() throws InterruptedException { "endTime string, perValue string); " + "@info(name = 'query1') " + - "from inputStream join stockAggregation as s " + - "on inputStream.symbol == s.symbol " + + "from inputStream join stockAggregation " + + "on inputStream.symbol == stockAggregation.symbol " + "within \"2017-06-01 04:05:**\" " + "per \"seconds\" " + - "select s.symbol, sum(totalPrice) as totalPrice " + - "group by s.symbol " + + "select stockAggregation.symbol, sum(totalPrice) as totalPrice " + + "group by stockAggregation.symbol " + "order by AGG_TIMESTAMP " + "insert all events into outputStream; ";