Spaces:
Running
Running
| # app_nn.py | |
| # --------------------------- | |
| # Backend Socrates logic, no Streamlit | |
| # --------------------------- | |
| import json | |
| import os | |
| import time | |
| import concurrent.futures | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime, timezone | |
| from supabase import create_client | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from characters import get_character, CHARACTERS | |
| def _t(label: str, t0: float) -> float: | |
| """Print elapsed time since t0, return current time.""" | |
| now = time.perf_counter() | |
| print(f"[TIMING] {label}: {now - t0:.2f}s") | |
| return now | |
| from db6_topic_logger import update_topic_log | |
| from stories_from_supabase import ( | |
| log_story_usage, | |
| pick_story_with_fallback, | |
| ) | |
| from config import ( | |
| SUPABASE_URL, | |
| SUPABASE_SERVICE_KEY, | |
| HF_EMBEDDING_MODEL, | |
| OPENAI_CHAT_MODEL, | |
| ) | |
| from translate_query_response import ( | |
| detect_language, | |
| translate_to_english, | |
| translate_from_english, | |
| ) | |
| from Agent_Perception_Evaluation import analyze_perception_from_history | |
| from Agent_Chat_Classification import analyze_message | |
| from Agent_Socratic_Dialogue import run_socratic_turn | |
| from Agent_Pragmatic_Socratic_Dialogue import run_pragmatic_turn, PRAGMATIC_TOPICS | |
| from supabase_socratic import embed_text, search_life_events, search_past_dialogues | |
| from Retrieve import retrieve_all_chunks | |
| from Prompt_building import TOPIC_TO_DBS, PERSONAL_BUCKET | |
| from ask_llm_final_prompt import ask_socrates | |
| from db_5_process_session import ( | |
| on_pre_message_tick, | |
| save_chat_message, | |
| _prepare_chat_history_for_retrieval, | |
| ) | |
| from supabase_ie import load_history_for_display, set_last_message_language, get_initial_language | |
| from geo_utils import get_user_countries_and_languages | |
| from tavily_news import run_tavily_news_pipeline | |
| from push_utils import send_expo_push_notification | |
| # --- Opinions catalog β loaded once per character at startup --- | |
| _ALL_OPINIONS: Dict[str, Dict[str, Any]] = {} | |
| for _char_id, _char_cfg in CHARACTERS.items(): | |
| _opinions_file = _char_cfg.get("opinions_file") | |
| if _opinions_file: | |
| _opinions_path = os.path.join(os.path.dirname(__file__), _opinions_file) | |
| try: | |
| with open(_opinions_path, encoding="utf-8") as _f: | |
| _ALL_OPINIONS[_char_id] = json.load(_f) | |
| except Exception as _e: | |
| print(f"[opinions] Could not load {_opinions_file}: {_e}") | |
| _ALL_OPINIONS[_char_id] = {} | |
| # Rate-limit state for rhetorical_pattern triggers (in-memory, per user_id) | |
| # Value: len(history_texts) at the time of the last rhetorical injection | |
| _rhetorical_last_history_len: Dict[str, int] = {} | |
| _MIN_HISTORY_TURNS_BETWEEN_RHETORIC = 6 # 3 dialogue turns = 6 messages | |
| # --- globals (OK for backend) --- | |
| supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name=HF_EMBEDDING_MODEL, encode_kwargs={"normalize_embeddings": True} | |
| ) | |
| def _build_time_context(timezone_str: str, user_id: str) -> str: | |
| """ | |
| Return a short string only when something is genuinely notable: | |
| - Local time is unusual (before 07:00 or after 23:00) | |
| - AND/OR the gap since the last message is more than 24 hours | |
| Returns empty string otherwise so nothing is injected into the prompt. | |
| """ | |
| from zoneinfo import ZoneInfo | |
| from supabase_ie import _load_history as _lh | |
| lines = [] | |
| # ββ compute gap first β needed for both checks ββββββββββββββββ | |
| delta_h = 0.0 | |
| try: | |
| from dateutil import parser as _dp | |
| total = _lh("chat_history_total", user_id) | |
| msgs = (total.get("sessions") or [{}])[-1].get("messages", []) | |
| last_ts_str = next( | |
| (m.get("time") or m.get("timestamp") for m in reversed(msgs) if m.get("time") or m.get("timestamp")), | |
| None, | |
| ) | |
| if last_ts_str: | |
| last_ts = _dp.parse(last_ts_str) | |
| if last_ts.tzinfo is None: | |
| last_ts = last_ts.replace(tzinfo=timezone.utc) | |
| delta_h = (datetime.now(timezone.utc) - last_ts).total_seconds() / 3600 | |
| except Exception: | |
| pass | |
| # ββ unusual hour β only if gap > 6 h (new session at odd hour) ββ | |
| # If someone was chatting 10 min ago at 2am the character already knows. | |
| try: | |
| local_now = datetime.now(ZoneInfo(timezone_str)) | |
| h = local_now.hour | |
| if delta_h > 6: | |
| if h < 7: | |
| period = "very early morning" if h >= 5 else "night (very late)" | |
| lines.append(f"User's local time: {local_now.strftime('%H:%M')} ({period})") | |
| elif h >= 23: | |
| lines.append(f"User's local time: {local_now.strftime('%H:%M')} (late night)") | |
| except Exception: | |
| pass | |
| # ββ gap β only flag if more than 24 hours βββββββββββββββββββββ | |
| if delta_h > 48: | |
| lines.append(f"Time since last message: {int(delta_h / 24)} days") | |
| elif delta_h > 24: | |
| lines.append("Time since last message: more than 1 day") | |
| return "\n".join(lines) | |
| def run_chat_app( | |
| user_id: str, | |
| username: str, | |
| profile: Dict[str, Any], | |
| ui_lang: str, | |
| user_msg: str, | |
| character_id: str = "socrates", | |
| timezone: str = "UTC", | |
| voice_loop: bool = False, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Single chat turn for Socrates. | |
| Parameters | |
| ---------- | |
| user_id : str | |
| Supabase auth user id. | |
| username : str | |
| Short user name or nickname. | |
| profile : dict | |
| User profile (from user_profiles table). | |
| ui_lang : str | |
| Preferred UI language (e.g. "en", "it"). | |
| user_msg : str | |
| The user's message in their own language. | |
| Returns | |
| ------- | |
| dict with keys: | |
| - reply: str # reply in user language | |
| - reply_en: str # reply in English | |
| - user_lang_code: str | |
| - analysis: dict # classification info | |
| - emotion: Any # emotion analysis (if available) | |
| """ | |
| user_msg = (user_msg or "").strip() | |
| if not user_msg: | |
| return { | |
| "reply": "", | |
| "reply_en": "", | |
| "user_lang_code": "en", | |
| "analysis": {}, | |
| "emotion": None, | |
| } | |
| # ---- load character config + opinions catalog for this character ---- | |
| char = get_character(character_id) | |
| _opinions_catalog: Dict[str, Any] = _ALL_OPINIONS.get(character_id, {}) | |
| _t0 = time.perf_counter() | |
| _ck = _t0 # checkpoint | |
| # ---- time context (local time + gap since last message) ---- | |
| time_context = _build_time_context(timezone, user_id) | |
| # ---- language detection (fast β local + one DB read) ---- | |
| initial_lang = get_initial_language(user_id) | |
| detected_lang = detect_language(user_msg) | |
| if len(user_msg.strip().split()) <= 5 or detected_lang == initial_lang: | |
| user_lang_code = initial_lang or detected_lang | |
| else: | |
| user_lang_code = detected_lang | |
| set_last_message_language(user_id=user_id, code_language=user_lang_code) | |
| _ck = _t("language detection", _ck) | |
| # ---- translate + housekeeping in parallel ---- | |
| # translate_to_english is an LLM call (3-5s). | |
| # housekeeping is two sequential Supabase calls (4-6s). | |
| # Neither depends on the other, so run them concurrently. | |
| def _housekeeping_and_history() -> list: | |
| on_pre_message_tick(user_id=user_id, username=username) | |
| return _prepare_chat_history_for_retrieval(user_id=user_id) | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as _ex: | |
| _f_translate = _ex.submit(translate_to_english, user_msg, OPENAI_CHAT_MODEL, user_lang_code) | |
| _f_housekeeping = _ex.submit(_housekeeping_and_history) | |
| user_msg_en = _f_translate.result() | |
| history_texts = _f_housekeeping.result() | |
| _ck = _t("translate + housekeeping (parallel)", _ck) | |
| # ---- semantic memory fetch (once per turn, before routing) ---- | |
| # Embed the user message and search both memory tables. | |
| # life_events β passed to all three pipelines (standard, socratic, pragmatic) | |
| # past_dialogues β passed only to the Socratic pipeline | |
| # In voice_loop mode both are skipped to reduce latency. | |
| if not voice_loop: | |
| _user_embedding = embed_text(user_msg_en) | |
| _ck = _t("embed_text", _ck) | |
| life_events = search_life_events(user_id, _user_embedding, user_msg=user_msg_en) if _user_embedding else [] | |
| _ck = _t("search_life_events", _ck) | |
| past_dialogues = search_past_dialogues(user_id, _user_embedding) if _user_embedding else [] | |
| _ck = _t("search_past_dialogues", _ck) | |
| else: | |
| life_events = [] | |
| past_dialogues = [] | |
| # ---- perception analysis (best-effort) ---- | |
| # If the user is mid-step in a structured pragmatic dialogue, pass that context | |
| # so the perception LLM raises its bar for disengagement signals. | |
| from supabase_pragmatic_socratic import get_active_pragmatic_dialogue as _get_active_for_perception | |
| _active_dialogue = _get_active_for_perception(user_id) | |
| _perception_context: Optional[str] = None | |
| if _active_dialogue and _active_dialogue.get("status") == "iterating": | |
| _perception_context = ( | |
| "The user is currently in the middle of a structured step-by-step reasoning dialogue " | |
| "(Socratic pragmatic dialogue). They are responding to a specific reasoning step. " | |
| "In this context:\n" | |
| "- Expressions of uncertainty like 'I don't know', 'tell me', 'non so', 'dimmelo', " | |
| "'just tell me', 'you tell me' are CONTENT REQUESTS for explanation β NOT disengagement.\n" | |
| "- Short or terse replies are normal in a focused task flow, not signs of frustration.\n" | |
| "- Only flag intent=closure or disengagement if the signal is very explicit " | |
| "(e.g. 'let's stop', 'I'm done', 'I don't want to continue').\n" | |
| "- Raise the threshold for 'frustrated' and 'overwhelmed' significantly." | |
| ) | |
| # ---- perception + classification in parallel ---- | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=2) as _ex: | |
| _f_emotion = _ex.submit( | |
| analyze_perception_from_history, | |
| user_msg_en, user_id, _perception_context | |
| ) | |
| _f_analysis = _ex.submit(analyze_message, user_id, user_msg_en) | |
| try: | |
| emotion_result = _f_emotion.result() | |
| except Exception: | |
| emotion_result = None | |
| try: | |
| analysis = _f_analysis.result() | |
| except Exception: | |
| analysis = {} | |
| _ck = _t("perception + classification (parallel)", _ck) | |
| story = analysis.get("topic_for_story") | |
| # ---- Socratic opinion clash / rhetorical pattern ---- | |
| _socratic_trigger = analysis.get("socratic_trigger", "none") or "none" | |
| _trigger_subtype = analysis.get("trigger_subtype", "") or "" | |
| _socratic_alignment = analysis.get("socratic_alignment", "none") or "none" | |
| opinion_data: Optional[Dict[str, Any]] = None | |
| opinion_first_turn: bool = False | |
| rhetorical_hint: Optional[str] = None | |
| if _socratic_trigger == "opinion_clash" and _socratic_alignment != "aligned": | |
| _entry = _opinions_catalog.get(_trigger_subtype) | |
| if _entry: | |
| opinion_data = _entry | |
| opinion_first_turn = True | |
| elif _socratic_trigger == "rhetorical_pattern": | |
| _hist_len = len(history_texts) | |
| _last = _rhetorical_last_history_len.get(user_id, -_MIN_HISTORY_TURNS_BETWEEN_RHETORIC) | |
| if (_hist_len - _last) >= _MIN_HISTORY_TURNS_BETWEEN_RHETORIC: | |
| _entry = _opinions_catalog.get(_trigger_subtype) if _trigger_subtype else None | |
| if _entry: | |
| rhetorical_hint = _entry.get("entry_hook", "") | |
| _rhetorical_last_history_len[user_id] = _hist_len | |
| # ---- fetch news before retrieval so chunks are available for LLM ---- | |
| print(f"[DEBUG][classifier] topic={analysis.get('topic')} | needs_news_fetch={analysis.get('needs_news_fetch')} | news_topic={analysis.get('news_topic')} | news_question={analysis.get('news_question')}") | |
| news_chunks: list = [] | |
| if analysis.get("topic") == "news" and analysis.get("needs_news_fetch"): | |
| try: | |
| user_countries, user_langs = get_user_countries_and_languages(user_id) | |
| except Exception: | |
| user_countries, user_langs = ([], ["en"]) | |
| try: | |
| news_result = run_tavily_news_pipeline( | |
| news_question=analysis.get("news_question", user_msg_en), | |
| news_topic=analysis.get("news_topic", []), | |
| user_id=user_id, | |
| ) | |
| news_chunks = news_result.get("chunks", []) | |
| print(f"[DEBUG][tavily] status={news_result.get('status')} | articles={news_result.get('articles')} | chunks={len(news_chunks)}") | |
| for i, c in enumerate(news_chunks): | |
| print(f"[DEBUG][tavily] chunk[{i}] title={c.get('metadata',{}).get('title')} | date={c.get('metadata',{}).get('date')} | score={c.get('score')}") | |
| except Exception as e: | |
| print(f"β οΈ [news] Tavily pipeline failed: {e}") | |
| _ck = _t("news_fetch (conditional)", _ck) | |
| # ---- retrieve contextual chunks (RAG) β done once before routing ---- | |
| # Chunks are passed to all three pipelines (standard, socratic, pragmatic). | |
| # Skipped in voice_loop mode to reduce latency (~1.5s saved). | |
| if not voice_loop: | |
| try: | |
| chunks = retrieve_all_chunks( | |
| query=user_msg_en, | |
| model=embeddings, | |
| user_id=user_id, | |
| username=username, | |
| k=10, | |
| topic=analysis.get("topic"), | |
| topic_to_dbs=TOPIC_TO_DBS, | |
| ) | |
| except Exception as e: | |
| print(f"[chat] retrieval error (continuing without chunks): {e}") | |
| chunks = [] | |
| _ck = _t("retrieve_all_chunks (RAG)", _ck) | |
| else: | |
| chunks = [] | |
| # Prepend news chunks so the LLM sees them first | |
| if news_chunks: | |
| chunks = news_chunks + chunks | |
| # ---- Pragmatic Socratic route (structured stepwise reasoning) ---- | |
| # Covers: personal, historical, advice, meta, knowledge, creative | |
| # when the user's question calls for a structured, practical explanation. | |
| # "philosophical" is excluded β routes directly to the Socratic pipeline. | |
| # Runs BEFORE the Socratic route. | |
| # Also route to pragmatic if there is already an active dialogue, regardless of | |
| # the current message's classifier topic (the user may send a short follow-up | |
| # that gets misclassified as "dialogic" while mid-session). | |
| from supabase_pragmatic_socratic import get_active_pragmatic_dialogue as _get_active | |
| _has_active_dialogue = _get_active(user_id) is not None | |
| if char["use_pragmatic_route"] and (analysis.get("topic") in PRAGMATIC_TOPICS or _has_active_dialogue): | |
| try: | |
| # When the classifier already decided this is a guided/procedural question, | |
| # bypass the internal eligibility gate so the pragmatic pipeline fires directly. | |
| _force_eligible = analysis.get("response_mode") == "guided" | |
| pragmatic_result = run_pragmatic_turn( | |
| user_id=user_id, | |
| user_msg_en=user_msg_en, | |
| history=history_texts, | |
| topic=analysis.get("topic", "advice"), | |
| emotion=emotion_result, | |
| life_events=life_events, | |
| retrieved_chunks=chunks, | |
| force_eligible=_force_eligible, | |
| character_id=character_id, | |
| ) | |
| if pragmatic_result is not None: | |
| final_reply_en = pragmatic_result.get("reply_en") or "" | |
| # When the dialogue closes (stage=closed) there may be no reply; | |
| # let the standard pipeline handle the final message in that case. | |
| if final_reply_en: | |
| final_reply_display = translate_from_english(final_reply_en, user_lang_code) | |
| save_chat_message( | |
| message_en={"role": "user", "content": user_msg_en}, | |
| message_user_language={"role": "user", "content": user_msg}, | |
| user_id=user_id, | |
| ) | |
| save_chat_message( | |
| message_en={"role": "assistant", "content": final_reply_en, "character_id": character_id}, | |
| message_user_language={"role": "assistant", "content": final_reply_display, "character_id": character_id}, | |
| user_id=user_id, | |
| ) | |
| return { | |
| "reply": final_reply_display, | |
| "reply_en": final_reply_en, | |
| "user_lang_code": user_lang_code, | |
| "analysis": analysis, | |
| "emotion": emotion_result, | |
| "news_fetch": None, | |
| "pragmatic": pragmatic_result, | |
| } | |
| except Exception as e: | |
| print(f"[pragmatic] pipeline error, falling back: {e}") | |
| _ck = _t("pragmatic route", _ck) | |
| # ---- pick story (if any) ---- | |
| # Must run before the Socratic route so socratic_story_dic is defined when passed in. | |
| # Skipped in voice_loop mode to reduce latency. | |
| socratic_story_dic = None | |
| if char["use_stories"] and not voice_loop: | |
| try: | |
| socratic_story_dic = pick_story_with_fallback(user_id, story, character_id=character_id) | |
| except Exception as e: | |
| print(f"[story] pick failed (continuing without story): {e}") | |
| if socratic_story_dic and emotion_result and analysis.get("topic") in PERSONAL_BUCKET | {"philosophical"}: | |
| emotion_result["category"] = "storyteller" | |
| # ---- Philosophical Socratic route (values-based, reflective exploration) ---- | |
| # Stage A inside run_socratic_turn decides eligibility for personal topics. | |
| # opinion_clash bypasses Stage A and enters directly with the opinion entry_hook. | |
| if char["use_socratic_route"] and (analysis.get("topic") in char["dialogue_topics"] or opinion_data is not None): | |
| try: | |
| socratic_result = run_socratic_turn( | |
| user_id=user_id, | |
| user_msg_en=user_msg_en, | |
| history=history_texts, | |
| emotion=emotion_result, | |
| life_events=life_events, | |
| past_dialogues=past_dialogues, | |
| retrieved_chunks=chunks, | |
| socratic_story=socratic_story_dic, | |
| opinion_data=opinion_data, | |
| opinion_first_turn=opinion_first_turn, | |
| character_id=character_id, | |
| ) | |
| if socratic_result.get("eligible") is False: | |
| # Stage A ruled it out β fall through to the standard pipeline. | |
| pass | |
| else: | |
| final_reply_en = socratic_result["reply_en"] | |
| final_reply_display = translate_from_english(final_reply_en, user_lang_code) | |
| save_chat_message( | |
| message_en={"role": "user", "content": user_msg_en}, | |
| message_user_language={"role": "user", "content": user_msg}, | |
| user_id=user_id, | |
| ) | |
| save_chat_message( | |
| message_en={"role": "assistant", "content": final_reply_en, "character_id": character_id}, | |
| message_user_language={"role": "assistant", "content": final_reply_display, "character_id": character_id}, | |
| user_id=user_id, | |
| ) | |
| return { | |
| "reply": final_reply_display, | |
| "reply_en": final_reply_en, | |
| "user_lang_code": user_lang_code, | |
| "analysis": analysis, | |
| "emotion": emotion_result, | |
| "news_fetch": None, | |
| "socratic": socratic_result, | |
| } | |
| except Exception as e: | |
| print(f"[socratic] pipeline error, falling back to standard: {e}") | |
| _ck = _t("socratic route", _ck) | |
| # ---- pivot retrieval for closing messages ---- | |
| # When the conversation is winding down, the closing word ("ok", "thanks") is a | |
| # poor retrieval query and the topic route may not include db3/db5/db6. | |
| # Use the last substantive user message as the query instead. | |
| _is_closing_turn = ( | |
| (emotion_result or {}).get("intent") == "closure" | |
| or len(user_msg_en.strip().split()) <= 3 | |
| ) | |
| if _is_closing_turn: | |
| pivot_query = None | |
| for msg in reversed(history_texts): | |
| if msg.get("role") == "user" and len((msg.get("content") or "").split()) > 5: | |
| pivot_query = msg["content"] | |
| break | |
| if pivot_query: | |
| pivot_chunks = retrieve_all_chunks( | |
| query=pivot_query, | |
| model=embeddings, | |
| user_id=user_id, | |
| username=username, | |
| k=3, | |
| topic="pivot", | |
| topic_to_dbs={"pivot": ["db5"]}, | |
| ) | |
| seen_contents = {c["content"] for c in chunks} | |
| for c in pivot_chunks: | |
| if c["content"] not in seen_contents: | |
| chunks.append(c) | |
| # ---- Reconcile response_mode with perception signals ---- | |
| response_mode = analysis.get("response_mode", "dialogic") | |
| if emotion_result: | |
| p_intent = emotion_result.get("intent", "") | |
| p_receptivity = emotion_result.get("receptivity", "") | |
| if p_intent == "memory_check": | |
| response_mode = "factual" | |
| elif p_intent == "guidance" and p_receptivity != "overwhelmed": | |
| # Don't downgrade "guided" to "supportive": "guided" means a procedural | |
| # state machine was already chosen for this question upstream. | |
| if response_mode != "guided": | |
| response_mode = "supportive" | |
| # ---- final answer from LLM ---- | |
| final_reply_en = ask_socrates( | |
| user_input=user_msg_en, | |
| retrieved_chunks=chunks, | |
| relevant_missing=analysis.get("relevant_missing"), | |
| topic=analysis.get("topic"), | |
| socratic_story=socratic_story_dic, | |
| response_mode=response_mode, | |
| emotion=emotion_result, | |
| user_id=user_id, | |
| life_events=life_events, | |
| past_dialogues=past_dialogues, | |
| news_temporal_context=analysis.get("news_temporal_context", ""), | |
| rhetorical_hint=rhetorical_hint, | |
| reply_language=user_lang_code, | |
| character_id=character_id, | |
| time_context=time_context, | |
| ) | |
| _ck = _t("ask_socrates (final LLM)", _ck) | |
| # LLM already replied in user_lang_code β no post-translation needed | |
| final_reply_display = final_reply_en | |
| # ---- persist messages ---- | |
| save_chat_message( | |
| message_en={"role": "user", "content": user_msg_en}, | |
| message_user_language={"role": "user", "content": user_msg}, | |
| user_id=user_id, | |
| ) | |
| save_chat_message( | |
| message_en={"role": "assistant", "content": final_reply_en, "character_id": character_id}, | |
| message_user_language={ | |
| "role": "assistant", | |
| "content": final_reply_display, | |
| "character_id": character_id, | |
| }, | |
| user_id=user_id, | |
| ) | |
| # ---- update proactive_state with last active character ---- | |
| # proactive_pipeline.py reads this to know which persona to use | |
| try: | |
| supabase.table("proactive_state").upsert( | |
| {"user_id": user_id, "character_id": character_id}, | |
| on_conflict="user_id", | |
| ).execute() | |
| except Exception as _e: | |
| print(f"[proactive_state] character_id update failed: {_e}") | |
| # ---- log story usage (always, regardless of topic) ---- | |
| story_id = (socratic_story_dic or {}).get("id") | |
| if socratic_story_dic and story_id: | |
| try: | |
| log_story_usage( | |
| user_id=user_id, | |
| story_id=story_id, | |
| assistant_message=final_reply_en, | |
| ) | |
| except Exception as e: | |
| print(f"[story] log_story_usage failed: {e}") | |
| # Update topic log from this session (fire-and-forget, non-blocking) | |
| try: | |
| update_topic_log(user_id=user_id) | |
| except Exception as e: | |
| print(f"[topic_log] update failed: {e}") | |
| _t(f"TOTAL turn (user_id={user_id})", _t0) | |
| return { | |
| "reply": final_reply_display, | |
| "reply_en": final_reply_en, | |
| "user_lang_code": user_lang_code, | |
| "analysis": analysis, | |
| "emotion": emotion_result, | |
| "news_fetch": {"status": "ok", "articles": len(news_chunks)} if news_chunks else None, | |
| } | |
| def generate_proactive_nudge( | |
| user_id: str, | |
| username: str, | |
| profile: Optional[Dict[str, Any]] = None, | |
| idle_seconds: int = 60, | |
| character_id: str = "socrates", | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| If the last message in short-term history is from the assistant and was sent | |
| more than idle_seconds ago (with no user reply since), retrieve pivot context | |
| and generate a short proactive follow-up question. | |
| Returns a dict {reply, reply_en, user_lang_code} or None if not triggered. | |
| """ | |
| from ask_llm_final_prompt import query_llm_rewrite_only | |
| history = _prepare_chat_history_for_retrieval(user_id=user_id) | |
| if not history: | |
| return None | |
| # Condition 1: last message must be from the assistant (user hasn't replied) | |
| last_msg = history[-1] | |
| if last_msg.get("role") != "assistant": | |
| return None | |
| # Condition 2: don't nudge again if a nudge was already sent (β₯2 consecutive | |
| # assistant messages at the tail means we already poked the user) | |
| consecutive_assistant = 0 | |
| for msg in reversed(history): | |
| if msg.get("role") == "assistant": | |
| consecutive_assistant += 1 | |
| else: | |
| break | |
| if consecutive_assistant >= 2: | |
| return None | |
| # Condition 3: at least idle_seconds have passed | |
| last_time_str = last_msg.get("time") or last_msg.get("timestamp") | |
| if not last_time_str: | |
| return None | |
| try: | |
| last_time = datetime.fromisoformat(last_time_str.replace("Z", "+00:00")) | |
| if last_time.tzinfo is None: | |
| last_time = last_time.replace(tzinfo=timezone.utc) | |
| now_aware = datetime.now(timezone.utc) | |
| elapsed = (now_aware - last_time).total_seconds() | |
| if elapsed < idle_seconds: | |
| return None | |
| except Exception as e: | |
| print(f"[nudge] time parse error: {e}") | |
| return None | |
| # Find last substantive user message to use as retrieval query | |
| pivot_query = None | |
| for msg in reversed(history): | |
| if msg.get("role") == "user" and len((msg.get("content") or "").split()) > 5: | |
| pivot_query = msg["content"] | |
| break | |
| if not pivot_query: | |
| return None | |
| # Retrieve pivot chunks from db5 (best-effort β nudge works without them) | |
| pivot_chunks = [] | |
| try: | |
| pivot_chunks = retrieve_all_chunks( | |
| query=pivot_query, | |
| model=embeddings, | |
| user_id=user_id, | |
| username=username, | |
| k=3, | |
| topic="pivot", | |
| topic_to_dbs={"pivot": ["db5"]}, | |
| ) | |
| except Exception as e: | |
| print(f"[nudge] retrieval error (continuing without chunks): {e}") | |
| context_str = "\n\n".join(c["content"] for c in pivot_chunks[:3]) if pivot_chunks else "" | |
| display_name = (profile or {}).get("Name") or (profile or {}).get("name") or "my friend" | |
| user_lang_code = get_initial_language(user_id) | |
| nudge_persona = get_character(character_id).get( | |
| "proactive_persona", | |
| "You are Socrates β warm, sardonic, genuinely curious.", | |
| ) | |
| context_block = f"Relevant context from memory:\n{context_str}" if context_str else "" | |
| prompt = f"""{nudge_persona} The user went quiet after your last reply. | |
| Last thing the user said (before going quiet): | |
| \"\"\"{pivot_query}\"\"\" | |
| {context_block} | |
| Write a SHORT proactive follow-up (20β50 words). Pick ONE angle: | |
| - If context is available: reference something specific from it. E.g. "By the way β did you ever sort out [X]?" | |
| - If no context: pick up an unresolved thread from what the user last said and gently nudge them back. | |
| Warm, natural Socratic voice. ONE question only. No preamble.""" | |
| try: | |
| nudge_en = query_llm_rewrite_only(prompt) | |
| except Exception as e: | |
| print(f"[nudge] LLM error: {e}") | |
| return None | |
| nudge_display = translate_from_english(nudge_en, user_lang_code) | |
| save_chat_message( | |
| message_en={"role": "assistant", "content": nudge_en, "character_id": character_id}, | |
| message_user_language={"role": "assistant", "content": nudge_display, "character_id": character_id}, | |
| user_id=user_id, | |
| ) | |
| # Send push notification so the user is alerted even when the app is in background | |
| try: | |
| send_expo_push_notification(user_id=user_id, body=nudge_display) | |
| except Exception as e: | |
| print(f"[nudge] push error: {e}") | |
| return { | |
| "reply": nudge_display, | |
| "reply_en": nudge_en, | |
| "user_lang_code": user_lang_code, | |
| } | |