Skip to content

Commit

Permalink
RasterSourceRDD.tiledLayerRDD within the geometry
Browse files Browse the repository at this point in the history
  • Loading branch information
pomadchin committed Jul 16, 2022
1 parent a4c3308 commit 556cb1b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Add RasterSourceRDD.tiledLayerRDD within the geometry [#3474](https://github.com/locationtech/geotrellis/pull/3474)

## [3.6.3] - 2022-07-12

### Changed
Expand Down
29 changes: 26 additions & 3 deletions spark/src/main/scala/geotrellis/spark/RasterSourceRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import geotrellis.raster._
import geotrellis.raster.io.geotiff.OverviewStrategy
import geotrellis.raster.resample.NearestNeighbor
import geotrellis.layer._
import geotrellis.vector.Geometry
import geotrellis.util._
import org.apache.spark.rdd._
import org.apache.spark.{Partitioner, SparkContext}
import cats.syntax.option._

import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag
Expand Down Expand Up @@ -199,14 +201,29 @@ object RasterSourceRDD {
sources: RDD[RasterSource],
layout: LayoutDefinition
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, NearestNeighbor, None, None)
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, None, NearestNeighbor, None, None)

def tiledLayerRDD(
sources: RDD[RasterSource],
layout: LayoutDefinition,
geometry: Geometry
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, geometry.some, NearestNeighbor, None, None)

def tiledLayerRDD(
sources: RDD[RasterSource],
layout: LayoutDefinition,
resampleMethod: ResampleMethod
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, resampleMethod, None, None)
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, None, resampleMethod, None, None)

def tiledLayerRDD(
sources: RDD[RasterSource],
layout: LayoutDefinition,
resampleMethod: ResampleMethod,
geometry: Geometry
)(implicit sc: SparkContext): MultibandTileLayerRDD[SpatialKey] =
tiledLayerRDD(sources, layout, KeyExtractor.spatialKeyExtractor, geometry.some, resampleMethod, None, None)

/**
* On tiling more than a single MultibandTile may get into a group that correspond to the same key.
Expand All @@ -223,6 +240,7 @@ object RasterSourceRDD {
sources: RDD[RasterSource],
layout: LayoutDefinition,
keyExtractor: KeyExtractor.Aux[K, M],
geometry: Option[Geometry] = None,
resampleMethod: ResampleMethod = NearestNeighbor,
rasterSummary: Option[RasterSummary[M]] = None,
partitioner: Option[Partitioner] = None,
Expand All @@ -239,7 +257,12 @@ object RasterSourceRDD {
}

val rasterRegionRDD: RDD[(K, RasterRegion)] =
tiledLayoutSourceRDD.flatMap { _.keyedRasterRegions() }
tiledLayoutSourceRDD.flatMap { source =>
val keyedRasterRegions = source.keyedRasterRegions()
geometry.fold(keyedRasterRegions) { geom => keyedRasterRegions.filter {
case (key, _) => layerMetadata.keyToExtent(key).intersects(geom)
} }
}

// The number of partitions estimated by RasterSummary can sometimes be much
// lower than what the user set. Therefore, we assume that the larger value
Expand Down
32 changes: 32 additions & 0 deletions spark/src/test/scala/geotrellis/spark/RasterSourceRDDSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import geotrellis.raster.geotiff._
import geotrellis.raster.io.geotiff._
import geotrellis.spark.store.hadoop._
import geotrellis.store.hadoop._
import geotrellis.store.cog._
import geotrellis.vector.Geometry

import spire.syntax.cfor._
import cats.implicits._
Expand Down Expand Up @@ -179,6 +181,36 @@ class RasterSourceRDDSpec extends AnyFunSpec with TestEnvironment with RasterMat

assertRDDLayersEqual(reprojectedExpectedRDD, tiledSource)
}

it("should reproduce tileToLayout when given an RDD[RasterSource] within the extent") {
val rasterSourceRDD: RDD[RasterSource] = sc.parallelize(Seq(rasterSource))

// Need to define these here or else a serialization error will occur
val targetLayout = layout
val crs = targetCRS
val mapTransform = targetLayout.mapTransform

// filter by a correct square to validate keys
val expectedKeys = List(SpatialKey(2303, 3223), SpatialKey(2303, 3224), SpatialKey(2304, 3223), SpatialKey(2304, 3224)).sorted
val geometry =
expectedKeys
.map(mapTransform.keyToExtent)
.reduce(_ combine _)
.bufferByLayout(targetLayout) // buffer to ensure the extent is within the given keys
.toPolygon()

val filteredExpectedRDD = reprojectedExpectedRDD.withContext { _.filter { case (key, _) => mapTransform.keyToExtent(key).intersects(geometry) } }

val reprojectedRasterSourceRDD: RDD[RasterSource] = rasterSourceRDD.map { _.reprojectToGrid(crs, targetLayout) }

val tiledSource: MultibandTileLayerRDD[SpatialKey] = RasterSourceRDD.tiledLayerRDD(reprojectedRasterSourceRDD, targetLayout, geometry)

val actualKeys = tiledSource.map(_._1).collect().toSet

actualKeys shouldBe expectedKeys.toSet

assertRDDLayersEqual(filteredExpectedRDD, tiledSource)
}
}

describe("RasterSourceRDD.read") {
Expand Down

0 comments on commit 556cb1b

Please sign in to comment.