Pulastya B commited on
Commit
565097f
·
1 Parent(s): fbe4715

Real-time streaming: /run-async with BackgroundTasks, UUID-first, SSE-delivered results

Browse files
FRRONTEEEND/components/ChatInterface.tsx CHANGED
@@ -101,8 +101,13 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
101
  // Optional: Display token budget updates
102
  console.log('💰 Token update:', data.message);
103
  } else if (data.type === 'analysis_complete') {
104
- console.log('✅ Analysis completed');
105
- setIsTyping(false); // This will trigger cleanup
 
 
 
 
 
106
  }
107
  } catch (err) {
108
  console.error('❌ Error parsing SSE event:', err, e.data);
@@ -128,6 +133,74 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
128
  };
129
  }, [activeSessionId]);
130
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
131
  const handleSend = async () => {
132
  if ((!input.trim() && !uploadedFile) || isTyping) return;
133
 
@@ -183,7 +256,7 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
183
  formData.append('use_cache', 'true');
184
  formData.append('max_iterations', '20');
185
 
186
- response = await fetch(`${API_URL}/run`, {
187
  method: 'POST',
188
  body: formData
189
  });
@@ -230,6 +303,14 @@ export const ChatInterface: React.FC<{ onBack: () => void }> = ({ onBack }) => {
230
  setActiveSessionId(data.session_id);
231
  }
232
 
 
 
 
 
 
 
 
 
233
  let assistantContent = '';
234
  let reports: Array<{name: string, path: string}> = [];
235
  let plots: Array<{title: string, url: string, type?: 'image' | 'html'}> = [];
 
101
  // Optional: Display token budget updates
102
  console.log('💰 Token update:', data.message);
103
  } else if (data.type === 'analysis_complete') {
104
+ console.log('✅ Analysis completed', data.result);
105
+ setIsTyping(false);
106
+
107
+ // Process the final result
108
+ if (data.result) {
109
+ processAnalysisResult(data.result);
110
+ }
111
  }
112
  } catch (err) {
113
  console.error('❌ Error parsing SSE event:', err, e.data);
 
133
  };
134
  }, [activeSessionId]);
135
 
136
+ const processAnalysisResult = (result: any) => {
137
+ // Extract and display the analysis result from SSE
138
+ let assistantContent = '✅ Analysis Complete!\n\n';
139
+ let reports: Array<{name: string, path: string}> = [];
140
+ let plots: Array<{title: string, url: string, type?: 'image' | 'html'}> = [];
141
+
142
+ // Extract plots and reports from workflow_history
143
+ if (result.workflow_history) {
144
+ const reportTools = ['generate_ydata_profiling_report', 'generate_plotly_dashboard', 'generate_all_plots'];
145
+ const plotTools = [
146
+ 'generate_interactive_correlation_heatmap',
147
+ 'generate_interactive_scatter',
148
+ 'generate_interactive_histogram',
149
+ 'generate_interactive_box_plots',
150
+ 'generate_interactive_time_series',
151
+ 'generate_eda_plots',
152
+ 'generate_data_quality_plots',
153
+ 'analyze_correlations'
154
+ ];
155
+
156
+ result.workflow_history.forEach((step: any) => {
157
+ if (reportTools.includes(step.tool)) {
158
+ const reportPath = step.result?.output_path || step.result?.report_path || step.arguments?.output_path;
159
+ if (reportPath && (step.result?.success !== false)) {
160
+ reports.push({
161
+ name: step.tool.replace('generate_', '').replace(/_/g, ' ').trim(),
162
+ path: reportPath
163
+ });
164
+ }
165
+ }
166
+
167
+ if (plotTools.includes(step.tool) && step.result?.plots) {
168
+ step.result.plots.forEach((plot: any) => {
169
+ plots.push({
170
+ title: plot.title || plot.type || 'Plot',
171
+ url: plot.url || plot.path,
172
+ type: plot.url?.endsWith('.html') ? 'html' : 'image'
173
+ });
174
+ });
175
+ }
176
+ });
177
+ }
178
+
179
+ if (reports.length > 0) {
180
+ assistantContent += '📊 **Generated Reports:**\n';
181
+ reports.forEach(r => assistantContent += `- ${r.name}\n`);
182
+ assistantContent += '\n';
183
+ }
184
+
185
+ if (plots.length > 0) {
186
+ assistantContent += `📈 **Generated ${plots.length} Visualizations**\n\n`;
187
+ }
188
+
189
+ assistantContent += result.final_answer || 'Analysis complete. Check the generated artifacts.';
190
+
191
+ // Add assistant message with result
192
+ const assistantMessage: Message = {
193
+ id: Date.now().toString(),
194
+ role: 'assistant',
195
+ content: assistantContent,
196
+ timestamp: new Date(),
197
+ reports,
198
+ plots
199
+ };
200
+
201
+ updateSession(activeSessionId, [...activeSession.messages, assistantMessage]);
202
+ };
203
+
204
  const handleSend = async () => {
205
  if ((!input.trim() && !uploadedFile) || isTyping) return;
206
 
 
256
  formData.append('use_cache', 'true');
257
  formData.append('max_iterations', '20');
258
 
259
+ response = await fetch(`${API_URL}/run-async`, {
260
  method: 'POST',
261
  body: formData
262
  });
 
303
  setActiveSessionId(data.session_id);
304
  }
