Skip to content

Commit

Permalink
Merge pull request #933 from opendatacube/expand_time_resolutions
Browse files Browse the repository at this point in the history
Enhanced time resolution handling (subday and overlapping summary periods)
  • Loading branch information
SpacemanPaul authored Mar 17, 2023
2 parents e2e86a5 + 64e294f commit c5ecdea
Show file tree
Hide file tree
Showing 25 changed files with 780 additions and 516 deletions.
9 changes: 9 additions & 0 deletions check-code-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,30 @@ datacube product add https://raw.githubusercontent.com/GeoscienceAustralia/dea-c
datacube product add https://raw.githubusercontent.com/GeoscienceAustralia/dea-config/master/products/sea_ocean_coast/geodata_coast_100k/geodata_coast_100k.odc-product.yaml
datacube product add https://raw.githubusercontent.com/GeoscienceAustralia/dea-config/master/products/inland_water/c3_wo/ga_ls_wo_3.odc-product.yaml

# Geomedian for summary product testing

datacube product add https://raw.githubusercontent.com/GeoscienceAustralia/dea-config/master/products/baseline_satellite_data/geomedian-au/ga_ls8c_nbart_gm_cyear_3.odc-product.yaml

# S2 multiproduct datasets
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/baseline/ga_s2bm_ard_3/52/LGM/2017/07/19/20170719T030622/ga_s2bm_ard_3-2-1_52LGM_2017-07-19_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/baseline/ga_s2bm_ard_3/52/LGM/2017/07/29/20170729T081630/ga_s2bm_ard_3-2-1_52LGM_2017-07-29_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/baseline/ga_s2bm_ard_3/52/LGM/2017/08/08/20170818T192649/ga_s2bm_ard_3-2-1_52LGM_2017-08-08_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/baseline/ga_s2am_ard_3/52/LGM/2017/07/14/20170714T082022/ga_s2am_ard_3-2-1_52LGM_2017-07-14_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/baseline/ga_s2am_ard_3/52/LGM/2017/07/24/20170724T030641/ga_s2am_ard_3-2-1_52LGM_2017-07-24_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/baseline/ga_s2am_ard_3/52/LGM/2017/08/03/20170921T103758/ga_s2am_ard_3-2-1_52LGM_2017-08-03_final.odc-metadata.yaml --confirm-ignore-lineage

# flag masking datasets
datacube dataset add https://data.dea.ga.gov.au/projects/geodata_coast_100k/v2004/x_15/y_-40/COAST_100K_15_-40.yaml
datacube dataset add https://data.dea.ga.gov.au/projects/geodata_coast_100k/v2004/x_8/y_-21/COAST_100K_8_-21.yaml

datacube dataset add https://data.dea.ga.gov.au/derivative/ga_ls_wo_3/1-6-0/094/077/2018/02/08/ga_ls_wo_3_094077_2018-02-08_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://data.dea.ga.gov.au/derivative/ga_ls_fc_3/2-5-1/094/077/2018/02/08/ga_ls_fc_3_094077_2018-02-08_final.odc-metadata.yaml --confirm-ignore-lineage

# Geomedian datasets
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/derivative/ga_ls8c_nbart_gm_cyear_3/3-0-0/x17/y37/2019--P1Y/ga_ls8c_nbart_gm_cyear_3_x17y37_2019--P1Y_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/derivative/ga_ls8c_nbart_gm_cyear_3/3-0-0/x17/y37/2020--P1Y/ga_ls8c_nbart_gm_cyear_3_x17y37_2020--P1Y_final.odc-metadata.yaml --confirm-ignore-lineage
datacube dataset add https://dea-public-data.s3.ap-southeast-2.amazonaws.com/derivative/ga_ls8c_nbart_gm_cyear_3/3-0-0/x17/y37/2021--P1Y/ga_ls8c_nbart_gm_cyear_3_x17y37_2021--P1Y_final.odc-metadata.yaml --confirm-ignore-lineage

# create material view for ranges extents
datacube-ows-update --schema --role $DB_USERNAME
datacube-ows-update
Expand Down
234 changes: 113 additions & 121 deletions datacube_ows/data.py

Large diffs are not rendered by default.

50 changes: 39 additions & 11 deletions datacube_ows/mv_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
from enum import Enum
from typing import Any, Iterable, Optional, Tuple, Union, cast

import pytz
from datacube.utils.geometry import Geometry as ODCGeom
from geoalchemy2 import Geometry
from psycopg2.extras import DateTimeTZRange
from sqlalchemy import SMALLINT, Column, MetaData, Table, or_, select, text
from sqlalchemy import (SMALLINT, Column, MetaData, Table, and_, or_, select,
text)
from sqlalchemy.dialects.postgresql import TSTZRANGE, UUID
from sqlalchemy.sql.functions import count
from sqlalchemy.sql.functions import count, func

