from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS #from langchain.schema import Document as LangchainDocument from langchain_core.documents import Document as LangchainDocument import config from pathlib import Path import os import json from datetime import datetime, timedelta from supabase import create_client from config import SUPABASE_URL, SUPABASE_SERVICE_KEY, HF_EMBEDDING_MODEL from tempfile import TemporaryDirectory embedding_model = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL) supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) from supabase_ie import _load_history, _save_history # --- LOAD CHAT HISTORY --- def _prepare_chat_history_for_retrieval(user_id: str = "None", max_turns: int = 10): """Return recent chat messages (flattened) from short-term for prompt context.""" short = _load_history("chat_history_short", user_id) if not short["sessions"]: return [] current = short["sessions"][-1]["messages"] return current[-max_turns:] # --- SESSION EXPIRY --- def close_current_session_if_expired(max_idle_minutes: int = 360, user_id: str = "None"): #TBC """Close session if idle too long in short-term history.""" short = _load_history("chat_history_short", user_id) if not short["sessions"]: return short current = short["sessions"][-1] if not current["messages"]: return short last_msg_time = datetime.fromisoformat(current["messages"][-1]["time"]) if datetime.utcnow() - last_msg_time > timedelta(minutes=max_idle_minutes): current["status"] = "closed" current["date_closed"] = datetime.utcnow().isoformat() _save_history("chat_history_short", short, user_id) return short # --- SAVE CHAT --- def save_chat_message(message_en: dict, message_user_language: dict, user_id: str = "None"): """Append a message to both short-term and total history in Supabase.""" now = datetime.utcnow().isoformat() if isinstance(message_user_language, str): message_user_language = {"lang": message_user_language} # Ensure timestamp is present if "time" not in message_en: message_en["time"] = now if "time" not in message_user_language: message_user_language["time"] = now # --- TOTAL (user language for UI) --- total = _load_history("chat_history_total", user_id) if not total["sessions"]: total["sessions"].append({ "session_id": 1, "status": "open", "date_opened": now, "date_closed": None, "messages": [] }) total["sessions"][-1]["messages"].append(message_user_language) _save_history("chat_history_total", total, user_id) # --- SHORT (English for reasoning) --- short = _load_history("chat_history_short", user_id) if not short["sessions"]: short["sessions"].append({ "session_id": 1, "status": "open", "date_opened": now, "date_closed": None, "messages": [] }) short["sessions"][-1]["messages"].append(message_en) _save_history("chat_history_short", short, user_id) print(f"[DEBUG][SAVE_CHAT] total_user_language += {message_user_language}") print(f"[DEBUG][SAVE_CHAT] short_english += {message_en}") # --- SESSION MANAGEMENT HELPERS --- def on_pre_message_tick(user_id: str, username: str): """Call before new user message: close session if expired, rotate/archive if needed.""" closed = close_current_session_if_expired(user_id=user_id) if closed: rotate_archive_if_needed(max_sessions=2, user_id=user_id, username=username) # --- ARCHIVE --- def rotate_archive_if_needed(max_sessions: int = 2, user_id: str = "None", username: str = "None"): """Keep only recent sessions in short-term, archive old closed ones to FAISS.""" short = _load_history("chat_history_short", user_id) while len(short["sessions"]) > max_sessions: old_session = short["sessions"].pop(0) if old_session["status"] == "closed": # Build lightweight summary summary_dict = _summarise_batch_with_llm([old_session]) summary_text = summary_dict["summary"] # Metadata for traceability metadata = { "session_id": old_session["session_id"], "date_opened": old_session.get("date_opened"), "date_closed": old_session.get("date_closed"), "user_id": user_id } # Archive into FAISS with metadata _faiss_from_summary_and_merge(summary_dict, db_name="db5", username=username) _save_history("chat_history_short", short, user_id) return short # --- SUMMARISER HELPERS --- def _summarise_batch_with_llm(batch_sessions: list[dict]) -> dict: """ Lightweight summariser for sessions: just collect recent user prompts. Returns a dict with summary text (string), date, and session_ids. Suitable for FAISS storage without an LLM call. """ lines = [] for s in batch_sessions: for m in s.get("messages", []): if m.get("role") == "user": txt = m.get("content", "").strip() if txt: lines.append(txt) # Take only the last 20 user prompts highlights = lines[-20:] summary_text = ( f"Batch of {len(batch_sessions)} sessions " f"(IDs {batch_sessions[0]['session_id']}–{batch_sessions[-1]['session_id']}).\n" f"Recent user prompts:\n- " + "\n- ".join(highlights) ) return { "summary": summary_text, "date": datetime.now().strftime("%Y-%m-%d"), "session_ids": [s["session_id"] for s in batch_sessions] } def _faiss_from_summary_and_merge(summary: dict, db_name="db5", username: str = "None"): """ Add a summary dict to FAISS stored in Supabase. Path: users/user_/{db_name}/index.faiss, index.pkl """ user_folder = f"user_{username}" embeddings = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL) with TemporaryDirectory() as tmp_dir: tmp_path = Path(tmp_dir) # 1. Create new FAISS index from summary doc = LangchainDocument( page_content=summary["summary"], metadata={ "date": summary["date"], "session_ids": summary["session_ids"], "user": user_folder } ) new_db = FAISS.from_documents([doc], embeddings) new_db.save_local(str(tmp_path)) # 2. Try to download existing FAISS DB from Supabase existing_path = tmp_path / "existing" existing_path.mkdir(exist_ok=True) files = ["index.faiss", "index.pkl"] has_existing = True for f in files: try: res = supabase.storage.from_("vector_dbs").download(f"users/{user_folder}/{db_name}/{f}") with open(existing_path / f, "wb") as out: out.write(res) except Exception: has_existing = False # 3. Merge if existing DB was found if has_existing: base = FAISS.load_local(str(existing_path), embeddings, allow_dangerous_deserialization=True) incr = FAISS.load_local(str(tmp_path), embeddings, allow_dangerous_deserialization=True) base.merge_from(incr) base.save_local(str(tmp_path)) # 4. Upload merged FAISS back to Supabase for f in files: with open(tmp_path / f, "rb") as fh: supabase.storage.from_("vector_dbs").upload( f"users/{user_folder}/{db_name}/{f}", fh, {"upsert": "true"} )