Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from functools import lru_cache | |
| import geopandas as gpd | |
| from src.utils import VG250_DIR, NE_DIR, BKG_FILES, NE_FILES, AGS_TO_BUNDESLAND | |
| def _find_shapefile(directory: Path, filename: str) -> Path | None: | |
| direct = directory / filename | |
| if direct.exists(): | |
| return direct | |
| for sub in sorted(directory.iterdir()): | |
| if sub.is_dir(): | |
| candidate = sub / filename | |
| if candidate.exists(): | |
| return candidate | |
| return None | |
| def _load_and_reproject(path: Path, keep_cols: list[str]) -> gpd.GeoDataFrame: | |
| gdf = gpd.read_file(path) | |
| available = [c for c in keep_cols if c in gdf.columns] + ["geometry"] | |
| gdf = gdf[available].copy() | |
| if gdf.crs and gdf.crs.to_epsg() != 4326: | |
| gdf = gdf.to_crs("EPSG:4326") | |
| elif gdf.crs is None: | |
| # No CRS info — assume WGS84 (unlikely for BKG but safe fallback) | |
| gdf = gdf.set_crs("EPSG:4326") | |
| return gdf | |
| def load_bundeslaender() -> gpd.GeoDataFrame | None: | |
| path = _find_shapefile(VG250_DIR, BKG_FILES["bundeslaender"]) | |
| if path is None: | |
| return None | |
| return _load_and_reproject(path, ["GEN", "AGS", "EWZ"]) | |
| def load_kreise() -> gpd.GeoDataFrame | None: | |
| path = _find_shapefile(VG250_DIR, BKG_FILES["kreise"]) | |
| if path is None: | |
| return None | |
| return _load_and_reproject(path, ["GEN", "AGS", "BEZ", "EWZ"]) | |
| def load_gemeinden() -> gpd.GeoDataFrame | None: | |
| path = _find_shapefile(VG250_DIR, BKG_FILES["gemeinden"]) | |
| if path is None: | |
| return None | |
| gdf = _load_and_reproject(path, ["GEN", "AGS", "BEZ", "EWZ"]) | |
| if "AGS" in gdf.columns: | |
| gdf["bundesland"] = gdf["AGS"].str[:2].map(AGS_TO_BUNDESLAND) | |
| return gdf | |
| def load_world_countries() -> gpd.GeoDataFrame | None: | |
| path = _find_shapefile(NE_DIR, NE_FILES["countries"]) | |
| if path is None: | |
| return None | |
| return _load_and_reproject(path, ["NAME", "ISO_A2", "CONTINENT", "POP_EST"]) | |
| def load_admin1() -> gpd.GeoDataFrame | None: | |
| path = _find_shapefile(NE_DIR, NE_FILES["admin1"]) | |
| if path is None: | |
| return None | |
| return _load_and_reproject(path, ["name", "admin", "iso_a2", "type_en"]) | |
| def get_boundary_gdf(scope: str) -> gpd.GeoDataFrame | None: | |
| loaders = { | |
| "bundeslaender": load_bundeslaender, | |
| "kreise": load_kreise, | |
| "gemeinden": load_gemeinden, | |
| "countries": load_world_countries, | |
| "admin1": load_admin1, | |
| } | |
| loader = loaders.get(scope) | |
| if loader is None: | |
| return None | |
| try: | |
| return loader() | |
| except Exception as e: | |
| print(f"[boundary] FEHLER beim Laden von scope={scope}: {e}") | |
| return None | |
| def filter_gemeinden_by_bundesland(gdf: gpd.GeoDataFrame, bundesland: str) -> gpd.GeoDataFrame: | |
| if "bundesland" not in gdf.columns or bundesland == "Alle": | |
| return gdf | |
| return gdf[gdf["bundesland"] == bundesland].copy() | |
| def filter_admin1_by_country(gdf: gpd.GeoDataFrame, iso2: str) -> gpd.GeoDataFrame: | |
| if "iso_a2" not in gdf.columns or not iso2: | |
| return gdf | |
| return gdf[gdf["iso_a2"].str.upper() == iso2.upper()].copy() | |
| def check_boundary_availability() -> dict[str, bool]: | |
| result = {} | |
| for scope, filename in BKG_FILES.items(): | |
| result[scope] = _find_shapefile(VG250_DIR, filename) is not None | |
| for scope, filename in NE_FILES.items(): | |
| result[scope] = _find_shapefile(NE_DIR, filename) is not None | |
| return result | |