Spaces:
Running
Running
Pulastya B commited on
Commit Β·
2f3df85
1
Parent(s): 187c5e0
CRITICAL: Fixed race conditions, session cleanup, SSE leaks, and added localStorage persistence
Browse files- Bug #1: Added thread-local storage + SessionState wrapper to prevent race conditions while keeping heavy components shared
- Bug #3: Implemented TTL-based session cleanup (60min timeout) with LRU eviction
- Bug #4: Added localStorage persistence for chat history - survives page refresh
- Bug #5: Fixed SSE connection leaks with proper cleanup flags and 50ms delay
- Bug #6: Sessions now expire after 60 minutes of inactivity
- Increased session cache from 10 to 50 for better scalability
- Added request count tracking per session for monitoring
- FRRONTEEEND/components/ChatInterface.tsx +157 -81
- src/api/app.py +84 -17
FRRONTEEEND/components/ChatInterface.tsx
CHANGED
|
@@ -39,14 +39,61 @@ const generateLocalSessionId = () => `local_${Date.now()}_${Math.random().toStri
|
|
| 39 |
// Initial session ID - generated once when module loads
|
| 40 |
const INITIAL_SESSION_ID = generateLocalSessionId();
|
| 41 |
|
| 42 |
-
|
| 43 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 44 |
id: INITIAL_SESSION_ID,
|
| 45 |
title: 'New Chat',
|
| 46 |
messages: [],
|
| 47 |
updatedAt: new Date(),
|
| 48 |
-
}]
|
| 49 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 50 |
const [input, setInput] = useState('');
|
| 51 |
const [isTyping, setIsTyping] = useState(false);
|
| 52 |
const [currentStep, setCurrentStep] = useState<string>('');
|
|
@@ -61,6 +108,20 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
|
|
| 61 |
|
| 62 |
const activeSession = sessions.find(s => s.id === activeSessionId) || sessions[0];
|
| 63 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 64 |
useEffect(() => {
|
| 65 |
if (scrollRef.current) {
|
| 66 |
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
|
|
@@ -80,6 +141,7 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
|
|
| 80 |
|
| 81 |
// Track which session the current SSE connection is for
|
| 82 |
const sseSessionRef = useRef<string | null>(null);
|
|
|
|
| 83 |
|
| 84 |
// Connect to SSE when we receive a valid backend UUID
|
| 85 |
useEffect(() => {
|
|
@@ -88,106 +150,120 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
|
|
| 88 |
|
| 89 |
if (!isBackendUUID) {
|
| 90 |
// No backend session yet - close any existing connection
|
| 91 |
-
if (eventSourceRef.current) {
|
| 92 |
console.log('π Closing SSE - no backend session');
|
|
|
|
| 93 |
eventSourceRef.current.close();
|
| 94 |
eventSourceRef.current = null;
|
| 95 |
sseSessionRef.current = null;
|
|
|
|
| 96 |
}
|
| 97 |
return;
|
| 98 |
}
|
| 99 |
|
| 100 |
-
// Check if we're
|
| 101 |
-
if (sseSessionRef.current
|
| 102 |
-
//
|
| 103 |
-
if (eventSourceRef.current) {
|
| 104 |
-
console.log(
|
| 105 |
-
|
| 106 |
-
eventSourceRef.current = null;
|
| 107 |
}
|
| 108 |
-
} else if (eventSourceRef.current && eventSourceRef.current.readyState !== 2) {
|
| 109 |
-
// Same session and connection is still open - reuse it
|
| 110 |
-
console.log('β»οΈ Reusing existing SSE connection');
|
| 111 |
-
return;
|
| 112 |
}
|
| 113 |
|
| 114 |
-
//
|
| 115 |
-
|
| 116 |
-
|
| 117 |
-
|
| 118 |
-
|
| 119 |
-
|
| 120 |
-
|
| 121 |
-
|
| 122 |
-
|
|
|
|
| 123 |
|
| 124 |
-
//
|
| 125 |
-
|
| 126 |
-
|
| 127 |
-
|
| 128 |
-
|
| 129 |
|
| 130 |
-
|
| 131 |
-
|
| 132 |
-
|
| 133 |
-
|
| 134 |
-
|
| 135 |
-
|
| 136 |
-
|
| 137 |
-
|
| 138 |
-
|
| 139 |
-
|
| 140 |
-
|
| 141 |
-
|
| 142 |
-
|
| 143 |
-
|
| 144 |
-
} else if (data.type === 'token_update') {
|
| 145 |
-
// Optional: Display token budget updates
|
| 146 |
-
console.log('π° Token update:', data.message);
|
| 147 |
-
} else if (data.type === 'analysis_complete') {
|
| 148 |
-
console.log('β
Analysis completed', data.result);
|
| 149 |
-
setIsTyping(false);
|
| 150 |
-
|
| 151 |
-
// Create a unique key based on actual workflow content to prevent duplicates
|
| 152 |
-
// Use the last tool executed + summary hash for uniqueness
|
| 153 |
-
const lastTool = data.result?.workflow_history?.[data.result.workflow_history.length - 1]?.tool || 'unknown';
|
| 154 |
-
const summarySnippet = (data.result?.summary || '').substring(0, 50);
|
| 155 |
-
const resultKey = `${activeSessionId}-${lastTool}-${summarySnippet}`;
|
| 156 |
-
|
| 157 |
-
// Only process if we haven't seen this exact result before
|
| 158 |
-
if (!processedAnalysisRef.current.has(resultKey)) {
|
| 159 |
-
console.log('π New analysis result, processing...', resultKey);
|
| 160 |
-
processedAnalysisRef.current.add(resultKey);
|
| 161 |
|
| 162 |
-
//
|
| 163 |
-
if (data.
|
| 164 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 165 |
}
|
| 166 |
-
}
|
| 167 |
-
console.
|
| 168 |
}
|
| 169 |
-
}
|
| 170 |
-
} catch (err) {
|
| 171 |
-
console.error('β Error parsing SSE event:', err, e.data);
|
| 172 |
-
}
|
| 173 |
-
};
|
| 174 |
|
| 175 |
-
|
| 176 |
-
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
| 180 |
-
};
|
| 181 |
-
|
| 182 |
-
eventSourceRef.current = eventSource;
|
| 183 |
|
| 184 |
// Cleanup on unmount or session change
|
| 185 |
return () => {
|
| 186 |
-
|
| 187 |
-
|
|
|
|
|
|
|
| 188 |
eventSourceRef.current.close();
|
| 189 |
eventSourceRef.current = null;
|
| 190 |
sseSessionRef.current = null;
|
|
|
|
| 191 |
}
|
| 192 |
};
|
| 193 |
}, [activeSessionId]);
|
|
|
|
| 39 |
// Initial session ID - generated once when module loads
|
| 40 |
const INITIAL_SESSION_ID = generateLocalSessionId();
|
| 41 |
|
| 42 |
+
// LocalStorage key for persisting sessions
|
| 43 |
+
const SESSIONS_STORAGE_KEY = 'ds_agent_chat_sessions';
|
| 44 |
+
const ACTIVE_SESSION_STORAGE_KEY = 'ds_agent_active_session';
|
| 45 |
+
|
| 46 |
+
// Load sessions from localStorage
|
| 47 |
+
const loadSessionsFromStorage = (): ChatSession[] => {
|
| 48 |
+
try {
|
| 49 |
+
const stored = localStorage.getItem(SESSIONS_STORAGE_KEY);
|
| 50 |
+
if (stored) {
|
| 51 |
+
const parsed = JSON.parse(stored);
|
| 52 |
+
// Convert ISO date strings back to Date objects
|
| 53 |
+
return parsed.map((s: any) => ({
|
| 54 |
+
...s,
|
| 55 |
+
updatedAt: new Date(s.updatedAt),
|
| 56 |
+
messages: s.messages.map((m: any) => ({
|
| 57 |
+
...m,
|
| 58 |
+
timestamp: new Date(m.timestamp)
|
| 59 |
+
}))
|
| 60 |
+
}));
|
| 61 |
+
}
|
| 62 |
+
} catch (err) {
|
| 63 |
+
console.error('Failed to load sessions from localStorage:', err);
|
| 64 |
+
}
|
| 65 |
+
// Return default session if loading fails
|
| 66 |
+
return [{
|
| 67 |
id: INITIAL_SESSION_ID,
|
| 68 |
title: 'New Chat',
|
| 69 |
messages: [],
|
| 70 |
updatedAt: new Date(),
|
| 71 |
+
}];
|
| 72 |
+
};
|
| 73 |
+
|
| 74 |
+
// Save sessions to localStorage
|
| 75 |
+
const saveSessionsToStorage = (sessions: ChatSession[]) => {
|
| 76 |
+
try {
|
| 77 |
+
localStorage.setItem(SESSIONS_STORAGE_KEY, JSON.stringify(sessions));
|
| 78 |
+
} catch (err) {
|
| 79 |
+
console.error('Failed to save sessions to localStorage:', err);
|
| 80 |
+
}
|
| 81 |
+
};
|
| 82 |
+
|
| 83 |
+
export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
|
| 84 |
+
const [sessions, setSessions] = useState<ChatSession[]>(loadSessionsFromStorage);
|
| 85 |
+
const [activeSessionId, setActiveSessionId] = useState<string>(() => {
|
| 86 |
+
// Try to restore last active session
|
| 87 |
+
try {
|
| 88 |
+
const stored = localStorage.getItem(ACTIVE_SESSION_STORAGE_KEY);
|
| 89 |
+
if (stored && sessions.some(s => s.id === stored)) {
|
| 90 |
+
return stored;
|
| 91 |
+
}
|
| 92 |
+
} catch (err) {
|
| 93 |
+
console.error('Failed to load active session:', err);
|
| 94 |
+
}
|
| 95 |
+
return sessions[0]?.id || INITIAL_SESSION_ID;
|
| 96 |
+
});
|
| 97 |
const [input, setInput] = useState('');
|
| 98 |
const [isTyping, setIsTyping] = useState(false);
|
| 99 |
const [currentStep, setCurrentStep] = useState<string>('');
|
|
|
|
| 108 |
|
| 109 |
const activeSession = sessions.find(s => s.id === activeSessionId) || sessions[0];
|
| 110 |
|
| 111 |
+
// Persist sessions to localStorage whenever they change
|
| 112 |
+
useEffect(() => {
|
| 113 |
+
saveSessionsToStorage(sessions);
|
| 114 |
+
}, [sessions]);
|
| 115 |
+
|
| 116 |
+
// Persist active session ID
|
| 117 |
+
useEffect(() => {
|
| 118 |
+
try {
|
| 119 |
+
localStorage.setItem(ACTIVE_SESSION_STORAGE_KEY, activeSessionId);
|
| 120 |
+
} catch (err) {
|
| 121 |
+
console.error('Failed to save active session:', err);
|
| 122 |
+
}
|
| 123 |
+
}, [activeSessionId]);
|
| 124 |
+
|
| 125 |
useEffect(() => {
|
| 126 |
if (scrollRef.current) {
|
| 127 |
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
|
|
|
|
| 141 |
|
| 142 |
// Track which session the current SSE connection is for
|
| 143 |
const sseSessionRef = useRef<string | null>(null);
|
| 144 |
+
const isCleaningUpRef = useRef<boolean>(false); // Prevent race conditions during cleanup
|
| 145 |
|
| 146 |
// Connect to SSE when we receive a valid backend UUID
|
| 147 |
useEffect(() => {
|
|
|
|
| 150 |
|
| 151 |
if (!isBackendUUID) {
|
| 152 |
// No backend session yet - close any existing connection
|
| 153 |
+
if (eventSourceRef.current && !isCleaningUpRef.current) {
|
| 154 |
console.log('π Closing SSE - no backend session');
|
| 155 |
+
isCleaningUpRef.current = true;
|
| 156 |
eventSourceRef.current.close();
|
| 157 |
eventSourceRef.current = null;
|
| 158 |
sseSessionRef.current = null;
|
| 159 |
+
isCleaningUpRef.current = false;
|
| 160 |
}
|
| 161 |
return;
|
| 162 |
}
|
| 163 |
|
| 164 |
+
// Check if we're already connected to the correct session
|
| 165 |
+
if (sseSessionRef.current === activeSessionId) {
|
| 166 |
+
// Same session - check if connection is still alive
|
| 167 |
+
if (eventSourceRef.current && eventSourceRef.current.readyState !== 2) {
|
| 168 |
+
console.log('β»οΈ Reusing existing SSE connection for same session');
|
| 169 |
+
return;
|
|
|
|
| 170 |
}
|
|
|
|
|
|
|
|
|
|
|
|
|
| 171 |
}
|
| 172 |
|
| 173 |
+
// Different session or connection is closed - need new connection
|
| 174 |
+
// First, close any existing connection
|
| 175 |
+
if (eventSourceRef.current && !isCleaningUpRef.current) {
|
| 176 |
+
const oldSession = sseSessionRef.current?.slice(0, 8) || 'unknown';
|
| 177 |
+
console.log(`π Closing SSE for ${oldSession}... before switching to ${activeSessionId.slice(0, 8)}...`);
|
| 178 |
+
isCleaningUpRef.current = true;
|
| 179 |
+
eventSourceRef.current.close();
|
| 180 |
+
eventSourceRef.current = null;
|
| 181 |
+
isCleaningUpRef.current = false;
|
| 182 |
+
}
|
| 183 |
|
| 184 |
+
// Small delay to ensure old connection is fully closed
|
| 185 |
+
const timeoutId = setTimeout(() => {
|
| 186 |
+
// Double-check we're still on the same session (might have switched again)
|
| 187 |
+
if (activeSessionId !== sseSessionRef.current) {
|
| 188 |
+
console.log(`π Opening new SSE connection to session: ${activeSessionId.slice(0, 8)}...`);
|
| 189 |
|
| 190 |
+
const API_URL = window.location.origin;
|
| 191 |
+
const eventSource = new EventSource(`${API_URL}/api/progress/stream/${activeSessionId}`);
|
| 192 |
+
sseSessionRef.current = activeSessionId;
|
| 193 |
+
eventSourceRef.current = eventSource;
|
| 194 |
+
|
| 195 |
+
eventSource.onopen = () => {
|
| 196 |
+
console.log('β
SSE connection established');
|
| 197 |
+
};
|
| 198 |
+
|
| 199 |
+
// Handle all incoming messages
|
| 200 |
+
eventSource.onmessage = (e) => {
|
| 201 |
+
console.log('π¨ SSE received:', e.data);
|
| 202 |
+
try {
|
| 203 |
+
const data = JSON.parse(e.data);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 204 |
|
| 205 |
+
// Handle different event types
|
| 206 |
+
if (data.type === 'connected') {
|
| 207 |
+
console.log('π Connected to progress stream');
|
| 208 |
+
} else if (data.type === 'agent_assigned') {
|
| 209 |
+
// π€ Multi-Agent: Display which specialist agent is handling the task
|
| 210 |
+
const agentMessage = `${data.emoji} **${data.agent}** assigned\n_${data.description}_`;
|
| 211 |
+
setCurrentStep(agentMessage);
|
| 212 |
+
console.log(`π€ Agent assigned: ${data.agent}`);
|
| 213 |
+
} else if (data.type === 'tool_executing') {
|
| 214 |
+
setCurrentStep(data.message || `π§ Executing: ${data.tool}`);
|
| 215 |
+
} else if (data.type === 'tool_completed') {
|
| 216 |
+
setCurrentStep(data.message || `β Completed: ${data.tool}`);
|
| 217 |
+
} else if (data.type === 'tool_failed') {
|
| 218 |
+
setCurrentStep(data.message || `β Failed: ${data.tool}`);
|
| 219 |
+
} else if (data.type === 'token_update') {
|
| 220 |
+
// Optional: Display token budget updates
|
| 221 |
+
console.log('π° Token update:', data.message);
|
| 222 |
+
} else if (data.type === 'analysis_complete') {
|
| 223 |
+
console.log('β
Analysis completed', data.result);
|
| 224 |
+
setIsTyping(false);
|
| 225 |
+
|
| 226 |
+
// Create a unique key based on actual workflow content to prevent duplicates
|
| 227 |
+
// Use the last tool executed + summary hash for uniqueness
|
| 228 |
+
const lastTool = data.result?.workflow_history?.[data.result.workflow_history.length - 1]?.tool || 'unknown';
|
| 229 |
+
const summarySnippet = (data.result?.summary || '').substring(0, 50);
|
| 230 |
+
const resultKey = `${activeSessionId}-${lastTool}-${summarySnippet}`;
|
| 231 |
+
|
| 232 |
+
// Only process if we haven't seen this exact result before
|
| 233 |
+
if (!processedAnalysisRef.current.has(resultKey)) {
|
| 234 |
+
console.log('π New analysis result, processing...', resultKey);
|
| 235 |
+
processedAnalysisRef.current.add(resultKey);
|
| 236 |
+
|
| 237 |
+
// Process the final result with the current session ID
|
| 238 |
+
if (data.result) {
|
| 239 |
+
processAnalysisResult(data.result, activeSessionId);
|
| 240 |
+
}
|
| 241 |
+
} else {
|
| 242 |
+
console.log('βοΈ Skipping duplicate analysis result', resultKey);
|
| 243 |
+
}
|
| 244 |
}
|
| 245 |
+
} catch (err) {
|
| 246 |
+
console.error('β Error parsing SSE event:', err, e.data);
|
| 247 |
}
|
| 248 |
+
};
|
|
|
|
|
|
|
|
|
|
|
|
|
| 249 |
|
| 250 |
+
// Handle errors - DON'T immediately close, just log
|
| 251 |
+
eventSource.onerror = (err) => {
|
| 252 |
+
console.error('β SSE connection error/closed:', err);
|
| 253 |
+
};
|
| 254 |
+
}
|
| 255 |
+
}, 50); // 50ms delay to ensure old connection closes
|
|
|
|
|
|
|
| 256 |
|
| 257 |
// Cleanup on unmount or session change
|
| 258 |
return () => {
|
| 259 |
+
clearTimeout(timeoutId); // Clear timeout if component unmounts
|
| 260 |
+
if (eventSourceRef.current && !isCleaningUpRef.current) {
|
| 261 |
+
console.log('π§Ή Cleaning up SSE connection on unmount/session change');
|
| 262 |
+
isCleaningUpRef.current = true;
|
| 263 |
eventSourceRef.current.close();
|
| 264 |
eventSourceRef.current = null;
|
| 265 |
sseSessionRef.current = null;
|
| 266 |
+
isCleaningUpRef.current = false;
|
| 267 |
}
|
| 268 |
};
|
| 269 |
}, [activeSessionId]);
|
src/api/app.py
CHANGED
|
@@ -151,14 +151,30 @@ class ProgressEventManager:
|
|
| 151 |
# π₯ MULTI-USER SUPPORT: Session state isolation
|
| 152 |
# Heavy components (SBERT, tools, LLM client) are shared via global 'agent'
|
| 153 |
# Only session memory is isolated per user for fast initialization
|
| 154 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 155 |
agent_cache_lock = asyncio.Lock()
|
| 156 |
-
|
|
|
|
| 157 |
logger.info("π₯ Multi-user session isolation initialized (fast mode)")
|
| 158 |
|
| 159 |
# Global agent - Heavy components loaded ONCE at startup
|
| 160 |
# SBERT model, tool functions, LLM client are shared across all users
|
|
|
|
| 161 |
agent: Optional[DataScienceCopilot] = None
|
|
|
|
| 162 |
agent = None
|
| 163 |
|
| 164 |
# Session state isolation (lightweight - just session memory)
|
|
@@ -169,11 +185,13 @@ async def get_agent_for_session(session_id: str) -> DataScienceCopilot:
|
|
| 169 |
"""
|
| 170 |
Get agent with isolated session state.
|
| 171 |
|
| 172 |
-
OPTIMIZATION:
|
| 173 |
-
|
| 174 |
-
Heavy components (SBERT, tools, LLM client) are shared.
|
| 175 |
This reduces per-user initialization from 20s to <1s.
|
| 176 |
|
|
|
|
|
|
|
|
|
|
| 177 |
Args:
|
| 178 |
session_id: Unique session identifier
|
| 179 |
|
|
@@ -193,10 +211,25 @@ async def get_agent_for_session(session_id: str) -> DataScienceCopilot:
|
|
| 193 |
use_compact_prompts=False
|
| 194 |
)
|
| 195 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 196 |
# Check if we have cached session memory for this session
|
| 197 |
if session_id in session_states:
|
| 198 |
-
|
| 199 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 200 |
agent.http_session_key = session_id
|
| 201 |
return agent
|
| 202 |
|
|
@@ -206,23 +239,56 @@ async def get_agent_for_session(session_id: str) -> DataScienceCopilot:
|
|
| 206 |
# Create isolated session memory for this user
|
| 207 |
new_session = SessionMemory(session_id=session_id)
|
| 208 |
|
| 209 |
-
# Cache
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
|
| 213 |
-
|
| 214 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 215 |
|
| 216 |
-
|
|
|
|
|
|
|
| 217 |
|
| 218 |
-
# Set session on shared agent
|
| 219 |
agent.session = new_session
|
| 220 |
agent.http_session_key = session_id
|
| 221 |
|
| 222 |
-
logger.info(f"β
Session created for {session_id[:8]} (cache: {len(session_states)}/{
|
| 223 |
|
| 224 |
return agent
|
| 225 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 226 |
# π REQUEST QUEUING: Global lock to prevent concurrent workflows
|
| 227 |
# This ensures only one analysis runs at a time, preventing:
|
| 228 |
# - Race conditions on file writes
|
|
@@ -483,7 +549,8 @@ async def run_analysis_async(
|
|
| 483 |
async with agent_cache_lock:
|
| 484 |
# Check session_states cache for this specific session_id
|
| 485 |
if session_id in session_states:
|
| 486 |
-
|
|
|
|
| 487 |
if hasattr(cached_session, 'last_dataset') and cached_session.last_dataset:
|
| 488 |
has_dataset = True
|
| 489 |
logger.info(f"[ASYNC] Follow-up query for session {session_id[:8]}... - using cached dataset")
|
|
|
|
| 151 |
# π₯ MULTI-USER SUPPORT: Session state isolation
|
| 152 |
# Heavy components (SBERT, tools, LLM client) are shared via global 'agent'
|
| 153 |
# Only session memory is isolated per user for fast initialization
|
| 154 |
+
|
| 155 |
+
from dataclasses import dataclass
|
| 156 |
+
from datetime import datetime, timedelta
|
| 157 |
+
import threading
|
| 158 |
+
|
| 159 |
+
@dataclass
|
| 160 |
+
class SessionState:
|
| 161 |
+
"""Wrapper for session with metadata for cleanup"""
|
| 162 |
+
session: Any
|
| 163 |
+
created_at: datetime
|
| 164 |
+
last_accessed: datetime
|
| 165 |
+
request_count: int = 0
|
| 166 |
+
|
| 167 |
+
session_states: Dict[str, SessionState] = {} # session_id -> SessionState
|
| 168 |
agent_cache_lock = asyncio.Lock()
|
| 169 |
+
MAX_CACHED_SESSIONS = 50 # Increased limit for scale
|
| 170 |
+
SESSION_TTL_MINUTES = 60 # Sessions expire after 1 hour of inactivity
|
| 171 |
logger.info("π₯ Multi-user session isolation initialized (fast mode)")
|
| 172 |
|
| 173 |
# Global agent - Heavy components loaded ONCE at startup
|
| 174 |
# SBERT model, tool functions, LLM client are shared across all users
|
| 175 |
+
# CRITICAL: We use threading.local() to ensure thread-safe session isolation
|
| 176 |
agent: Optional[DataScienceCopilot] = None
|
| 177 |
+
agent_thread_local = threading.local() # Thread-local storage for session isolation
|
| 178 |
agent = None
|
| 179 |
|
| 180 |
# Session state isolation (lightweight - just session memory)
|
|
|
|
| 185 |
"""
|
| 186 |
Get agent with isolated session state.
|
| 187 |
|
| 188 |
+
OPTIMIZATION: Heavy components (SBERT, tools, LLM client) are shared.
|
| 189 |
+
Session state is isolated using thread-local storage to prevent race conditions.
|
|
|
|
| 190 |
This reduces per-user initialization from 20s to <1s.
|
| 191 |
|
| 192 |
+
THREAD SAFETY: Uses threading.local() so each request thread gets its own
|
| 193 |
+
agent reference with isolated session, preventing cross-contamination.
|
| 194 |
+
|
| 195 |
Args:
|
| 196 |
session_id: Unique session identifier
|
| 197 |
|
|
|
|
| 211 |
use_compact_prompts=False
|
| 212 |
)
|
| 213 |
|
| 214 |
+
# Clean up expired sessions periodically (every 10th request)
|
| 215 |
+
if len(session_states) > 0 and len(session_states) % 10 == 0:
|
| 216 |
+
cleanup_expired_sessions()
|
| 217 |
+
|
| 218 |
+
now = datetime.now()
|
| 219 |
+
|
| 220 |
# Check if we have cached session memory for this session
|
| 221 |
if session_id in session_states:
|
| 222 |
+
state = session_states[session_id]
|
| 223 |
+
state.last_accessed = now
|
| 224 |
+
state.request_count += 1
|
| 225 |
+
logger.info(f"[β»οΈ] Reusing session {session_id[:8]}... (requests: {state.request_count})")
|
| 226 |
+
|
| 227 |
+
# Store in thread-local storage for isolation
|
| 228 |
+
agent_thread_local.session = state.session
|
| 229 |
+
agent_thread_local.session_id = session_id
|
| 230 |
+
|
| 231 |
+
# Return agent with session set (safe because of workflow_lock)
|
| 232 |
+
agent.session = state.session
|
| 233 |
agent.http_session_key = session_id
|
| 234 |
return agent
|
| 235 |
|
|
|
|
| 239 |
# Create isolated session memory for this user
|
| 240 |
new_session = SessionMemory(session_id=session_id)
|
| 241 |
|
| 242 |
+
# Cache management: Remove expired first, then LRU if still over limit
|
| 243 |
+
if len(session_states) >= MAX_CACHED_SESSIONS:
|
| 244 |
+
expired_count = cleanup_expired_sessions()
|
| 245 |
+
|
| 246 |
+
# If still over limit after cleanup, remove least recently used
|
| 247 |
+
if len(session_states) >= MAX_CACHED_SESSIONS:
|
| 248 |
+
# Sort by last_accessed and remove oldest
|
| 249 |
+
sorted_sessions = sorted(session_states.items(), key=lambda x: x[1].last_accessed)
|
| 250 |
+
oldest_session_id = sorted_sessions[0][0]
|
| 251 |
+
logger.info(f"[ποΈ] Cache full, removing LRU session {oldest_session_id[:8]}...")
|
| 252 |
+
del session_states[oldest_session_id]
|
| 253 |
+
|
| 254 |
+
# Create session state wrapper with metadata
|
| 255 |
+
session_state = SessionState(
|
| 256 |
+
session=new_session,
|
| 257 |
+
created_at=now,
|
| 258 |
+
last_accessed=now,
|
| 259 |
+
request_count=1
|
| 260 |
+
)
|
| 261 |
+
session_states[session_id] = session_state
|
| 262 |
|
| 263 |
+
# Store in thread-local storage
|
| 264 |
+
agent_thread_local.session = new_session
|
| 265 |
+
agent_thread_local.session_id = session_id
|
| 266 |
|
| 267 |
+
# Set session on shared agent (safe with workflow_lock)
|
| 268 |
agent.session = new_session
|
| 269 |
agent.http_session_key = session_id
|
| 270 |
|
| 271 |
+
logger.info(f"β
Session created for {session_id[:8]} (cache: {len(session_states)}/{MAX_CACHED_SESSIONS}) - <1s init")
|
| 272 |
|
| 273 |
return agent
|
| 274 |
|
| 275 |
+
def cleanup_expired_sessions():
|
| 276 |
+
"""Remove expired sessions based on TTL."""
|
| 277 |
+
now = datetime.now()
|
| 278 |
+
expired = []
|
| 279 |
+
|
| 280 |
+
for session_id, state in session_states.items():
|
| 281 |
+
# Check if session has been inactive for too long
|
| 282 |
+
inactive_duration = now - state.last_accessed
|
| 283 |
+
if inactive_duration > timedelta(minutes=SESSION_TTL_MINUTES):
|
| 284 |
+
expired.append(session_id)
|
| 285 |
+
|
| 286 |
+
for session_id in expired:
|
| 287 |
+
logger.info(f"[ποΈ] Removing expired session {session_id[:8]}... (inactive for {SESSION_TTL_MINUTES}min)")
|
| 288 |
+
del session_states[session_id]
|
| 289 |
+
|
| 290 |
+
return len(expired)
|
| 291 |
+
|
| 292 |
# π REQUEST QUEUING: Global lock to prevent concurrent workflows
|
| 293 |
# This ensures only one analysis runs at a time, preventing:
|
| 294 |
# - Race conditions on file writes
|
|
|
|
| 549 |
async with agent_cache_lock:
|
| 550 |
# Check session_states cache for this specific session_id
|
| 551 |
if session_id in session_states:
|
| 552 |
+
state = session_states[session_id]
|
| 553 |
+
cached_session = state.session # Extract SessionMemory from wrapper
|
| 554 |
if hasattr(cached_session, 'last_dataset') and cached_session.last_dataset:
|
| 555 |
has_dataset = True
|
| 556 |
logger.info(f"[ASYNC] Follow-up query for session {session_id[:8]}... - using cached dataset")
|