pragmatic-agent / runtime /session_store.py
m97j's picture
First codes update-edit
ed5a1e9
# app/runtime/session_store.py
from datetime import datetime, timedelta
from typing import TypedDict
from uuid import uuid4
import threading
import logging
import gradio as gr
from huggingface_hub import whoami
from config import HF_TOKEN
from infrastructure.hf_dataset_client import SessionManager
from modules.conversation.history_controller import SessionHistoryController
from runtime.request_limit import get_req_count
from ui.i18n.translations import translations
# ---------------------------------------------------------------------------
# Global in-memory store of active sessions, keyed by Hugging Face user ID.
# Each user has:
# - "session_manager": SessionManager instance (handles persistence to HF Hub)
# - "session_history_controllers": dict mapping session_id -> SessionHistoryController
#
# Example structure:
# {
# "hf_user123": {
# "session_manager": SessionManager(...),
# "session_history_controllers": {
# "uuid-session-1": SessionHistoryController(...),
# "uuid-session-2": SessionHistoryController(...)
# }
# },
# "hf_user456": {
# "session_manager": SessionManager(...),
# "session_history_controllers": { ... }
# }
# }
#
# Concurrency:
# ​​- Since the Spaces environment can handle multiple requests (multi-threading),
# locks are applied to prevent potential race conditions when accessing shared data structures (SESSION_STORES).
# - A global lock (RLock) and per-user locks are used in parallel to enable fine-grained critical section management.
# ---------------------------------------------------------------------------
class StoreDict(TypedDict):
session_manager: SessionManager
session_history_controllers: dict[str, SessionHistoryController]
SESSION_STORES: dict[str, StoreDict] = {}
# Concurrency locks
SESSION_LOCK = threading.RLock()
USER_LOCKS: dict[str, threading.RLock] = {}
def _get_user_lock(hf_user: str) -> threading.RLock:
# Lazy-init per-user lock under global lock to avoid races on lock creation.
with SESSION_LOCK:
lock = USER_LOCKS.get(hf_user)
if lock is None:
lock = threading.RLock()
USER_LOCKS[hf_user] = lock
return lock
def init_user_store(hf_user: str, hf_token: str = HF_TOKEN):
"""Initialize user-specific store upon login.
Creates a SessionManager and an empty dict of SessionHistoryControllers."""
user_lock = _get_user_lock(hf_user)
with user_lock:
if hf_user not in SESSION_STORES:
SESSION_STORES[hf_user] = {
"session_manager": SessionManager(hf_token),
"session_history_controllers": {}
}
return SESSION_STORES[hf_user]
def fetch_user_and_sessions(hf_token: gr.OAuthToken | None, lang="en"):
"""Resolve user identity, ensure store exists, hydrate sessions dropdown, and prepare initial status."""
if hf_token is None:
# No token → guest-like blank state on UI
return gr.update(choices=[], value=None), gr.update(), gr.update(), gr.update()
user_info = whoami(hf_token)
hf_user = user_info["name"] if user_info else "_guest"
store = get_account_store(hf_user)
if store is None:
store = init_user_store(hf_user, hf_token)
else:
store["session_manager"].hf_token = hf_token
mgr = store["session_manager"]
# List sessions from HF Hub (may return [] on first login)
sessions = mgr.get_sessions() or mgr.list_sessions(hf_user)
# Create a fresh session id (no sidebar update yet; title will be generated on first message)
session_update = create_session(hf_user)
# get_req_count uses hf_user to compute remaining quota (guest or logged-in)
req_remains = get_req_count(hf_user)
# status message configuration
status_msg = translations[lang]["status"].format(login_status=hf_user, remains=req_remains)
# Return order must match your UI bindings: (dropdown choices, status label, hidden_user, hidden_session_id)
return (
gr.update(choices=sessions, value=None),
gr.update(value=status_msg),
gr.update(value=hf_user),
session_update # already gr.update(value=session_id)
)
def generate_session_id(history_controllers: dict[str, SessionHistoryController]) -> str:
"""Generate unique session ID."""
while True:
session_id = str(uuid4())
if session_id not in history_controllers:
return session_id
def create_session(hf_user: str):
"""Create a new session and return a Gradio update object with the new session_id.
In the UI, when you click the new session button, the chat screen is only initialized,
and the title is created/registered when processing the first message."""
if not hf_user:
return gr.update(value="")
user_lock = _get_user_lock(hf_user)
with user_lock:
store = get_account_store(hf_user)
if store is None:
# In case it was not initialized (defensive)
store = init_user_store(hf_user)
history_controllers = store["session_history_controllers"]
session_id = generate_session_id(history_controllers)
history_ctr = SessionHistoryController()
store["session_history_controllers"][session_id] = history_ctr
return gr.update(value=session_id)
def load_session(hf_user: str, session_id: str):
"""Session loading: Create a SessionHistoryController and inject data only when needed."""
if not hf_user or not session_id:
return gr.update(value=[]), gr.update()
user_lock = _get_user_lock(hf_user)
with user_lock:
store = get_account_store(hf_user)
if store is None:
return gr.update(value=[]), gr.update()
controller = get_session_controller(hf_user, session_id)
if controller is None:
records = store["session_manager"].download_session(hf_user, session_id)
controller = SessionHistoryController(history=records)
store["session_history_controllers"][session_id] = controller
return gr.update(value=controller.get_full_history()), gr.update(value=session_id)
def get_account_store(hf_user: str):
"""View session store by user."""
# Reading shared dict: protect with global lock to be consistent.
with SESSION_LOCK:
return SESSION_STORES.get(hf_user)
def get_session_controller(hf_user: str, session_id: str):
"""Session object lookup."""
user_lock = _get_user_lock(hf_user)
with user_lock:
store = SESSION_STORES.get(hf_user)
if not store:
return None
return store["session_history_controllers"].get(session_id)
def get_session_manager(hf_user: str):
"""Session manager query."""
user_lock = _get_user_lock(hf_user)
with user_lock:
store = SESSION_STORES.get(hf_user)
if not store:
return None
return store["session_manager"]
def list_sessions(hf_user: str):
"""View user session list."""
user_lock = _get_user_lock(hf_user)
with user_lock:
store = SESSION_STORES.get(hf_user)
if not store:
return []
return store["session_manager"].list_sessions(hf_user)
def finalize_session(hf_user: str, session_id: str):
"""Session End: Persist to HF Dataset and delete from memory.
Concurrency safety: Protects session controller access/deletion with per-user locks."""
user_lock = _get_user_lock(hf_user)
with user_lock:
store = SESSION_STORES.get(hf_user)
if not store:
return
history_ctr = store["session_history_controllers"].get(session_id)
if not history_ctr:
return
title_raw = history_ctr.get_session_title()
timestamp_dt = history_ctr.get_last_request_time() # Always returns a valid datetime (init at creation)
timestamp = timestamp_dt.strftime("%Y%m%d%H%M%S")
try:
store["session_manager"].push_session(
hf_user,
session_id,
history_ctr.get_full_history(),
title_raw,
timestamp,
)
# always upload a backup as well
store["session_manager"].push_session(
hf_user,
session_id,
history_ctr.get_full_history(),
title_raw,
timestamp,
backup=True,
)
except Exception as e:
logging.error(f"Error saving session {session_id} for user {hf_user}: {e}")
# Protected deletion
del store["session_history_controllers"][session_id]
# If no more sessions for this user, clean up the store entry too
if not store["session_history_controllers"]:
# Remove user-specific lock together to avoid stale locks
with SESSION_LOCK:
del SESSION_STORES[hf_user]
# best-effort: remove the user lock
USER_LOCKS.pop(hf_user, None)
def get_expired_sessions():
"""Retrieve expired sessions (inactive for >= 2 hours).
Returns a list of tuples: (hf_user, [expired_session_ids])."""
now = datetime.now()
results = []
# Iterate users safely under global lock to snapshot keys
with SESSION_LOCK:
users = list(SESSION_STORES.keys())
for hf_user in users:
user_lock = _get_user_lock(hf_user)
with user_lock:
store = SESSION_STORES.get(hf_user)
if not store:
continue
expired_sessions = [
session_id
for session_id, history_ctr in store["session_history_controllers"].items()
# Note: history_ctr.get_last_request_time() is always set at init
if now - history_ctr.get_last_request_time() >= timedelta(hours=2)
]
if expired_sessions:
results.append((hf_user, expired_sessions))
return results