import json import os import pickle import gradio as gr import pandas as pd from openai import OpenAI MODEL_PATH = "random_forest_regression.pkl" DATA_PATH = "bfs_municipality_and_tax_data.csv" OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini") with open(MODEL_PATH, "rb") as f: model = pickle.load(f) df_bfs_data = pd.read_csv(DATA_PATH, sep=",", encoding="utf-8") df_bfs_data["tax_income"] = ( df_bfs_data["tax_income"].astype(str).str.replace("'", "", regex=False).astype(float) ) town_to_row = { str(row["bfs_name"]).lower(): row for _, row in df_bfs_data.iterrows() } valid_towns = list(df_bfs_data["bfs_name"].sort_values().unique()) def match_town(user_town: str): if not user_town or not user_town.strip(): return None key = user_town.strip().lower() if key in town_to_row: return str(town_to_row[key]["bfs_name"]) for canonical_lower, row in town_to_row.items(): if key in canonical_lower or canonical_lower in key: return str(row["bfs_name"]) return None def call_llm_json(system_prompt: str, user_prompt: str) -> str: if not OPENAI_API_KEY: raise ValueError("OPENAI_API_KEY ist nicht gesetzt.") if not OPENAI_MODEL: raise ValueError("OPENAI_MODEL ist nicht gesetzt.") client = OpenAI(api_key=OPENAI_API_KEY) response = client.chat.completions.create( model=OPENAI_MODEL, max_tokens=512, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], ) return response.choices[0].message.content def parse_json_response(raw: str, required_keys: tuple[str, ...]) -> dict: cleaned = (raw or "").strip() if not cleaned: raise ValueError("LLM hat eine leere Antwort zurückgegeben.") if cleaned.startswith("```"): lines = cleaned.splitlines() inner = [line for line in lines[1:] if line.strip() != "```"] cleaned = "\n".join(inner) try: parsed = json.loads(cleaned) except json.JSONDecodeError as exc: raise ValueError(f"Kein gültiges JSON erhalten: {cleaned[:300]}") from exc missing = [k for k in required_keys if k not in parsed] if missing: raise ValueError(f"JSON fehlen Pflichtfelder: {', '.join(missing)}.") return parsed def extract_preferences(user_text: str) -> dict: system_prompt = ( "Du bist ein Datenextraktionssystem für Schweizer Wohnungsanfragen. " "Extrahiere aus dem Benutzertext genau drei Werte und gib sie als JSON zurück:\n" '- "rooms": Anzahl Zimmer als Dezimalzahl (z.B. 3.5)\n' '- "area_m2": Wohnfläche in m² als Zahl\n' '- "town": Name der Gemeinde oder Stadt als String\n' "Gib NUR gültiges JSON zurück, kein Markdown, keine Erklärung.\n" 'Beispiel: {"rooms": 3.5, "area_m2": 85, "town": "Winterthur"}\n' "Falls ein Wert fehlt oder unklar ist, setze ihn auf null." ) raw = call_llm_json(system_prompt=system_prompt, user_prompt=user_text) return parse_json_response(raw, required_keys=("rooms", "area_m2", "town")) def predict_apartment_price(rooms: float, area_m2: float, town: str) -> float: canonical = match_town(town) if canonical is None: raise ValueError( f"Gemeinde '{town}' nicht gefunden. " "Bitte einen gültigen Ort im Kanton Zürich angeben." ) row = town_to_row[canonical.lower()] features = pd.DataFrame([{ "rooms": rooms, "area_m2": area_m2, "pop": float(row["pop"]), "pop_dens": float(row["pop_dens"]), "frg_pct": float(row["frg_pct"]), "emp": float(row["emp"]), "tax_income": float(row["tax_income"]), }]) return float(model.predict(features)[0]) def generate_explanation(preferences: dict, prediction: float) -> str: system_prompt = ( "Du bist ein freundlicher Schweizer Wohnungsberater. " "Erkläre die Mietpreisschätzung auf Deutsch in 2–3 Sätzen. " "Erwähne die Wohnungsmerkmale (Zimmer, Fläche, Ort), den geschätzten Mietpreis " "und einen Hinweis auf Unsicherheit (z.B. Zustand, Mikrolage). " "Berechne keinen neuen Preis selbst – verwende nur den angegebenen Modellwert. " 'Gib NUR gültiges JSON zurück im Format: {"answer": "Deine Erklärung"}' ) user_prompt = ( f"Wohnungswunsch: {preferences.get('rooms')} Zimmer, " f"{preferences.get('area_m2')} m², {preferences.get('town')}. " f"Modellschätzung: CHF {prediction:,.0f} pro Monat." ) raw = call_llm_json(system_prompt=system_prompt, user_prompt=user_prompt) result = parse_json_response(raw, required_keys=("answer",)) return result["answer"] def run_pipeline(user_text: str): if not user_text or not user_text.strip(): return {}, 0.0, "Bitte beschreibe deinen Wohnungswunsch auf Deutsch." try: preferences = extract_preferences(user_text) except Exception as e: return {}, 0.0, f"Fehler bei der Texterkennung: {e}" rooms = preferences.get("rooms") area_m2 = preferences.get("area_m2") town = preferences.get("town") if rooms is None or area_m2 is None or town is None: missing = [k for k, v in {"Zimmer": rooms, "Fläche": area_m2, "Ort": town}.items() if v is None] return preferences, 0.0, ( f"Fehlende Angaben: {', '.join(missing)}. " "Bitte Anfrage präzisieren, z.B.: «3.5 Zimmer, 85 m², Winterthur»." ) try: prediction = predict_apartment_price(float(rooms), float(area_m2), str(town)) except Exception as e: return preferences, 0.0, f"Fehler bei der Preisschätzung: {e}" try: explanation = generate_explanation(preferences, prediction) except Exception as e: explanation = f"Geschätzte Monatsmiete: CHF {prediction:,.0f}. (Erklärung nicht verfügbar: {e})" return preferences, prediction, explanation with gr.Blocks(title="Wohnungspreisschätzer – Kanton Zürich") as demo: gr.Markdown( """ # Wohnungspreisschätzer – Kanton Zürich Beschreibe deinen Wohnungswunsch auf **Deutsch** und erhalte eine Mietpreisschätzung. """ ) user_input = gr.Textbox( label="Wohnungswunsch", lines=3, placeholder="Ich suche eine 3.5-Zimmer-Wohnung mit etwa 85 m² in Winterthur.", ) btn = gr.Button("Schätzen", variant="primary") extracted = gr.JSON(label="Extrahierte Eingaben") price = gr.Number(label="Geschätzte Monatsmiete (CHF)") answer = gr.Textbox(label="Antwort", lines=5) btn.click(fn=run_pipeline, inputs=[user_input], outputs=[extracted, price, answer]) gr.Examples( examples=[ ["Ich suche eine 3.5-Zimmer-Wohnung mit etwa 85 m² in Winterthur."], ["Haben Sie etwas für mich? 4 Zimmer, 100 m², am liebsten in Zürich."], ["Suche kleine 2-Zimmer-Wohnung mit 50 m² in Uster."], ], inputs=[user_input], ) demo.launch()