Spaces:
Running
Running
| import os | |
| import re | |
| import uuid | |
| import time | |
| import streamlit as st | |
| from pathlib import Path | |
| import base64 | |
| import difflib | |
| from typing import Optional | |
| from supabase import create_client | |
| from app_nn import run_chat_app | |
| from config import refresh_user_context, SUPABASE_URL, SUPABASE_SERVICE_KEY, OPENAI_API_KEY | |
| from translate_query_response import ( | |
| detect_language_code, | |
| translate_from_english, | |
| translate_to_english, | |
| extract_interests_with_llm, | |
| ) | |
| from supabase_ie import create_ui_language_row | |
| from db3_utils import COUNTRY_NAME_TO_ISO2 | |
| ISO2_TO_NAME = {iso.upper(): name for name, iso in COUNTRY_NAME_TO_ISO2.items()} | |
| # ---------------- STREAMLIT CONFIG ---------------- | |
| st.set_page_config(page_title="Socrates", page_icon="💬", layout="centered") | |
| st.markdown("<style>div.block-container{max-width:700px;margin:auto;}</style>", unsafe_allow_html=True) | |
| # 🧩 Optional: reset directly via URL (?reset=1) | |
| params = st.query_params if hasattr(st, "query_params") else {} | |
| if params.get("reset", ["0"])[0] == "1": | |
| st.session_state.clear() | |
| st.rerun() | |
| sb = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| TABLE_NAME = "user_profiles" | |
| DEV_FIXED_PASSWORD = "123456" | |
| # ---------------- WIZARD HELPER ---------------- | |
| # Canonical English country names (use as the "ground truth" list) | |
| COUNTRY_CANON = sorted(COUNTRY_NAME_TO_ISO2.keys()) | |
| NAME_TO_CANON = {name.lower(): name for name in COUNTRY_CANON} | |
| def normalize_country_end(raw: str | None) -> Optional[str]: | |
| """ | |
| Return a canonical English country name or None. | |
| Strategy: | |
| 1) Exact (case-insensitive) against canonical English names | |
| 2) LLM translate-to-English + exact match | |
| 3) Tight fuzzy match (handles small typos) | |
| """ | |
| if not raw: | |
| return None | |
| r = raw.strip() | |
| if not r: | |
| return None | |
| # 1) exact (case-insensitive) | |
| k = r.lower() | |
| if k in NAME_TO_CANON: | |
| return NAME_TO_CANON[k] | |
| # 2) translate to English, then exact | |
| try: | |
| en = (translate_to_english(r) or "").strip() | |
| except Exception: | |
| en = "" | |
| if en: | |
| k2 = en.lower() | |
| if k2 in NAME_TO_CANON: | |
| return NAME_TO_CANON[k2] | |
| # 3) fuzzy on the canonical list | |
| cand = difflib.get_close_matches(en or r, COUNTRY_CANON, n=1, cutoff=0.86) | |
| return cand[0] if cand else None | |
| def safe_country_name(raw: str | None) -> Optional[str]: | |
| """ | |
| Return a canonical English country name from free text, or None if not resolvable. | |
| Works with localized names (e.g., 'Italia') and ISO-2 (e.g., 'IT'). | |
| """ | |
| if not raw: | |
| return None | |
| val = raw.strip() | |
| # 0) quick ISO-2: "IT" -> "Italy" | |
| if len(val) == 2 and val.upper() in ISO2_TO_NAME: | |
| return ISO2_TO_NAME[val.upper()] | |
| # 1) minimal aliases for common local names (extend as needed) | |
| ALIAS = { | |
| "italia": "Italy", | |
| "españa": "Spain", "espana": "Spain", | |
| "deutschland": "Germany", | |
| "méxico": "Mexico", "mexico": "Mexico", | |
| "brasil": "Brazil", | |
| "portugal": "Portugal", | |
| "nederland": "Netherlands", | |
| "suomi": "Finland", | |
| "polska": "Poland", | |
| "éire": "Ireland", "eire": "Ireland", | |
| "elláda": "Greece", "ellada": "Greece", "ελλάδα": "Greece", | |
| } | |
| key_raw = val.lower() | |
| if key_raw in ALIAS: | |
| return ALIAS[key_raw] | |
| # 2) try translation once, but don't depend on it | |
| try: | |
| val_en = (translate_to_english(val) or "").strip() | |
| except Exception: | |
| val_en = "" | |
| candidates = [c for c in {val_en, val} if c] # unique, non-empty: translated then raw | |
| # 3) accept exact English country names case-insensitively | |
| for cand in candidates: | |
| name_key = cand.lower() | |
| if name_key in NAME_TO_CANON: | |
| return NAME_TO_CANON[name_key] | |
| return None | |
| def _img_data_uri(filename: str) -> str: | |
| """Return a 'data:image/png;base64,...' URI for a local PNG/JPG file.""" | |
| p = Path(__file__).parent / filename | |
| with open(p, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode("utf-8") | |
| # rudimentary type detection | |
| ext = p.suffix.lower().lstrip(".") | |
| mime = "jpeg" if ext in ("jpg", "jpeg") else "png" | |
| return f"data:image/{mime};base64,{b64}" | |
| def socrates_bubble(text: str): | |
| """Display Socrates' avatar with a styled speech bubble (consistent font).""" | |
| avatar_uri = _img_data_uri("Socrates.png") # <-- embed the PNG reliably | |
| st.markdown( | |
| f""" | |
| <div style="display:flex; align-items:flex-start; gap:12px; margin-top:0.75rem;"> | |
| <img src="{avatar_uri}" width="48" style="border-radius:50%; flex:0 0 48px;"/> | |
| <div style=" | |
| background:#f7f7f7; | |
| border-radius:14px; | |
| padding:10px 14px; | |
| box-shadow:0 1px 3px rgba(0,0,0,0.08); | |
| line-height:1.5; | |
| color:#222; | |
| max-width:560px;"> | |
| {text} | |
| </div> | |
| </div> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # ---------------- AUTH + PROFILE HANDLING ---------------- | |
| def upsert_user_profile(user_id, name, living_country, origin_country, interests): | |
| try: | |
| if isinstance(interests, str): | |
| interests = [s.strip() for s in interests.split(",") if s.strip()] | |
| payload = { | |
| "id": user_id, | |
| "name": name or None, | |
| "living_country": living_country or None, | |
| "origin_country": origin_country or None, | |
| "interests": interests or None, # use text[] column or text depending on schema | |
| } | |
| sb.table("user_profiles").upsert(payload, on_conflict="id").execute() | |
| except Exception as e: | |
| st.error(f"Failed to upsert user profile: {e}") | |
| def _normalize_list_users(resp): | |
| """ | |
| Normalize admin.list_users() into a plain Python list of user dicts/objects. | |
| Covers SDK variants that return .users, .data, dict{"users": [...]}, or a list. | |
| """ | |
| if isinstance(resp, dict) and "users" in resp: | |
| return resp["users"] | |
| if hasattr(resp, "users"): # some SDKs | |
| return resp.users | |
| if hasattr(resp, "data"): # other SDKs | |
| return resp.data | |
| if isinstance(resp, list): | |
| return resp | |
| return [] | |
| def _extract_id_email(u): | |
| """ | |
| Return (user_id , email_lower) for either a dict user or a User-like object. | |
| Never calls .get on objects; uses getattr for objects. | |
| """ | |
| # dict response | |
| if isinstance(u, dict): | |
| user_id = u.get("id") or (u.get("user") or {}).get("id") | |
| email = (u.get("email") | |
| or (u.get("user") or {}).get("email") | |
| or "") | |
| return user_id , email.lower() | |
| # object response | |
| user_obj = getattr(u, "user", None) # some wrappers nest the 'user' | |
| user_id = getattr(u, "id", None) or (getattr(user_obj, "id", None) if user_obj else None) | |
| email = getattr(u, "email", None) or (getattr(user_obj, "email", None) if user_obj else None) or "" | |
| return user_id , email.lower() | |
| def _extract_created_id(created): | |
| """ | |
| Normalize the id out of admin.create_user(...) result (dict or object). | |
| """ | |
| if isinstance(created, dict): | |
| return created.get("id") or (created.get("user") or {}).get("id") | |
| if hasattr(created, "id"): | |
| return created.id | |
| if hasattr(created, "user") and getattr(created.user, "id", None): | |
| return created.user.id | |
| return None | |
| def create_or_get_user(email: str) -> str | None: | |
| """ | |
| If a Supabase Auth user with this email exists -> return its id. | |
| Otherwise create it (deterministic UUID) and return the id. | |
| Uses the SERVICE ROLE client (sb) via sb.auth.admin.* calls. | |
| """ | |
| try: | |
| email_lower = email.strip().lower() | |
| # 1) List users and search by email (works across SDK shapes) | |
| lu = sb.auth.admin.list_users() | |
| users = _normalize_list_users(lu) | |
| user_id = None | |
| for u in users: | |
| cand_id, cand_email = _extract_id_email(u) | |
| if cand_email == email_lower and cand_id: | |
| user_id = cand_id | |
| break | |
| if user_id: | |
| print(f"✅ Existing Auth user: {email_lower} -> {user_id}") | |
| return user_id | |
| # 2) Not found → create with deterministic UUID | |
| new_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, email_lower)) | |
| print(f"🧩 Creating new Auth user {email_lower} with UUID {new_uuid}") | |
| created = sb.auth.admin.create_user({ | |
| "email": email_lower, | |
| "password": DEV_FIXED_PASSWORD, | |
| "email_confirm": True, | |
| "id": new_uuid, | |
| }) | |
| user_id = _extract_created_id(created) or new_uuid | |
| print(f"✅ Created Auth user: {email_lower} -> {user_id}") | |
| return user_id | |
| except Exception as e: | |
| st.error(f"Error creating or fetching user: {e}") | |
| print("⚠️ create_or_get_user() error:", e) | |
| return None | |
| # ---------------- WIZARD ---------------- | |
| def onboarding_wizard(user_id): | |
| #st.title("💬 Socrates") | |
| st.markdown("---") | |
| if "wizard_step" not in st.session_state: | |
| st.session_state.wizard_step = 1 | |
| st.session_state.form_data = { | |
| "language": "", | |
| "name": "", | |
| "living_country": "", | |
| "origin_country": "", | |
| "interests": "", | |
| } | |
| step = st.session_state.wizard_step | |
| data = st.session_state.form_data | |
| # ✅ read the chosen UI language (default to English) | |
| #ui_lang = detect_language_code(lang_raw) | |
| # Step 1 - Name | |
| if step == 1: | |
| # Socrates question (leave this English; it's a meta prompt about language) | |
| socrates_bubble("Language, Lingua, Langue, Idioma, Sprache, 语言, Språk, 言語, ... ?") | |
| # Label in English here is fine; the user types their language name | |
| data["language"] = st.text_input("Language", value=data["language"]) | |
| if st.button("Next →", use_container_width=True): | |
| lang_raw = (data["language"] or "").strip() | |
| if not lang_raw: | |
| st.warning("Please enter your language.") | |
| st.stop() | |
| # user_lang_code = detect_language_code(lang_raw) | |
| # # ✅ persist in session for the next steps | |
| # st.session_state["ui_lang"] = user_lang_code | |
| detected = detect_language_code(lang_raw) | |
| st.session_state["ui_lang"] = detected if detected else "en" | |
| st.session_state.wizard_step = 2 | |
| st.rerun() | |
| # Step 2 - Name | |
| if step == 2: | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| socrates_message = "Before our dialogue begins, by what name shall I address you, my friend?" | |
| socrates_bubble(translate_from_english(socrates_message, ui_lang)) | |
| name_label = translate_from_english("Your name", ui_lang) | |
| data["name"] = st.text_input(name_label, value=data["name"]) | |
| st.session_state["name"] = data["name"] | |
| c1, c2 = st.columns([1, 1]) | |
| # we don't change the language of the the Previous and Next buttons | |
| if c1.button("← Previous", use_container_width=True): | |
| st.session_state.wizard_step = 1 | |
| st.rerun() | |
| if c2.button("Next →", use_container_width=True): | |
| if data["name"].strip(): | |
| st.session_state.wizard_step = 3 | |
| st.rerun() | |
| else: | |
| warning = "Could you tell me your name, so that our dialogue may truly begin?" | |
| warning_lan = translate_from_english(warning, ui_lang) | |
| st.warning(warning_lan) | |
| # Step 3 - Living country (new dedicated step) | |
| elif step == 3: | |
| #_warm_up_translation_once() | |
| usergivenname = data['name'] if data['name'].strip() else "my friend" | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| msg = f"Thank you, {usergivenname}. In which country do you dwell?" | |
| socrates_bubble(translate_from_english(msg, ui_lang)) | |
| living_label = translate_from_english("Country you live in", ui_lang) | |
| data["living_country"] = st.text_input(living_label, value=data["living_country"],key="living_country_input") | |
| c1, c2 = st.columns([1, 1]) | |
| if c1.button("← Previous", use_container_width=True): | |
| st.session_state.wizard_step = 2 | |
| st.rerun() | |
| if c2.button("Next →", use_container_width=True): | |
| # No validation here; collect raw input and continue | |
| st.session_state.wizard_step = 4 | |
| st.rerun() | |
| # Step 4 - Origin Countries | |
| elif step == 4: | |
| usergivenname = data['name'] if data['name'].strip() else "my friend" | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| socrates_message = f"Thank you, {usergivenname}. Now, if you don't mind, from what land your forebears come?" | |
| socrates_message_lan = translate_from_english(socrates_message, ui_lang) | |
| socrates_bubble(socrates_message_lan) | |
| origin_label = translate_from_english("Country of origin", ui_lang) | |
| data["origin_country"] = st.text_input(origin_label, value=data["origin_country"],key="origin_country_input") | |
| c1, c2 = st.columns([1, 1]) | |
| if c1.button("← Previous", use_container_width=True): | |
| st.session_state.wizard_step = 3 | |
| st.rerun() | |
| if c2.button("Next →", use_container_width=True): | |
| # No validation here; collect raw input and continue | |
| st.session_state.wizard_step = 5 | |
| st.rerun() | |
| # Step 5 - Interests | |
| elif step == 5: | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| socrates_message = "One last curiosity, if you permit me, what pursuits most stir your soul and occupy your thought?" | |
| socrates_message_lan = translate_from_english(socrates_message, ui_lang) | |
| socrates_bubble(socrates_message_lan) | |
| interests_label = translate_from_english("Your main interests (comma separated)", ui_lang) | |
| data["interests"] = st.text_input(interests_label, value=data["interests"]) | |
| c1, c2 = st.columns([1, 1]) | |
| if c1.button("← Previous", use_container_width=True): | |
| st.session_state.wizard_step = 4 | |
| st.rerun() | |
| if c2.button("Finish", use_container_width=True): | |
| if data["name"].strip(): | |
| # Interests | |
| interest_list_en = extract_interests_with_llm(data["interests"], ui_lang) if data["interests"] else None | |
| # ✅ Resolve countries ONCE at the end (LLM + exact + fuzzy). Save only English names. | |
| living_en = normalize_country_end(data.get("living_country")) | |
| origin_en = normalize_country_end(data.get("origin_country")) | |
| # If unresolved but the user typed something, inform (do NOT block) | |
| missing = [] | |
| if data.get("living_country") and not living_en: | |
| missing.append("living country") | |
| if data.get("origin_country") and not origin_en: | |
| missing.append("country of origin") | |
| if missing: | |
| st.info(translate_from_english( | |
| "I couldn’t confidently recognize your " + " and ".join(missing) + | |
| ". I’ll skip saving it for now. You can add it later in chat by simply saying " | |
| "“My current country is …” or “My origin country is …”.", | |
| ui_lang | |
| )) | |
| # Optional: one-time chat reminder after onboarding | |
| st.session_state["post_onboarding_notice"] = ( | |
| "If you’d like, tell me your " + " and ".join(missing) + | |
| " in chat anytime. For example: “My current country is Italy.”" | |
| ) | |
| # Save clean English names (or None if unresolved/left blank) | |
| upsert_user_profile( | |
| user_id, | |
| data["name"].strip(), | |
| living_en if data.get("living_country") else None, | |
| origin_en if data.get("origin_country") else None, | |
| interest_list_en, | |
| ) | |
| # wrap up | |
| st.session_state.pop("wizard_step", None) | |
| st.session_state.pop("form_data", None) | |
| st.session_state["onboarding_complete"] = True | |
| time.sleep(1.0) | |
| st.rerun() | |
| else: | |
| warning = "Name is required before finishing." | |
| warning_lan = translate_from_english(warning, ui_lang) | |
| st.warning(warning_lan) | |
| def username_from_email(email: str | None) -> str: | |
| """Local-part only, lowercased, keep [a-z0-9_-].""" | |
| if not email: | |
| return "user" | |
| local = email.split("@", 1)[0].lower() | |
| safe = re.sub(r"[^a-z0-9_-]+", "-", local).strip("-") | |
| return safe or "user" | |
| # ---------------- MAIN APP FLOW ---------------- | |
| def main(): | |
| user_id = st.session_state.get("user_id") | |
| wizard = st.session_state.get("wizard_start") | |
| onboarding_done = st.session_state.get("onboarding_complete") | |
| # 1️⃣ LOGIN STEP | |
| if not wizard and not onboarding_done and not user_id: | |
| st.header("💬 Socrates Login") | |
| email = st.text_input("Email") | |
| if st.button("Enter", use_container_width=True): | |
| # If the user types a different email, hard-reset the session | |
| prev_email = st.session_state.get("email") | |
| if prev_email and (prev_email.strip().lower() != (email or "").strip().lower()): | |
| for k in list(st.session_state.keys()): | |
| del st.session_state[k] | |
| st.rerun() | |
| user_id = create_or_get_user(email) | |
| if not user_id: | |
| st.error("Could not create or fetch user.") | |
| return | |
| email_norm = (email or "").strip().lower() | |
| username = username_from_email(email_norm) | |
| st.session_state["user_id"] = user_id | |
| st.session_state["email"] = email_norm | |
| st.session_state["username"] = username | |
| # 🔎 Check existing profile FIRST (do not upsert yet) | |
| prof = sb.table("user_profiles").select("*").eq("id", user_id).execute() | |
| if prof.data: | |
| profile = prof.data[0] | |
| # Wizard is required only if key fields are missing | |
| needs_onboarding = not any([ | |
| profile.get("name"), | |
| profile.get("living_country"), | |
| profile.get("origin_country"), | |
| profile.get("interests"), | |
| ]) | |
| if needs_onboarding: | |
| st.session_state["wizard_start"] = True | |
| st.rerun() | |
| else: | |
| refresh_user_context() | |
| st.session_state["onboarding_complete"] = True | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| create_ui_language_row(user_id, ui_lang) | |
| run_chat_app(user_id=user_id, username=username, profile=profile, ui_lang=ui_lang) | |
| return | |
| else: | |
| # Create a minimal row so the wizard can complete it later | |
| try: | |
| sb.table("user_profiles").insert({"id": user_id, "username": username}).execute() | |
| except Exception as e: | |
| st.warning(f"Couldn't create minimal user_profiles row: {e}") | |
| st.session_state["wizard_start"] = True | |
| st.rerun() | |
| # 2️⃣ ONBOARDING WIZARD | |
| if wizard and user_id and not onboarding_done: | |
| onboarding_wizard(user_id) | |
| return | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| # 3️⃣ AFTER ONBOARDING → FETCH PROFILE AND START CHAT | |
| if onboarding_done and user_id: | |
| ui_lang = st.session_state.get("ui_lang", "en") | |
| prof = sb.table("user_profiles").select("*").eq("id", user_id).execute() | |
| profile = prof.data[0] if prof.data else {} | |
| # ✅ Store name + UUID also here (first time after onboarding) | |
| username = st.session_state.get("username") or username_from_email(st.session_state.get("email", "")) or user_id | |
| st.session_state["user_id"] = user_id | |
| run_chat_app(user_id=user_id, username=username, profile=profile, ui_lang=ui_lang) | |
| return | |
| if __name__ == "__main__": | |
| main() | |