Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small changes in object selection #72

Merged
merged 2 commits into from
May 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions hydromt_fiat/workflows/exposure_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,23 +558,27 @@ def get_max_potential_damage_columns(self):
def get_damage_function_columns(self):
return [c for c in self.exposure_db.columns if "Damage Function:" in c]

def get_buildings(
def select_objects(
self,
type: Optional[str] = None,
non_building_names: Optional[list[str]] = None,
return_gdf: bool = False,
) -> gpd.GeoDataFrame:
buildings = self.exposure_db
objects = self.exposure_db

if non_building_names:
buildings = buildings.loc[
~buildings["Primary Object Type"].isin(non_building_names), :
objects = objects.loc[
~objects["Primary Object Type"].isin(non_building_names), :
]

if type:
if str(type).lower() != "all":
buildings = buildings.loc[buildings["Primary Object Type"] == type, :]
objects = objects.loc[objects["Primary Object Type"] == type, :]

return buildings
if return_gdf:
objects = self.df_to_gdf(objects, crs=self.crs)

return objects

def get_object_ids(
self,
Expand All @@ -593,22 +597,26 @@ def get_object_ids(
list[Any]
list of ids
"""
buildings = self.get_buildings(
type=property_type,
non_building_names=non_building_names,
)

if (selection_type == "aggregation_area") or (selection_type == "all"):
buildings = self.select_objects(
type=property_type,
non_building_names=non_building_names,
)
if selection_type == "all":
ids = buildings["Object ID"]
elif selection_type == "aggregation_area":
label = aggregation[0].name # Always use first aggregation area type
ids = buildings.loc[
buildings[f"Aggregation Label: {label}"] == aggregation_area_name,
buildings[f"Aggregation Label: {aggregation}"]
== aggregation_area_name,
"Object ID",
]
elif selection_type == "polygon":
assert polygon_file is not None
buildings = self.select_objects(
type=property_type,
non_building_names=non_building_names,
return_gdf=True,
)
polygon = gpd.read_file(polygon_file)
ids = gpd.sjoin(buildings, polygon)["Object ID"]
elif selection_type == "list":
Expand All @@ -617,6 +625,7 @@ def get_object_ids(
return ids

def get_geoms_from_xy(self):
# TODO see if and how this can be merged with the df_to_gdf function
exposure_geoms = gpd.GeoDataFrame(
{
"Object ID": self.exposure_db["Object ID"],
Expand All @@ -627,6 +636,14 @@ def get_geoms_from_xy(self):
)
self.exposure_geoms = exposure_geoms

@staticmethod
def df_to_gdf(df: pd.DataFrame, crs: str) -> gpd.GeoDataFrame:
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df["X Coordinate"], df["Y Coordinate"]),
crs = crs
)
return gdf

def check_required_columns(self):
for col in self._REQUIRED_COLUMNS:
try:
Expand Down