Socrates_docker / db_5_process_session.py
alesamodio's picture
remove fet_user_id and get_username
0b170f9
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
#from langchain.schema import Document as LangchainDocument
from langchain_core.documents import Document as LangchainDocument
import config
from pathlib import Path
import os
import json
from datetime import datetime, timedelta
from supabase import create_client
from config import SUPABASE_URL, SUPABASE_SERVICE_KEY, HF_EMBEDDING_MODEL
from tempfile import TemporaryDirectory
embedding_model = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL)
supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
from supabase_ie import _load_history, _save_history
# --- LOAD CHAT HISTORY ---
def _prepare_chat_history_for_retrieval(user_id: str = "None", max_turns: int = 10):
"""Return recent chat messages (flattened) from short-term for prompt context."""
short = _load_history("chat_history_short", user_id)
if not short["sessions"]:
return []
current = short["sessions"][-1]["messages"]
return current[-max_turns:]
# --- SESSION EXPIRY ---
def close_current_session_if_expired(max_idle_minutes: int = 360, user_id: str = "None"): #TBC
"""Close session if idle too long in short-term history."""
short = _load_history("chat_history_short", user_id)
if not short["sessions"]:
return short
current = short["sessions"][-1]
if not current["messages"]:
return short
last_msg_time = datetime.fromisoformat(current["messages"][-1]["time"])
if datetime.utcnow() - last_msg_time > timedelta(minutes=max_idle_minutes):
current["status"] = "closed"
current["date_closed"] = datetime.utcnow().isoformat()
_save_history("chat_history_short", short, user_id)
return short
# --- SAVE CHAT ---
def save_chat_message(message_en: dict, message_user_language: dict, user_id: str = "None"):
"""Append a message to both short-term and total history in Supabase."""
now = datetime.utcnow().isoformat()
if isinstance(message_user_language, str):
message_user_language = {"lang": message_user_language}
# Ensure timestamp is present
if "time" not in message_en:
message_en["time"] = now
if "time" not in message_user_language:
message_user_language["time"] = now
# --- TOTAL (user language for UI) ---
total = _load_history("chat_history_total", user_id)
if not total["sessions"]:
total["sessions"].append({
"session_id": 1,
"status": "open",
"date_opened": now,
"date_closed": None,
"messages": []
})
total["sessions"][-1]["messages"].append(message_user_language)
_save_history("chat_history_total", total, user_id)
# --- SHORT (English for reasoning) ---
short = _load_history("chat_history_short", user_id)
if not short["sessions"]:
short["sessions"].append({
"session_id": 1,
"status": "open",
"date_opened": now,
"date_closed": None,
"messages": []
})
short["sessions"][-1]["messages"].append(message_en)
_save_history("chat_history_short", short, user_id)
print(f"[DEBUG][SAVE_CHAT] total_user_language += {message_user_language}")
print(f"[DEBUG][SAVE_CHAT] short_english += {message_en}")
# --- SESSION MANAGEMENT HELPERS ---
def on_pre_message_tick(user_id: str, username: str):
"""Call before new user message: close session if expired, rotate/archive if needed."""
closed = close_current_session_if_expired(user_id=user_id)
if closed:
rotate_archive_if_needed(max_sessions=2, user_id=user_id, username=username)
# --- ARCHIVE ---
def rotate_archive_if_needed(max_sessions: int = 2, user_id: str = "None", username: str = "None"):
"""Keep only recent sessions in short-term, archive old closed ones to FAISS."""
short = _load_history("chat_history_short", user_id)
while len(short["sessions"]) > max_sessions:
old_session = short["sessions"].pop(0)
if old_session["status"] == "closed":
# Build lightweight summary
summary_dict = _summarise_batch_with_llm([old_session])
summary_text = summary_dict["summary"]
# Metadata for traceability
metadata = {
"session_id": old_session["session_id"],
"date_opened": old_session.get("date_opened"),
"date_closed": old_session.get("date_closed"),
"user_id": user_id
}
# Archive into FAISS with metadata
_faiss_from_summary_and_merge(summary_dict, db_name="db5", username=username)
_save_history("chat_history_short", short, user_id)
return short
# --- SUMMARISER HELPERS ---
def _summarise_batch_with_llm(batch_sessions: list[dict]) -> dict:
"""
Lightweight summariser for sessions: just collect recent user prompts.
Returns a dict with summary text (string), date, and session_ids.
Suitable for FAISS storage without an LLM call.
"""
lines = []
for s in batch_sessions:
for m in s.get("messages", []):
if m.get("role") == "user":
txt = m.get("content", "").strip()
if txt:
lines.append(txt)
# Take only the last 20 user prompts
highlights = lines[-20:]
summary_text = (
f"Batch of {len(batch_sessions)} sessions "
f"(IDs {batch_sessions[0]['session_id']}{batch_sessions[-1]['session_id']}).\n"
f"Recent user prompts:\n- " + "\n- ".join(highlights)
)
return {
"summary": summary_text,
"date": datetime.now().strftime("%Y-%m-%d"),
"session_ids": [s["session_id"] for s in batch_sessions]
}
def _faiss_from_summary_and_merge(summary: dict, db_name="db5", username: str = "None"):
"""
Add a summary dict to FAISS stored in Supabase.
Path: users/user_<name>/{db_name}/index.faiss, index.pkl
"""
user_folder = f"user_{username}"
embeddings = HuggingFaceEmbeddings(model_name=HF_EMBEDDING_MODEL)
with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
# 1. Create new FAISS index from summary
doc = LangchainDocument(
page_content=summary["summary"],
metadata={
"date": summary["date"],
"session_ids": summary["session_ids"],
"user": user_folder
}
)
new_db = FAISS.from_documents([doc], embeddings)
new_db.save_local(str(tmp_path))
# 2. Try to download existing FAISS DB from Supabase
existing_path = tmp_path / "existing"
existing_path.mkdir(exist_ok=True)
files = ["index.faiss", "index.pkl"]
has_existing = True
for f in files:
try:
res = supabase.storage.from_("vector_dbs").download(f"users/{user_folder}/{db_name}/{f}")
with open(existing_path / f, "wb") as out:
out.write(res)
except Exception:
has_existing = False
# 3. Merge if existing DB was found
if has_existing:
base = FAISS.load_local(str(existing_path), embeddings, allow_dangerous_deserialization=True)
incr = FAISS.load_local(str(tmp_path), embeddings, allow_dangerous_deserialization=True)
base.merge_from(incr)
base.save_local(str(tmp_path))
# 4. Upload merged FAISS back to Supabase
for f in files:
with open(tmp_path / f, "rb") as fh:
supabase.storage.from_("vector_dbs").upload(
f"users/{user_folder}/{db_name}/{f}", fh, {"upsert": "true"}
)