Spaces:
Sleeping
Sleeping
| # app/runtime/session_store.py | |
| from datetime import datetime, timedelta | |
| from typing import TypedDict | |
| from uuid import uuid4 | |
| import threading | |
| import logging | |
| import gradio as gr | |
| from huggingface_hub import whoami | |
| from config import HF_TOKEN | |
| from infrastructure.hf_dataset_client import SessionManager | |
| from modules.conversation.history_controller import SessionHistoryController | |
| from runtime.request_limit import get_req_count | |
| from ui.i18n.translations import translations | |
| # --------------------------------------------------------------------------- | |
| # Global in-memory store of active sessions, keyed by Hugging Face user ID. | |
| # Each user has: | |
| # - "session_manager": SessionManager instance (handles persistence to HF Hub) | |
| # - "session_history_controllers": dict mapping session_id -> SessionHistoryController | |
| # | |
| # Example structure: | |
| # { | |
| # "hf_user123": { | |
| # "session_manager": SessionManager(...), | |
| # "session_history_controllers": { | |
| # "uuid-session-1": SessionHistoryController(...), | |
| # "uuid-session-2": SessionHistoryController(...) | |
| # } | |
| # }, | |
| # "hf_user456": { | |
| # "session_manager": SessionManager(...), | |
| # "session_history_controllers": { ... } | |
| # } | |
| # } | |
| # | |
| # Concurrency: | |
| # - Since the Spaces environment can handle multiple requests (multi-threading), | |
| # locks are applied to prevent potential race conditions when accessing shared data structures (SESSION_STORES). | |
| # - A global lock (RLock) and per-user locks are used in parallel to enable fine-grained critical section management. | |
| # --------------------------------------------------------------------------- | |
| class StoreDict(TypedDict): | |
| session_manager: SessionManager | |
| session_history_controllers: dict[str, SessionHistoryController] | |
| SESSION_STORES: dict[str, StoreDict] = {} | |
| # Concurrency locks | |
| SESSION_LOCK = threading.RLock() | |
| USER_LOCKS: dict[str, threading.RLock] = {} | |
| def _get_user_lock(hf_user: str) -> threading.RLock: | |
| # Lazy-init per-user lock under global lock to avoid races on lock creation. | |
| with SESSION_LOCK: | |
| lock = USER_LOCKS.get(hf_user) | |
| if lock is None: | |
| lock = threading.RLock() | |
| USER_LOCKS[hf_user] = lock | |
| return lock | |
| def init_user_store(hf_user: str, hf_token: str = HF_TOKEN): | |
| """Initialize user-specific store upon login. | |
| Creates a SessionManager and an empty dict of SessionHistoryControllers.""" | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| if hf_user not in SESSION_STORES: | |
| SESSION_STORES[hf_user] = { | |
| "session_manager": SessionManager(hf_token), | |
| "session_history_controllers": {} | |
| } | |
| return SESSION_STORES[hf_user] | |
| def fetch_user_and_sessions(hf_token: gr.OAuthToken | None, lang="en"): | |
| """Resolve user identity, ensure store exists, hydrate sessions dropdown, and prepare initial status.""" | |
| if hf_token is None: | |
| # No token → guest-like blank state on UI | |
| return gr.update(choices=[], value=None), gr.update(), gr.update(), gr.update() | |
| user_info = whoami(hf_token) | |
| hf_user = user_info["name"] if user_info else "_guest" | |
| store = get_account_store(hf_user) | |
| if store is None: | |
| store = init_user_store(hf_user, hf_token) | |
| else: | |
| store["session_manager"].hf_token = hf_token | |
| mgr = store["session_manager"] | |
| # List sessions from HF Hub (may return [] on first login) | |
| sessions = mgr.get_sessions() or mgr.list_sessions(hf_user) | |
| # Create a fresh session id (no sidebar update yet; title will be generated on first message) | |
| session_update = create_session(hf_user) | |
| # get_req_count uses hf_user to compute remaining quota (guest or logged-in) | |
| req_remains = get_req_count(hf_user) | |
| # status message configuration | |
| status_msg = translations[lang]["status"].format(login_status=hf_user, remains=req_remains) | |
| # Return order must match your UI bindings: (dropdown choices, status label, hidden_user, hidden_session_id) | |
| return ( | |
| gr.update(choices=sessions, value=None), | |
| gr.update(value=status_msg), | |
| gr.update(value=hf_user), | |
| session_update # already gr.update(value=session_id) | |
| ) | |
| def generate_session_id(history_controllers: dict[str, SessionHistoryController]) -> str: | |
| """Generate unique session ID.""" | |
| while True: | |
| session_id = str(uuid4()) | |
| if session_id not in history_controllers: | |
| return session_id | |
| def create_session(hf_user: str): | |
| """Create a new session and return a Gradio update object with the new session_id. | |
| In the UI, when you click the new session button, the chat screen is only initialized, | |
| and the title is created/registered when processing the first message.""" | |
| if not hf_user: | |
| return gr.update(value="") | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = get_account_store(hf_user) | |
| if store is None: | |
| # In case it was not initialized (defensive) | |
| store = init_user_store(hf_user) | |
| history_controllers = store["session_history_controllers"] | |
| session_id = generate_session_id(history_controllers) | |
| history_ctr = SessionHistoryController() | |
| store["session_history_controllers"][session_id] = history_ctr | |
| return gr.update(value=session_id) | |
| def load_session(hf_user: str, session_id: str): | |
| """Session loading: Create a SessionHistoryController and inject data only when needed.""" | |
| if not hf_user or not session_id: | |
| return gr.update(value=[]), gr.update() | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = get_account_store(hf_user) | |
| if store is None: | |
| return gr.update(value=[]), gr.update() | |
| controller = get_session_controller(hf_user, session_id) | |
| if controller is None: | |
| records = store["session_manager"].download_session(hf_user, session_id) | |
| controller = SessionHistoryController(history=records) | |
| store["session_history_controllers"][session_id] = controller | |
| return gr.update(value=controller.get_full_history()), gr.update(value=session_id) | |
| def get_account_store(hf_user: str): | |
| """View session store by user.""" | |
| # Reading shared dict: protect with global lock to be consistent. | |
| with SESSION_LOCK: | |
| return SESSION_STORES.get(hf_user) | |
| def get_session_controller(hf_user: str, session_id: str): | |
| """Session object lookup.""" | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = SESSION_STORES.get(hf_user) | |
| if not store: | |
| return None | |
| return store["session_history_controllers"].get(session_id) | |
| def get_session_manager(hf_user: str): | |
| """Session manager query.""" | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = SESSION_STORES.get(hf_user) | |
| if not store: | |
| return None | |
| return store["session_manager"] | |
| def list_sessions(hf_user: str): | |
| """View user session list.""" | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = SESSION_STORES.get(hf_user) | |
| if not store: | |
| return [] | |
| return store["session_manager"].list_sessions(hf_user) | |
| def finalize_session(hf_user: str, session_id: str): | |
| """Session End: Persist to HF Dataset and delete from memory. | |
| Concurrency safety: Protects session controller access/deletion with per-user locks.""" | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = SESSION_STORES.get(hf_user) | |
| if not store: | |
| return | |
| history_ctr = store["session_history_controllers"].get(session_id) | |
| if not history_ctr: | |
| return | |
| title_raw = history_ctr.get_session_title() | |
| timestamp_dt = history_ctr.get_last_request_time() # Always returns a valid datetime (init at creation) | |
| timestamp = timestamp_dt.strftime("%Y%m%d%H%M%S") | |
| try: | |
| store["session_manager"].push_session( | |
| hf_user, | |
| session_id, | |
| history_ctr.get_full_history(), | |
| title_raw, | |
| timestamp, | |
| ) | |
| # always upload a backup as well | |
| store["session_manager"].push_session( | |
| hf_user, | |
| session_id, | |
| history_ctr.get_full_history(), | |
| title_raw, | |
| timestamp, | |
| backup=True, | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error saving session {session_id} for user {hf_user}: {e}") | |
| # Protected deletion | |
| del store["session_history_controllers"][session_id] | |
| # If no more sessions for this user, clean up the store entry too | |
| if not store["session_history_controllers"]: | |
| # Remove user-specific lock together to avoid stale locks | |
| with SESSION_LOCK: | |
| del SESSION_STORES[hf_user] | |
| # best-effort: remove the user lock | |
| USER_LOCKS.pop(hf_user, None) | |
| def get_expired_sessions(): | |
| """Retrieve expired sessions (inactive for >= 2 hours). | |
| Returns a list of tuples: (hf_user, [expired_session_ids]).""" | |
| now = datetime.now() | |
| results = [] | |
| # Iterate users safely under global lock to snapshot keys | |
| with SESSION_LOCK: | |
| users = list(SESSION_STORES.keys()) | |
| for hf_user in users: | |
| user_lock = _get_user_lock(hf_user) | |
| with user_lock: | |
| store = SESSION_STORES.get(hf_user) | |
| if not store: | |
| continue | |
| expired_sessions = [ | |
| session_id | |
| for session_id, history_ctr in store["session_history_controllers"].items() | |
| # Note: history_ctr.get_last_request_time() is always set at init | |
| if now - history_ctr.get_last_request_time() >= timedelta(hours=2) | |
| ] | |
| if expired_sessions: | |
| results.append((hf_user, expired_sessions)) | |
| return results | |