""" Shared state module for GitHub Companion Backend. This module provides thread-safe shared state for managing analysis sessions across multiple concurrent users. It uses threading locks to ensure safe access to the shared dictionary in a multi-threaded environment. """ import threading from typing import Dict, Any # Thread-safe lock for accessing analysis_jobs _lock = threading.Lock() # Global dictionary to store analysis job states # Each session_id maps to a dictionary containing: # - status: The current job status # - repo_path: Path to cloned repository # - rag_chain: The LangChain retrieval chain # - vectorstore: ChromaDB vectorstore # - chat_history: List of chat messages # - result: Analysis result data analysis_jobs: Dict[str, Dict[str, Any]] = {} def get_session(session_id: str) -> Dict[str, Any] | None: """Thread-safe getter for a session.""" with _lock: return analysis_jobs.get(session_id) def set_session(session_id: str, data: Dict[str, Any]) -> None: """Thread-safe setter for a session.""" with _lock: analysis_jobs[session_id] = data def update_session(session_id: str, key: str, value: Any) -> None: """Thread-safe update for a specific key in a session.""" with _lock: if session_id in analysis_jobs: analysis_jobs[session_id][key] = value def delete_session(session_id: str) -> bool: """Thread-safe deletion of a session. Returns True if deleted.""" with _lock: if session_id in analysis_jobs: del analysis_jobs[session_id] return True return False def session_exists(session_id: str) -> bool: """Check if a session exists.""" with _lock: return session_id in analysis_jobs