Spaces:
Runtime error
Runtime error
| """ | |
| Simplified Session Manager (inspired by Udbhav's approach) | |
| Features: | |
| - In-memory session tracking with thread-safe RLock | |
| - Simple filesystem-based storage (one directory per session) | |
| - Persistent session index (JSON) and alias map | |
| - Automatic cleanup of idle sessions (>6 hours) | |
| - Human-readable aliases for sessions | |
| """ | |
| import os | |
| import json | |
| import shutil | |
| import threading | |
| import uuid | |
| import time | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Optional | |
| # Base directory for all sessions (HF persistent storage) | |
| BASE_DATA_DIR = Path("/tmp/outputs/sessions") | |
| # Session index file (persisted to disk) | |
| SESSIONS_INDEX_FILE = BASE_DATA_DIR / "sessions_index.json" | |
| # Alias map file (maps friendly names to session IDs) | |
| ALIAS_MAP_FILE = BASE_DATA_DIR / "alias_map.json" | |
| class SimpleSessionManager: | |
| """ | |
| Thread-safe session manager with persistence and auto-cleanup. | |
| """ | |
| def __init__(self): | |
| self.sessions: Dict[str, dict] = {} | |
| self.alias_map: Dict[str, str] = {} # alias -> session_id | |
| self._lock = threading.RLock() | |
| self._cleanup_thread = None | |
| # Ensure directories exist | |
| BASE_DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| print(f"[SESSION] Base directory: {BASE_DATA_DIR}") | |
| # Load persisted data | |
| self._load_index() | |
| self._load_alias_map() | |
| # Start cleanup thread | |
| self._start_cleanup_thread() | |
| def _load_index(self): | |
| """Load sessions index from disk.""" | |
| if SESSIONS_INDEX_FILE.exists(): | |
| try: | |
| with open(SESSIONS_INDEX_FILE, 'r') as f: | |
| data = json.load(f) | |
| self.sessions = data | |
| print(f"[SESSION] Loaded {len(self.sessions)} sessions from index") | |
| except Exception as e: | |
| print(f"[SESSION] Error loading index: {e}") | |
| def _load_alias_map(self): | |
| """Load alias map from disk.""" | |
| if ALIAS_MAP_FILE.exists(): | |
| try: | |
| with open(ALIAS_MAP_FILE, 'r') as f: | |
| self.alias_map = json.load(f) | |
| print(f"[SESSION] Loaded {len(self.alias_map)} aliases") | |
| except Exception as e: | |
| print(f"[SESSION] Error loading alias map: {e}") | |
| def _save_index(self): | |
| """Save sessions index to disk.""" | |
| try: | |
| SESSIONS_INDEX_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with open(SESSIONS_INDEX_FILE, 'w') as f: | |
| json.dump(self.sessions, f, indent=2, default=str) | |
| except Exception as e: | |
| print(f"[SESSION] Error saving index: {e}") | |
| def _save_alias_map(self): | |
| """Save alias map to disk.""" | |
| try: | |
| ALIAS_MAP_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with open(ALIAS_MAP_FILE, 'w') as f: | |
| json.dump(self.alias_map, f, indent=2) | |
| except Exception as e: | |
| print(f"[SESSION] Error saving alias map: {e}") | |
| def _new_session(self, session_id: Optional[str] = None, alias: Optional[str] = None) -> Dict: | |
| """Create a new session directory structure.""" | |
| if session_id is None: | |
| session_id = str(uuid.uuid4()) | |
| session_root = BASE_DATA_DIR / f"user_{session_id}" | |
| # Create subdirectories | |
| subdirs = { | |
| "data": session_root / "data", | |
| "cache": session_root / "cache", | |
| "results": session_root / "results", | |
| } | |
| for dir_path in subdirs.values(): | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| session_info = { | |
| "session_id": session_id, | |
| "root": str(session_root), | |
| "subdirs": {k: str(v) for k, v in subdirs.items()}, | |
| "created_at": datetime.now().isoformat(), | |
| "last_accessed": datetime.now().isoformat(), | |
| "alias": alias, | |
| } | |
| print(f"[SESSION] Created new session: {session_id} (alias: {alias})") | |
| return session_info | |
| def create_session(self, alias: Optional[str] = None) -> Dict: | |
| """ | |
| Create a new session with optional alias. | |
| Args: | |
| alias: Human-readable name for the session | |
| Returns: | |
| Session info dict | |
| """ | |
| with self._lock: | |
| # If alias exists, return existing session | |
| if alias and alias in self.alias_map: | |
| session_id = self.alias_map[alias] | |
| if session_id in self.sessions: | |
| print(f"[SESSION] Session '{alias}' already exists: {session_id}") | |
| self.sessions[session_id]["last_accessed"] = datetime.now().isoformat() | |
| self._save_index() | |
| return self.sessions[session_id] | |
| # Create new session | |
| session_id = str(uuid.uuid4()) | |
| session_info = self._new_session(session_id, alias) | |
| self.sessions[session_id] = session_info | |
| if alias: | |
| self.alias_map[alias] = session_id | |
| self._save_alias_map() | |
| self._save_index() | |
| return session_info | |
| def get_session(self, session_id: Optional[str] = None, alias: Optional[str] = None) -> Optional[Dict]: | |
| """ | |
| Get session by ID or alias. | |
| Args: | |
| session_id: Session UUID | |
| alias: Human-readable session name | |
| Returns: | |
| Session info dict or None | |
| """ | |
| with self._lock: | |
| # Lookup by alias first | |
| if alias: | |
| session_id = self.alias_map.get(alias) | |
| if not session_id: | |
| return None | |
| session_info = self.sessions.get(session_id) | |
| if session_info: | |
| session_info["last_accessed"] = datetime.now().isoformat() | |
| self._save_index() | |
| return session_info | |
| def list_sessions(self) -> Dict[str, Dict]: | |
| """List all active sessions.""" | |
| with self._lock: | |
| return {alias: self.sessions[sid] for alias, sid in self.alias_map.items() if sid in self.sessions} | |
| def delete_session(self, session_id: str) -> bool: | |
| """Delete a session and its files.""" | |
| with self._lock: | |
| session_info = self.sessions.pop(session_id, None) | |
| if not session_info: | |
| return False | |
| # Remove alias mapping | |
| for alias, sid in list(self.alias_map.items()): | |
| if sid == session_id: | |
| self.alias_map.pop(alias) | |
| # Remove directory | |
| session_root = Path(session_info["root"]) | |
| try: | |
| if session_root.exists(): | |
| shutil.rmtree(session_root) | |
| print(f"[SESSION] Deleted session directory: {session_root}") | |
| except Exception as e: | |
| print(f"[SESSION] Error deleting {session_root}: {e}") | |
| self._save_index() | |
| self._save_alias_map() | |
| return True | |
| def _cleanup_old(self, max_age_hours: int = 6): | |
| """Remove sessions idle longer than max_age_hours.""" | |
| now = datetime.now() | |
| to_remove = [] | |
| with self._lock: | |
| for session_id, session_info in self.sessions.items(): | |
| last_accessed = datetime.fromisoformat(session_info["last_accessed"]) | |
| if now - last_accessed > timedelta(hours=max_age_hours): | |
| to_remove.append(session_id) | |
| for session_id in to_remove: | |
| print(f"[SESSION] Cleaning up idle session: {session_id}") | |
| self.delete_session(session_id) | |
| def _start_cleanup_thread(self): | |
| """Start background cleanup thread.""" | |
| if self._cleanup_thread and self._cleanup_thread.is_alive(): | |
| return | |
| def loop(): | |
| while True: | |
| try: | |
| # Run cleanup every hour, remove sessions idle > 6 hours | |
| time.sleep(3600) | |
| self._cleanup_old(max_age_hours=6) | |
| except Exception as e: | |
| print(f"[SESSION] Cleanup error: {e}") | |
| self._cleanup_thread = threading.Thread(target=loop, daemon=True) | |
| self._cleanup_thread.start() | |
| print("[SESSION] Cleanup thread started") | |
| # Global session manager instance | |
| SESSION_MANAGER = SimpleSessionManager() | |
| def get_or_create_session(alias: Optional[str] = None) -> Dict: | |
| """Convenience function to get or create a session.""" | |
| if alias: | |
| session = SESSION_MANAGER.get_session(alias=alias) | |
| if session: | |
| return session | |
| return SESSION_MANAGER.create_session(alias=alias) | |
| def get_session(session_id: str) -> Optional[Dict]: | |
| """Get session by ID.""" | |
| return SESSION_MANAGER.get_session(session_id=session_id) | |
| def get_session_by_alias(alias: str) -> Optional[Dict]: | |
| """Get session by alias.""" | |
| return SESSION_MANAGER.get_session(alias=alias) | |
| def list_all_sessions() -> Dict[str, Dict]: | |
| """List all sessions.""" | |
| return SESSION_MANAGER.list_sessions() | |