From cf0937356eb136da079cdc72ed8fd9cd5b64c61b Mon Sep 17 00:00:00 2001 From: Vikas Gupta Date: Wed, 14 Jun 2023 19:35:53 +0530 Subject: [PATCH 1/2] filter methods in zframe --- .../main/java/zingg/common/client/ZFrame.java | 7 ++++++ .../java/zingg/spark/client/SparkFrame.java | 18 ++++++++++++++- .../java/zingg/client/TestSparkFrame.java | 22 +++++++++++++++++++ .../java/zingg/client/TestSparkFrameBase.java | 17 ++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index ccc7417fb..f9a279c6d 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -138,5 +138,12 @@ public interface ZFrame { public FieldData[] fields(); public Object getMaxVal(String colName); + + public ZFrame filterInCond(String colName,ZFrame innerDF, String innerDFCol); + + public ZFrame filterNotNullCond(String colName); + + public ZFrame filterNullCond(String colName); + } \ No newline at end of file diff --git a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java index f159b452a..4bdd03112 100644 --- a/spark/client/src/main/java/zingg/spark/client/SparkFrame.java +++ b/spark/client/src/main/java/zingg/spark/client/SparkFrame.java @@ -354,5 +354,21 @@ public Object getMaxVal(String colName) { Row r = df.agg(functions.max(colName)).head(); return r.get(0); } - + + @Override + public ZFrame, Row, Column> filterInCond(String colName,ZFrame, Row, Column> innerDF, String innerDFCol) { + ZFrame, Row, Column> innerDF2 = innerDF.select(innerDF.col(innerDFCol).alias(colName)); + return this.joinOnCol(innerDF2, colName); + } + + @Override + public ZFrame, Row, Column> filterNotNullCond(String colName) { + return this.filter(df.col(colName).isNotNull()); + } + + @Override + public ZFrame, Row, Column> filterNullCond(String colName) { + return this.filter(df.col(colName).isNull()); + } + } \ No newline at end of file diff --git a/spark/client/src/test/java/zingg/client/TestSparkFrame.java b/spark/client/src/test/java/zingg/client/TestSparkFrame.java index 3b060596b..aedef7765 100644 --- a/spark/client/src/test/java/zingg/client/TestSparkFrame.java +++ b/spark/client/src/test/java/zingg/client/TestSparkFrame.java @@ -290,5 +290,27 @@ public void testRightJoinMultiCol(){ assertEquals(10,joinedData.count()); } + @Test + public void testFilterInCond(){ + SparkFrame inpData = getInputData(); + SparkFrame clusterData = getClusterDataWithNull(); + ZFrame, Row, Column> filteredData = inpData.filterInCond(ColName.ID_COL, clusterData, ColName.COL_PREFIX+ ColName.ID_COL); + assertEquals(5,filteredData.count()); + } + + @Test + public void testFilterNotNullCond(){ + SparkFrame clusterData = getClusterDataWithNull(); + ZFrame, Row, Column> filteredData = clusterData.filterNotNullCond(ColName.SOURCE_COL); + assertEquals(3,filteredData.count()); + } + + @Test + public void testFilterNullCond(){ + SparkFrame clusterData = getClusterDataWithNull(); + ZFrame, Row, Column> filteredData = clusterData.filterNullCond(ColName.SOURCE_COL); + assertEquals(2,filteredData.count()); + } + } \ No newline at end of file diff --git a/spark/client/src/test/java/zingg/client/TestSparkFrameBase.java b/spark/client/src/test/java/zingg/client/TestSparkFrameBase.java index 30d3b4b0a..6bd4ec3f0 100644 --- a/spark/client/src/test/java/zingg/client/TestSparkFrameBase.java +++ b/spark/client/src/test/java/zingg/client/TestSparkFrameBase.java @@ -185,6 +185,23 @@ protected SparkFrame getClusterData() { return df; } + protected SparkFrame getClusterDataWithNull() { + Row[] rows = { + RowFactory.create( 1,100,1001,"b"), + RowFactory.create( 2,100,1002,"a"), + RowFactory.create( 3,100,2001,null), + RowFactory.create( 4,900,2002,"c"), + RowFactory.create( 5,111,9002,null) + }; + StructType schema = new StructType(new StructField[] { + new StructField(ColName.COL_PREFIX+ ColName.ID_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.CLUSTER_COLUMN, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.SCORE_COL, DataTypes.IntegerType, false, Metadata.empty()), + new StructField(ColName.SOURCE_COL, DataTypes.StringType, true, Metadata.empty())}); + SparkFrame df = new SparkFrame(spark.createDataFrame(Arrays.asList(rows), schema)); + return df; + } + protected void assertTrueCheckingExceptOutput(ZFrame, Row, Column> sf1, ZFrame, Row, Column> sf2, String message) { assertTrue(sf1.except(sf2).isEmpty(), message); } From 454c55f9809014a1353f7a267cabf3a30d1b7f27 Mon Sep 17 00:00:00 2001 From: Vikas Gupta Date: Wed, 14 Jun 2023 22:39:15 +0530 Subject: [PATCH 2/2] constant for left join --- common/client/src/main/java/zingg/common/client/ZFrame.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/client/src/main/java/zingg/common/client/ZFrame.java b/common/client/src/main/java/zingg/common/client/ZFrame.java index f9a279c6d..b3976de89 100644 --- a/common/client/src/main/java/zingg/common/client/ZFrame.java +++ b/common/client/src/main/java/zingg/common/client/ZFrame.java @@ -6,7 +6,8 @@ public interface ZFrame { public static final String RIGHT_JOIN = "right"; - + public static final String LEFT_JOIN = "left"; + public ZFrame cache(); public ZFrame as(String s); public String[] columns();