305
 
306
+ // For async endpoint, result comes via SSE analysis_complete event
307
+ // For now, just wait for SSE to deliver the result
308
+ if (data.status === 'started') {
309
+ console.log('🚀 Analysis started, waiting for SSE events...');
310
+ return; // Don't process result here, will come via SSE
311
+ }
312
+
313
+ // Legacy sync endpoint handling (if data.result exists)
314
  let assistantContent = '';
315
  let reports: Array<{name: string, path: string}> = [];
316
  let plots: Array<{title: string, url: string, type?: 'image' | 'html'}> = [];
src/api/app.py CHANGED
@@ -16,7 +16,7 @@ from dotenv import load_dotenv
16
  # Load environment variables from .env file
17
  load_dotenv()
18
 
19
- from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request
20
  from fastapi.responses import JSONResponse, FileResponse, StreamingResponse
21
  from fastapi.staticfiles import StaticFiles
22
  from fastapi.middleware.cors import CORSMiddleware
@@ -278,6 +278,90 @@ class AnalysisRequest(BaseModel):
278
  max_iterations: int = 20
279
 
280
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
281
  @app.post("/run")
282
  async def run_analysis(
283
  file: Optional[UploadFile] = File(None, description="Dataset file (CSV or Parquet) - optional for follow-up requests"),
 
16
  # Load environment variables from .env file
17
  load_dotenv()
18
 
19
+ from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request, BackgroundTasks
20
  from fastapi.responses import JSONResponse, FileResponse, StreamingResponse
21
  from fastapi.staticfiles import StaticFiles
22
  from fastapi.middleware.cors import CORSMiddleware
 
278
  max_iterations: int = 20
279
 
280
 
281
+ def run_analysis_background(file_path: str, task_description: str, target_col: Optional[str],
282
+ use_cache: bool, max_iterations: int, session_id: str):
283
+ """Background task to run analysis and emit events."""
284
+ try:
285
+ logger.info(f"[BACKGROUND] Starting analysis for session {session_id}")
286
+
287
+ result = agent.analyze(
288
+ file_path=file_path,
289
+ task_description=task_description,
290
+ target_col=target_col,
291
+ use_cache=use_cache,
292
+ max_iterations=max_iterations
293
+ )
294
+
295
+ logger.info(f"[BACKGROUND] Analysis completed for session {session_id}")
296
+
297
+ # Send completion event
298
+ progress_manager.emit(session_id, {
299
+ "type": "analysis_complete",
300
+ "status": result.get("status"),
301
+ "message": "✅ Analysis completed successfully!",
302
+ "result": result
303
+ })
304
+
305
+ except Exception as e:
306
+ logger.error(f"[BACKGROUND] Analysis failed for session {session_id}: {e}")
307
+ progress_manager.emit(session_id, {
308
+ "type": "analysis_failed",
309
+ "error": str(e),
310
+ "message": f"❌ Analysis failed: {str(e)}"
311
+ })
312
+
313
+
314
+ @app.post("/run-async")
315
+ async def run_analysis_async(
316
+ background_tasks: BackgroundTasks,
317
+ file: Optional[UploadFile] = File(None),
318
+ task_description: str = Form(...),
319
+ target_col: Optional[str] = Form(None),
320
+ use_cache: bool = Form(True),
321
+ max_iterations: int = Form(20)
322
+ ) -> JSONResponse:
323
+ """
324
+ Start analysis in background and return session UUID immediately.
325
+ Frontend can connect SSE with this UUID to receive real-time updates.
326
+ """
327
+ if agent is None:
328
+ raise HTTPException(status_code=503, detail="Agent not initialized")
329
+
330
+ # Get session UUID immediately
331
+ session_id = agent.session.session_id if hasattr(agent, 'session') and agent.session else "default"
332
+ logger.info(f"[ASYNC] Created session: {session_id}")
333
+
334
+ # Handle file upload
335
+ temp_file_path = None
336
+ if file:
337
+ temp_dir = Path("/tmp") / "data_science_agent"
338
+ temp_dir.mkdir(parents=True, exist_ok=True)
339
+ temp_file_path = temp_dir / file.filename
340
+
341
+ with open(temp_file_path, "wb") as buffer:
342
+ shutil.copyfileobj(file.file, buffer)
343
+
344
+ logger.info(f"[ASYNC] File saved: {file.filename}")
345
+
346
+ # Start background analysis
347
+ background_tasks.add_task(
348
+ run_analysis_background,
349
+ file_path=str(temp_file_path) if temp_file_path else "",
350
+ task_description=task_description,
351
+ target_col=target_col,
352
+ use_cache=use_cache,
353
+ max_iterations=max_iterations,
354
+ session_id=session_id
355
+ )
356
+
357
+ # Return UUID immediately so frontend can connect SSE
358
+ return JSONResponse(content={
359
+ "session_id": session_id,
360
+ "status": "started",
361
+ "message": "Analysis started in background"
362
+ })
363
+
364
+
365
  @app.post("/run")
366
  async def run_analysis(
367
  file: Optional[UploadFile] = File(None, description="Dataset file (CSV or Parquet) - optional for follow-up requests"),