File size: 3,590 Bytes
0d4a0ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from pathlib import Path
from functools import lru_cache
import geopandas as gpd

from src.utils import VG250_DIR, NE_DIR, BKG_FILES, NE_FILES, AGS_TO_BUNDESLAND


def _find_shapefile(directory: Path, filename: str) -> Path | None:
    direct = directory / filename
    if direct.exists():
        return direct
    for sub in sorted(directory.iterdir()):
        if sub.is_dir():
            candidate = sub / filename
            if candidate.exists():
                return candidate
    return None


def _load_and_reproject(path: Path, keep_cols: list[str]) -> gpd.GeoDataFrame:
    gdf = gpd.read_file(path)
    available = [c for c in keep_cols if c in gdf.columns] + ["geometry"]
    gdf = gdf[available].copy()
    if gdf.crs and gdf.crs.to_epsg() != 4326:
        gdf = gdf.to_crs("EPSG:4326")
    elif gdf.crs is None:
        # No CRS info — assume WGS84 (unlikely for BKG but safe fallback)
        gdf = gdf.set_crs("EPSG:4326")
    return gdf


@lru_cache(maxsize=None)
def load_bundeslaender() -> gpd.GeoDataFrame | None:
    path = _find_shapefile(VG250_DIR, BKG_FILES["bundeslaender"])
    if path is None:
        return None
    return _load_and_reproject(path, ["GEN", "AGS", "EWZ"])


@lru_cache(maxsize=None)
def load_kreise() -> gpd.GeoDataFrame | None:
    path = _find_shapefile(VG250_DIR, BKG_FILES["kreise"])
    if path is None:
        return None
    return _load_and_reproject(path, ["GEN", "AGS", "BEZ", "EWZ"])


@lru_cache(maxsize=None)
def load_gemeinden() -> gpd.GeoDataFrame | None:
    path = _find_shapefile(VG250_DIR, BKG_FILES["gemeinden"])
    if path is None:
        return None
    gdf = _load_and_reproject(path, ["GEN", "AGS", "BEZ", "EWZ"])
    if "AGS" in gdf.columns:
        gdf["bundesland"] = gdf["AGS"].str[:2].map(AGS_TO_BUNDESLAND)
    return gdf


@lru_cache(maxsize=None)
def load_world_countries() -> gpd.GeoDataFrame | None:
    path = _find_shapefile(NE_DIR, NE_FILES["countries"])
    if path is None:
        return None
    return _load_and_reproject(path, ["NAME", "ISO_A2", "CONTINENT", "POP_EST"])


@lru_cache(maxsize=None)
def load_admin1() -> gpd.GeoDataFrame | None:
    path = _find_shapefile(NE_DIR, NE_FILES["admin1"])
    if path is None:
        return None
    return _load_and_reproject(path, ["name", "admin", "iso_a2", "type_en"])


def get_boundary_gdf(scope: str) -> gpd.GeoDataFrame | None:
    loaders = {
        "bundeslaender": load_bundeslaender,
        "kreise": load_kreise,
        "gemeinden": load_gemeinden,
        "countries": load_world_countries,
        "admin1": load_admin1,
    }
    loader = loaders.get(scope)
    if loader is None:
        return None
    try:
        return loader()
    except Exception as e:
        print(f"[boundary] FEHLER beim Laden von scope={scope}: {e}")
        return None


def filter_gemeinden_by_bundesland(gdf: gpd.GeoDataFrame, bundesland: str) -> gpd.GeoDataFrame:
    if "bundesland" not in gdf.columns or bundesland == "Alle":
        return gdf
    return gdf[gdf["bundesland"] == bundesland].copy()


def filter_admin1_by_country(gdf: gpd.GeoDataFrame, iso2: str) -> gpd.GeoDataFrame:
    if "iso_a2" not in gdf.columns or not iso2:
        return gdf
    return gdf[gdf["iso_a2"].str.upper() == iso2.upper()].copy()


def check_boundary_availability() -> dict[str, bool]:
    result = {}
    for scope, filename in BKG_FILES.items():
        result[scope] = _find_shapefile(VG250_DIR, filename) is not None
    for scope, filename in NE_FILES.items():
        result[scope] = _find_shapefile(NE_DIR, filename) is not None
    return result