Spaces:
Sleeping
Sleeping
| """In-memory agent session store for short-term memory.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from threading import Lock | |
| from typing import Dict, List | |
| class SessionMemory: | |
| short_turns: List[dict[str, str]] = field(default_factory=list) | |
| class AgentSessionStore: | |
| def __init__(self) -> None: | |
| self._lock = Lock() | |
| self._sessions: Dict[str, SessionMemory] = {} | |
| def get(self, session_id: str) -> SessionMemory: | |
| with self._lock: | |
| if session_id not in self._sessions: | |
| self._sessions[session_id] = SessionMemory() | |
| return self._sessions[session_id] | |
| def reset(self, session_id: str) -> None: | |
| with self._lock: | |
| self._sessions.pop(session_id, None) | |
| def reset_all(self) -> None: | |
| with self._lock: | |
| self._sessions.clear() | |
| SESSION_STORE = AgentSessionStore() | |