Socrates_docker / app_main3.py
alesamodio's picture
remove uid from app_main3
ce74a97
import os
import re
import uuid
import time
import streamlit as st
from pathlib import Path
import base64
import difflib
from typing import Optional
from supabase import create_client
from app_nn import run_chat_app
from config import refresh_user_context, SUPABASE_URL, SUPABASE_SERVICE_KEY, OPENAI_API_KEY
from translate_query_response import (
detect_language_code,
translate_from_english,
translate_to_english,
extract_interests_with_llm,
)
from supabase_ie import create_ui_language_row
from db3_utils import COUNTRY_NAME_TO_ISO2
ISO2_TO_NAME = {iso.upper(): name for name, iso in COUNTRY_NAME_TO_ISO2.items()}
# ---------------- STREAMLIT CONFIG ----------------
st.set_page_config(page_title="Socrates", page_icon="💬", layout="centered")
st.markdown("<style>div.block-container{max-width:700px;margin:auto;}</style>", unsafe_allow_html=True)
# 🧩 Optional: reset directly via URL (?reset=1)
params = st.query_params if hasattr(st, "query_params") else {}
if params.get("reset", ["0"])[0] == "1":
st.session_state.clear()
st.rerun()
sb = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
TABLE_NAME = "user_profiles"
DEV_FIXED_PASSWORD = "123456"
# ---------------- WIZARD HELPER ----------------
# Canonical English country names (use as the "ground truth" list)
COUNTRY_CANON = sorted(COUNTRY_NAME_TO_ISO2.keys())
NAME_TO_CANON = {name.lower(): name for name in COUNTRY_CANON}
def normalize_country_end(raw: str | None) -> Optional[str]:
"""
Return a canonical English country name or None.
Strategy:
1) Exact (case-insensitive) against canonical English names
2) LLM translate-to-English + exact match
3) Tight fuzzy match (handles small typos)
"""
if not raw:
return None
r = raw.strip()
if not r:
return None
# 1) exact (case-insensitive)
k = r.lower()
if k in NAME_TO_CANON:
return NAME_TO_CANON[k]
# 2) translate to English, then exact
try:
en = (translate_to_english(r) or "").strip()
except Exception:
en = ""
if en:
k2 = en.lower()
if k2 in NAME_TO_CANON:
return NAME_TO_CANON[k2]
# 3) fuzzy on the canonical list
cand = difflib.get_close_matches(en or r, COUNTRY_CANON, n=1, cutoff=0.86)
return cand[0] if cand else None
def safe_country_name(raw: str | None) -> Optional[str]:
"""
Return a canonical English country name from free text, or None if not resolvable.
Works with localized names (e.g., 'Italia') and ISO-2 (e.g., 'IT').
"""
if not raw:
return None
val = raw.strip()
# 0) quick ISO-2: "IT" -> "Italy"
if len(val) == 2 and val.upper() in ISO2_TO_NAME:
return ISO2_TO_NAME[val.upper()]
# 1) minimal aliases for common local names (extend as needed)
ALIAS = {
"italia": "Italy",
"españa": "Spain", "espana": "Spain",
"deutschland": "Germany",
"méxico": "Mexico", "mexico": "Mexico",
"brasil": "Brazil",
"portugal": "Portugal",
"nederland": "Netherlands",
"suomi": "Finland",
"polska": "Poland",
"éire": "Ireland", "eire": "Ireland",
"elláda": "Greece", "ellada": "Greece", "ελλάδα": "Greece",
}
key_raw = val.lower()
if key_raw in ALIAS:
return ALIAS[key_raw]
# 2) try translation once, but don't depend on it
try:
val_en = (translate_to_english(val) or "").strip()
except Exception:
val_en = ""
candidates = [c for c in {val_en, val} if c] # unique, non-empty: translated then raw
# 3) accept exact English country names case-insensitively
for cand in candidates:
name_key = cand.lower()
if name_key in NAME_TO_CANON:
return NAME_TO_CANON[name_key]
return None
def _img_data_uri(filename: str) -> str:
"""Return a 'data:image/png;base64,...' URI for a local PNG/JPG file."""
p = Path(__file__).parent / filename
with open(p, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
# rudimentary type detection
ext = p.suffix.lower().lstrip(".")
mime = "jpeg" if ext in ("jpg", "jpeg") else "png"
return f"data:image/{mime};base64,{b64}"
def socrates_bubble(text: str):
"""Display Socrates' avatar with a styled speech bubble (consistent font)."""
avatar_uri = _img_data_uri("Socrates.png") # <-- embed the PNG reliably
st.markdown(
f"""
<div style="display:flex; align-items:flex-start; gap:12px; margin-top:0.75rem;">
<img src="{avatar_uri}" width="48" style="border-radius:50%; flex:0 0 48px;"/>
<div style="
background:#f7f7f7;
border-radius:14px;
padding:10px 14px;
box-shadow:0 1px 3px rgba(0,0,0,0.08);
line-height:1.5;
color:#222;
max-width:560px;">
{text}
</div>
</div>
""",
unsafe_allow_html=True,
)
# ---------------- AUTH + PROFILE HANDLING ----------------
def upsert_user_profile(user_id, name, living_country, origin_country, interests):
try:
if isinstance(interests, str):
interests = [s.strip() for s in interests.split(",") if s.strip()]
payload = {
"id": user_id,
"name": name or None,
"living_country": living_country or None,
"origin_country": origin_country or None,
"interests": interests or None, # use text[] column or text depending on schema
}
sb.table("user_profiles").upsert(payload, on_conflict="id").execute()
except Exception as e:
st.error(f"Failed to upsert user profile: {e}")
def _normalize_list_users(resp):
"""
Normalize admin.list_users() into a plain Python list of user dicts/objects.
Covers SDK variants that return .users, .data, dict{"users": [...]}, or a list.
"""
if isinstance(resp, dict) and "users" in resp:
return resp["users"]
if hasattr(resp, "users"): # some SDKs
return resp.users
if hasattr(resp, "data"): # other SDKs
return resp.data
if isinstance(resp, list):
return resp
return []
def _extract_id_email(u):
"""
Return (user_id , email_lower) for either a dict user or a User-like object.
Never calls .get on objects; uses getattr for objects.
"""
# dict response
if isinstance(u, dict):
user_id = u.get("id") or (u.get("user") or {}).get("id")
email = (u.get("email")
or (u.get("user") or {}).get("email")
or "")
return user_id , email.lower()
# object response
user_obj = getattr(u, "user", None) # some wrappers nest the 'user'
user_id = getattr(u, "id", None) or (getattr(user_obj, "id", None) if user_obj else None)
email = getattr(u, "email", None) or (getattr(user_obj, "email", None) if user_obj else None) or ""
return user_id , email.lower()
def _extract_created_id(created):
"""
Normalize the id out of admin.create_user(...) result (dict or object).
"""
if isinstance(created, dict):
return created.get("id") or (created.get("user") or {}).get("id")
if hasattr(created, "id"):
return created.id
if hasattr(created, "user") and getattr(created.user, "id", None):
return created.user.id
return None
def create_or_get_user(email: str) -> str | None:
"""
If a Supabase Auth user with this email exists -> return its id.
Otherwise create it (deterministic UUID) and return the id.
Uses the SERVICE ROLE client (sb) via sb.auth.admin.* calls.
"""
try:
email_lower = email.strip().lower()
# 1) List users and search by email (works across SDK shapes)
lu = sb.auth.admin.list_users()
users = _normalize_list_users(lu)
user_id = None
for u in users:
cand_id, cand_email = _extract_id_email(u)
if cand_email == email_lower and cand_id:
user_id = cand_id
break
if user_id:
print(f"✅ Existing Auth user: {email_lower} -> {user_id}")
return user_id
# 2) Not found → create with deterministic UUID
new_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, email_lower))
print(f"🧩 Creating new Auth user {email_lower} with UUID {new_uuid}")
created = sb.auth.admin.create_user({
"email": email_lower,
"password": DEV_FIXED_PASSWORD,
"email_confirm": True,
"id": new_uuid,
})
user_id = _extract_created_id(created) or new_uuid
print(f"✅ Created Auth user: {email_lower} -> {user_id}")
return user_id
except Exception as e:
st.error(f"Error creating or fetching user: {e}")
print("⚠️ create_or_get_user() error:", e)
return None
# ---------------- WIZARD ----------------
def onboarding_wizard(user_id):
#st.title("💬 Socrates")
st.markdown("---")
if "wizard_step" not in st.session_state:
st.session_state.wizard_step = 1
st.session_state.form_data = {
"language": "",
"name": "",
"living_country": "",
"origin_country": "",
"interests": "",
}
step = st.session_state.wizard_step
data = st.session_state.form_data
# ✅ read the chosen UI language (default to English)
#ui_lang = detect_language_code(lang_raw)
# Step 1 - Name
if step == 1:
# Socrates question (leave this English; it's a meta prompt about language)
socrates_bubble("Language, Lingua, Langue, Idioma, Sprache, 语言, Språk, 言語, ... ?")
# Label in English here is fine; the user types their language name
data["language"] = st.text_input("Language", value=data["language"])
if st.button("Next →", use_container_width=True):
lang_raw = (data["language"] or "").strip()
if not lang_raw:
st.warning("Please enter your language.")
st.stop()
# user_lang_code = detect_language_code(lang_raw)
# # ✅ persist in session for the next steps
# st.session_state["ui_lang"] = user_lang_code
detected = detect_language_code(lang_raw)
st.session_state["ui_lang"] = detected if detected else "en"
st.session_state.wizard_step = 2
st.rerun()
# Step 2 - Name
if step == 2:
ui_lang = st.session_state.get("ui_lang", "en")
socrates_message = "Before our dialogue begins, by what name shall I address you, my friend?"
socrates_bubble(translate_from_english(socrates_message, ui_lang))
name_label = translate_from_english("Your name", ui_lang)
data["name"] = st.text_input(name_label, value=data["name"])
st.session_state["name"] = data["name"]
c1, c2 = st.columns([1, 1])
# we don't change the language of the the Previous and Next buttons
if c1.button("← Previous", use_container_width=True):
st.session_state.wizard_step = 1
st.rerun()
if c2.button("Next →", use_container_width=True):
if data["name"].strip():
st.session_state.wizard_step = 3
st.rerun()
else:
warning = "Could you tell me your name, so that our dialogue may truly begin?"
warning_lan = translate_from_english(warning, ui_lang)
st.warning(warning_lan)
# Step 3 - Living country (new dedicated step)
elif step == 3:
#_warm_up_translation_once()
usergivenname = data['name'] if data['name'].strip() else "my friend"
ui_lang = st.session_state.get("ui_lang", "en")
msg = f"Thank you, {usergivenname}. In which country do you dwell?"
socrates_bubble(translate_from_english(msg, ui_lang))
living_label = translate_from_english("Country you live in", ui_lang)
data["living_country"] = st.text_input(living_label, value=data["living_country"],key="living_country_input")
c1, c2 = st.columns([1, 1])
if c1.button("← Previous", use_container_width=True):
st.session_state.wizard_step = 2
st.rerun()
if c2.button("Next →", use_container_width=True):
# No validation here; collect raw input and continue
st.session_state.wizard_step = 4
st.rerun()
# Step 4 - Origin Countries
elif step == 4:
usergivenname = data['name'] if data['name'].strip() else "my friend"
ui_lang = st.session_state.get("ui_lang", "en")
socrates_message = f"Thank you, {usergivenname}. Now, if you don't mind, from what land your forebears come?"
socrates_message_lan = translate_from_english(socrates_message, ui_lang)
socrates_bubble(socrates_message_lan)
origin_label = translate_from_english("Country of origin", ui_lang)
data["origin_country"] = st.text_input(origin_label, value=data["origin_country"],key="origin_country_input")
c1, c2 = st.columns([1, 1])
if c1.button("← Previous", use_container_width=True):
st.session_state.wizard_step = 3
st.rerun()
if c2.button("Next →", use_container_width=True):
# No validation here; collect raw input and continue
st.session_state.wizard_step = 5
st.rerun()
# Step 5 - Interests
elif step == 5:
ui_lang = st.session_state.get("ui_lang", "en")
socrates_message = "One last curiosity, if you permit me, what pursuits most stir your soul and occupy your thought?"
socrates_message_lan = translate_from_english(socrates_message, ui_lang)
socrates_bubble(socrates_message_lan)
interests_label = translate_from_english("Your main interests (comma separated)", ui_lang)
data["interests"] = st.text_input(interests_label, value=data["interests"])
c1, c2 = st.columns([1, 1])
if c1.button("← Previous", use_container_width=True):
st.session_state.wizard_step = 4
st.rerun()
if c2.button("Finish", use_container_width=True):
if data["name"].strip():
# Interests
interest_list_en = extract_interests_with_llm(data["interests"], ui_lang) if data["interests"] else None
# ✅ Resolve countries ONCE at the end (LLM + exact + fuzzy). Save only English names.
living_en = normalize_country_end(data.get("living_country"))
origin_en = normalize_country_end(data.get("origin_country"))
# If unresolved but the user typed something, inform (do NOT block)
missing = []
if data.get("living_country") and not living_en:
missing.append("living country")
if data.get("origin_country") and not origin_en:
missing.append("country of origin")
if missing:
st.info(translate_from_english(
"I couldn’t confidently recognize your " + " and ".join(missing) +
". I’ll skip saving it for now. You can add it later in chat by simply saying "
"“My current country is …” or “My origin country is …”.",
ui_lang
))
# Optional: one-time chat reminder after onboarding
st.session_state["post_onboarding_notice"] = (
"If you’d like, tell me your " + " and ".join(missing) +
" in chat anytime. For example: “My current country is Italy.”"
)
# Save clean English names (or None if unresolved/left blank)
upsert_user_profile(
user_id,
data["name"].strip(),
living_en if data.get("living_country") else None,
origin_en if data.get("origin_country") else None,
interest_list_en,
)
# wrap up
st.session_state.pop("wizard_step", None)
st.session_state.pop("form_data", None)
st.session_state["onboarding_complete"] = True
time.sleep(1.0)
st.rerun()
else:
warning = "Name is required before finishing."
warning_lan = translate_from_english(warning, ui_lang)
st.warning(warning_lan)
def username_from_email(email: str | None) -> str:
"""Local-part only, lowercased, keep [a-z0-9_-]."""
if not email:
return "user"
local = email.split("@", 1)[0].lower()
safe = re.sub(r"[^a-z0-9_-]+", "-", local).strip("-")
return safe or "user"
# ---------------- MAIN APP FLOW ----------------
def main():
user_id = st.session_state.get("user_id")
wizard = st.session_state.get("wizard_start")
onboarding_done = st.session_state.get("onboarding_complete")
# 1️⃣ LOGIN STEP
if not wizard and not onboarding_done and not user_id:
st.header("💬 Socrates Login")
email = st.text_input("Email")
if st.button("Enter", use_container_width=True):
# If the user types a different email, hard-reset the session
prev_email = st.session_state.get("email")
if prev_email and (prev_email.strip().lower() != (email or "").strip().lower()):
for k in list(st.session_state.keys()):
del st.session_state[k]
st.rerun()
user_id = create_or_get_user(email)
if not user_id:
st.error("Could not create or fetch user.")
return
email_norm = (email or "").strip().lower()
username = username_from_email(email_norm)
st.session_state["user_id"] = user_id
st.session_state["email"] = email_norm
st.session_state["username"] = username
# 🔎 Check existing profile FIRST (do not upsert yet)
prof = sb.table("user_profiles").select("*").eq("id", user_id).execute()
if prof.data:
profile = prof.data[0]
# Wizard is required only if key fields are missing
needs_onboarding = not any([
profile.get("name"),
profile.get("living_country"),
profile.get("origin_country"),
profile.get("interests"),
])
if needs_onboarding:
st.session_state["wizard_start"] = True
st.rerun()
else:
refresh_user_context()
st.session_state["onboarding_complete"] = True
ui_lang = st.session_state.get("ui_lang", "en")
create_ui_language_row(user_id, ui_lang)
run_chat_app(user_id=user_id, username=username, profile=profile, ui_lang=ui_lang)
return
else:
# Create a minimal row so the wizard can complete it later
try:
sb.table("user_profiles").insert({"id": user_id, "username": username}).execute()
except Exception as e:
st.warning(f"Couldn't create minimal user_profiles row: {e}")
st.session_state["wizard_start"] = True
st.rerun()
# 2️⃣ ONBOARDING WIZARD
if wizard and user_id and not onboarding_done:
onboarding_wizard(user_id)
return
ui_lang = st.session_state.get("ui_lang", "en")
# 3️⃣ AFTER ONBOARDING → FETCH PROFILE AND START CHAT
if onboarding_done and user_id:
ui_lang = st.session_state.get("ui_lang", "en")
prof = sb.table("user_profiles").select("*").eq("id", user_id).execute()
profile = prof.data[0] if prof.data else {}
# ✅ Store name + UUID also here (first time after onboarding)
username = st.session_state.get("username") or username_from_email(st.session_state.get("email", "")) or user_id
st.session_state["user_id"] = user_id
run_chat_app(user_id=user_id, username=username, profile=profile, ui_lang=ui_lang)
return
if __name__ == "__main__":
main()