import os import re import uuid import time import streamlit as st from pathlib import Path import base64 import difflib from typing import Optional from supabase import create_client from app_nn import run_chat_app from config import refresh_user_context, SUPABASE_URL, SUPABASE_SERVICE_KEY, OPENAI_API_KEY from translate_query_response import ( detect_language_code, translate_from_english, translate_to_english, extract_interests_with_llm, ) from supabase_ie import create_ui_language_row from db3_utils import COUNTRY_NAME_TO_ISO2 ISO2_TO_NAME = {iso.upper(): name for name, iso in COUNTRY_NAME_TO_ISO2.items()} # ---------------- STREAMLIT CONFIG ---------------- st.set_page_config(page_title="Socrates", page_icon="💬", layout="centered") st.markdown("", unsafe_allow_html=True) # 🧩 Optional: reset directly via URL (?reset=1) params = st.query_params if hasattr(st, "query_params") else {} if params.get("reset", ["0"])[0] == "1": st.session_state.clear() st.rerun() sb = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) TABLE_NAME = "user_profiles" DEV_FIXED_PASSWORD = "123456" # ---------------- WIZARD HELPER ---------------- # Canonical English country names (use as the "ground truth" list) COUNTRY_CANON = sorted(COUNTRY_NAME_TO_ISO2.keys()) NAME_TO_CANON = {name.lower(): name for name in COUNTRY_CANON} def normalize_country_end(raw: str | None) -> Optional[str]: """ Return a canonical English country name or None. Strategy: 1) Exact (case-insensitive) against canonical English names 2) LLM translate-to-English + exact match 3) Tight fuzzy match (handles small typos) """ if not raw: return None r = raw.strip() if not r: return None # 1) exact (case-insensitive) k = r.lower() if k in NAME_TO_CANON: return NAME_TO_CANON[k] # 2) translate to English, then exact try: en = (translate_to_english(r) or "").strip() except Exception: en = "" if en: k2 = en.lower() if k2 in NAME_TO_CANON: return NAME_TO_CANON[k2] # 3) fuzzy on the canonical list cand = difflib.get_close_matches(en or r, COUNTRY_CANON, n=1, cutoff=0.86) return cand[0] if cand else None def safe_country_name(raw: str | None) -> Optional[str]: """ Return a canonical English country name from free text, or None if not resolvable. Works with localized names (e.g., 'Italia') and ISO-2 (e.g., 'IT'). """ if not raw: return None val = raw.strip() # 0) quick ISO-2: "IT" -> "Italy" if len(val) == 2 and val.upper() in ISO2_TO_NAME: return ISO2_TO_NAME[val.upper()] # 1) minimal aliases for common local names (extend as needed) ALIAS = { "italia": "Italy", "españa": "Spain", "espana": "Spain", "deutschland": "Germany", "méxico": "Mexico", "mexico": "Mexico", "brasil": "Brazil", "portugal": "Portugal", "nederland": "Netherlands", "suomi": "Finland", "polska": "Poland", "éire": "Ireland", "eire": "Ireland", "elláda": "Greece", "ellada": "Greece", "ελλάδα": "Greece", } key_raw = val.lower() if key_raw in ALIAS: return ALIAS[key_raw] # 2) try translation once, but don't depend on it try: val_en = (translate_to_english(val) or "").strip() except Exception: val_en = "" candidates = [c for c in {val_en, val} if c] # unique, non-empty: translated then raw # 3) accept exact English country names case-insensitively for cand in candidates: name_key = cand.lower() if name_key in NAME_TO_CANON: return NAME_TO_CANON[name_key] return None def _img_data_uri(filename: str) -> str: """Return a 'data:image/png;base64,...' URI for a local PNG/JPG file.""" p = Path(__file__).parent / filename with open(p, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") # rudimentary type detection ext = p.suffix.lower().lstrip(".") mime = "jpeg" if ext in ("jpg", "jpeg") else "png" return f"data:image/{mime};base64,{b64}" def socrates_bubble(text: str): """Display Socrates' avatar with a styled speech bubble (consistent font).""" avatar_uri = _img_data_uri("Socrates.png") # <-- embed the PNG reliably st.markdown( f"""
{text}
""", unsafe_allow_html=True, ) # ---------------- AUTH + PROFILE HANDLING ---------------- def upsert_user_profile(user_id, name, living_country, origin_country, interests): try: if isinstance(interests, str): interests = [s.strip() for s in interests.split(",") if s.strip()] payload = { "id": user_id, "name": name or None, "living_country": living_country or None, "origin_country": origin_country or None, "interests": interests or None, # use text[] column or text depending on schema } sb.table("user_profiles").upsert(payload, on_conflict="id").execute() except Exception as e: st.error(f"Failed to upsert user profile: {e}") def _normalize_list_users(resp): """ Normalize admin.list_users() into a plain Python list of user dicts/objects. Covers SDK variants that return .users, .data, dict{"users": [...]}, or a list. """ if isinstance(resp, dict) and "users" in resp: return resp["users"] if hasattr(resp, "users"): # some SDKs return resp.users if hasattr(resp, "data"): # other SDKs return resp.data if isinstance(resp, list): return resp return [] def _extract_id_email(u): """ Return (user_id , email_lower) for either a dict user or a User-like object. Never calls .get on objects; uses getattr for objects. """ # dict response if isinstance(u, dict): user_id = u.get("id") or (u.get("user") or {}).get("id") email = (u.get("email") or (u.get("user") or {}).get("email") or "") return user_id , email.lower() # object response user_obj = getattr(u, "user", None) # some wrappers nest the 'user' user_id = getattr(u, "id", None) or (getattr(user_obj, "id", None) if user_obj else None) email = getattr(u, "email", None) or (getattr(user_obj, "email", None) if user_obj else None) or "" return user_id , email.lower() def _extract_created_id(created): """ Normalize the id out of admin.create_user(...) result (dict or object). """ if isinstance(created, dict): return created.get("id") or (created.get("user") or {}).get("id") if hasattr(created, "id"): return created.id if hasattr(created, "user") and getattr(created.user, "id", None): return created.user.id return None def create_or_get_user(email: str) -> str | None: """ If a Supabase Auth user with this email exists -> return its id. Otherwise create it (deterministic UUID) and return the id. Uses the SERVICE ROLE client (sb) via sb.auth.admin.* calls. """ try: email_lower = email.strip().lower() # 1) List users and search by email (works across SDK shapes) lu = sb.auth.admin.list_users() users = _normalize_list_users(lu) user_id = None for u in users: cand_id, cand_email = _extract_id_email(u) if cand_email == email_lower and cand_id: user_id = cand_id break if user_id: print(f"✅ Existing Auth user: {email_lower} -> {user_id}") return user_id # 2) Not found → create with deterministic UUID new_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, email_lower)) print(f"🧩 Creating new Auth user {email_lower} with UUID {new_uuid}") created = sb.auth.admin.create_user({ "email": email_lower, "password": DEV_FIXED_PASSWORD, "email_confirm": True, "id": new_uuid, }) user_id = _extract_created_id(created) or new_uuid print(f"✅ Created Auth user: {email_lower} -> {user_id}") return user_id except Exception as e: st.error(f"Error creating or fetching user: {e}") print("⚠️ create_or_get_user() error:", e) return None # ---------------- WIZARD ---------------- def onboarding_wizard(user_id): #st.title("💬 Socrates") st.markdown("---") if "wizard_step" not in st.session_state: st.session_state.wizard_step = 1 st.session_state.form_data = { "language": "", "name": "", "living_country": "", "origin_country": "", "interests": "", } step = st.session_state.wizard_step data = st.session_state.form_data # ✅ read the chosen UI language (default to English) #ui_lang = detect_language_code(lang_raw) # Step 1 - Name if step == 1: # Socrates question (leave this English; it's a meta prompt about language) socrates_bubble("Language, Lingua, Langue, Idioma, Sprache, 语言, Språk, 言語, ... ?") # Label in English here is fine; the user types their language name data["language"] = st.text_input("Language", value=data["language"]) if st.button("Next →", use_container_width=True): lang_raw = (data["language"] or "").strip() if not lang_raw: st.warning("Please enter your language.") st.stop() # user_lang_code = detect_language_code(lang_raw) # # ✅ persist in session for the next steps # st.session_state["ui_lang"] = user_lang_code detected = detect_language_code(lang_raw) st.session_state["ui_lang"] = detected if detected else "en" st.session_state.wizard_step = 2 st.rerun() # Step 2 - Name if step == 2: ui_lang = st.session_state.get("ui_lang", "en") socrates_message = "Before our dialogue begins, by what name shall I address you, my friend?" socrates_bubble(translate_from_english(socrates_message, ui_lang)) name_label = translate_from_english("Your name", ui_lang) data["name"] = st.text_input(name_label, value=data["name"]) st.session_state["name"] = data["name"] c1, c2 = st.columns([1, 1]) # we don't change the language of the the Previous and Next buttons if c1.button("← Previous", use_container_width=True): st.session_state.wizard_step = 1 st.rerun() if c2.button("Next →", use_container_width=True): if data["name"].strip(): st.session_state.wizard_step = 3 st.rerun() else: warning = "Could you tell me your name, so that our dialogue may truly begin?" warning_lan = translate_from_english(warning, ui_lang) st.warning(warning_lan) # Step 3 - Living country (new dedicated step) elif step == 3: #_warm_up_translation_once() usergivenname = data['name'] if data['name'].strip() else "my friend" ui_lang = st.session_state.get("ui_lang", "en") msg = f"Thank you, {usergivenname}. In which country do you dwell?" socrates_bubble(translate_from_english(msg, ui_lang)) living_label = translate_from_english("Country you live in", ui_lang) data["living_country"] = st.text_input(living_label, value=data["living_country"],key="living_country_input") c1, c2 = st.columns([1, 1]) if c1.button("← Previous", use_container_width=True): st.session_state.wizard_step = 2 st.rerun() if c2.button("Next →", use_container_width=True): # No validation here; collect raw input and continue st.session_state.wizard_step = 4 st.rerun() # Step 4 - Origin Countries elif step == 4: usergivenname = data['name'] if data['name'].strip() else "my friend" ui_lang = st.session_state.get("ui_lang", "en") socrates_message = f"Thank you, {usergivenname}. Now, if you don't mind, from what land your forebears come?" socrates_message_lan = translate_from_english(socrates_message, ui_lang) socrates_bubble(socrates_message_lan) origin_label = translate_from_english("Country of origin", ui_lang) data["origin_country"] = st.text_input(origin_label, value=data["origin_country"],key="origin_country_input") c1, c2 = st.columns([1, 1]) if c1.button("← Previous", use_container_width=True): st.session_state.wizard_step = 3 st.rerun() if c2.button("Next →", use_container_width=True): # No validation here; collect raw input and continue st.session_state.wizard_step = 5 st.rerun() # Step 5 - Interests elif step == 5: ui_lang = st.session_state.get("ui_lang", "en") socrates_message = "One last curiosity, if you permit me, what pursuits most stir your soul and occupy your thought?" socrates_message_lan = translate_from_english(socrates_message, ui_lang) socrates_bubble(socrates_message_lan) interests_label = translate_from_english("Your main interests (comma separated)", ui_lang) data["interests"] = st.text_input(interests_label, value=data["interests"]) c1, c2 = st.columns([1, 1]) if c1.button("← Previous", use_container_width=True): st.session_state.wizard_step = 4 st.rerun() if c2.button("Finish", use_container_width=True): if data["name"].strip(): # Interests interest_list_en = extract_interests_with_llm(data["interests"], ui_lang) if data["interests"] else None # ✅ Resolve countries ONCE at the end (LLM + exact + fuzzy). Save only English names. living_en = normalize_country_end(data.get("living_country")) origin_en = normalize_country_end(data.get("origin_country")) # If unresolved but the user typed something, inform (do NOT block) missing = [] if data.get("living_country") and not living_en: missing.append("living country") if data.get("origin_country") and not origin_en: missing.append("country of origin") if missing: st.info(translate_from_english( "I couldn’t confidently recognize your " + " and ".join(missing) + ". I’ll skip saving it for now. You can add it later in chat by simply saying " "“My current country is …” or “My origin country is …”.", ui_lang )) # Optional: one-time chat reminder after onboarding st.session_state["post_onboarding_notice"] = ( "If you’d like, tell me your " + " and ".join(missing) + " in chat anytime. For example: “My current country is Italy.”" ) # Save clean English names (or None if unresolved/left blank) upsert_user_profile( user_id, data["name"].strip(), living_en if data.get("living_country") else None, origin_en if data.get("origin_country") else None, interest_list_en, ) # wrap up st.session_state.pop("wizard_step", None) st.session_state.pop("form_data", None) st.session_state["onboarding_complete"] = True time.sleep(1.0) st.rerun() else: warning = "Name is required before finishing." warning_lan = translate_from_english(warning, ui_lang) st.warning(warning_lan) def username_from_email(email: str | None) -> str: """Local-part only, lowercased, keep [a-z0-9_-].""" if not email: return "user" local = email.split("@", 1)[0].lower() safe = re.sub(r"[^a-z0-9_-]+", "-", local).strip("-") return safe or "user" # ---------------- MAIN APP FLOW ---------------- def main(): user_id = st.session_state.get("user_id") wizard = st.session_state.get("wizard_start") onboarding_done = st.session_state.get("onboarding_complete") # 1️⃣ LOGIN STEP if not wizard and not onboarding_done and not user_id: st.header("💬 Socrates Login") email = st.text_input("Email") if st.button("Enter", use_container_width=True): # If the user types a different email, hard-reset the session prev_email = st.session_state.get("email") if prev_email and (prev_email.strip().lower() != (email or "").strip().lower()): for k in list(st.session_state.keys()): del st.session_state[k] st.rerun() user_id = create_or_get_user(email) if not user_id: st.error("Could not create or fetch user.") return email_norm = (email or "").strip().lower() username = username_from_email(email_norm) st.session_state["user_id"] = user_id st.session_state["email"] = email_norm st.session_state["username"] = username # 🔎 Check existing profile FIRST (do not upsert yet) prof = sb.table("user_profiles").select("*").eq("id", user_id).execute() if prof.data: profile = prof.data[0] # Wizard is required only if key fields are missing needs_onboarding = not any([ profile.get("name"), profile.get("living_country"), profile.get("origin_country"), profile.get("interests"), ]) if needs_onboarding: st.session_state["wizard_start"] = True st.rerun() else: refresh_user_context() st.session_state["onboarding_complete"] = True ui_lang = st.session_state.get("ui_lang", "en") create_ui_language_row(user_id, ui_lang) run_chat_app(user_id=user_id, username=username, profile=profile, ui_lang=ui_lang) return else: # Create a minimal row so the wizard can complete it later try: sb.table("user_profiles").insert({"id": user_id, "username": username}).execute() except Exception as e: st.warning(f"Couldn't create minimal user_profiles row: {e}") st.session_state["wizard_start"] = True st.rerun() # 2️⃣ ONBOARDING WIZARD if wizard and user_id and not onboarding_done: onboarding_wizard(user_id) return ui_lang = st.session_state.get("ui_lang", "en") # 3️⃣ AFTER ONBOARDING → FETCH PROFILE AND START CHAT if onboarding_done and user_id: ui_lang = st.session_state.get("ui_lang", "en") prof = sb.table("user_profiles").select("*").eq("id", user_id).execute() profile = prof.data[0] if prof.data else {} # ✅ Store name + UUID also here (first time after onboarding) username = st.session_state.get("username") or username_from_email(st.session_state.get("email", "")) or user_id st.session_state["user_id"] = user_id run_chat_app(user_id=user_id, username=username, profile=profile, ui_lang=ui_lang) return if __name__ == "__main__": main()