Spaces:
Sleeping
Sleeping
| """ | |
| Script to collect building data for a single address and save results in Excel. | |
| Usage: | |
| python -m scripts.collect_building_data --address "Guggenbühlstrasse 140a 8404 Winterthur" | |
| """ | |
| from services.geo_admin_service import GeoAdminService | |
| from services.building_image_service import ImageService | |
| from openai_services.openai_feature_service import OpenAIFeatureService | |
| from openai_services.building_image_schema import BuildingImageExtraction | |
| import os | |
| import argparse | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| import pathlib | |
| PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1] | |
| def normalize_key(k: str) -> str: | |
| if k.endswith("_confidence"): | |
| return k[:-11].upper() + "_confidence" | |
| if k.endswith("_unit"): | |
| return k[:-5].upper() + "_unit" | |
| return k.upper() | |
| def flatten_extraction(features: BuildingImageExtraction) -> dict: | |
| raw = features.model_dump() | |
| flat = {} | |
| for key, val in raw.items(): | |
| out_key = normalize_key(key) | |
| if isinstance(val, dict): | |
| value = val.get("value") | |
| if value is None: | |
| # Rueckwaerts-kompatibel fuer alte Runs. | |
| value = val.get("value_enum") or val.get("value_str") or val.get("value_num") | |
| confidence = val.get("confidence") | |
| # auskommentiert da accuracy_pct den confidence Wert nur spiegelt und damit redundant ist. | |
| # accuracy_pct = val.get("accuracy_pct") | |
| flat[out_key] = value | |
| flat[f"{out_key}_confidence"] = confidence | |
| # if accuracy_pct is None and confidence is not None: | |
| # accuracy_pct = round(confidence * 100.0, 1) | |
| # flat[f"{out_key}_accuracy_pct"] = accuracy_pct | |
| else: | |
| flat[out_key] = val | |
| return flat | |
| def clean_none_values(flat: dict) -> dict: | |
| for key, value in flat.items(): | |
| # nur für value-Felder, nicht confidence | |
| if key.endswith("confidence"): | |
| continue | |
| if value is None: | |
| conf = flat.get(f"{key}_confidence", 0) | |
| if conf >= 0.6: | |
| flat[key] = "NEIN" | |
| else: | |
| flat[key] = "UNBEKANNT" | |
| return flat | |
| def derive_material_flags(flat: dict) -> dict: | |
| fassade = flat.get("FASSADE_BEKLEIDUNG") | |
| if isinstance(fassade, str): | |
| fassade_upper = fassade.upper() | |
| if "HOLZ" in fassade_upper: | |
| flat["HOLZ"] = "JA" | |
| if "STAHLBLECH" in fassade_upper: | |
| flat["STAHLBLECH"] = "JA" | |
| if "STAHL" in fassade_upper: | |
| flat["STAHL"] = "JA" | |
| if "ETERNIT" in fassade_upper: | |
| flat["ETERNIT"] = "JA" | |
| if "STEINPLATTEN" in fassade_upper or "STEIN" in fassade_upper: | |
| flat["STEINPLATTEN"] = "JA" | |
| if "BETON" in fassade_upper: | |
| flat["BETON"] = "JA" | |
| dach = flat.get("DACH_BEKLEIDUNG") | |
| if isinstance(dach, str): | |
| dach_upper = dach.upper() | |
| if "DACHZIEGEL" in dach_upper or "ZIEGEL" in dach_upper: | |
| flat["DACHZIEGEL"] = "JA" | |
| if "STAHLBLECH" in dach_upper: | |
| flat["STAHLBLECH"] = "JA" | |
| if "ETERNIT" in dach_upper: | |
| flat["ETERNIT"] = "JA" | |
| if "DACHZIEGEL" in dach_upper or "ZIEGEL" in dach_upper: | |
| flat["DACH_BEKLEIDUNG"] = "ZIEGEL" | |
| return flat | |
| def collect_building_data(address: str): | |
| load_dotenv() | |
| # --------------------------------------------------------- | |
| # ENV CHECK | |
| # --------------------------------------------------------- | |
| google_api_key = os.getenv("API_KEY_GOOGLE_MAPS") | |
| if not google_api_key: | |
| raise ValueError("API_KEY_GOOGLE_MAPS not found in .env file") | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai_api_key: | |
| raise ValueError("OPENAI_API_KEY not found in .env file") | |
| collected_data_path = os.getenv("COLLECTED_DATA_PATH") | |
| if not collected_data_path: | |
| raise ValueError("COLLECTED_DATA_PATH not found in .env file") | |
| image_output_dir = os.getenv("OUTPUT_IMAGES_PATH") | |
| if not image_output_dir: | |
| raise ValueError("OUTPUT_IMAGES_PATH not found in .env file") | |
| # --------------------------------------------------------- | |
| # GEO ADMIN SERVICE | |
| # --------------------------------------------------------- | |
| geo_service = GeoAdminService() | |
| result = geo_service.collect_building_data(address) | |
| print("✅ Address found:", result.get("label", "")) | |
| print(address) | |
| # geocode | |
| lon, lat, feature_id, x, y = result.get("LON"), result.get("LAT"), result.get("EGID"), result.get("X"), result.get("Y") | |
| print("Feature ID:", feature_id) | |
| print(result) | |
| #--------------------------------------------------------- | |
| # IMAGE SERVICE | |
| #--------------------------------------------------------- | |
| image_service = ImageService() | |
| result = geo_service.collect_building_data(address) | |
| lat = result["lat"] | |
| lon = result["lon"] | |
| x = result["x"] | |
| y = result["y"] | |
| feature_id = result["feature_id"] | |
| print("Coordinates:", lat, lon) | |
| i = 0 | |
| lat += 0.00005 # leicht versetzen, damit Google unterschiedliche Bilder liefert | |
| lon += 0.00005 | |
| for i in range(i, 3): | |
| print(f"Downloading Street View image {i+1}/3...") | |
| street_path = image_service.download_image( | |
| "https://maps.googleapis.com/maps/api/streetview" | |
| f"?size=640x640" | |
| f"&scale=2" | |
| f"&location={lat},{lon}" | |
| f"&radius=23" | |
| f"&pitch=25" | |
| f"&source=outdoor" | |
| f"&fov=110" | |
| f"&key={google_api_key}", | |
| name=f"streetview_{i+1}_{result['EGID']}", | |
| outdir=f"{image_output_dir}/{result['EGID']}", | |
| ) | |
| lat -= 0.00003 # leicht versetzen, damit Google unterschiedliche Bilder liefert | |
| lon -= 0.00003 | |
| i = 0 | |
| lat= result["lat"] | |
| lon = result["lon"] | |
| lat += 0.00005 # leicht versetzen, damit Google unterschiedliche Bilder liefert | |
| lon += 0.00005 | |
| for i in range (i, 3): | |
| print(f"Downloading zoomed Street View image {i+1}/3...") | |
| street_path_zoomed = image_service.download_image( | |
| "https://maps.googleapis.com/maps/api/streetview" | |
| f"?size=640x640" | |
| f"&scale=2" | |
| f"&location={lat},{lon}" | |
| f"&radius=23" | |
| f"&pitch=30" | |
| f"&source=outdoor" | |
| f"&fov=70" | |
| f"&key={google_api_key}", | |
| name=f"streetview_zoomed_{i+1}_{result['EGID']}", | |
| outdir=f"{image_output_dir}/{result['EGID']}", | |
| ) | |
| lat -= 0.00003 # leicht versetzen, damit Google unterschiedliche Bilder liefert | |
| lon -= 0.00003 | |
| e95, n95 = image_service.ensure_lv95_xy(x, y) | |
| print(f"LV95 coordinates for WMS: E={e95}, N={n95}") | |
| # Orthofoto (50m) + Orthofoto (20m) + Katasterplan (50m) | |
| ortho_url = image_service.build_wms_url(e95, n95, layer="ch.swisstopo.swissimage", meters=50, width=1024, height=1024,image_format="image/jpeg") | |
| ortho_zoomed_url = image_service.build_wms_url(e95, n95, layer="ch.swisstopo.swissimage", meters=20, width=1024, height=1024,image_format="image/jpeg") | |
| plain_url = image_service.build_wms_url(e95, n95, layer="ch.swisstopo-vd.amtliche-vermessung", meters=50, width=1024, height=1024, image_format="image/png") | |
| ortho_path = image_service.download_image( | |
| ortho_url, | |
| name=f"swissimage_{feature_id}", | |
| outdir=f"{image_output_dir}/{result['EGID']}", # ✅ gleiches outdir | |
| ) | |
| print("Orthofoto saved:", ortho_path) | |
| ortho_zoomed_path = image_service.download_image( | |
| ortho_zoomed_url, | |
| name=f"swissimage_zoomed_{feature_id}", | |
| outdir=f"{image_output_dir}/{result['EGID']}", # ✅ gleiches outdir | |
| ) | |
| print("Orthofotos saved:", ortho_zoomed_path) | |
| plain_path = image_service.download_image( | |
| plain_url, | |
| name=f"cadastral_{feature_id}", | |
| outdir=f"{image_output_dir}/{result['EGID']}", # ✅ gleiches outdir | |
| ) | |
| print("Cadastral map saved:", plain_path) | |
| # Gebäude markieren und in ordner "marked" speichern | |
| os.makedirs(f"{image_output_dir}/{result['EGID']}/marked", exist_ok=True) | |
| marked_ortho_path = image_service.draw_marker( | |
| ortho_path, # ✅ URL mit layer-Parameter | |
| out_path=f"{image_output_dir}/{result['EGID']}/marked/{result['EGID']}.jpeg", # ✅ .jpeg konsistent | |
| ) | |
| marked_zoomed_ortho_path = image_service.draw_marker( | |
| ortho_zoomed_path, # ✅ URL mit layer-Parameter | |
| out_path=f"{image_output_dir}/{result['EGID']}/marked/zoomed_{result['EGID']}.jpeg", #✅ .jpeg konsistent | |
| ) | |
| marked_plain_path = image_service.draw_marker( | |
| plain_path, # ✅ URL mit layer-Parameter | |
| out_path=f"{image_output_dir}/{result['EGID']}/marked/cadastral_{result['EGID']}.jpeg", # ✅ .jpeg konsistent | |
| ) | |
| # Build zh map browser URL | |
| zh_map_url = f"https://geo.zh.ch/maps?x={int(e95)}&y={int(n95)}&scale=900&basemap=areavbackgroundzh" | |
| # z.B. ins Result schreiben | |
| result["ZH_MAP_URL"] = zh_map_url | |
| # --------------------------------------------------------- | |
| # DATAFRAME | |
| # --------------------------------------------------------- | |
| print("Extracting features with OpenAI...") | |
| feature_service = OpenAIFeatureService(api_key=openai_api_key, model="gpt-4o") | |
| features = feature_service.extract_features( | |
| image_paths=[street_path, street_path_zoomed, marked_ortho_path, marked_plain_path, marked_zoomed_ortho_path] | |
| ) | |
| flat = flatten_extraction(features) | |
| flat = {k: (v.upper() if isinstance(v, str) else v) for k, v in flat.items()} | |
| flat = derive_material_flags(flat) | |
| flat = clean_none_values(flat) | |
| print("Extracted features:", flat) | |
| result.update(flat) | |
| df = pd.DataFrame([result]) | |
| address_parts = GeoAdminService.parse_user_address(result.get("ADDRESS", "")) | |
| df["STRASSE"] = address_parts.get("street", "").title() | |
| df["HAUSNR"] = address_parts.get("nr", "") | |
| df["HAUSNRZUSATZ"] = address_parts.get("suffix", "") | |
| df["ORT"] = address_parts.get("city", "").title() | |
| df["PLZ"] = address_parts.get("plz", "") | |
| df["FENSTER"] = df.apply( | |
| lambda row: "AB 1990" if pd.notna(row["BAUJAHR"]) and row["BAUJAHR"] > 1990 else row["FENSTER"], | |
| axis=1 | |
| ) | |
| df["FENSTER_confidence"] = df.apply( | |
| lambda row: 1.0 if pd.notna(row["BAUJAHR"]) and row["BAUJAHR"] > 1990 else row.get("FENSTER_confidence", 0.0), | |
| axis=1 | |
| ) | |
| base_cols = [ | |
| "EGID", "GSW_STATUS", "STRASSE", "HAUSNR", "HAUSNRZUSATZ", | |
| "PLZ", "ORT", "BAUJAHR", "HAUPTNUTZUNG", "NUTZUNG", "lat", "lon", "ZH_MAP_URL" | |
| ] | |
| feature_cols = list(flat.keys()) | |
| drop_cols = {"ADDRESS", "x", "y", "feature_id"} | |
| # wenn spalten fehlen hinzufügen | |
| REQUIRED_COLS = [ | |
| "EGID", "GSW_STATUS", "HAUPTNUTZUNG", "NUTZUNG", "BAUJAHR", | |
| "TRAGWERK_FASSADE", "FASSADE_DAEMMUNG", "FASSADE_BEKLEIDUNG", "KONSTRUKTION_DECKE", | |
| "BODENAUFBAU", "KONSTRUKTION_DACH", "DACH_BEKLEIDUNG", "PHOTOVOLTAIK", "PV_FLAECHE", | |
| "FENSTER", "FENSTERANZAHL", "DAEMMUNGSFLAECHE", "STAHL", "STAHL_LM", "STAHLBLECH", | |
| "STAHLBLECH_FLAECHE", "ETERNIT", "ETERNIT_FLAECHE", "STEINPLATTEN", | |
| "STEINPLATTEN_FLAECHE", "DACHZIEGEL", "DACHZIEGEL_FLAECHE", "BETON", "BETON_FLAECHE", | |
| "HOLZ", "HOLZ_LM", "HOLZ_FLAECHE", "STRASSE", "HAUSNR", "HAUSNRZUSATZ", "PLZ", "ORT" | |
| ] | |
| cols = base_cols + feature_cols | |
| cols = [col for col in cols if col not in drop_cols] | |
| cols = list(dict.fromkeys(cols + REQUIRED_COLS)) | |
| df = df.reindex(columns=cols) | |
| # --------------------------------------------------------- | |
| # SAVE | |
| # --------------------------------------------------------- | |
| output_file = collected_data_path | |
| os.makedirs("data", exist_ok=True) | |
| df.to_excel(output_file, index=False) | |
| df.to_excel(f"{image_output_dir}/{result['EGID']}/features.xlsx", index=False) | |
| print(f"📁 Results saved to {output_file} und {image_output_dir}/{result['EGID']}/features.xlsx") | |
| return df | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--address", type=str, required=True) | |
| args = parser.parse_args() | |
| df = collect_building_data(args.address) | |
| print(df) | |
| if __name__ == "__main__": | |
| main() |