""" InMemory session manager. - Thread-safe via Lock - TTL-based lazy eviction - Bounded by MAX_SESSIONS """ from __future__ import annotations import os import threading import uuid from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Optional SESSION_TTL_SEC = int(os.environ.get("SESSION_TTL_SEC", "1800")) MAX_SESSIONS = int(os.environ.get("MAX_SESSIONS", "1000")) MAX_TURNS = 50 @dataclass class AccumulatedContext: campaign_name: Optional[str] = None industry: Optional[str] = None cvr: Optional[float] = None ctr: Optional[float] = None cpa: Optional[float] = None image_base64: Optional[str] = None def merge(self, ctx: "AccumulatedContext") -> None: """Merge new values in -- never overwrites with None.""" if ctx.campaign_name is not None: self.campaign_name = ctx.campaign_name if ctx.industry is not None: self.industry = ctx.industry if ctx.cvr is not None: self.cvr = ctx.cvr if ctx.ctr is not None: self.ctr = ctx.ctr if ctx.cpa is not None: self.cpa = ctx.cpa if ctx.image_base64 is not None: self.image_base64 = ctx.image_base64 @dataclass class HistoryEntry: role: str # "user" | "assistant" content: str timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat()) @dataclass class SessionState: session_id: str created_at: datetime = field(default_factory=datetime.utcnow) last_accessed: datetime = field(default_factory=datetime.utcnow) turn_count: int = 0 accumulated_context: AccumulatedContext = field(default_factory=AccumulatedContext) history: list[HistoryEntry] = field(default_factory=list) current_level: str = "level1" def is_expired(self) -> bool: return datetime.utcnow() - self.last_accessed > timedelta(seconds=SESSION_TTL_SEC) def touch(self) -> None: self.last_accessed = datetime.utcnow() class SessionStore: def __init__(self) -> None: self._sessions: dict[str, SessionState] = {} self._lock = threading.Lock() def create(self) -> SessionState: with self._lock: self._evict_expired() if len(self._sessions) >= MAX_SESSIONS: raise RuntimeError("MAX_SESSIONS limit reached") session_id = str(uuid.uuid4()) state = SessionState(session_id=session_id) self._sessions[session_id] = state return state def get(self, session_id: str) -> Optional[SessionState]: with self._lock: state = self._sessions.get(session_id) if state is None: return None if state.is_expired(): del self._sessions[session_id] return None state.touch() return state def save(self, state: SessionState) -> None: with self._lock: self._sessions[state.session_id] = state def _evict_expired(self) -> None: expired = [sid for sid, s in self._sessions.items() if s.is_expired()] for sid in expired: del self._sessions[sid] def count(self) -> int: with self._lock: return len(self._sessions) # Singleton store = SessionStore()