File size: 3,358 Bytes
7df55e6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
session.py
==========
In-memory session store with automatic TTL cleanup.
Sessions are dicts keyed by session_id (UUID from frontend).
A background task purges sessions inactive for SESSION_TTL_MINUTES.
"""

import time
import asyncio
import logging
from typing import Optional

logger = logging.getLogger(__name__)

# ── Config ────────────────────────────────────────────────────────────────────
SESSION_TTL_MINUTES   = 30   # delete session after this many minutes of inactivity
CLEANUP_INTERVAL_SECS = 600  # run cleanup every 10 minutes
MAX_HISTORY_TURNS     = 10   # keep last N user+assistant pairs in memory
SESSION_MSG_LIMIT     = 20   # hard cap β€” after this, agent redirects to email

# ── Store ─────────────────────────────────────────────────────────────────────
_sessions: dict[str, dict] = {}


# ── Public API ────────────────────────────────────────────────────────────────

def get_or_create(session_id: str) -> dict:
    """Return existing session or create a fresh one."""
    if session_id not in _sessions:
        _sessions[session_id] = {
            "history":       [],   # list of {"role": ..., "content": ...}
            "message_count": 0,
            "last_active":   time.time(),
        }
    return _sessions[session_id]


def append(session_id: str, role: str, content: str) -> None:
    """Append a message to session history and update last_active."""
    session = get_or_create(session_id)
    session["history"].append({"role": role, "content": content})
    session["message_count"] += 1
    session["last_active"] = time.time()

    # Trim to MAX_HISTORY_TURNS pairs (keep most recent)
    max_msgs = MAX_HISTORY_TURNS * 2
    if len(session["history"]) > max_msgs:
        session["history"] = session["history"][-max_msgs:]


def get_history(session_id: str) -> list[dict]:
    """Return conversation history for a session."""
    return _sessions.get(session_id, {}).get("history", [])


def get_message_count(session_id: str) -> int:
    """Return how many messages have been sent in this session."""
    return _sessions.get(session_id, {}).get("message_count", 0)


def is_over_limit(session_id: str) -> bool:
    return get_message_count(session_id) >= SESSION_MSG_LIMIT


# ── Background Cleanup ────────────────────────────────────────────────────────

async def cleanup_loop() -> None:
    """Async background task β€” deletes stale sessions every CLEANUP_INTERVAL_SECS."""
    while True:
        await asyncio.sleep(CLEANUP_INTERVAL_SECS)
        _purge_stale()


def _purge_stale() -> None:
    cutoff = time.time() - SESSION_TTL_MINUTES * 60
    stale  = [sid for sid, s in _sessions.items() if s["last_active"] < cutoff]
    for sid in stale:
        del _sessions[sid]
    if stale:
        logger.info(f"Session cleanup: removed {len(stale)} stale session(s).")