Spaces:
Sleeping
Sleeping
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| #from langchain.schema import Document as LangchainDocument | |
| from langchain_core.documents import Document as LangchainDocument | |
| import config | |
| from pathlib import Path | |
| import os | |
| import json | |
| from datetime import datetime, timedelta | |
| from supabase import create_client | |
| from config import SUPABASE_URL, SUPABASE_SERVICE_KEY, HF_EMBEDDING_MODEL | |
| from tempfile import TemporaryDirectory | |
| embedding_model = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL) | |
| supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) | |
| from supabase_ie import _load_history, _save_history | |
| # --- LOAD CHAT HISTORY --- | |
| def _prepare_chat_history_for_retrieval(user_id: str = "None", max_turns: int = 10): | |
| """Return recent chat messages (flattened) from short-term for prompt context.""" | |
| short = _load_history("chat_history_short", user_id) | |
| if not short["sessions"]: | |
| return [] | |
| current = short["sessions"][-1]["messages"] | |
| return current[-max_turns:] | |
| # --- SESSION EXPIRY --- | |
| def close_current_session_if_expired(max_idle_minutes: int = 360, user_id: str = "None"): #TBC | |
| """Close session if idle too long in short-term history.""" | |
| short = _load_history("chat_history_short", user_id) | |
| if not short["sessions"]: | |
| return short | |
| current = short["sessions"][-1] | |
| if not current["messages"]: | |
| return short | |
| last_msg_time = datetime.fromisoformat(current["messages"][-1]["time"]) | |
| if datetime.utcnow() - last_msg_time > timedelta(minutes=max_idle_minutes): | |
| current["status"] = "closed" | |
| current["date_closed"] = datetime.utcnow().isoformat() | |
| _save_history("chat_history_short", short, user_id) | |
| return short | |
| # --- SAVE CHAT --- | |
| def save_chat_message(message_en: dict, message_user_language: dict, user_id: str = "None"): | |
| """Append a message to both short-term and total history in Supabase.""" | |
| now = datetime.utcnow().isoformat() | |
| if isinstance(message_user_language, str): | |
| message_user_language = {"lang": message_user_language} | |
| # Ensure timestamp is present | |
| if "time" not in message_en: | |
| message_en["time"] = now | |
| if "time" not in message_user_language: | |
| message_user_language["time"] = now | |
| # --- TOTAL (user language for UI) --- | |
| total = _load_history("chat_history_total", user_id) | |
| if not total["sessions"]: | |
| total["sessions"].append({ | |
| "session_id": 1, | |
| "status": "open", | |
| "date_opened": now, | |
| "date_closed": None, | |
| "messages": [] | |
| }) | |
| total["sessions"][-1]["messages"].append(message_user_language) | |
| _save_history("chat_history_total", total, user_id) | |
| # --- SHORT (English for reasoning) --- | |
| short = _load_history("chat_history_short", user_id) | |
| if not short["sessions"]: | |
| short["sessions"].append({ | |
| "session_id": 1, | |
| "status": "open", | |
| "date_opened": now, | |
| "date_closed": None, | |
| "messages": [] | |
| }) | |
| short["sessions"][-1]["messages"].append(message_en) | |
| _save_history("chat_history_short", short, user_id) | |
| print(f"[DEBUG][SAVE_CHAT] total_user_language += {message_user_language}") | |
| print(f"[DEBUG][SAVE_CHAT] short_english += {message_en}") | |
| # --- SESSION MANAGEMENT HELPERS --- | |
| def on_pre_message_tick(user_id: str, username: str): | |
| """Call before new user message: close session if expired, rotate/archive if needed.""" | |
| closed = close_current_session_if_expired(user_id=user_id) | |
| if closed: | |
| rotate_archive_if_needed(max_sessions=2, user_id=user_id, username=username) | |
| # --- ARCHIVE --- | |
| def rotate_archive_if_needed(max_sessions: int = 2, user_id: str = "None", username: str = "None"): | |
| """Keep only recent sessions in short-term, archive old closed ones to FAISS.""" | |
| short = _load_history("chat_history_short", user_id) | |
| while len(short["sessions"]) > max_sessions: | |
| old_session = short["sessions"].pop(0) | |
| if old_session["status"] == "closed": | |
| # Build lightweight summary | |
| summary_dict = _summarise_batch_with_llm([old_session]) | |
| summary_text = summary_dict["summary"] | |
| # Metadata for traceability | |
| metadata = { | |
| "session_id": old_session["session_id"], | |
| "date_opened": old_session.get("date_opened"), | |
| "date_closed": old_session.get("date_closed"), | |
| "user_id": user_id | |
| } | |
| # Archive into FAISS with metadata | |
| _faiss_from_summary_and_merge(summary_dict, db_name="db5", username=username) | |
| _save_history("chat_history_short", short, user_id) | |
| return short | |
| # --- SUMMARISER HELPERS --- | |
| def _summarise_batch_with_llm(batch_sessions: list[dict]) -> dict: | |
| """ | |
| Lightweight summariser for sessions: just collect recent user prompts. | |
| Returns a dict with summary text (string), date, and session_ids. | |
| Suitable for FAISS storage without an LLM call. | |
| """ | |
| lines = [] | |
| for s in batch_sessions: | |
| for m in s.get("messages", []): | |
| if m.get("role") == "user": | |
| txt = m.get("content", "").strip() | |
| if txt: | |
| lines.append(txt) | |
| # Take only the last 20 user prompts | |
| highlights = lines[-20:] | |
| summary_text = ( | |
| f"Batch of {len(batch_sessions)} sessions " | |
| f"(IDs {batch_sessions[0]['session_id']}–{batch_sessions[-1]['session_id']}).\n" | |
| f"Recent user prompts:\n- " + "\n- ".join(highlights) | |
| ) | |
| return { | |
| "summary": summary_text, | |
| "date": datetime.now().strftime("%Y-%m-%d"), | |
| "session_ids": [s["session_id"] for s in batch_sessions] | |
| } | |
| def _faiss_from_summary_and_merge(summary: dict, db_name="db5", username: str = "None"): | |
| """ | |
| Add a summary dict to FAISS stored in Supabase. | |
| Path: users/user_<name>/{db_name}/index.faiss, index.pkl | |
| """ | |
| user_folder = f"user_{username}" | |
| embeddings = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL) | |
| with TemporaryDirectory() as tmp_dir: | |
| tmp_path = Path(tmp_dir) | |
| # 1. Create new FAISS index from summary | |
| doc = LangchainDocument( | |
| page_content=summary["summary"], | |
| metadata={ | |
| "date": summary["date"], | |
| "session_ids": summary["session_ids"], | |
| "user": user_folder | |
| } | |
| ) | |
| new_db = FAISS.from_documents([doc], embeddings) | |
| new_db.save_local(str(tmp_path)) | |
| # 2. Try to download existing FAISS DB from Supabase | |
| existing_path = tmp_path / "existing" | |
| existing_path.mkdir(exist_ok=True) | |
| files = ["index.faiss", "index.pkl"] | |
| has_existing = True | |
| for f in files: | |
| try: | |
| res = supabase.storage.from_("vector_dbs").download(f"users/{user_folder}/{db_name}/{f}") | |
| with open(existing_path / f, "wb") as out: | |
| out.write(res) | |
| except Exception: | |
| has_existing = False | |
| # 3. Merge if existing DB was found | |
| if has_existing: | |
| base = FAISS.load_local(str(existing_path), embeddings, allow_dangerous_deserialization=True) | |
| incr = FAISS.load_local(str(tmp_path), embeddings, allow_dangerous_deserialization=True) | |
| base.merge_from(incr) | |
| base.save_local(str(tmp_path)) | |
| # 4. Upload merged FAISS back to Supabase | |
| for f in files: | |
| with open(tmp_path / f, "rb") as fh: | |
| supabase.storage.from_("vector_dbs").upload( | |
| f"users/{user_folder}/{db_name}/{f}", fh, {"upsert": "true"} | |
| ) | |