from datacube_ows.utils import default_to_utc


def get_sqlalc_engine(index: "datacube.index.Index") -> "sqlalchemy.engine.base.Engine":
Expand Down Expand Up @@ -60,10 +64,14 @@ def sel(self, stv: Table) -> Iterable["sqlalchemy.sql.elements.ClauseElement"]:
return [text("ST_AsGeoJSON(ST_Union(spatial_extent))")]
assert False

TimeSearchTerm = Union[
Tuple[datetime.datetime, datetime.datetime],
datetime.datetime,
]

def mv_search(index: "datacube.index.Index",
sel: MVSelectOpts = MVSelectOpts.IDS,
times: Optional[Iterable[Tuple[datetime.datetime, datetime.datetime]]] = None,
times: Optional[Iterable[TimeSearchTerm]] = None,
geom: Optional[ODCGeom] = None,
products: Optional[Iterable["datacube.model.DatasetType"]] = None) -> Union[
Iterable[Iterable[Any]],
Expand All @@ -90,16 +98,36 @@ def mv_search(index: "datacube.index.Index",
raise Exception("Must filter by product/layer")
prod_ids = [p.id for p in products]

s = select(sel.sel(stv)).where(stv.c.dataset_type_ref.in_(prod_ids))
s = select(*sel.sel(stv)).where(stv.c.dataset_type_ref.in_(prod_ids))
if times is not None:
s = s.where(
or_(
*[
or_clauses = []
for t in times:
if isinstance(t, datetime.datetime):
t = datetime.datetime(t.year, t.month, t.day, t.hour, t.minute, t.second)
t = default_to_utc(t)
if not t.tzinfo:
t = t.replace(tzinfo=pytz.utc)
tmax = t + datetime.timedelta(seconds=1)
or_clauses.append(
and_(
func.lower(stv.c.temporal_extent) >= t,
func.lower(stv.c.temporal_extent) < tmax,
)
)
elif isinstance(t, datetime.date):
t = datetime.datetime(t.year, t.month, t.day, tzinfo=pytz.utc)
tmax = t + datetime.timedelta(days=1)
or_clauses.append(
and_(
func.lower(stv.c.temporal_extent) >= t,
func.lower(stv.c.temporal_extent) < tmax,
)
)
else:
or_clauses.append(
stv.c.temporal_extent.op("&&")(DateTimeTZRange(*t))
for t in times
]
)
)
)
s = s.where(or_(*or_clauses))
orig_crs = None
if geom is not None:
orig_crs = geom.crs
Expand Down
136 changes: 78 additions & 58 deletions datacube_ows/ows_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import math
import os
from collections.abc import Mapping
from enum import Enum
from importlib import import_module
from typing import Optional
from typing import Optional, Sequence

import numpy
from babel.messages.catalog import Catalog
Expand All @@ -35,15 +36,13 @@
load_json_obj)
from datacube_ows.cube_pool import ODCInitException, cube, get_cube
from datacube_ows.ogc_utils import (ConfigException, FunctionWrapper,
create_geobox, day_summary_date_range,
local_solar_date_range, month_date_range,
year_date_range)
create_geobox, local_solar_date_range)
from datacube_ows.resource_limits import (OWSResourceManagementRules,
parse_cache_age)
from datacube_ows.styles import StyleDef
from datacube_ows.tile_matrix_sets import TileMatrixSet
from datacube_ows.utils import (group_by_mosaic, group_by_solar,
group_by_statistical)
from datacube_ows.utils import (group_by_begin_datetime, group_by_mosaic,
group_by_solar)

_LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -320,12 +319,65 @@ def make_ready(self, dc, *args, **kwargs):
super().make_ready(dc, *args, **kwargs)


TIMERES_RAW = "raw"
TIMERES_DAY = "day"
TIMERES_MON = "month"
TIMERES_YR = "year"
class TimeRes(Enum):
SUBDAY = "subday"
SOLAR = "solar"
SUMMARY = "summary"

TIMERES_VALS = [TIMERES_RAW, TIMERES_DAY, TIMERES_MON, TIMERES_YR]
def is_subday(self) -> bool:
return self == self.SUBDAY

def is_solar(self) -> bool:
return self == self.SOLAR

def is_summary(self) -> bool:
return not self.is_solar() and not self.is_subday()

def allow_mosaic(self) -> bool:
return not self.is_subday()

