File size: 7,160 Bytes
791b742
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import json
import os
import pickle

import gradio as gr
import pandas as pd
from openai import OpenAI

MODEL_PATH = "random_forest_regression.pkl"
DATA_PATH = "bfs_municipality_and_tax_data.csv"

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")

with open(MODEL_PATH, "rb") as f:
    model = pickle.load(f)

df_bfs_data = pd.read_csv(DATA_PATH, sep=",", encoding="utf-8")
df_bfs_data["tax_income"] = (
    df_bfs_data["tax_income"].astype(str).str.replace("'", "", regex=False).astype(float)
)

town_to_row = {
    str(row["bfs_name"]).lower(): row
    for _, row in df_bfs_data.iterrows()
}
valid_towns = list(df_bfs_data["bfs_name"].sort_values().unique())


def match_town(user_town: str):
    if not user_town or not user_town.strip():
        return None
    key = user_town.strip().lower()
    if key in town_to_row:
        return str(town_to_row[key]["bfs_name"])
    for canonical_lower, row in town_to_row.items():
        if key in canonical_lower or canonical_lower in key:
            return str(row["bfs_name"])
    return None


def call_llm_json(system_prompt: str, user_prompt: str) -> str:
    if not OPENAI_API_KEY:
        raise ValueError("OPENAI_API_KEY ist nicht gesetzt.")
    if not OPENAI_MODEL:
        raise ValueError("OPENAI_MODEL ist nicht gesetzt.")
    client = OpenAI(api_key=OPENAI_API_KEY)
    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        max_tokens=512,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return response.choices[0].message.content


def parse_json_response(raw: str, required_keys: tuple[str, ...]) -> dict:
    cleaned = (raw or "").strip()
    if not cleaned:
        raise ValueError("LLM hat eine leere Antwort zurückgegeben.")
    if cleaned.startswith("```"):
        lines = cleaned.splitlines()
        inner = [line for line in lines[1:] if line.strip() != "```"]
        cleaned = "\n".join(inner)
    try:
        parsed = json.loads(cleaned)
    except json.JSONDecodeError as exc:
        raise ValueError(f"Kein gültiges JSON erhalten: {cleaned[:300]}") from exc
    missing = [k for k in required_keys if k not in parsed]
    if missing:
        raise ValueError(f"JSON fehlen Pflichtfelder: {', '.join(missing)}.")
    return parsed


def extract_preferences(user_text: str) -> dict:
    system_prompt = (
        "Du bist ein Datenextraktionssystem für Schweizer Wohnungsanfragen. "
        "Extrahiere aus dem Benutzertext genau drei Werte und gib sie als JSON zurück:\n"
        '- "rooms": Anzahl Zimmer als Dezimalzahl (z.B. 3.5)\n'
        '- "area_m2": Wohnfläche in m² als Zahl\n'
        '- "town": Name der Gemeinde oder Stadt als String\n'
        "Gib NUR gültiges JSON zurück, kein Markdown, keine Erklärung.\n"
        'Beispiel: {"rooms": 3.5, "area_m2": 85, "town": "Winterthur"}\n'
        "Falls ein Wert fehlt oder unklar ist, setze ihn auf null."
    )
    raw = call_llm_json(system_prompt=system_prompt, user_prompt=user_text)
    return parse_json_response(raw, required_keys=("rooms", "area_m2", "town"))


def predict_apartment_price(rooms: float, area_m2: float, town: str) -> float:
    canonical = match_town(town)
    if canonical is None:
        raise ValueError(
            f"Gemeinde '{town}' nicht gefunden. "
            "Bitte einen gültigen Ort im Kanton Zürich angeben."
        )
    row = town_to_row[canonical.lower()]
    features = pd.DataFrame([{
        "rooms": rooms,
        "area_m2": area_m2,
        "pop": float(row["pop"]),
        "pop_dens": float(row["pop_dens"]),
        "frg_pct": float(row["frg_pct"]),
        "emp": float(row["emp"]),
        "tax_income": float(row["tax_income"]),
    }])
    return float(model.predict(features)[0])


