Spaces:
Sleeping
Sleeping
| """ | |
| Context Memory Module | |
| Manages session-based context for conversation continuity | |
| """ | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime, timedelta | |
| import threading | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class ContextMemory: | |
| def __init__(self, max_sessions: int = 1000, session_timeout_hours: int = 24): | |
| """ | |
| Initialize context memory store | |
| Args: | |
| max_sessions: Maximum number of sessions to store | |
| session_timeout_hours: Hours before session expires | |
| """ | |
| self._store: Dict[str, Dict[str, Any]] = {} | |
| self._timestamps: Dict[str, datetime] = {} | |
| self._lock = threading.Lock() | |
| self._max_sessions = max_sessions | |
| self._session_timeout = timedelta(hours=session_timeout_hours) | |
| def save_context(self, session_id: str, context: Dict[str, Any]) -> bool: | |
| """ | |
| Save context for a session | |
| Args: | |
| session_id: Unique session identifier | |
| context: Context data to store | |
| Returns: | |
| True if saved successfully | |
| """ | |
| with self._lock: | |
| # Clean up old sessions if we're at capacity | |
| if len(self._store) >= self._max_sessions: | |
| self._cleanup_old_sessions() | |
| self._store[session_id] = context | |
| self._timestamps[session_id] = datetime.now() | |
| logger.debug(f"Saved context for session {session_id}") | |
| return True | |
| def get_context(self, session_id: str, default: Any = None) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get context for a session | |
| Args: | |
| session_id: Unique session identifier | |
| default: Default value if session not found | |
| Returns: | |
| Context data or default | |
| """ | |
| with self._lock: | |
| if session_id in self._store: | |
| # Check if session has expired | |
| if self._is_expired(session_id): | |
| self._remove_session(session_id) | |
| return default | |
| # Update timestamp on access | |
| self._timestamps[session_id] = datetime.now() | |
| return self._store[session_id] | |
| return default | |
| def update_context(self, session_id: str, updates: Dict[str, Any]) -> bool: | |
| """ | |
| Update existing context with new data | |
| Args: | |
| session_id: Unique session identifier | |
| updates: Data to merge into existing context | |
| Returns: | |
| True if updated successfully | |
| """ | |
| with self._lock: | |
| if session_id in self._store: | |
| self._store[session_id].update(updates) | |
| self._timestamps[session_id] = datetime.now() | |
| return True | |
| return False | |
| def clear_context(self, session_id: str) -> bool: | |
| """ | |
| Clear context for a session | |
| Args: | |
| session_id: Unique session identifier | |
| Returns: | |
| True if cleared successfully | |
| """ | |
| with self._lock: | |
| return self._remove_session(session_id) | |
| def _is_expired(self, session_id: str) -> bool: | |
| """Check if a session has expired""" | |
| if session_id not in self._timestamps: | |
| return True | |
| age = datetime.now() - self._timestamps[session_id] | |
| return age > self._session_timeout | |
| def _remove_session(self, session_id: str) -> bool: | |
| """Remove a session from storage""" | |
| if session_id in self._store: | |
| del self._store[session_id] | |
| del self._timestamps[session_id] | |
| logger.debug(f"Removed session {session_id}") | |
| return True | |
| return False | |
| def _cleanup_old_sessions(self): | |
| """Remove expired sessions""" | |
| expired = [ | |
| sid for sid in self._store | |
| if self._is_expired(sid) | |
| ] | |
| for sid in expired: | |
| self._remove_session(sid) | |
| # If still at capacity, remove oldest sessions | |
| if len(self._store) >= self._max_sessions: | |
| sorted_sessions = sorted( | |
| self._timestamps.items(), | |
| key=lambda x: x[1] | |
| ) | |
| # Remove oldest 10% | |
| remove_count = max(1, self._max_sessions // 10) | |
| for sid, _ in sorted_sessions[:remove_count]: | |
| self._remove_session(sid) | |
| logger.info(f"Cleanup complete. Active sessions: {len(self._store)}") | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get memory store statistics""" | |
| with self._lock: | |
| return { | |
| "active_sessions": len(self._store), | |
| "max_sessions": self._max_sessions, | |
| "session_timeout_hours": self._session_timeout.total_seconds() / 3600 | |
| } |