redhairedshanks1 commited on
Commit
f41653d
Β·
1 Parent(s): bf45da8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +607 -608
app.py CHANGED
@@ -1,609 +1,608 @@
1
- # app.py - MasterLLM v2.0 with Bedrock Fallback System
2
- """
3
- MasterLLM Pipeline Orchestrator v2.0
4
- - Bedrock (priority) + Gemini (fallback) for pipeline generation
5
- - Bedrock LangChain (priority) + CrewAI (fallback) for execution
6
- - MongoDB session management
7
- - Complete REST API
8
- - Gradio UI with fancy displays
9
- """
10
- import os
11
- import json
12
- import uuid
13
- from datetime import datetime
14
- from typing import List, Optional
15
-
16
- import gradio as gr
17
- from fastapi import FastAPI
18
- from fastapi.middleware.cors import CORSMiddleware
19
- from contextlib import asynccontextmanager
20
- import asyncio
21
-
22
- # Import our new services
23
- from services.pipeline_generator import generate_pipeline, format_pipeline_for_display
24
- from services.pipeline_executor import execute_pipeline_streaming
25
- from services.session_manager import session_manager
26
- from api_routes import router as api_router
27
-
28
-
29
- # ========================
30
- # BACKGROUND CLEANUP TASK
31
- # ========================
32
-
33
- async def periodic_cleanup():
34
- """Cleanup old sessions every hour"""
35
- while True:
36
- await asyncio.sleep(3600) # Run every hour
37
- try:
38
- removed = session_manager.cleanup_old_sessions(max_age_hours=24)
39
- if removed > 0:
40
- print(f"🧹 Cleaned up {removed} inactive sessions")
41
- except Exception as e:
42
- print(f"⚠️ Cleanup error: {e}")
43
-
44
-
45
- @asynccontextmanager
46
- async def lifespan(app: FastAPI):
47
- """Manage application lifecycle"""
48
- # Startup
49
- print("πŸš€ Starting MasterLLM v2.0...")
50
- task = asyncio.create_task(periodic_cleanup())
51
- yield
52
- # Shutdown
53
- task.cancel()
54
- session_manager.close()
55
- print("πŸ›‘ MasterLLM shut down gracefully")
56
-
57
-
58
- # ========================
59
- # FASTAPI APP
60
- # ========================
61
-
62
- app = FastAPI(
63
- title="MasterLLM v2.0 - AI Pipeline Orchestrator",
64
- description="Bedrock + Gemini fallback system with MongoDB sessions",
65
- version="2.0.0",
66
- lifespan=lifespan
67
- )
68
-
69
- # CORS Configuration
70
- app.add_middleware(
71
- CORSMiddleware,
72
- allow_origins=[os.getenv("FRONTEND_ORIGIN", "http://localhost:3000")],
73
- allow_credentials=True,
74
- allow_methods=["*"],
75
- allow_headers=["*"],
76
- )
77
-
78
- # Mount API routes
79
- app.include_router(api_router)
80
-
81
-
82
- # ========================
83
- # CONVERSATION STATE
84
- # ========================
85
-
86
- class ConversationState:
87
- INITIAL = "initial"
88
- PIPELINE_PROPOSED = "pipeline_proposed"
89
- PIPELINE_APPROVED = "pipeline_approved"
90
- EXECUTING = "executing"
91
- COMPLETED = "completed"
92
- ERROR = "error"
93
-
94
-
95
- # ========================
96
- # GRADIO UI HANDLERS
97
- # ========================
98
-
99
- def create_new_session():
100
- """Create a new session"""
101
- return session_manager.create_session()
102
-
103
-
104
- def handle_file_upload(file_path, session_id):
105
- """Handle file upload"""
106
- if not file_path:
107
- return None, json.dumps({
108
- "status": "error",
109
- "message": "No file uploaded"
110
- }, indent=2), session_id
111
-
112
- if not session_id:
113
- session_id = create_new_session()
114
-
115
- file_name = os.path.basename(file_path)
116
-
117
- # Update session
118
- session_manager.update_session(session_id, {
119
- "current_file": file_path,
120
- "state": ConversationState.INITIAL
121
- })
122
-
123
- # Add system message
124
- session_manager.add_message(
125
- session_id,
126
- "system",
127
- f"File uploaded: {file_name}"
128
- )
129
-
130
- status = {
131
- "status": "success",
132
- "message": f"File '{file_name}' uploaded successfully",
133
- "file_info": {
134
- "name": file_name,
135
- "path": file_path,
136
- "size_bytes": os.path.getsize(file_path) if os.path.exists(file_path) else 0
137
- },
138
- "next_action": "πŸ’¬ Now tell me what you'd like to do with this document"
139
- }
140
-
141
- return file_path, json.dumps(status, indent=2), session_id
142
-
143
-
144
- def chatbot_response_streaming(message: str, history: List, session_id: str, file_path: str = None):
145
- """
146
- Handle chat messages with streaming updates
147
- Uses Bedrock (priority) β†’ Gemini (fallback) for both generation and execution
148
- """
149
- # Get or create session
150
- session = session_manager.get_session(session_id)
151
- if not session:
152
- session_id = create_new_session()
153
- session = session_manager.get_session(session_id)
154
-
155
- # Update file path if provided
156
- if file_path:
157
- session_manager.update_session(session_id, {"current_file": file_path})
158
- session = session_manager.get_session(session_id)
159
-
160
- # Add user message to session
161
- session_manager.add_message(session_id, "user", message)
162
-
163
- current_state = session.get("state", ConversationState.INITIAL)
164
-
165
- # ========================
166
- # STATE: INITIAL - Generate Pipeline
167
- # ========================
168
- if current_state == ConversationState.INITIAL:
169
- # Check if file is uploaded
170
- if not session.get("current_file"):
171
- response = {
172
- "status": "error",
173
- "message": "Please upload a document first",
174
- "action": "πŸ“ Click 'Upload Document' to begin"
175
- }
176
- response_text = f"```json\n{json.dumps(response, indent=2)}\n```"
177
- session_manager.add_message(session_id, "assistant", response_text)
178
- yield history + [[message, response_text]]
179
- return
180
-
181
- try:
182
- # Generate pipeline using Bedrock β†’ Gemini fallback
183
- yield history + [[message, "πŸ€– Generating pipeline with AI...\n⏳ Trying Bedrock first..."]]
184
-
185
- pipeline = generate_pipeline(
186
- user_input=message,
187
- file_path=session.get("current_file"),
188
- prefer_bedrock=True
189
- )
190
-
191
- # Save proposed pipeline to session
192
- session_manager.update_session(session_id, {
193
- "proposed_pipeline": pipeline,
194
- "state": ConversationState.PIPELINE_PROPOSED
195
- })
196
-
197
- # Format for display
198
- formatted_display = format_pipeline_for_display(pipeline)
199
-
200
- # Create response with both fancy display and JSON
201
- response_text = formatted_display + f"\n\n```json\n{json.dumps(pipeline, indent=2)}\n```"
202
-
203
- session_manager.add_message(session_id, "assistant", response_text)
204
- yield history + [[message, response_text]]
205
- return
206
-
207
- except Exception as e:
208
- error_response = {
209
- "status": "error",
210
- "message": "Failed to generate pipeline",
211
- "error": str(e),
212
- "action": "Please try rephrasing your request"
213
- }
214
- response_text = f"```json\n{json.dumps(error_response, indent=2)}\n```"
215
- session_manager.add_message(session_id, "assistant", response_text)
216
- yield history + [[message, response_text]]
217
- return
218
-
219
- # ========================
220
- # STATE: PIPELINE_PROPOSED - Handle Approval/Rejection
221
- # ========================
222
- elif current_state == ConversationState.PIPELINE_PROPOSED:
223
- user_input = message.lower().strip()
224
-
225
- # APPROVE - Execute the pipeline
226
- if "approve" in user_input or "yes" in user_input:
227
- session_manager.update_session(session_id, {"state": ConversationState.EXECUTING})
228
-
229
- plan = session.get("proposed_pipeline", {})
230
-
231
- # Initial status
232
- initial_status = {
233
- "status": "executing",
234
- "message": "πŸš€ Starting pipeline execution...",
235
- "pipeline": plan.get("pipeline_name", "unknown"),
236
- "executor": "Attempting Bedrock LangChain first",
237
- "steps": []
238
- }
239
- accumulated_response = f"```json\n{json.dumps(initial_status, indent=2)}\n```"
240
- yield history + [[message, accumulated_response]]
241
-
242
- steps_completed = []
243
- final_payload = None
244
- executor_used = "unknown"
245
-
246
- try:
247
- # Execute pipeline with Bedrock β†’ CrewAI fallback
248
- for event in execute_pipeline_streaming(
249
- pipeline=plan,
250
- file_path=session.get("current_file"),
251
- session_id=session_id,
252
- prefer_bedrock=True
253
- ):
254
- event_type = event.get("type")
255
-
256
- # Info events (fallback notifications, etc.)
257
- if event_type == "info":
258
- info_status = {
259
- "status": "info",
260
- "message": event.get("message"),
261
- "executor": event.get("executor", "unknown")
262
- }
263
- accumulated_response = f"```json\n{json.dumps(info_status, indent=2)}\n```"
264
- yield history + [[message, accumulated_response]]
265
-
266
- # Step updates
267
- elif event_type == "step":
268
- step_info = {
269
- "step": event.get("step", 0),
270
- "tool": event.get("tool", "processing"),
271
- "status": event.get("status", "running"),
272
- "executor": event.get("executor", "unknown")
273
- }
274
- steps_completed.append(step_info)
275
- executor_used = event.get("executor", executor_used)
276
-
277
- progress_status = {
278
- "status": "executing",
279
- "message": f"πŸ“ Step {event.get('step', 0)}: {event.get('tool', 'processing')}...",
280
- "pipeline": plan.get("pipeline_name", ""),
281
- "executor": executor_used,
282
- "steps_completed": steps_completed
283
- }
284
- accumulated_response = f"```json\n{json.dumps(progress_status, indent=2)}\n```"
285
- yield history + [[message, accumulated_response]]
286
-
287
- # Final result
288
- elif event_type == "final":
289
- final_payload = event.get("data")
290
- executor_used = event.get("executor", executor_used)
291
-
292
- # Error
293
- elif event_type == "error":
294
- error_result = {
295
- "status": "failed",
296
- "error": event.get("error"),
297
- "steps_completed": steps_completed,
298
- "executor": event.get("executor", "unknown")
299
- }
300
- final_response = f"```json\n{json.dumps(error_result, indent=2)}\n```"
301
- session_manager.update_session(session_id, {"state": ConversationState.INITIAL})
302
- session_manager.add_message(session_id, "assistant", final_response)
303
- yield history + [[message, final_response]]
304
- return
305
-
306
- # Process final result
307
- if final_payload:
308
- session_manager.update_session(session_id, {
309
- "pipeline_result": final_payload,
310
- "state": ConversationState.INITIAL
311
- })
312
-
313
- # Save execution to MongoDB
314
- session_manager.save_pipeline_execution(
315
- session_id=session_id,
316
- pipeline=plan,
317
- result=final_payload,
318
- file_path=session.get("current_file"),
319
- executor=executor_used
320
- )
321
-
322
- # Format final response
323
- final_display = {
324
- "status": "completed",
325
- "executor": executor_used,
326
- "pipeline": plan.get("pipeline_name"),
327
- "result": final_payload,
328
- "summary": {
329
- "total_steps": len(steps_completed),
330
- "completed_successfully": len([s for s in steps_completed if s.get("status") == "completed"])
331
- }
332
- }
333
- final_response = f"```json\n{json.dumps(final_display, indent=2)}\n```"
334
- else:
335
- final_response = f"```json\n{json.dumps({'status': 'completed', 'steps': steps_completed, 'executor': executor_used}, indent=2)}\n```"
336
- session_manager.update_session(session_id, {"state": ConversationState.INITIAL})
337
-
338
- session_manager.add_message(session_id, "assistant", final_response)
339
- yield history + [[message, final_response]]
340
- return
341
-
342
- except Exception as e:
343
- error_result = {
344
- "error": str(e),
345
- "status": "failed",
346
- "message": "Pipeline execution failed",
347
- "steps_completed": steps_completed
348
- }
349
- final_response = f"```json\n{json.dumps(error_result, indent=2)}\n```"
350
- session_manager.update_session(session_id, {"state": ConversationState.INITIAL})
351
- session_manager.add_message(session_id, "assistant", final_response)
352
- yield history + [[message, final_response]]
353
- return
354
-
355
- # REJECT - Cancel the pipeline
356
- elif "reject" in user_input or "no" in user_input:
357
- session_manager.update_session(session_id, {
358
- "state": ConversationState.INITIAL,
359
- "proposed_pipeline": None
360
- })
361
- response_data = {
362
- "status": "rejected",
363
- "message": "Pipeline rejected by user",
364
- "action": "πŸ’¬ Please provide a new instruction"
365
- }
366
- response = f"```json\n{json.dumps(response_data, indent=2)}\n```"
367
- session_manager.add_message(session_id, "assistant", response)
368
- yield history + [[message, response]]
369
- return
370
-
371
- # EDIT - Request modifications
372
- elif "edit" in user_input or "modify" in user_input:
373
- current_pipeline = session.get("proposed_pipeline", {})
374
- edit_help = {
375
- "status": "edit_mode",
376
- "message": "To modify the plan, describe your changes",
377
- "current_plan": current_pipeline,
378
- "examples": [
379
- "Add summarization at the end",
380
- "Remove table extraction",
381
- "Only process pages 1-3",
382
- "Translate to French instead of Spanish"
383
- ],
384
- "action": "Describe your changes, or say 'approve' to run as-is"
385
- }
386
- response = f"```json\n{json.dumps(edit_help, indent=2)}\n```"
387
- session_manager.add_message(session_id, "assistant", response)
388
- yield history + [[message, response]]
389
- return
390
-
391
- # Try to modify pipeline based on user input
392
- else:
393
- if len(message.strip()) > 5:
394
- try:
395
- original_plan = session.get("proposed_pipeline", {})
396
- edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}"
397
-
398
- # Generate new pipeline with modification
399
- new_pipeline = generate_pipeline(
400
- user_input=edit_context,
401
- file_path=session.get("current_file"),
402
- prefer_bedrock=True
403
- )
404
-
405
- session_manager.update_session(session_id, {
406
- "proposed_pipeline": new_pipeline,
407
- "state": ConversationState.PIPELINE_PROPOSED
408
- })
409
-
410
- formatted = format_pipeline_for_display(new_pipeline)
411
- response = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
412
- session_manager.add_message(session_id, "assistant", response)
413
- yield history + [[message, response]]
414
- return
415
-
416
- except Exception as e:
417
- error_response = {
418
- "status": "edit_failed",
419
- "error": str(e),
420
- "message": "Could not modify the plan",
421
- "action": "Try 'approve' to run as-is, or 'reject' to start over"
422
- }
423
- response = f"```json\n{json.dumps(error_response, indent=2)}\n```"
424
- session_manager.add_message(session_id, "assistant", response)
425
- yield history + [[message, response]]
426
- return
427
-
428
- # Default waiting message
429
- response_data = {
430
- "status": "waiting_for_confirmation",
431
- "message": "Please type 'approve', 'reject', or describe changes",
432
- "hint": "You can also say 'edit' for modification hints"
433
- }
434
- response = f"```json\n{json.dumps(response_data, indent=2)}\n```"
435
- session_manager.add_message(session_id, "assistant", response)
436
- yield history + [[message, response]]
437
- return
438
-
439
- # Default fallback
440
- response = json.dumps({"status": "ready", "message": "Ready for your next instruction"}, indent=2)
441
- session_manager.add_message(session_id, "assistant", response)
442
- yield history + [[message, response]]
443
-
444
-
445
- # ========================
446
- # GRADIO UI
447
- # ========================
448
-
449
- with gr.Blocks(
450
- title="MasterLLM v2.0 - AI Pipeline Orchestrator",
451
- theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"),
452
- css="""
453
- .gradio-container {
454
- max-width: 1400px !important;
455
- }
456
- """
457
- ) as demo:
458
- gr.Markdown("""
459
- # πŸ€– MasterLLM v2.0 - AI Pipeline Orchestrator
460
-
461
- **πŸ† Bedrock Priority** with Gemini Fallback | **πŸ’Ύ MongoDB Sessions** | **πŸ“‘ Complete REST API**
462
-
463
- Upload a document, describe what you want, and watch AI orchestrate the perfect pipeline!
464
- """)
465
-
466
- # State management
467
- session_id_state = gr.State(value=create_new_session())
468
- file_state = gr.State(value=None)
469
-
470
- with gr.Row():
471
- with gr.Column(scale=3):
472
- # Chat interface
473
- chatbot = gr.Chatbot(
474
- height=650,
475
- show_label=False,
476
- bubble_full_width=False,
477
- show_copy_button=True,
478
- avatar_images=(None, "πŸ€–"),
479
- )
480
-
481
- # Text input
482
- msg = gr.Textbox(
483
- placeholder="πŸ’¬ Type your instruction... (e.g., 'extract text from pages 1-5 and summarize')",
484
- show_label=False,
485
- lines=2,
486
- max_lines=4,
487
- container=False,
488
- )
489
-
490
- with gr.Row():
491
- submit_btn = gr.Button("πŸš€ Send", variant="primary", scale=2)
492
- clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", scale=1)
493
-
494
- with gr.Column(scale=1):
495
- # File upload section
496
- gr.Markdown("### πŸ“ Upload Document")
497
- file_upload = gr.File(
498
- label="PDF or Image",
499
- file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp"],
500
- type="filepath",
501
- )
502
-
503
- upload_status = gr.Textbox(
504
- label="πŸ“Š Upload Status",
505
- interactive=False,
506
- lines=10,
507
- max_lines=15,
508
- )
509
-
510
- # Session info
511
- gr.Markdown("### πŸ”— Session Info")
512
- session_display = gr.Textbox(
513
- label="Session ID",
514
- interactive=False,
515
- value=lambda: session_id_state.value[:8] + "...",
516
- )
517
-
518
- # Examples
519
- gr.Markdown("### πŸ’‘ Example Pipelines")
520
- gr.Examples(
521
- examples=[
522
- "extract text from pages 1-5",
523
- "extract text and summarize",
524
- "extract text, tables, and translate to Spanish",
525
- "get tables from pages 2-4 and summarize",
526
- "text-classify-ner from entire document",
527
- "describe images and summarize findings",
528
- "extract text, detect signatures and stamps",
529
- ],
530
- inputs=msg,
531
- )
532
-
533
- # System info
534
- gr.Markdown("""
535
- ### ℹ️ System Features
536
- - βœ… **Bedrock** (Claude 3.5 Sonnet) priority
537
- - βœ… **Gemini** (gemini-2.0-flash) fallback
538
- - βœ… **MongoDB** session persistence
539
- - βœ… **Streaming** real-time updates
540
- - βœ… **Component-level** JSON output
541
- - βœ… **REST API** for integration
542
-
543
- ### πŸ“Š Pipeline Flow:
544
- 1. **Upload** your document
545
- 2. **Describe** what you want
546
- 3. **Review** AI-generated pipeline
547
- 4. **Approve** to execute
548
- 5. **Watch** streaming updates
549
- 6. **Get** complete JSON results
550
- """)
551
-
552
- # Event handlers
553
- file_upload.upload(
554
- fn=handle_file_upload,
555
- inputs=[file_upload, session_id_state],
556
- outputs=[file_state, upload_status, session_id_state],
557
- )
558
-
559
- msg.submit(
560
- fn=chatbot_response_streaming,
561
- inputs=[msg, chatbot, session_id_state, file_state],
562
- outputs=[chatbot],
563
- ).then(
564
- lambda: "",
565
- outputs=msg,
566
- )
567
-
568
- submit_btn.click(
569
- fn=chatbot_response_streaming,
570
- inputs=[msg, chatbot, session_id_state, file_state],
571
- outputs=[chatbot],
572
- ).then(
573
- lambda: "",
574
- outputs=msg,
575
- )
576
-
577
- clear_btn.click(
578
- fn=lambda: ([], create_new_session(), None, None, "", ""),
579
- outputs=[chatbot, session_id_state, file_state, file_upload, msg, upload_status],
580
- )
581
-
582
- # Mount Gradio on FastAPI
583
- app = gr.mount_gradio_app(app, demo, path="/")
584
-
585
-
586
- # ========================
587
- # LAUNCH
588
- # ========================
589
-
590
- if __name__ == "__main__":
591
- import uvicorn
592
- port = int(os.getenv("PORT", 7860))
593
- print(f"""
594
- ╔════════════════════════════════════════════════════════════╗
595
- β•‘ β•‘
596
- β•‘ πŸš€ MasterLLM v2.0 Starting... β•‘
597
- β•‘ β•‘
598
- β•‘ 🌐 Gradio UI: http://localhost:{port} β•‘
599
- β•‘ πŸ“‘ REST API: http://localhost:{port}/api/v1 β•‘
600
- β•‘ πŸ“š API Docs: http://localhost:{port}/docs β•‘
601
- β•‘ β•‘
602
- β•‘ πŸ† Bedrock: Priority (Claude 3.5 Sonnet) β•‘
603
- β•‘ πŸ”„ Gemini: Fallback (gemini-2.0-flash) β•‘
604
- β•‘ πŸ’Ύ MongoDB: Session management β•‘
605
- β•‘ β•‘
606
- β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
607
- """)
608
-
609
  uvicorn.run(app, host="0.0.0.0", port=port)
 