def generate_explanation(preferences: dict, prediction: float) -> str:
    system_prompt = (
        "Du bist ein freundlicher Schweizer Wohnungsberater. "
        "Erkläre die Mietpreisschätzung auf Deutsch in 2–3 Sätzen. "
        "Erwähne die Wohnungsmerkmale (Zimmer, Fläche, Ort), den geschätzten Mietpreis "
        "und einen Hinweis auf Unsicherheit (z.B. Zustand, Mikrolage). "
        "Berechne keinen neuen Preis selbst – verwende nur den angegebenen Modellwert. "
        'Gib NUR gültiges JSON zurück im Format: {"answer": "Deine Erklärung"}'
    )
    user_prompt = (
        f"Wohnungswunsch: {preferences.get('rooms')} Zimmer, "
        f"{preferences.get('area_m2')} m², {preferences.get('town')}. "
        f"Modellschätzung: CHF {prediction:,.0f} pro Monat."
    )
    raw = call_llm_json(system_prompt=system_prompt, user_prompt=user_prompt)
    result = parse_json_response(raw, required_keys=("answer",))
    return result["answer"]


def run_pipeline(user_text: str):
    if not user_text or not user_text.strip():
        return {}, 0.0, "Bitte beschreibe deinen Wohnungswunsch auf Deutsch."

    try:
        preferences = extract_preferences(user_text)
    except Exception as e:
        return {}, 0.0, f"Fehler bei der Texterkennung: {e}"

    rooms = preferences.get("rooms")
    area_m2 = preferences.get("area_m2")
    town = preferences.get("town")

    if rooms is None or area_m2 is None or town is None:
        missing = [k for k, v in {"Zimmer": rooms, "Fläche": area_m2, "Ort": town}.items() if v is None]
        return preferences, 0.0, (
            f"Fehlende Angaben: {', '.join(missing)}. "
            "Bitte Anfrage präzisieren, z.B.: «3.5 Zimmer, 85 m², Winterthur»."
        )

    try:
        prediction = predict_apartment_price(float(rooms), float(area_m2), str(town))
    except Exception as e:
        return preferences, 0.0, f"Fehler bei der Preisschätzung: {e}"

    try:
        explanation = generate_explanation(preferences, prediction)
    except Exception as e:
        explanation = f"Geschätzte Monatsmiete: CHF {prediction:,.0f}. (Erklärung nicht verfügbar: {e})"

    return preferences, prediction, explanation


with gr.Blocks(title="Wohnungspreisschätzer – Kanton Zürich") as demo:
    gr.Markdown(
        """
        # Wohnungspreisschätzer – Kanton Zürich
        Beschreibe deinen Wohnungswunsch auf **Deutsch** und erhalte eine Mietpreisschätzung.
        """
    )
    user_input = gr.Textbox(
        label="Wohnungswunsch",
        lines=3,
        placeholder="Ich suche eine 3.5-Zimmer-Wohnung mit etwa 85 m² in Winterthur.",
    )
    btn = gr.Button("Schätzen", variant="primary")
    extracted = gr.JSON(label="Extrahierte Eingaben")
    price = gr.Number(label="Geschätzte Monatsmiete (CHF)")
    answer = gr.Textbox(label="Antwort", lines=5)

    btn.click(fn=run_pipeline, inputs=[user_input], outputs=[extracted, price, answer])

    gr.Examples(
        examples=[
            ["Ich suche eine 3.5-Zimmer-Wohnung mit etwa 85 m² in Winterthur."],
            ["Haben Sie etwas für mich? 4 Zimmer, 100 m², am liebsten in Zürich."],
            ["Suche kleine 2-Zimmer-Wohnung mit 50 m² in Uster."],
        ],
        inputs=[user_input],
    )

demo.launch()