diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index aeca0a2eab..7d24f311ca 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -75,6 +75,11 @@ slf4j-api ${slf4j.version} + + org.locationtech.jts + jts-core + ${jts.version} + com.carrotsearch diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java index 87d39bf16e..a7d192a090 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java @@ -18,7 +18,9 @@ */ package org.apache.parquet.column.statistics; +import org.apache.parquet.column.statistics.geometry.GeometryStatistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; @@ -30,6 +32,7 @@ public class BinaryStatistics extends Statistics { private Binary max; private Binary min; + private GeometryStatistics geometryStatistics = null; /** * @deprecated will be removed in 2.0.0. Use {@link Statistics#createStats(org.apache.parquet.schema.Type)} instead @@ -41,6 +44,10 @@ public BinaryStatistics() { BinaryStatistics(PrimitiveType type) { super(type); + LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + geometryStatistics = new GeometryStatistics(); + } } private BinaryStatistics(BinaryStatistics other) { @@ -49,6 +56,9 @@ private BinaryStatistics(BinaryStatistics other) { initializeStats(other.min, other.max); } setNumNulls(other.getNumNulls()); + if (other.geometryStatistics != null) { + geometryStatistics = other.geometryStatistics.copy(); + } } @Override @@ -62,6 +72,9 @@ public void updateStats(Binary value) { } else if (comparator().compare(max, value) < 0) { max = value.copy(); } + if (geometryStatistics != null) { + geometryStatistics.update(value); + } } @Override @@ -72,6 +85,9 @@ public void mergeStatisticsMinMax(Statistics stats) { } else { updateStats(binaryStats.getMin(), binaryStats.getMax()); } + if (geometryStatistics != null) { + geometryStatistics.merge(binaryStats.geometryStatistics); + } } /** @@ -190,4 +206,12 @@ public void setMinMax(Binary min, Binary max) { public BinaryStatistics copy() { return new BinaryStatistics(this); } + + public void setGeometryStatistics(GeometryStatistics geometryStatistics) { + this.geometryStatistics = geometryStatistics; + } + + public GeometryStatistics getGeometryStatistics() { + return geometryStatistics; + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java index 83070d49f1..f18bca9598 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/Statistics.java @@ -20,6 +20,7 @@ import java.util.Arrays; import org.apache.parquet.column.UnknownColumnTypeException; +import org.apache.parquet.column.statistics.geometry.GeometryStatistics; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.Float16; import org.apache.parquet.schema.LogicalTypeAnnotation; @@ -64,6 +65,10 @@ public Builder withNumNulls(long numNulls) { return this; } + public Builder withGeometryStatistics(GeometryStatistics geometryStatistics) { + throw new UnsupportedOperationException("Please use the GeometryBuilder"); + } + public Statistics build() { Statistics stats = createStats(type); if (min != null && max != null) { @@ -178,6 +183,30 @@ public Statistics build() { } } + // Builder for GEOMETRY type to handle GeometryStatistics + private static class GeometryBuilder extends Builder { + + private GeometryStatistics geometryStatistics; + + public GeometryBuilder(PrimitiveType type) { + super(type); + assert type.getPrimitiveTypeName() == PrimitiveTypeName.BINARY; + } + + @Override + public Builder withGeometryStatistics(GeometryStatistics geometryStatistics) { + this.geometryStatistics = geometryStatistics; + return this; + } + + @Override + public Statistics build() { + BinaryStatistics stats = (BinaryStatistics) super.build(); + stats.setGeometryStatistics(geometryStatistics); + return stats; + } + } + private final PrimitiveType type; private final PrimitiveComparator comparator; private boolean hasNonNullValue; @@ -269,6 +298,11 @@ public static Builder getBuilderForReading(PrimitiveType type) { if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) { return new Float16Builder(type); } + return new Builder(type); + case BINARY: + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + return new GeometryBuilder(type); + } default: return new Builder(type); } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java new file mode 100644 index 0000000000..e4dfcb136c --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/BoundingBox.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import org.apache.parquet.Preconditions; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; + +public class BoundingBox { + + private double xMin = Double.MAX_VALUE; + private double xMax = Double.MIN_VALUE; + private double yMin = Double.MAX_VALUE; + private double yMax = Double.MIN_VALUE; + private double zMin = Double.MAX_VALUE; + private double zMax = Double.MIN_VALUE; + private double mMin = Double.MAX_VALUE; + private double mMax = Double.MIN_VALUE; + + public BoundingBox( + double xMin, double xMax, double yMin, double yMax, double zMin, double zMax, double mMin, double mMax) { + this.xMin = xMin; + this.xMax = xMax; + this.yMin = yMin; + this.yMax = yMax; + this.zMin = zMin; + this.zMax = zMax; + this.mMin = mMin; + this.mMax = mMax; + } + + public BoundingBox() {} + + public double getXMin() { + return xMin; + } + + public double getXMax() { + return xMax; + } + + public double getYMin() { + return yMin; + } + + public double getYMax() { + return yMax; + } + + public double getZMin() { + return zMin; + } + + public double getZMax() { + return zMax; + } + + public double getMMin() { + return mMin; + } + + public double getMMax() { + return mMax; + } + + void update(Geometry geom) { + if (geom == null || geom.isEmpty()) { + return; + } + Coordinate[] coordinates = geom.getCoordinates(); + for (Coordinate coordinate : coordinates) { + update(coordinate.getX(), coordinate.getY(), coordinate.getZ(), coordinate.getM()); + } + } + + public void update(double x, double y, double z, double m) { + xMin = Math.min(xMin, x); + xMax = Math.max(xMax, x); + yMin = Math.min(yMin, y); + yMax = Math.max(yMax, y); + zMin = Math.min(zMin, z); + zMax = Math.max(zMax, z); + mMin = Math.min(mMin, m); + mMax = Math.max(mMax, m); + } + + public void merge(BoundingBox other) { + Preconditions.checkArgument(other != null, "Cannot merge with null bounding box"); + xMin = Math.min(xMin, other.xMin); + xMax = Math.max(xMax, other.xMax); + yMin = Math.min(yMin, other.yMin); + yMax = Math.max(yMax, other.yMax); + zMin = Math.min(zMin, other.zMin); + zMax = Math.max(zMax, other.zMax); + mMin = Math.min(mMin, other.mMin); + mMax = Math.max(mMax, other.mMax); + } + + public void reset() { + xMin = Double.MAX_VALUE; + xMax = Double.MIN_VALUE; + yMin = Double.MAX_VALUE; + yMax = Double.MIN_VALUE; + zMin = Double.MAX_VALUE; + zMax = Double.MIN_VALUE; + mMin = Double.MAX_VALUE; + mMax = Double.MIN_VALUE; + } + + public void abort() { + xMin = Double.NaN; + xMax = Double.NaN; + yMin = Double.NaN; + yMax = Double.NaN; + zMin = Double.NaN; + zMax = Double.NaN; + mMin = Double.NaN; + mMax = Double.NaN; + } + + public BoundingBox copy() { + return new BoundingBox(xMin, xMax, yMin, yMax, zMin, zMax, mMin, mMax); + } + + @Override + public String toString() { + return "BoundingBox{" + "xMin=" + + xMin + ", xMax=" + + xMax + ", yMin=" + + yMin + ", yMax=" + + yMax + ", zMin=" + + zMin + ", zMax=" + + zMax + ", mMin=" + + mMin + ", mMax=" + + mMax + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java new file mode 100644 index 0000000000..96fd412f19 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/Covering.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.nio.ByteBuffer; +import org.apache.parquet.Preconditions; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; + +public class Covering { + + protected final LogicalTypeAnnotation.Edges edges; + protected ByteBuffer geometry; + + public Covering(ByteBuffer geometry, LogicalTypeAnnotation.Edges edges) { + Preconditions.checkArgument(geometry != null, "Geometry cannot be null"); + Preconditions.checkArgument(edges != null, "Edges cannot be null"); + this.geometry = geometry; + this.edges = edges; + } + + public ByteBuffer getGeometry() { + return geometry; + } + + public LogicalTypeAnnotation.Edges getEdges() { + return edges; + } + + void update(Geometry geom) { + throw new UnsupportedOperationException( + "Update is not supported for " + this.getClass().getSimpleName()); + } + + public void merge(Covering other) { + throw new UnsupportedOperationException( + "Merge is not supported for " + this.getClass().getSimpleName()); + } + + public void reset() { + throw new UnsupportedOperationException( + "Reset is not supported for " + this.getClass().getSimpleName()); + } + + public void abort() { + throw new UnsupportedOperationException( + "Abort is not supported for " + this.getClass().getSimpleName()); + } + + public Covering copy() { + return new Covering(geometry.duplicate(), edges); + } + + @Override + public String toString() { + String geomText; + try { + geomText = new WKBReader().read(geometry.array()).toText(); + } catch (ParseException e) { + geomText = "Invalid Geometry"; + } + + return "Covering{" + "geometry=" + geomText + ", edges=" + edges + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java new file mode 100644 index 0000000000..c696b3235c --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/EnvelopeCovering.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.nio.ByteBuffer; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Envelope; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKBWriter; + +public class EnvelopeCovering extends Covering { + + private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]); + private final WKBReader reader = new WKBReader(); + private final WKBWriter writer = new WKBWriter(); + private final GeometryFactory factory = new GeometryFactory(); + + public EnvelopeCovering() { + super(EMPTY, LogicalTypeAnnotation.Edges.PLANAR); + } + + @Override + void update(Geometry geom) { + if (geometry == null) { + return; + } + try { + if (geometry != EMPTY) { + Envelope envelope = reader.read(geometry.array()).getEnvelopeInternal(); + envelope.expandToInclude(geom.getEnvelopeInternal()); + Geometry polygon = factory.createPolygon(new Coordinate[] { + new Coordinate(envelope.getMinX(), envelope.getMinY()), + new Coordinate(envelope.getMinX(), envelope.getMaxY()), + new Coordinate(envelope.getMaxX(), envelope.getMaxY()), + new Coordinate(envelope.getMaxX(), envelope.getMinY()), + new Coordinate(envelope.getMinX(), envelope.getMinY()) + }); + geometry = ByteBuffer.wrap(writer.write(polygon)); + } else { + geometry = ByteBuffer.wrap(writer.write(geom.getEnvelope())); + } + } catch (ParseException e) { + geometry = null; + } + } + + @Override + public void merge(Covering other) { + if (other instanceof EnvelopeCovering) { + try { + update(reader.read(other.geometry.array())); + } catch (ParseException e) { + geometry = null; + } + } else { + throw new UnsupportedOperationException("Cannot merge " + this.getClass() + " with " + + other.getClass().getSimpleName()); + } + } + + @Override + public void reset() { + geometry = EMPTY; + } + + @Override + public void abort() { + geometry = null; + } + + @Override + public EnvelopeCovering copy() { + EnvelopeCovering copy = new EnvelopeCovering(); + copy.geometry = geometry == null ? null : ByteBuffer.wrap(geometry.array()); + return copy; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryStatistics.java new file mode 100644 index 0000000000..9c5b05d67c --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryStatistics.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.io.api.Binary; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; + +public class GeometryStatistics { + + private final BoundingBox boundingBox; + private final Covering covering; + private final GeometryTypes geometryTypes; + private final WKBReader reader = new WKBReader(); + + public GeometryStatistics(BoundingBox boundingBox, Covering covering, GeometryTypes geometryTypes) { + this.boundingBox = boundingBox; + this.covering = covering; + this.geometryTypes = geometryTypes; + } + + public GeometryStatistics() { + this(new BoundingBox(), new EnvelopeCovering(), new GeometryTypes()); + } + + public BoundingBox getBoundingBox() { + return boundingBox; + } + + public Covering getCovering() { + return covering; + } + + public GeometryTypes getGeometryTypes() { + return geometryTypes; + } + + public void update(Binary value) { + if (value == null) { + return; + } + try { + Geometry geom = reader.read(value.getBytes()); + update(geom); + } catch (ParseException e) { + abort(); + } + } + + private void update(Geometry geom) { + boundingBox.update(geom); + covering.update(geom); + geometryTypes.update(geom); + } + + public void merge(GeometryStatistics other) { + Preconditions.checkArgument(other != null, "Cannot merge with null GeometryStatistics"); + boundingBox.merge(other.boundingBox); + covering.merge(other.covering); + geometryTypes.merge(other.geometryTypes); + } + + public void reset() { + boundingBox.reset(); + covering.reset(); + geometryTypes.reset(); + } + + public void abort() { + boundingBox.abort(); + covering.abort(); + geometryTypes.abort(); + } + + public GeometryStatistics copy() { + return new GeometryStatistics(boundingBox.copy(), covering.copy(), geometryTypes.copy()); + } + + @Override + public String toString() { + return "GeometryStatistics{" + "boundingBox=" + + boundingBox + ", covering=" + + covering + ", geometryTypes=" + + geometryTypes + '}'; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java new file mode 100644 index 0000000000..52a26c358c --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/geometry/GeometryTypes.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.statistics.geometry; + +import java.util.HashSet; +import java.util.Set; +import org.apache.parquet.Preconditions; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.Geometry; + +public class GeometryTypes { + + private static final int UNKNOWN_TYPE_ID = -1; + private Set types = new HashSet<>(); + private boolean valid = true; + + public GeometryTypes(Set types) { + this.types = types; + } + + public GeometryTypes() {} + + public Set getTypes() { + return types; + } + + void update(Geometry geometry) { + if (!valid) { + return; + } + int code = getGeometryTypeCode(geometry); + if (code != UNKNOWN_TYPE_ID) { + types.add(code); + } else { + valid = false; + types.clear(); + } + } + + public void merge(GeometryTypes other) { + Preconditions.checkArgument(other != null, "Cannot merge with null GeometryTypes"); + if (!valid) { + return; + } + if (!other.valid) { + valid = false; + types.clear(); + return; + } + types.addAll(other.types); + } + + public void reset() { + types.clear(); + valid = true; + } + + public void abort() { + valid = false; + types.clear(); + } + + public GeometryTypes copy() { + return new GeometryTypes(new HashSet<>(types)); + } + + @Override + public String toString() { + // TODO: Print the geometry types as strings + return "GeometryTypes{" + "types=" + types + '}'; + } + + private int getGeometryTypeId(Geometry geometry) { + switch (geometry.getGeometryType()) { + case Geometry.TYPENAME_POINT: + return 1; + case Geometry.TYPENAME_LINESTRING: + return 2; + case Geometry.TYPENAME_POLYGON: + return 3; + case Geometry.TYPENAME_MULTIPOINT: + return 4; + case Geometry.TYPENAME_MULTILINESTRING: + return 5; + case Geometry.TYPENAME_MULTIPOLYGON: + return 6; + case Geometry.TYPENAME_GEOMETRYCOLLECTION: + return 7; + default: + return UNKNOWN_TYPE_ID; + } + } + + private int getGeometryTypeCode(Geometry geometry) { + int typeId = getGeometryTypeId(geometry); + if (typeId == UNKNOWN_TYPE_ID) { + return UNKNOWN_TYPE_ID; + } + Coordinate coordinate = geometry.getCoordinate(); + if (!Double.isNaN(coordinate.getZ())) { + typeId += 1000; + } + if (!Double.isNaN(coordinate.getM())) { + typeId += 2000; + } + return typeId; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java index 86099717df..3b6a210b4e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndex.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.PrimitiveIterator; +import org.apache.parquet.column.statistics.geometry.GeometryStatistics; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; @@ -71,4 +72,12 @@ default List getRepetitionLevelHistogram() { default List getDefinitionLevelHistogram() { throw new UnsupportedOperationException("Definition level histogram is not implemented"); } + + /** + * @return the unmodifiable list of the geometry statistics for each page; + * used for converting to the related thrift object + */ + default List getGeometryStatistics() { + throw new UnsupportedOperationException("Geometry statistics is not implemented"); + } } diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index f4fe80ab98..661b6422fa 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -38,8 +38,10 @@ import java.util.Set; import java.util.function.IntPredicate; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.statistics.geometry.GeometryStatistics; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Contains; import org.apache.parquet.filter2.predicate.Operators.Eq; @@ -56,6 +58,7 @@ import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveComparator; import org.apache.parquet.schema.PrimitiveStringifier; import org.apache.parquet.schema.PrimitiveType; @@ -105,6 +108,8 @@ int translate(int arrayIndex) { private long[] repLevelHistogram; // might be null private long[] defLevelHistogram; + // might be null + private GeometryStatistics[] geometryStatistics; static String truncate(String str) { if (str.length() <= MAX_VALUE_LENGTH_FOR_TOSTRING) { @@ -182,6 +187,17 @@ public List getDefinitionLevelHistogram() { return LongLists.unmodifiable(LongArrayList.wrap(defLevelHistogram)); } + @Override + public List getGeometryStatistics() { + List geomStats = new ArrayList<>(); + if (geometryStatistics != null) { + for (GeometryStatistics stats : geometryStatistics) { + geomStats.add(stats.copy()); + } + } + return geomStats; + } + @Override public String toString() { try (Formatter formatter = new Formatter()) { @@ -490,6 +506,7 @@ public long getMinMaxSize() { private int nextPageIndex; private LongList repLevelHistogram = new LongArrayList(); private LongList defLevelHistogram = new LongArrayList(); + private List geometryStatistics = new ArrayList<>(); /** * @return a no-op builder that does not collect statistics objects and therefore returns {@code null} at @@ -580,10 +597,46 @@ public static ColumnIndex build( List maxValues, List repLevelHistogram, List defLevelHistogram) { + return build(type, boundaryOrder, nullPages, nullCounts, minValues, maxValues, null, null, null); + } + + /** + * @param type + * the primitive type + * @param boundaryOrder + * the boundary order of the min/max values + * @param nullPages + * the null pages (one boolean value for each page that signifies whether the page consists of nulls + * entirely) + * @param nullCounts + * the number of null values for each page + * @param minValues + * the min values for each page + * @param maxValues + * the max values for each page + * @param repLevelHistogram + * the repetition level histogram for all levels of each page + * @param defLevelHistogram + * the definition level histogram for all levels of each page + * @param geometryStatistics + * the geometry statistics for each page (apply to GEOMETRY logical type only) + * @return the newly created {@link ColumnIndex} object based on the specified arguments + */ + public static ColumnIndex build( + PrimitiveType type, + BoundaryOrder boundaryOrder, + List nullPages, + List nullCounts, + List minValues, + List maxValues, + List repLevelHistogram, + List defLevelHistogram, + List geometryStatistics) { ColumnIndexBuilder builder = createNewBuilder(type, Integer.MAX_VALUE); - builder.fill(nullPages, nullCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram); + builder.fill( + nullPages, nullCounts, minValues, maxValues, repLevelHistogram, defLevelHistogram, geometryStatistics); ColumnIndexBase columnIndex = builder.build(type); columnIndex.boundaryOrder = requireNonNull(boundaryOrder); return columnIndex; @@ -631,6 +684,16 @@ public void add(Statistics stats, SizeStatistics sizeStats) { defLevelHistogram = null; } + if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.GeometryLogicalTypeAnnotation) { + assert stats instanceof BinaryStatistics; + BinaryStatistics binaryStats = (BinaryStatistics) stats; + if (geometryStatistics != null && binaryStats.getGeometryStatistics() != null) { + geometryStatistics.add(binaryStats.getGeometryStatistics()); + } else { + geometryStatistics = null; + } + } + ++nextPageIndex; } @@ -644,7 +707,8 @@ private void fill( List minValues, List maxValues, List repLevelHistogram, - List defLevelHistogram) { + List defLevelHistogram, + List geometryStatistics) { clear(); int pageCount = nullPages.size(); if ((nullCounts != null && nullCounts.size() != pageCount) @@ -691,6 +755,9 @@ private void fill( if (defLevelHistogram != null) { this.defLevelHistogram.addAll(defLevelHistogram); } + if (geometryStatistics != null) { + this.geometryStatistics.addAll(geometryStatistics); + } } /** @@ -727,6 +794,10 @@ private ColumnIndexBase build(PrimitiveType type) { if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { columnIndex.defLevelHistogram = defLevelHistogram.toLongArray(); } + if (geometryStatistics != null && !geometryStatistics.isEmpty()) { + columnIndex.geometryStatistics = new GeometryStatistics[geometryStatistics.size()]; + geometryStatistics.toArray(columnIndex.geometryStatistics); + } return columnIndex; } @@ -773,6 +844,7 @@ private void clear() { pageIndexes.clear(); repLevelHistogram.clear(); defLevelHistogram.clear(); + geometryStatistics.clear(); } abstract void clearMinMax(); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java index 05629dd388..a2518fc963 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java @@ -33,6 +33,7 @@ import static org.apache.parquet.schema.PrimitiveStringifier.TIME_STRINGIFIER; import static org.apache.parquet.schema.PrimitiveStringifier.TIME_UTC_STRINGIFIER; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -146,6 +147,21 @@ protected LogicalTypeAnnotation fromString(List params) { protected LogicalTypeAnnotation fromString(List params) { return float16Type(); } + }, + GEOMETRY { + @Override + protected LogicalTypeAnnotation fromString(List params) { + if (params.size() < 2) { + throw new RuntimeException( + "Expecting at least 2 parameters for geometry logical type, got " + params.size()); + } + GeometryEncoding encoding = GeometryEncoding.valueOf(params.get(0)); + Edges edges = Edges.valueOf(params.get(1)); + String crs = params.size() > 2 ? params.get(2) : null; + ByteBuffer metadata = + params.size() > 3 ? ByteBuffer.wrap(params.get(3).getBytes()) : null; + return geometryType(encoding, edges, crs, metadata); + } }; protected abstract LogicalTypeAnnotation fromString(List params); @@ -316,6 +332,11 @@ public static Float16LogicalTypeAnnotation float16Type() { return Float16LogicalTypeAnnotation.INSTANCE; } + public static GeometryLogicalTypeAnnotation geometryType( + GeometryEncoding encoding, Edges edges, String crs, ByteBuffer metadata) { + return new GeometryLogicalTypeAnnotation(encoding, edges, crs, metadata); + } + public static class StringLogicalTypeAnnotation extends LogicalTypeAnnotation { private static final StringLogicalTypeAnnotation INSTANCE = new StringLogicalTypeAnnotation(); @@ -1091,6 +1112,104 @@ public int hashCode() { } } + public enum GeometryEncoding { + WKB + } + + public enum Edges { + PLANAR, + SPHERICAL + } + + public static class GeometryLogicalTypeAnnotation extends LogicalTypeAnnotation { + private final GeometryEncoding encoding; + private final Edges edges; + private final String crs; + private final ByteBuffer metadata; + + private GeometryLogicalTypeAnnotation(GeometryEncoding encoding, Edges edges, String crs, ByteBuffer metadata) { + Preconditions.checkArgument(encoding != null, "Geometry encoding is required"); + Preconditions.checkArgument(edges != null, "Geometry edges is required"); + this.encoding = encoding; + this.edges = edges; + this.crs = crs; + this.metadata = metadata; + } + + @Override + @Deprecated + public OriginalType toOriginalType() { + return null; + } + + @Override + public Optional accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) { + return logicalTypeAnnotationVisitor.visit(this); + } + + @Override + LogicalTypeToken getType() { + return LogicalTypeToken.GEOMETRY; + } + + @Override + protected String typeParametersAsString() { + StringBuilder sb = new StringBuilder(); + sb.append("("); + sb.append(encoding); + sb.append(","); + sb.append(edges); + if (crs != null && !crs.isEmpty()) { + sb.append(","); + sb.append(crs); + } + if (metadata != null) { + sb.append(","); + sb.append(metadata); + } + sb.append(")"); + return sb.toString(); + } + + public GeometryEncoding getEncoding() { + return encoding; + } + + public Edges getEdges() { + return edges; + } + + public String getCrs() { + return crs; + } + + public ByteBuffer getMetadata() { + return metadata; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GeometryLogicalTypeAnnotation)) { + return false; + } + GeometryLogicalTypeAnnotation other = (GeometryLogicalTypeAnnotation) obj; + return (encoding == other.encoding) && (edges == other.edges) && crs.equals(other.crs); + } + + @Override + public int hashCode() { + return Objects.hash(encoding, crs, edges); + } + + @Override + PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) { + if (encoding == GeometryEncoding.WKB) { + return PrimitiveStringifier.WKB_STRINGIFIER; + } + return super.valueStringifier(primitiveType); + } + } + /** * Implement this interface to visit a logical type annotation in the schema. * The default implementation for each logical type specific visitor method is empty. @@ -1162,5 +1281,9 @@ default Optional visit(MapKeyValueTypeAnnotation mapKeyValueLogicalType) { default Optional visit(Float16LogicalTypeAnnotation float16LogicalType) { return empty(); } + + default Optional visit(GeometryLogicalTypeAnnotation geometryLogicalType) { + return empty(); + } } } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java index c46e94367f..bb5c8a9474 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveStringifier.java @@ -35,6 +35,9 @@ import java.util.concurrent.TimeUnit; import javax.naming.OperationNotSupportedException; import org.apache.parquet.io.api.Binary; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.WKBReader; /** * Class that provides string representations for the primitive values. These string values are to be used for @@ -449,4 +452,20 @@ String stringifyNotNull(Binary value) { return Float16.toFloatString(value); } }; + + static final PrimitiveStringifier WKB_STRINGIFIER = new BinaryStringifierBase("WKB_STRINGIFIER") { + + @Override + String stringifyNotNull(Binary value) { + + Geometry geometry; + try { + WKBReader reader = new WKBReader(); + geometry = reader.read(value.getBytesUnsafe()); + return geometry.toText(); + } catch (ParseException e) { + return BINARY_INVALID; + } + } + }; } diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index e74d7cde02..f08e20333d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -271,6 +271,14 @@ public Optional visit( LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + // ColumnOrder is undefined for GEOMETRY logical type. Use the default comparator for + // now. + return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for BINARY logical type: " + logicalType)); diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java index 5bc2f89f47..45985c7a31 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java @@ -571,6 +571,15 @@ public Optional visit( return checkBinaryPrimitiveType(enumLogicalType); } + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + if (geometryLogicalType.getEncoding() != LogicalTypeAnnotation.GeometryEncoding.WKB) { + throw new RuntimeException("Only WKB geometry encoding is supported for now"); + } + return checkBinaryPrimitiveType(geometryLogicalType); + } + private Optional checkFixedPrimitiveType( int l, LogicalTypeAnnotation logicalTypeAnnotation) { Preconditions.checkState( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index e752b4ceea..62062c7967 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -50,6 +51,7 @@ import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.geometry.GeometryTypes; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.AesGcmEncryptor; @@ -65,6 +67,7 @@ import org.apache.parquet.format.BloomFilterHash; import org.apache.parquet.format.BloomFilterHeader; import org.apache.parquet.format.BoundaryOrder; +import org.apache.parquet.format.BoundingBox; import org.apache.parquet.format.BsonType; import org.apache.parquet.format.ColumnChunk; import org.apache.parquet.format.ColumnCryptoMetaData; @@ -73,17 +76,22 @@ import org.apache.parquet.format.ColumnOrder; import org.apache.parquet.format.CompressionCodec; import org.apache.parquet.format.ConvertedType; +import org.apache.parquet.format.Covering; import org.apache.parquet.format.DataPageHeader; import org.apache.parquet.format.DataPageHeaderV2; import org.apache.parquet.format.DateType; import org.apache.parquet.format.DecimalType; import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.Edges; import org.apache.parquet.format.Encoding; import org.apache.parquet.format.EncryptionWithColumnKey; import org.apache.parquet.format.EnumType; import org.apache.parquet.format.FieldRepetitionType; import org.apache.parquet.format.FileMetaData; import org.apache.parquet.format.Float16Type; +import org.apache.parquet.format.GeometryEncoding; +import org.apache.parquet.format.GeometryStatistics; +import org.apache.parquet.format.GeometryType; import org.apache.parquet.format.IntType; import org.apache.parquet.format.JsonType; import org.apache.parquet.format.KeyValue; @@ -346,6 +354,27 @@ static org.apache.parquet.format.TimeUnit convertUnit(LogicalTypeAnnotation.Time } } + static org.apache.parquet.format.GeometryEncoding convertGeometryEncoding( + LogicalTypeAnnotation.GeometryEncoding encoding) { + switch (encoding) { + case WKB: + return org.apache.parquet.format.GeometryEncoding.WKB; + default: + throw new RuntimeException("Unknown geometry encoding " + encoding); + } + } + + static org.apache.parquet.format.Edges convertEdges(LogicalTypeAnnotation.Edges edges) { + switch (edges) { + case PLANAR: + return org.apache.parquet.format.Edges.PLANAR; + case SPHERICAL: + return org.apache.parquet.format.Edges.SPHERICAL; + default: + throw new RuntimeException("Unknown edges " + edges); + } + } + private static class ConvertedTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor { @Override @@ -519,6 +548,24 @@ public Optional visit(LogicalTypeAnnotation.Float16LogicalTypeAnnot public Optional visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) { return of(LogicalType.UNKNOWN(new NullType())); } + + @Override + public Optional visit(LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + GeometryType geometryType = new GeometryType(); + if (geometryLogicalType.getEncoding() != null) { + geometryType.setEncoding(convertGeometryEncoding(geometryLogicalType.getEncoding())); + } + if (geometryLogicalType.getCrs() != null) { + geometryType.setCrs(geometryLogicalType.getCrs()); + } + if (geometryLogicalType.getEdges() != null) { + geometryType.setEdges(convertEdges(geometryLogicalType.getEdges())); + } + if (geometryLogicalType.getMetadata() != null) { + geometryType.setMetadata(geometryLogicalType.getMetadata()); + } + return of(LogicalType.GEOMETRY(geometryType)); + } } private void addRowGroup( @@ -765,9 +812,52 @@ public static Statistics toParquetStatistics( } } } + if (stats instanceof BinaryStatistics) { + BinaryStatistics binaryStats = (BinaryStatistics) stats; + if (binaryStats.getGeometryStatistics() != null) { + formatStats.setGeometry_stats(toParquetStatistics(binaryStats.getGeometryStatistics())); + } + } + return formatStats; + } + + private static GeometryStatistics toParquetStatistics( + org.apache.parquet.column.statistics.geometry.GeometryStatistics stats) { + GeometryStatistics formatStats = new GeometryStatistics(); + + formatStats.setBbox(toParquetBoundingBox(stats.getBoundingBox())); + + if (stats.getCovering().getGeometry() != null) { + Covering formatCovering = new Covering(); + formatCovering.setGeometry(stats.getCovering().getGeometry()); + formatCovering.setEdges(convertEdges(stats.getCovering().getEdges())); + formatStats.setCovering(formatCovering); + } + + List geometryTypes = new ArrayList<>(stats.getGeometryTypes().getTypes()); + Collections.sort(geometryTypes); + formatStats.setGeometry_types(geometryTypes); + return formatStats; } + private static BoundingBox toParquetBoundingBox(org.apache.parquet.column.statistics.geometry.BoundingBox bbox) { + BoundingBox formatBbox = new BoundingBox(); + formatBbox.setXmin(bbox.getXMin()); + formatBbox.setXmax(bbox.getXMax()); + formatBbox.setYmin(bbox.getYMin()); + formatBbox.setYmax(bbox.getYMax()); + if (bbox.getZMin() <= bbox.getZMax()) { + formatBbox.setZmin(bbox.getZMin()); + formatBbox.setZmax(bbox.getZMax()); + } + if (bbox.getMMin() <= bbox.getMMax()) { + formatBbox.setMmin(bbox.getMMin()); + formatBbox.setMmax(bbox.getMMax()); + } + return formatBbox; + } + private static boolean withinLimit(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) { if (stats.isSmallerThan(MAX_STATS_SIZE)) { return true; @@ -863,6 +953,10 @@ static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInte if (formatStats.isSetNull_count()) { statsBuilder.withNumNulls(formatStats.null_count); } + + if (formatStats.isSetGeometry_stats()) { + statsBuilder.withGeometryStatistics(fromParquetStatistics(formatStats.getGeometry_stats())); + } } return statsBuilder.build(); } @@ -873,6 +967,34 @@ public org.apache.parquet.column.statistics.Statistics fromParquetStatistics( return fromParquetStatisticsInternal(createdBy, statistics, type, expectedOrder); } + static org.apache.parquet.column.statistics.geometry.GeometryStatistics fromParquetStatistics( + GeometryStatistics formatGeomStats) { + org.apache.parquet.column.statistics.geometry.BoundingBox bbox = null; + if (formatGeomStats.isSetBbox()) { + BoundingBox formatBbox = formatGeomStats.getBbox(); + bbox = new org.apache.parquet.column.statistics.geometry.BoundingBox( + formatBbox.getXmin(), + formatBbox.getXmax(), + formatBbox.getYmin(), + formatBbox.getYmax(), + formatBbox.isSetZmin() ? formatBbox.getZmin() : Double.NaN, + formatBbox.isSetZmax() ? formatBbox.getZmax() : Double.NaN, + formatBbox.isSetMmin() ? formatBbox.getMmin() : Double.NaN, + formatBbox.isSetMmax() ? formatBbox.getMmax() : Double.NaN); + } + org.apache.parquet.column.statistics.geometry.Covering covering = null; + if (formatGeomStats.isSetCovering()) { + Covering formatCovering = formatGeomStats.getCovering(); + covering = new org.apache.parquet.column.statistics.geometry.Covering( + ByteBuffer.wrap(formatCovering.getGeometry()), convertEdges(formatCovering.getEdges())); + } + org.apache.parquet.column.statistics.geometry.GeometryTypes geometryTypes = null; + if (formatGeomStats.isSetGeometry_types()) { + geometryTypes = new GeometryTypes(new HashSet<>(formatGeomStats.getGeometry_types())); + } + return new org.apache.parquet.column.statistics.geometry.GeometryStatistics(bbox, covering, geometryTypes); + } + /** * Sort order for page and column statistics. Types are associated with sort * orders (e.g., UTF8 columns should use UNSIGNED) and column stats are @@ -1030,6 +1152,12 @@ public Optional visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { return of(SortOrder.SIGNED); } + + @Override + public Optional visit( + LogicalTypeAnnotation.GeometryLogicalTypeAnnotation geometryLogicalType) { + return of(SortOrder.UNKNOWN); + } }) .orElse(defaultSortOrder(primitive.getPrimitiveTypeName())); } @@ -1173,6 +1301,13 @@ LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) { return LogicalTypeAnnotation.uuidType(); case FLOAT16: return LogicalTypeAnnotation.float16Type(); + case GEOMETRY: + GeometryType geometry = type.getGEOMETRY(); + return LogicalTypeAnnotation.geometryType( + convertGeometryEncoding(geometry.getEncoding()), + convertEdges(geometry.getEdges()), + geometry.getCrs(), + geometry.getMetadata() != null ? ByteBuffer.wrap(geometry.getMetadata()) : null); default: throw new RuntimeException("Unknown logical type " + type); } @@ -1191,6 +1326,32 @@ private LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) { } } + private LogicalTypeAnnotation.GeometryEncoding convertGeometryEncoding(GeometryEncoding encoding) { + if (encoding == null) { + return null; + } + switch (encoding) { + case WKB: + return LogicalTypeAnnotation.GeometryEncoding.WKB; + default: + throw new RuntimeException("Unknown geometry encoding " + encoding); + } + } + + private static LogicalTypeAnnotation.Edges convertEdges(Edges edge) { + if (edge == null) { + return null; + } + switch (edge) { + case PLANAR: + return LogicalTypeAnnotation.Edges.PLANAR; + case SPHERICAL: + return LogicalTypeAnnotation.Edges.SPHERICAL; + default: + throw new RuntimeException("Unknown geometry edge " + edge); + } + } + private static void addKeyValue(FileMetaData fileMetaData, String key, String value) { KeyValue keyValue = new KeyValue(key); keyValue.value = value; @@ -2270,6 +2431,13 @@ public static ColumnIndex toParquetColumnIndex( if (defLevelHistogram != null && !defLevelHistogram.isEmpty()) { parquetColumnIndex.setDefinition_level_histograms(defLevelHistogram); } + if (columnIndex.getGeometryStatistics() != null + && !columnIndex.getGeometryStatistics().isEmpty()) { + columnIndex.getGeometryStatistics().forEach(geomStats -> { + parquetColumnIndex.addToGeometry_stats(toParquetStatistics(geomStats)); + }); + } + return parquetColumnIndex; } @@ -2278,6 +2446,15 @@ public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromPar if (!isMinMaxStatsSupported(type)) { return null; } + List geometryStatistics = null; + if (parquetColumnIndex.isSetGeometry_stats() + && !parquetColumnIndex.getGeometry_stats().isEmpty()) { + geometryStatistics = + new ArrayList<>(parquetColumnIndex.getGeometry_stats().size()); + parquetColumnIndex.getGeometry_stats().stream() + .map(ParquetMetadataConverter::fromParquetStatistics) + .forEach(geometryStatistics::add); + } return ColumnIndexBuilder.build( type, fromParquetBoundaryOrder(parquetColumnIndex.getBoundary_order()), @@ -2286,7 +2463,8 @@ public static org.apache.parquet.internal.column.columnindex.ColumnIndex fromPar parquetColumnIndex.getMin_values(), parquetColumnIndex.getMax_values(), parquetColumnIndex.getRepetition_level_histograms(), - parquetColumnIndex.getDefinition_level_histograms()); + parquetColumnIndex.getDefinition_level_histograms(), + geometryStatistics); } public static OffsetIndex toParquetOffsetIndex( diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java new file mode 100644 index 0000000000..2d65506e47 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestGeometryTypeRoundTrip.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.geometryType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.statistics.BinaryStatistics; +import org.apache.parquet.column.statistics.geometry.GeometryStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.io.LocalInputFile; +import org.apache.parquet.io.LocalOutputFile; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation.Edges; +import org.apache.parquet.schema.LogicalTypeAnnotation.GeometryEncoding; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.WKBWriter; + +public class TestGeometryTypeRoundTrip { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private Path newTempPath() throws IOException { + File file = temp.newFile(); + Preconditions.checkArgument(file.delete(), "Could not remove temp file"); + return file.toPath(); + } + + @Test + public void testBasicReadWriteGeometryValue() throws IOException { + GeometryFactory geomFactory = new GeometryFactory(); + WKBWriter wkbWriter = new WKBWriter(); + Binary[] points = { + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(1.0, 1.0)))), + Binary.fromConstantByteArray(wkbWriter.write(geomFactory.createPoint(new Coordinate(2.0, 2.0)))) + }; + + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(geometryType(GeometryEncoding.WKB, Edges.PLANAR, "EPSG:4326", null)) + .named("col_geom") + .named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + Path path = newTempPath(); + try (ParquetWriter writer = ExampleParquetWriter.builder(new LocalOutputFile(path)) + .withConf(conf) + .withDictionaryEncoding(false) + .build()) { + for (Binary value : points) { + writer.write(factory.newGroup().append("col_geom", value)); + } + } + + try (ParquetFileReader reader = ParquetFileReader.open(new LocalInputFile(path))) { + Assert.assertEquals(2, reader.getRecordCount()); + + System.out.println("Footer"); + System.out.println(reader.getFooter().toString()); + System.out.println(); + + ColumnChunkMetaData columnChunkMetaData = + reader.getRowGroups().get(0).getColumns().get(0); + System.out.println("Statistics"); + System.out.println(columnChunkMetaData.getStatistics().toString()); + System.out.println(); + + GeometryStatistics geometryStatistics = + ((BinaryStatistics) columnChunkMetaData.getStatistics()).getGeometryStatistics(); + Assert.assertNotNull(geometryStatistics); + System.out.println("GeometryStatistics"); + System.out.println(geometryStatistics); + System.out.println(); + + ColumnIndex columnIndex = reader.readColumnIndex(columnChunkMetaData); + System.out.println("ColumnIndex"); + System.out.println(columnIndex); + + List pageGeometryStatistics = columnIndex.getGeometryStatistics(); + Assert.assertNotNull(pageGeometryStatistics); + System.out.println("Page GeometryStatistics"); + System.out.println(pageGeometryStatistics); + } + } +} diff --git a/pom.xml b/pom.xml index f7c29f6a72..cd797708b1 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,7 @@ 2.30.0 shaded.parquet 3.3.6 - 2.10.0 + 2.11.0-SNAPSHOT 1.13.1 thrift ${thrift.executable} @@ -98,6 +98,7 @@ 2.0.9 0.16 1.6.0 + 1.19.0 2.3