from pathlib import Path from functools import lru_cache import geopandas as gpd from src.utils import VG250_DIR, NE_DIR, BKG_FILES, NE_FILES, AGS_TO_BUNDESLAND def _find_shapefile(directory: Path, filename: str) -> Path | None: direct = directory / filename if direct.exists(): return direct for sub in sorted(directory.iterdir()): if sub.is_dir(): candidate = sub / filename if candidate.exists(): return candidate return None def _load_and_reproject(path: Path, keep_cols: list[str]) -> gpd.GeoDataFrame: gdf = gpd.read_file(path) available = [c for c in keep_cols if c in gdf.columns] + ["geometry"] gdf = gdf[available].copy() if gdf.crs and gdf.crs.to_epsg() != 4326: gdf = gdf.to_crs("EPSG:4326") elif gdf.crs is None: # No CRS info — assume WGS84 (unlikely for BKG but safe fallback) gdf = gdf.set_crs("EPSG:4326") return gdf @lru_cache(maxsize=None) def load_bundeslaender() -> gpd.GeoDataFrame | None: path = _find_shapefile(VG250_DIR, BKG_FILES["bundeslaender"]) if path is None: return None return _load_and_reproject(path, ["GEN", "AGS", "EWZ"]) @lru_cache(maxsize=None) def load_kreise() -> gpd.GeoDataFrame | None: path = _find_shapefile(VG250_DIR, BKG_FILES["kreise"]) if path is None: return None return _load_and_reproject(path, ["GEN", "AGS", "BEZ", "EWZ"]) @lru_cache(maxsize=None) def load_gemeinden() -> gpd.GeoDataFrame | None: path = _find_shapefile(VG250_DIR, BKG_FILES["gemeinden"]) if path is None: return None gdf = _load_and_reproject(path, ["GEN", "AGS", "BEZ", "EWZ"]) if "AGS" in gdf.columns: gdf["bundesland"] = gdf["AGS"].str[:2].map(AGS_TO_BUNDESLAND) return gdf @lru_cache(maxsize=None) def load_world_countries() -> gpd.GeoDataFrame | None: path = _find_shapefile(NE_DIR, NE_FILES["countries"]) if path is None: return None return _load_and_reproject(path, ["NAME", "ISO_A2", "CONTINENT", "POP_EST"]) @lru_cache(maxsize=None) def load_admin1() -> gpd.GeoDataFrame | None: path = _find_shapefile(NE_DIR, NE_FILES["admin1"]) if path is None: return None return _load_and_reproject(path, ["name", "admin", "iso_a2", "type_en"]) def get_boundary_gdf(scope: str) -> gpd.GeoDataFrame | None: loaders = { "bundeslaender": load_bundeslaender, "kreise": load_kreise, "gemeinden": load_gemeinden, "countries": load_world_countries, "admin1": load_admin1, } loader = loaders.get(scope) if loader is None: return None try: return loader() except Exception as e: print(f"[boundary] FEHLER beim Laden von scope={scope}: {e}") return None def filter_gemeinden_by_bundesland(gdf: gpd.GeoDataFrame, bundesland: str) -> gpd.GeoDataFrame: if "bundesland" not in gdf.columns or bundesland == "Alle": return gdf return gdf[gdf["bundesland"] == bundesland].copy() def filter_admin1_by_country(gdf: gpd.GeoDataFrame, iso2: str) -> gpd.GeoDataFrame: if "iso_a2" not in gdf.columns or not iso2: return gdf return gdf[gdf["iso_a2"].str.upper() == iso2.upper()].copy() def check_boundary_availability() -> dict[str, bool]: result = {} for scope, filename in BKG_FILES.items(): result[scope] = _find_shapefile(VG250_DIR, filename) is not None for scope, filename in NE_FILES.items(): result[scope] = _find_shapefile(NE_DIR, filename) is not None return result