kuhs's picture
Upload folder using huggingface_hub
628a672 verified
"""
Script to collect building data for a single address and save results in Excel.
Usage:
python -m scripts.collect_building_data --address "Guggenbühlstrasse 140a 8404 Winterthur"
"""
from services.geo_admin_service import GeoAdminService
from services.building_image_service import ImageService
from openai_services.openai_feature_service import OpenAIFeatureService
from openai_services.building_image_schema import BuildingImageExtraction
import os
import argparse
import pandas as pd
from dotenv import load_dotenv
import pathlib
PROJECT_ROOT = pathlib.Path(__file__).resolve().parents[1]
def normalize_key(k: str) -> str:
if k.endswith("_confidence"):
return k[:-11].upper() + "_confidence"
if k.endswith("_unit"):
return k[:-5].upper() + "_unit"
return k.upper()
def flatten_extraction(features: BuildingImageExtraction) -> dict:
raw = features.model_dump()
flat = {}
for key, val in raw.items():
out_key = normalize_key(key)
if isinstance(val, dict):
value = val.get("value")
if value is None:
# Rueckwaerts-kompatibel fuer alte Runs.
value = val.get("value_enum") or val.get("value_str") or val.get("value_num")
confidence = val.get("confidence")
# auskommentiert da accuracy_pct den confidence Wert nur spiegelt und damit redundant ist.
# accuracy_pct = val.get("accuracy_pct")
flat[out_key] = value
flat[f"{out_key}_confidence"] = confidence
# if accuracy_pct is None and confidence is not None:
# accuracy_pct = round(confidence * 100.0, 1)
# flat[f"{out_key}_accuracy_pct"] = accuracy_pct
else:
flat[out_key] = val
return flat
def clean_none_values(flat: dict) -> dict:
for key, value in flat.items():
# nur für value-Felder, nicht confidence
if key.endswith("confidence"):
continue
if value is None:
conf = flat.get(f"{key}_confidence", 0)
if conf >= 0.6:
flat[key] = "NEIN"
else:
flat[key] = "UNBEKANNT"
return flat
def derive_material_flags(flat: dict) -> dict:
fassade = flat.get("FASSADE_BEKLEIDUNG")
if isinstance(fassade, str):
fassade_upper = fassade.upper()
if "HOLZ" in fassade_upper:
flat["HOLZ"] = "JA"
if "STAHLBLECH" in fassade_upper:
flat["STAHLBLECH"] = "JA"
if "STAHL" in fassade_upper:
flat["STAHL"] = "JA"
if "ETERNIT" in fassade_upper:
flat["ETERNIT"] = "JA"
if "STEINPLATTEN" in fassade_upper or "STEIN" in fassade_upper:
flat["STEINPLATTEN"] = "JA"
if "BETON" in fassade_upper:
flat["BETON"] = "JA"
dach = flat.get("DACH_BEKLEIDUNG")
if isinstance(dach, str):
dach_upper = dach.upper()
if "DACHZIEGEL" in dach_upper or "ZIEGEL" in dach_upper:
flat["DACHZIEGEL"] = "JA"
if "STAHLBLECH" in dach_upper:
flat["STAHLBLECH"] = "JA"
if "ETERNIT" in dach_upper:
flat["ETERNIT"] = "JA"
if "DACHZIEGEL" in dach_upper or "ZIEGEL" in dach_upper:
flat["DACH_BEKLEIDUNG"] = "ZIEGEL"
return flat
def collect_building_data(address: str):
load_dotenv()
# ---------------------------------------------------------
# ENV CHECK
# ---------------------------------------------------------
google_api_key = os.getenv("API_KEY_GOOGLE_MAPS")
if not google_api_key:
raise ValueError("API_KEY_GOOGLE_MAPS not found in .env file")
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
raise ValueError("OPENAI_API_KEY not found in .env file")
collected_data_path = os.getenv("COLLECTED_DATA_PATH")
if not collected_data_path:
raise ValueError("COLLECTED_DATA_PATH not found in .env file")
image_output_dir = os.getenv("OUTPUT_IMAGES_PATH")
if not image_output_dir:
raise ValueError("OUTPUT_IMAGES_PATH not found in .env file")
# ---------------------------------------------------------
# GEO ADMIN SERVICE
# ---------------------------------------------------------
geo_service = GeoAdminService()
result = geo_service.collect_building_data(address)
print("✅ Address found:", result.get("label", ""))
print(address)
# geocode
lon, lat, feature_id, x, y = result.get("LON"), result.get("LAT"), result.get("EGID"), result.get("X"), result.get("Y")
print("Feature ID:", feature_id)
print(result)
#---------------------------------------------------------
# IMAGE SERVICE
#---------------------------------------------------------
image_service = ImageService()
result = geo_service.collect_building_data(address)
lat = result["lat"]
lon = result["lon"]
x = result["x"]
y = result["y"]
feature_id = result["feature_id"]
print("Coordinates:", lat, lon)
i = 0
lat += 0.00005 # leicht versetzen, damit Google unterschiedliche Bilder liefert
lon += 0.00005
for i in range(i, 3):
print(f"Downloading Street View image {i+1}/3...")
street_path = image_service.download_image(
"https://maps.googleapis.com/maps/api/streetview"
f"?size=640x640"
f"&scale=2"
f"&location={lat},{lon}"
f"&radius=23"
f"&pitch=25"
f"&source=outdoor"
f"&fov=110"
f"&key={google_api_key}",
name=f"streetview_{i+1}_{result['EGID']}",
outdir=f"{image_output_dir}/{result['EGID']}",
)
lat -= 0.00003 # leicht versetzen, damit Google unterschiedliche Bilder liefert
lon -= 0.00003
i = 0
lat= result["lat"]
lon = result["lon"]
lat += 0.00005 # leicht versetzen, damit Google unterschiedliche Bilder liefert
lon += 0.00005
for i in range (i, 3):
print(f"Downloading zoomed Street View image {i+1}/3...")
street_path_zoomed = image_service.download_image(
"https://maps.googleapis.com/maps/api/streetview"
f"?size=640x640"
f"&scale=2"
f"&location={lat},{lon}"
f"&radius=23"
f"&pitch=30"
f"&source=outdoor"
f"&fov=70"
f"&key={google_api_key}",
name=f"streetview_zoomed_{i+1}_{result['EGID']}",
outdir=f"{image_output_dir}/{result['EGID']}",
)
lat -= 0.00003 # leicht versetzen, damit Google unterschiedliche Bilder liefert
lon -= 0.00003
e95, n95 = image_service.ensure_lv95_xy(x, y)
print(f"LV95 coordinates for WMS: E={e95}, N={n95}")
# Orthofoto (50m) + Orthofoto (20m) + Katasterplan (50m)
ortho_url = image_service.build_wms_url(e95, n95, layer="ch.swisstopo.swissimage", meters=50, width=1024, height=1024,image_format="image/jpeg")
ortho_zoomed_url = image_service.build_wms_url(e95, n95, layer="ch.swisstopo.swissimage", meters=20, width=1024, height=1024,image_format="image/jpeg")
plain_url = image_service.build_wms_url(e95, n95, layer="ch.swisstopo-vd.amtliche-vermessung", meters=50, width=1024, height=1024, image_format="image/png")
ortho_path = image_service.download_image(
ortho_url,
name=f"swissimage_{feature_id}",
outdir=f"{image_output_dir}/{result['EGID']}", # ✅ gleiches outdir
)
print("Orthofoto saved:", ortho_path)
ortho_zoomed_path = image_service.download_image(
ortho_zoomed_url,
name=f"swissimage_zoomed_{feature_id}",
outdir=f"{image_output_dir}/{result['EGID']}", # ✅ gleiches outdir
)
print("Orthofotos saved:", ortho_zoomed_path)
plain_path = image_service.download_image(
plain_url,
name=f"cadastral_{feature_id}",
outdir=f"{image_output_dir}/{result['EGID']}", # ✅ gleiches outdir
)
print("Cadastral map saved:", plain_path)
# Gebäude markieren und in ordner "marked" speichern
os.makedirs(f"{image_output_dir}/{result['EGID']}/marked", exist_ok=True)
marked_ortho_path = image_service.draw_marker(
ortho_path, # ✅ URL mit layer-Parameter
out_path=f"{image_output_dir}/{result['EGID']}/marked/{result['EGID']}.jpeg", # ✅ .jpeg konsistent
)
marked_zoomed_ortho_path = image_service.draw_marker(
ortho_zoomed_path, # ✅ URL mit layer-Parameter
out_path=f"{image_output_dir}/{result['EGID']}/marked/zoomed_{result['EGID']}.jpeg", #✅ .jpeg konsistent
)
marked_plain_path = image_service.draw_marker(
plain_path, # ✅ URL mit layer-Parameter
out_path=f"{image_output_dir}/{result['EGID']}/marked/cadastral_{result['EGID']}.jpeg", # ✅ .jpeg konsistent
)
# Build zh map browser URL
zh_map_url = f"https://geo.zh.ch/maps?x={int(e95)}&y={int(n95)}&scale=900&basemap=areavbackgroundzh"
# z.B. ins Result schreiben
result["ZH_MAP_URL"] = zh_map_url
# ---------------------------------------------------------
# DATAFRAME
# ---------------------------------------------------------
print("Extracting features with OpenAI...")
feature_service = OpenAIFeatureService(api_key=openai_api_key, model="gpt-4o")
features = feature_service.extract_features(
image_paths=[street_path, street_path_zoomed, marked_ortho_path, marked_plain_path, marked_zoomed_ortho_path]
)
flat = flatten_extraction(features)
flat = {k: (v.upper() if isinstance(v, str) else v) for k, v in flat.items()}
flat = derive_material_flags(flat)
flat = clean_none_values(flat)
print("Extracted features:", flat)
result.update(flat)
df = pd.DataFrame([result])
address_parts = GeoAdminService.parse_user_address(result.get("ADDRESS", ""))
df["STRASSE"] = address_parts.get("street", "").title()
df["HAUSNR"] = address_parts.get("nr", "")
df["HAUSNRZUSATZ"] = address_parts.get("suffix", "")
df["ORT"] = address_parts.get("city", "").title()
df["PLZ"] = address_parts.get("plz", "")
df["FENSTER"] = df.apply(
lambda row: "AB 1990" if pd.notna(row["BAUJAHR"]) and row["BAUJAHR"] > 1990 else row["FENSTER"],
axis=1
)
df["FENSTER_confidence"] = df.apply(
lambda row: 1.0 if pd.notna(row["BAUJAHR"]) and row["BAUJAHR"] > 1990 else row.get("FENSTER_confidence", 0.0),
axis=1
)
base_cols = [
"EGID", "GSW_STATUS", "STRASSE", "HAUSNR", "HAUSNRZUSATZ",
"PLZ", "ORT", "BAUJAHR", "HAUPTNUTZUNG", "NUTZUNG", "lat", "lon", "ZH_MAP_URL"
]
feature_cols = list(flat.keys())
drop_cols = {"ADDRESS", "x", "y", "feature_id"}
# wenn spalten fehlen hinzufügen
REQUIRED_COLS = [
"EGID", "GSW_STATUS", "HAUPTNUTZUNG", "NUTZUNG", "BAUJAHR",
"TRAGWERK_FASSADE", "FASSADE_DAEMMUNG", "FASSADE_BEKLEIDUNG", "KONSTRUKTION_DECKE",
"BODENAUFBAU", "KONSTRUKTION_DACH", "DACH_BEKLEIDUNG", "PHOTOVOLTAIK", "PV_FLAECHE",
"FENSTER", "FENSTERANZAHL", "DAEMMUNGSFLAECHE", "STAHL", "STAHL_LM", "STAHLBLECH",
"STAHLBLECH_FLAECHE", "ETERNIT", "ETERNIT_FLAECHE", "STEINPLATTEN",
"STEINPLATTEN_FLAECHE", "DACHZIEGEL", "DACHZIEGEL_FLAECHE", "BETON", "BETON_FLAECHE",
"HOLZ", "HOLZ_LM", "HOLZ_FLAECHE", "STRASSE", "HAUSNR", "HAUSNRZUSATZ", "PLZ", "ORT"
]
cols = base_cols + feature_cols
cols = [col for col in cols if col not in drop_cols]
cols = list(dict.fromkeys(cols + REQUIRED_COLS))
df = df.reindex(columns=cols)
# ---------------------------------------------------------
# SAVE
# ---------------------------------------------------------
output_file = collected_data_path
os.makedirs("data", exist_ok=True)
df.to_excel(output_file, index=False)
df.to_excel(f"{image_output_dir}/{result['EGID']}/features.xlsx", index=False)
print(f"📁 Results saved to {output_file} und {image_output_dir}/{result['EGID']}/features.xlsx")
return df
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--address", type=str, required=True)
args = parser.parse_args()
df = collect_building_data(args.address)
print(df)
if __name__ == "__main__":
main()