@classmethod
def parse(cls, cfg: Optional[str]) -> Optional["TimeRes"]:
if cfg is None:
cfg = "solar"
elif cfg == "raw":
_LOG.warning("The 'raw' time resolution type is deprecated. Please use 'solar'.")
cfg = "solar"
elif cfg in ("day", "month", "year"):
_LOG.warning("The '%s' time resolution type is deprecated. Please use 'summary'.", cfg)
cfg = "summary"
try:
return cls(cfg)
except ValueError:
return None

def search_times(self, t, geobox=None):
if self.is_solar():
if geobox is None:
raise ValueError("Solar time resolution search_times requires a geobox.")
times = local_solar_date_range(geobox, t)
elif self.is_subday():
# For subday products, return a single start datetime instead of a range.
# mv_index will expand this to a one-second search range.
# This prevents users from having to always use the full ISO timestamp in queries.
times = t
else:
# For summary products, return a single start date instead of a range.
# mv_index will expand this to a one-day search range
# This allows data with overlapping time periods to be resolved by start date.
times = t

return times

def dataset_groupby(self, product_names: Optional[Sequence[str]] = None, is_mosaic=False):
if self.is_subday():
return group_by_begin_datetime(product_names, truncate_dates=False)
elif is_mosaic:
return group_by_mosaic(product_names)
elif self.is_solar():
return group_by_solar(product_names)
else:
return group_by_begin_datetime(product_names)

DEF_TIME_LATEST = "latest"
DEF_TIME_EARLIEST = "earliest"
Expand Down Expand Up @@ -367,15 +419,14 @@ def __init__(self, cfg, global_cfg, parent_layer=None, **kwargs):
self.user_band_math = cfg.get("user_band_math", False)
else:
self.user_band_math = False

self.time_resolution = TimeRes.parse(cfg.get("time_resolution"))
if not self.time_resolution:
raise ConfigException(f"Invalid time resolution value {cfg['time_resolution']} in named layer {self.name}")
self.mosaic_date_func: Optional[FunctionWrapper] = None
if "mosaic_date_func" in cfg:
self.mosaic_date_func = FunctionWrapper(self, cfg["mosaic_date_func"])

self.time_resolution = cfg.get("time_resolution", TIMERES_RAW)
if self.time_resolution not in TIMERES_VALS:
raise ConfigException(
"Invalid time resolution value %s in named layer %s" % (self.time_resolution, self.name))
if self.mosaic_date_func and not self.time_resolution.allow_mosaic():
raise ConfigException(f"Mosaic date function not supported for {self.time_resolution} time resolution.")
self.default_time_rule = cfg.get("default_time", DEF_TIME_LATEST)
if self.default_time_rule not in (DEF_TIME_LATEST, DEF_TIME_EARLIEST):
try:
Expand Down Expand Up @@ -860,44 +911,18 @@ def extract_bboxes(self):
def layer_count(self):
return 1

@property
def is_raw_time_res(self):
return self.time_resolution == TIMERES_RAW

@property
def is_day_time_res(self):
return self.time_resolution == TIMERES_DAY

@property
def is_month_time_res(self):
return self.time_resolution == TIMERES_MON

@property
def is_year_time_res(self):
return self.time_resolution == TIMERES_YR

def search_times(self, t, geobox=None):
if self.is_month_time_res:
return month_date_range(t)
elif self.is_year_time_res:
return year_date_range(t)
elif self.is_day_time_res:
return day_summary_date_range(t)
else:
if not geobox:
bbox = self.ranges["bboxes"][self.native_CRS]
geobox = create_geobox(
self.native_CRS,
bbox["left"], bbox["bottom"], bbox["right"], bbox["top"],
1, 1
)
return local_solar_date_range(geobox, t)
if not geobox:
bbox = self.ranges["bboxes"][self.native_CRS]
geobox = create_geobox(
self.native_CRS,
bbox["left"], bbox["bottom"], bbox["right"], bbox["top"],
1, 1
)
return self.time_resolution.search_times(t, geobox)

def dataset_groupby(self):
if self.is_raw_time_res:
return "solar_day"
else:
return group_by_statistical()
return self.time_resolution.dataset_groupby(is_mosaic=self.mosaic_date_func is not None)

def __str__(self):
return "Named OWSLayer: %s" % self.name
Expand Down Expand Up @@ -997,12 +1022,7 @@ def parse_pq_names(self, cfg):
}

def dataset_groupby(self):
if self.mosaic_date_func:
return group_by_mosaic(self.product_names)
if self.is_raw_time_res:
return group_by_solar(self.product_names)
else:
return group_by_statistical(self.product_names)
return self.time_resolution.dataset_groupby(self.product_names, is_mosaic=self.mosaic_date_func is not None)


def parse_ows_layer(cfg, global_cfg, parent_layer=None, sibling=0):
Expand Down
Loading

0 comments on commit c5ecdea

Please sign in to comment.