|
|
""" |
|
|
Shared state module for GitHub Companion Backend. |
|
|
|
|
|
This module provides thread-safe shared state for managing analysis sessions |
|
|
across multiple concurrent users. It uses threading locks to ensure safe |
|
|
access to the shared dictionary in a multi-threaded environment. |
|
|
""" |
|
|
|
|
|
import threading |
|
|
from typing import Dict, Any |
|
|
|
|
|
|
|
|
_lock = threading.Lock() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
analysis_jobs: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
|
|
|
def get_session(session_id: str) -> Dict[str, Any] | None: |
|
|
"""Thread-safe getter for a session.""" |
|
|
with _lock: |
|
|
return analysis_jobs.get(session_id) |
|
|
|
|
|
|
|
|
def set_session(session_id: str, data: Dict[str, Any]) -> None: |
|
|
"""Thread-safe setter for a session.""" |
|
|
with _lock: |
|
|
analysis_jobs[session_id] = data |
|
|
|
|
|
|
|
|
def update_session(session_id: str, key: str, value: Any) -> None: |
|
|
"""Thread-safe update for a specific key in a session.""" |
|
|
with _lock: |
|
|
if session_id in analysis_jobs: |
|
|
analysis_jobs[session_id][key] = value |
|
|
|
|
|
|
|
|
def delete_session(session_id: str) -> bool: |
|
|
"""Thread-safe deletion of a session. Returns True if deleted.""" |
|
|
with _lock: |
|
|
if session_id in analysis_jobs: |
|
|
del analysis_jobs[session_id] |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
def session_exists(session_id: str) -> bool: |
|
|
"""Check if a session exists.""" |
|
|
with _lock: |
|
|
return session_id in analysis_jobs |
|
|
|