1
+ # app.py - MasterLLM v2.0 with Bedrock Fallback System
2
+ """
3
+ MasterLLM Pipeline Orchestrator v2.0
4
+ - Bedrock (priority) + Gemini (fallback) for pipeline generation
5
+ - Bedrock LangChain (priority) + CrewAI (fallback) for execution
6
+ - MongoDB session management
7
+ - Complete REST API
8
+ - Gradio UI with fancy displays
9
+ """
10
+ import os
11
+ import json
12
+ import uuid
13
+ from datetime import datetime
14
+ from typing import List, Optional
15
+
16
+ import gradio as gr
17
+ from fastapi import FastAPI
18
+ from fastapi.middleware.cors import CORSMiddleware
19
+ from contextlib import asynccontextmanager
20
+ import asyncio
21
+
22
+ # Import our new services
23
+ from services.pipeline_generator import generate_pipeline, format_pipeline_for_display
24
+ from services.pipeline_executor import execute_pipeline_streaming
25
+ from services.session_manager import session_manager
26
+ from api_routes import router as api_router
27
+
28
+
29
+ # ========================
30
+ # BACKGROUND CLEANUP TASK
31
+ # ========================
32
+
33
+ async def periodic_cleanup():
34
+ """Cleanup old sessions every hour"""
35
+ while True:
36
+ await asyncio.sleep(3600) # Run every hour
37
+ try:
38
+ removed = session_manager.cleanup_old_sessions(max_age_hours=24)
39
+ if removed > 0:
40
+ print(f"🧹 Cleaned up {removed} inactive sessions")
41
+ except Exception as e:
42
+ print(f"⚠️ Cleanup error: {e}")
43
+
44
+
45
+ @asynccontextmanager
46
+ async def lifespan(app: FastAPI):
47
+ """Manage application lifecycle"""
48
+ # Startup
49
+ print("πŸš€ Starting MasterLLM v2.0...")
50
+ task = asyncio.create_task(periodic_cleanup())
51
+ yield
52
+ # Shutdown
53
+ task.cancel()
54
+ session_manager.close()
55
+ print("πŸ›‘ MasterLLM shut down gracefully")
56
+
57
+
58
+ # ========================
59
+ # FASTAPI APP
60
+ # ========================
61
+
62
+ app = FastAPI(
63
+ title="MasterLLM v2.0 - AI Pipeline Orchestrator",
64
+ description="Bedrock + Gemini fallback system with MongoDB sessions",
65
+ version="2.0.0",
66
+ lifespan=lifespan
67
+ )
68
+
69
+ # CORS Configuration
70
+ app.add_middleware(
71
+ CORSMiddleware,
72
+ allow_origins=[os.getenv("FRONTEND_ORIGIN", "http://localhost:3000")],
73
+ allow_credentials=True,
74
+ allow_methods=["*"],
75
+ allow_headers=["*"],
76
+ )
77
+
78
+ # Mount API routes
79
+ app.include_router(api_router)
80
+
81
+
82
+ # ========================
83
+ # CONVERSATION STATE
84
+ # ========================
85
+
86
+ class ConversationState:
87
+ INITIAL = "initial"
88
+ PIPELINE_PROPOSED = "pipeline_proposed"
89
+ PIPELINE_APPROVED = "pipeline_approved"
90
+ EXECUTING = "executing"
91
+ COMPLETED = "completed"
92
+ ERROR = "error"
93
+
94
+
95
+ # ========================
96
+ # GRADIO UI HANDLERS
97
+ # ========================
98
+
99
+ def create_new_session():
100
+ """Create a new session"""
101
+ return session_manager.create_session()
102
+
103
+
104
+ def handle_file_upload(file_path, session_id):
105
+ """Handle file upload"""
106
+ if not file_path:
107
+ return None, json.dumps({
108
+ "status": "error",
109
+ "message": "No file uploaded"
110
+ }, indent=2), session_id
111
+
112
+ if not session_id:
113
+ session_id = create_new_session()
114
+
115
+ file_name = os.path.basename(file_path)
116
+
117
+ # Update session
118
+ session_manager.update_session(session_id, {
119
+ "current_file": file_path,
120
+ "state": ConversationState.INITIAL
121
+ })
122
+
123
+ # Add system message
124
+ session_manager.add_message(
125
+ session_id,
126
+ "system",
127
+ f"File uploaded: {file_name}"
128
+ )
129
+
130
+ status = {
131
+ "status": "success",
132
+ "message": f"File '{file_name}' uploaded successfully",
133
+ "file_info": {
134
+ "name": file_name,
135
+ "path": file_path,
136
+ "size_bytes": os.path.getsize(file_path) if os.path.exists(file_path) else 0
137
+ },
138
+ "next_action": "πŸ’¬ Now tell me what you'd like to do with this document"
139
+ }
140
+
141
+ return file_path, json.dumps(status, indent=2), session_id
142
+
143
+
144
+ def chatbot_response_streaming(message: str, history: List, session_id: str, file_path: str = None):
145
+ """
146
+ Handle chat messages with streaming updates
147
+ Uses Bedrock (priority) β†’ Gemini (fallback) for both generation and execution
148
+ """
149
+ # Get or create session
150
+ session = session_manager.get_session(session_id)
151
+ if not session:
152
+ session_id = create_new_session()
153
+ session = session_manager.get_session(session_id)
154
+
155
+ # Update file path if provided
156
+ if file_path:
157
+ session_manager.update_session(session_id, {"current_file": file_path})
158
+ session = session_manager.get_session(session_id)
159
+
160
+ # Add user message to session
161
+ session_manager.add_message(session_id, "user", message)
162
+
163
+ current_state = session.get("state", ConversationState.INITIAL)
164
+
165
+ # ========================
166
+ # STATE: INITIAL - Generate Pipeline
167
+ # ========================
168
+ if current_state == ConversationState.INITIAL:
169
+ # Check if file is uploaded
170
+ if not session.get("current_file"):
171
+ response = {
172
+ "status": "error",
173
+ "message": "Please upload a document first",
174
+ "action": "πŸ“ Click 'Upload Document' to begin"
175
+ }
176
+ response_text = f"```json\n{json.dumps(response, indent=2)}\n```"
177
+ session_manager.add_message(session_id, "assistant", response_text)
178
+ yield history + [[message, response_text]]
179
+ return
180
+
181
+ try:
182
+ # Generate pipeline using Bedrock β†’ Gemini fallback
183
+ yield history + [[message, "πŸ€– Generating pipeline with AI...\n⏳ Trying Bedrock first..."]]
184
+
185
+ pipeline = generate_pipeline(
186
+ user_input=message,
187
+ file_path=session.get("current_file"),
188
+ prefer_bedrock=True
189
+ )
190
+
191
+ # Save proposed pipeline to session
192
+ session_manager.update_session(session_id, {
193
+ "proposed_pipeline": pipeline,
194
+ "state": ConversationState.PIPELINE_PROPOSED
195
+ })
196
+
197
+ # Format for display
198
+ formatted_display = format_pipeline_for_display(pipeline)
199
+
200
+ # Create response with both fancy display and JSON
201
+ response_text = formatted_display + f"\n\n```json\n{json.dumps(pipeline, indent=2)}\n```"
202
+
203
+ session_manager.add_message(session_id, "assistant", response_text)
204
+ yield history + [[message, response_text]]
205
+ return
206
+
207
+ except Exception as e:
208
+ error_response = {
209
+ "status": "error",
210
+ "message": "Failed to generate pipeline",
211
+ "error": str(e),
212
+ "action": "Please try rephrasing your request"
213
+ }
214
+ response_text = f"```json\n{json.dumps(error_response, indent=2)}\n```"
215
+ session_manager.add_message(session_id, "assistant", response_text)
216
+ yield history + [[message, response_text]]
217
+ return
218
+
219
+ # ========================
220
+ # STATE: PIPELINE_PROPOSED - Handle Approval/Rejection
221
+ # ========================
222
+ elif current_state == ConversationState.PIPELINE_PROPOSED:
223
+ user_input = message.lower().strip()
224
+
225
+ # APPROVE - Execute the pipeline
226
+ if "approve" in user_input or "yes" in user_input:
227
+ session_manager.update_session(session_id, {"state": ConversationState.EXECUTING})
228
+
229
+ plan = session.get("proposed_pipeline", {})
230
+
231
+ # Initial status
232
+ initial_status = {
233
+ "status": "executing",
234
+ "message": "πŸš€ Starting pipeline execution...",
235
+ "pipeline": plan.get("pipeline_name", "unknown"),
236
+ "executor": "Attempting Bedrock LangChain first",
237
+ "steps": []
238
+ }
239
+ accumulated_response = f"```json\n{json.dumps(initial_status, indent=2)}\n```"
240
+ yield history + [[message, accumulated_response]]
241
+
242
+ steps_completed = []
243
+ final_payload = None
244
+ executor_used = "unknown"
245
+
246
+ try:
247
+ # Execute pipeline with Bedrock β†’ CrewAI fallback
248
+ for event in execute_pipeline_streaming(
249
+ pipeline=plan,
250
+ file_path=session.get("current_file"),
251
+ session_id=session_id,
252
+ prefer_bedrock=True
253
+ ):
254
+ event_type = event.get("type")
255
+
256
+ # Info events (fallback notifications, etc.)
257
+ if event_type == "info":
258
+ info_status = {
259
+ "status": "info",
260
+ "message": event.get("message"),
261
+ "executor": event.get("executor", "unknown")
262
+ }
263
+ accumulated_response = f"```json\n{json.dumps(info_status, indent=2)}\n```"
264
+ yield history + [[message, accumulated_response]]
265
+
266
+ # Step updates
267
+ elif event_type == "step":
268
+ step_info = {
269
+ "step": event.get("step", 0),
270
+ "tool": event.get("tool", "processing"),
271
+ "status": event.get("status", "running"),
272
+ "executor": event.get("executor", "unknown")
273
+ }
274
+ steps_completed.append(step_info)
275
+ executor_used = event.get("executor", executor_used)
276
+
277
+ progress_status = {
278
+ "status": "executing",
279
+ "message": f"πŸ“ Step {event.get('step', 0)}: {event.get('tool', 'processing')}...",
280
+ "pipeline": plan.get("pipeline_name", ""),
281
+ "executor": executor_used,
282
+ "steps_completed": steps_completed
283
+ }
284
+ accumulated_response = f"```json\n{json.dumps(progress_status, indent=2)}\n```"
285
+ yield history + [[message, accumulated_response]]
286
+
287
+ # Final result
288
+ elif event_type == "final":
289
+ final_payload = event.get("data")
290
+ executor_used = event.get("executor", executor_used)
291
+
292
+ # Error
293
+ elif event_type == "error":
294
+ error_result = {
295
+ "status": "failed",
296
+ "error": event.get("error"),
297
+ "steps_completed": steps_completed,
298
+ "executor": event.get("executor", "unknown")
299
+ }
300
+ final_response = f"```json\n{json.dumps(error_result, indent=2)}\n```"
301
+ session_manager.update_session(session_id, {"state": ConversationState.INITIAL})
302
+ session_manager.add_message(session_id, "assistant", final_response)
303
+ yield history + [[message, final_response]]
304
+ return
305
+
306
+ # Process final result
307
+ if final_payload:
308
+ session_manager.update_session(session_id, {
309
+ "pipeline_result": final_payload,
310
+ "state": ConversationState.INITIAL
311
+ })
312
+
313
+ # Save execution to MongoDB
314
+ session_manager.save_pipeline_execution(
315
+ session_id=session_id,
316
+ pipeline=plan,
317
+ result=final_payload,
318
+ file_path=session.get("current_file"),
319
+ executor=executor_used
320
+ )
321
+
322
+ # Format final response
323
+ final_display = {
324
+ "status": "completed",
325
+ "executor": executor_used,
326
+ "pipeline": plan.get("pipeline_name"),
327
+ "result": final_payload,
328
+ "summary": {
329
+ "total_steps": len(steps_completed),
330
+ "completed_successfully": len([s for s in steps_completed if s.get("status") == "completed"])
331
+ }
332
+ }
333
+ final_response = f"```json\n{json.dumps(final_display, indent=2)}\n```"
334
+ else:
335
+ final_response = f"```json\n{json.dumps({'status': 'completed', 'steps': steps_completed, 'executor': executor_used}, indent=2)}\n```"
336
+ session_manager.update_session(session_id, {"state": ConversationState.INITIAL})
337
+
338
+ session_manager.add_message(session_id, "assistant", final_response)
339
+ yield history + [[message, final_response]]
340
+ return
341
+
342
+ except Exception as e:
343
+ error_result = {
344
+ "error": str(e),
345
+ "status": "failed",
346
+ "message": "Pipeline execution failed",
347
+ "steps_completed": steps_completed
348
+ }
349
+ final_response = f"```json\n{json.dumps(error_result, indent=2)}\n```"
350
+ session_manager.update_session(session_id, {"state": ConversationState.INITIAL})
351
+ session_manager.add_message(session_id, "assistant", final_response)
352
+ yield history + [[message, final_response]]
353
+ return
354
+
355
+ # REJECT - Cancel the pipeline
356
+ elif "reject" in user_input or "no" in user_input:
357
+ session_manager.update_session(session_id, {
358
+ "state": ConversationState.INITIAL,
359
+ "proposed_pipeline": None
360
+ })
361
+ response_data = {
362
+ "status": "rejected",
363
+ "message": "Pipeline rejected by user",
364
+ "action": "πŸ’¬ Please provide a new instruction"
365
+ }
366
+ response = f"```json\n{json.dumps(response_data, indent=2)}\n```"
367
+ session_manager.add_message(session_id, "assistant", response)
368
+ yield history + [[message, response]]
369
+ return
370
+
371
+ # EDIT - Request modifications
372
+ elif "edit" in user_input or "modify" in user_input:
373
+ current_pipeline = session.get("proposed_pipeline", {})
374
+ edit_help = {
375
+ "status": "edit_mode",
376
+ "message": "To modify the plan, describe your changes",
377
+ "current_plan": current_pipeline,
378
+ "examples": [
379
+ "Add summarization at the end",
380
+ "Remove table extraction",
381
+ "Only process pages 1-3",
382
+ "Translate to French instead of Spanish"
383
+ ],
384
+ "action": "Describe your changes, or say 'approve' to run as-is"
385
+ }
386
+ response = f"```json\n{json.dumps(edit_help, indent=2)}\n```"
387
+ session_manager.add_message(session_id, "assistant", response)
388
+ yield history + [[message, response]]
389
+ return
390
+
391
+ # Try to modify pipeline based on user input
392
+ else:
393
+ if len(message.strip()) > 5:
394
+ try:
395
+ original_plan = session.get("proposed_pipeline", {})
396
+ edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}"
397
+
398
+ # Generate new pipeline with modification
399
+ new_pipeline = generate_pipeline(
400
+ user_input=edit_context,
401
+ file_path=session.get("current_file"),
402
+ prefer_bedrock=True
403
+ )
404
+
405
+ session_manager.update_session(session_id, {
406
+ "proposed_pipeline": new_pipeline,
407
+ "state": ConversationState.PIPELINE_PROPOSED
408
+ })
409
+
410
+ formatted = format_pipeline_for_display(new_pipeline)
411
+ response = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```"
412
+ session_manager.add_message(session_id, "assistant", response)
413
+ yield history + [[message, response]]
414
+ return
415
+
416
+ except Exception as e:
417
+ error_response = {
418
+ "status": "edit_failed",
419
+ "error": str(e),
420
+ "message": "Could not modify the plan",
421
+ "action": "Try 'approve' to run as-is, or 'reject' to start over"
422
+ }
423
+ response = f"```json\n{json.dumps(error_response, indent=2)}\n```"
424
+ session_manager.add_message(session_id, "assistant", response)
425
+ yield history + [[message, response]]
426
+ return
427
+
428
+ # Default waiting message
429
+ response_data = {
430
+ "status": "waiting_for_confirmation",
431
+ "message": "Please type 'approve', 'reject', or describe changes",
432
+ "hint": "You can also say 'edit' for modification hints"
433
+ }
434
+ response = f"```json\n{json.dumps(response_data, indent=2)}\n```"
435
+ session_manager.add_message(session_id, "assistant", response)
436
+ yield history + [[message, response]]
437
+ return
438
+
439
+ # Default fallback
440
+ response = json.dumps({"status": "ready", "message": "Ready for your next instruction"}, indent=2)
441
+ session_manager.add_message(session_id, "assistant", response)
442
+ yield history + [[message, response]]
443
+
444
+
445
+ # ========================
446
+ # GRADIO UI
447
+ # ========================
448
+
449
+ with gr.Blocks(
450
+ title="MasterLLM v2.0 - AI Pipeline Orchestrator",
451
+ css="""
452
+ .gradio-container {
453
+ max-width: 1400px !important;
454
+ }
455
+ """
456
+ ) as demo:
457
+ gr.Markdown("""
458
+ # πŸ€– MasterLLM v2.0 - AI Pipeline Orchestrator
459
+
460
+ **πŸ† Bedrock Priority** with Gemini Fallback | **πŸ’Ύ MongoDB Sessions** | **πŸ“‘ Complete REST API**
461
+
462
+ Upload a document, describe what you want, and watch AI orchestrate the perfect pipeline!
463
+ """)
464
+
465
+ # State management
466
+ session_id_state = gr.State(value=create_new_session())
467
+ file_state = gr.State(value=None)
468
+
469
+ with gr.Row():
470
+ with gr.Column(scale=3):
471
+ # Chat interface
472
+ chatbot = gr.Chatbot(
473
+ height=650,
474
+ show_label=False,
475
+ bubble_full_width=False,
476
+ show_copy_button=True,
477
+ avatar_images=(None, "πŸ€–"),
478
+ )
479
+
480
+ # Text input
481
+ msg = gr.Textbox(
482
+ placeholder="πŸ’¬ Type your instruction... (e.g., 'extract text from pages 1-5 and summarize')",
483
+ show_label=False,
484
+ lines=2,
485
+ max_lines=4,
486
+ container=False,
487
+ )
488
+
489
+ with gr.Row():
490
+ submit_btn = gr.Button("πŸš€ Send", variant="primary", scale=2)
491
+ clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", scale=1)
492
+
493
+ with gr.Column(scale=1):
494
+ # File upload section
495
+ gr.Markdown("### πŸ“ Upload Document")
496
+ file_upload = gr.File(
497
+ label="PDF or Image",
498
+ file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp"],
499
+ type="filepath",
500
+ )
501
+
502
+ upload_status = gr.Textbox(
503
+ label="πŸ“Š Upload Status",
504
+ interactive=False,
505
+ lines=10,
506
+ max_lines=15,
507
+ )
508
+
509
+ # Session info
510
+ gr.Markdown("### πŸ”— Session Info")
511
+ session_display = gr.Textbox(
512
+ label="Session ID",
513
+ interactive=False,
514
+ value=lambda: session_id_state.value[:8] + "...",
515
+ )
516
+
517
+ # Examples
518
+ gr.Markdown("### πŸ’‘ Example Pipelines")
519
+ gr.Examples(
520
+ examples=[
521
+ "extract text from pages 1-5",
522
+ "extract text and summarize",
523
+ "extract text, tables, and translate to Spanish",
524
+ "get tables from pages 2-4 and summarize",
525
+ "text-classify-ner from entire document",
526
+ "describe images and summarize findings",
527
+ "extract text, detect signatures and stamps",
528
+ ],
529
+ inputs=msg,
530
+ )
531
+
532
+ # System info
533
+ gr.Markdown("""
534
+ ### ℹ️ System Features
535
+ - βœ… **Bedrock** (Claude 3.5 Sonnet) priority
536
+ - βœ… **Gemini** (gemini-2.0-flash) fallback
537
+ - βœ… **MongoDB** session persistence
538
+ - βœ… **Streaming** real-time updates
539
+ - βœ… **Component-level** JSON output
540
+ - βœ… **REST API** for integration
541
+
542
+ ### πŸ“Š Pipeline Flow:
543
+ 1. **Upload** your document
544
+ 2. **Describe** what you want
545
+ 3. **Review** AI-generated pipeline
546
+ 4. **Approve** to execute
547
+ 5. **Watch** streaming updates
548
+ 6. **Get** complete JSON results
549
+ """)
550
+
551
+ # Event handlers
552
+ file_upload.upload(
553
+ fn=handle_file_upload,
554
+ inputs=[file_upload, session_id_state],
555
+ outputs=[file_state, upload_status, session_id_state],
556
+ )
557
+
558
+ msg.submit(
559
+ fn=chatbot_response_streaming,
560
+ inputs=[msg, chatbot, session_id_state, file_state],
561
+ outputs=[chatbot],
562
+ ).then(
563
+ lambda: "",
564
+ outputs=msg,
565
+ )
566
+
567
+ submit_btn.click(
568
+ fn=chatbot_response_streaming,
569
+ inputs=[msg, chatbot, session_id_state, file_state],
570
+ outputs=[chatbot],
571
+ ).then(
572
+ lambda: "",
573
+ outputs=msg,
574
+ )
575
+
576
+ clear_btn.click(
577
+ fn=lambda: ([], create_new_session(), None, None, "", ""),
578
+ outputs=[chatbot, session_id_state, file_state, file_upload, msg, upload_status],
579
+ )
580
+
581
+ # Mount Gradio on FastAPI
582
+ app = gr.mount_gradio_app(app, demo, path="/")
583
+
584
+
585
+ # ========================
586
+ # LAUNCH
587
+ # ========================
588
+
589
+ if __name__ == "__main__":
590
+ import uvicorn
591
+ port = int(os.getenv("PORT", 7860))
592
+ print(f"""
593
+ ╔════════════════════════════════════════════════════════════╗
594
+ β•‘ β•‘
595
+ β•‘ πŸš€ MasterLLM v2.0 Starting... β•‘
596
+ β•‘ β•‘
597
+ β•‘ 🌐 Gradio UI: http://localhost:{port} β•‘
598
+ β•‘ πŸ“‘ REST API: http://localhost:{port}/api/v1 β•‘
599
+ β•‘ πŸ“š API Docs: http://localhost:{port}/docs β•‘
600
+ β•‘ β•‘
601
+ β•‘ πŸ† Bedrock: Priority (Claude 3.5 Sonnet) β•‘
602
+ β•‘ πŸ”„ Gemini: Fallback (gemini-2.0-flash) β•‘
603
+ β•‘ πŸ’Ύ MongoDB: Session management β•‘
604
+ β•‘ β•‘
605
+ β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
606
+ """)
607
+
 
608
  uvicorn.run(app, host="0.0.0.0", port=port)