Spaces:
Paused
Paused
| """ | |
| session.py | |
| ========== | |
| In-memory session store with automatic TTL cleanup. | |
| Sessions are dicts keyed by session_id (UUID from frontend). | |
| A background task purges sessions inactive for SESSION_TTL_MINUTES. | |
| """ | |
| import time | |
| import asyncio | |
| import logging | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SESSION_TTL_MINUTES = 30 # delete session after this many minutes of inactivity | |
| CLEANUP_INTERVAL_SECS = 600 # run cleanup every 10 minutes | |
| MAX_HISTORY_TURNS = 10 # keep last N user+assistant pairs in memory | |
| SESSION_MSG_LIMIT = 20 # hard cap β after this, agent redirects to email | |
| # ββ Store βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _sessions: dict[str, dict] = {} | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_or_create(session_id: str) -> dict: | |
| """Return existing session or create a fresh one.""" | |
| if session_id not in _sessions: | |
| _sessions[session_id] = { | |
| "history": [], # list of {"role": ..., "content": ...} | |
| "message_count": 0, | |
| "last_active": time.time(), | |
| } | |
| return _sessions[session_id] | |
| def append(session_id: str, role: str, content: str) -> None: | |
| """Append a message to session history and update last_active.""" | |
| session = get_or_create(session_id) | |
| session["history"].append({"role": role, "content": content}) | |
| session["message_count"] += 1 | |
| session["last_active"] = time.time() | |
| # Trim to MAX_HISTORY_TURNS pairs (keep most recent) | |
| max_msgs = MAX_HISTORY_TURNS * 2 | |
| if len(session["history"]) > max_msgs: | |
| session["history"] = session["history"][-max_msgs:] | |
| def get_history(session_id: str) -> list[dict]: | |
| """Return conversation history for a session.""" | |
| return _sessions.get(session_id, {}).get("history", []) | |
| def get_message_count(session_id: str) -> int: | |
| """Return how many messages have been sent in this session.""" | |
| return _sessions.get(session_id, {}).get("message_count", 0) | |
| def is_over_limit(session_id: str) -> bool: | |
| return get_message_count(session_id) >= SESSION_MSG_LIMIT | |
| # ββ Background Cleanup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def cleanup_loop() -> None: | |
| """Async background task β deletes stale sessions every CLEANUP_INTERVAL_SECS.""" | |
| while True: | |
| await asyncio.sleep(CLEANUP_INTERVAL_SECS) | |
| _purge_stale() | |
| def _purge_stale() -> None: | |
| cutoff = time.time() - SESSION_TTL_MINUTES * 60 | |
| stale = [sid for sid, s in _sessions.items() if s["last_active"] < cutoff] | |
| for sid in stale: | |
| del _sessions[sid] | |
| if stale: | |
| logger.info(f"Session cleanup: removed {len(stale)} stale session(s).") | |