Spaces:
Sleeping
Sleeping
| # API Routes - Complete REST API for MasterLLM | |
| # File: api_routes.py | |
| from fastapi import APIRouter, HTTPException, UploadFile, File, Form | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from typing import Optional, List, Dict, Any | |
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime | |
| # Import our services | |
| from services.pipeline_generator import generate_pipeline, format_pipeline_for_display | |
| from services.pipeline_executor import execute_pipeline_streaming, execute_pipeline | |
| from services.session_manager import session_manager | |
| router = APIRouter(prefix="/api/v1", tags=["MasterLLM API"]) | |
| # ======================== | |
| # REQUEST/RESPONSE MODELS | |
| # ======================== | |
| class PipelineRequest(BaseModel): | |
| user_input: str | |
| file_path: Optional[str] = None | |
| session_id: Optional[str] = None | |
| prefer_bedrock: bool = True | |
| class ExecuteRequest(BaseModel): | |
| pipeline: Dict[str, Any] | |
| file_path: str | |
| session_id: Optional[str] = None | |
| prefer_bedrock: bool = True | |
| class SessionCreate(BaseModel): | |
| user_id: Optional[str] = None | |
| metadata: Optional[Dict[str, Any]] = None | |
| class MessageAdd(BaseModel): | |
| role: str | |
| content: str | |
| metadata: Optional[Dict[str, Any]] = None | |
| # ======================== | |
| # SESSION ENDPOINTS | |
| # ======================== | |
| async def create_session(request: SessionCreate): | |
| """Create a new user session""" | |
| try: | |
| session_id = session_manager.create_session( | |
| user_id=request.user_id, | |
| metadata=request.metadata | |
| ) | |
| return { | |
| "success": True, | |
| "session_id": session_id, | |
| "message": "Session created successfully" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def get_session(session_id: str): | |
| """Get session data""" | |
| session = session_manager.get_session(session_id) | |
| if not session: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return { | |
| "success": True, | |
| "session": session | |
| } | |
| async def get_session_stats(session_id: str): | |
| """Get session statistics""" | |
| stats = session_manager.get_session_stats(session_id) | |
| if not stats: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return { | |
| "success": True, | |
| "stats": stats | |
| } | |
| async def get_session_history(session_id: str, limit: int = 50): | |
| """Get conversation history for a session""" | |
| history = session_manager.get_session_history(session_id, limit) | |
| return { | |
| "success": True, | |
| "history": history, | |
| "count": len(history) | |
| } | |
| async def add_message(session_id: str, message: MessageAdd): | |
| """Add a message to session history""" | |
| success = session_manager.add_message( | |
| session_id=session_id, | |
| role=message.role, | |
| content=message.content, | |
| metadata=message.metadata | |
| ) | |
| if not success: | |
| raise HTTPException(status_code=500, detail="Failed to add message") | |
| return { | |
| "success": True, | |
| "message": "Message added successfully" | |
| } | |
| # ======================== | |
| # PIPELINE GENERATION ENDPOINTS | |
| # ======================== | |
| async def generate_pipeline_api(request: PipelineRequest): | |
| """ | |
| Generate a pipeline from user input using Bedrock (priority) or Gemini (fallback) | |
| """ | |
| try: | |
| pipeline = generate_pipeline( | |
| user_input=request.user_input, | |
| file_path=request.file_path, | |
| prefer_bedrock=request.prefer_bedrock | |
| ) | |
| # Add to session if provided | |
| if request.session_id: | |
| session_manager.update_session( | |
| request.session_id, | |
| { | |
| "proposed_pipeline": pipeline, | |
| "state": "pipeline_proposed" | |
| } | |
| ) | |
| # Format for display | |
| formatted = format_pipeline_for_display(pipeline) | |
| return { | |
| "success": True, | |
| "pipeline": pipeline, | |
| "formatted_display": formatted, | |
| "generator": pipeline.get("_generator"), | |
| "model": pipeline.get("_model") | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ======================== | |
| # PIPELINE EXECUTION ENDPOINTS | |
| # ======================== | |
| async def execute_pipeline_api(request: ExecuteRequest): | |
| """ | |
| Execute a pipeline (non-streaming) using Bedrock (priority) or CrewAI (fallback) | |
| """ | |
| try: | |
| result = execute_pipeline( | |
| pipeline=request.pipeline, | |
| file_path=request.file_path, | |
| session_id=request.session_id, | |
| prefer_bedrock=request.prefer_bedrock | |
| ) | |
| # Save execution to session | |
| if request.session_id: | |
| session_manager.save_pipeline_execution( | |
| session_id=request.session_id, | |
| pipeline=request.pipeline, | |
| result=result, | |
| file_path=request.file_path, | |
| executor=result.get("executor", "unknown") | |
| ) | |
| session_manager.update_session( | |
| request.session_id, | |
| { | |
| "state": "completed", | |
| "last_result": result | |
| } | |
| ) | |
| return { | |
| "success": True, | |
| "result": result, | |
| "executor": result.get("executor") | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def execute_pipeline_stream_api(request: ExecuteRequest): | |
| """ | |
| Execute a pipeline with streaming updates using Bedrock (priority) or CrewAI (fallback) | |
| """ | |
| def event_stream(): | |
| try: | |
| for event in execute_pipeline_streaming( | |
| pipeline=request.pipeline, | |
| file_path=request.file_path, | |
| session_id=request.session_id, | |
| prefer_bedrock=request.prefer_bedrock | |
| ): | |
| # Format as Server-Sent Events | |
| yield f"data: {json.dumps(event)}\n\n" | |
| # Save final result to session | |
| if event.get("type") == "final" and request.session_id: | |
| session_manager.save_pipeline_execution( | |
| session_id=request.session_id, | |
| pipeline=request.pipeline, | |
| result=event.get("data"), | |
| file_path=request.file_path, | |
| executor=event.get("executor", "unknown") | |
| ) | |
| except Exception as e: | |
| error_event = { | |
| "type": "error", | |
| "error": str(e) | |
| } | |
| yield f"data: {json.dumps(error_event)}\n\n" | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream" | |
| ) | |
| # ======================== | |
| # FILE UPLOAD ENDPOINT | |
| # ======================== | |
| async def upload_file( | |
| file: UploadFile = File(...), | |
| session_id: Optional[str] = Form(None) | |
| ): | |
| """ | |
| Upload a document for processing | |
| """ | |
| try: | |
| # Create uploads directory if it doesn't exist | |
| upload_dir = "uploads" | |
| os.makedirs(upload_dir, exist_ok=True) | |
| # Generate unique filename | |
| file_ext = os.path.splitext(file.filename)[1] | |
| unique_filename = f"{uuid.uuid4()}{file_ext}" | |
| file_path = os.path.join(upload_dir, unique_filename) | |
| # Save file | |
| with open(file_path, "wb") as f: | |
| content = await file.read() | |
| f.write(content) | |
| # Update session if provided | |
| if session_id: | |
| session_manager.update_session( | |
| session_id, | |
| {"current_file": file_path} | |
| ) | |
| return { | |
| "success": True, | |
| "file_path": file_path, | |
| "filename": file.filename, | |
| "size_bytes": len(content) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # ======================== | |
| # PIPELINE HISTORY ENDPOINTS | |
| # ======================== | |
| async def get_pipeline_history( | |
| session_id: Optional[str] = None, | |
| limit: int = 10 | |
| ): | |
| """Get pipeline execution history""" | |
| executions = session_manager.get_pipeline_executions( | |
| session_id=session_id, | |
| limit=limit | |
| ) | |
| return { | |
| "success": True, | |
| "executions": executions, | |
| "count": len(executions) | |
| } | |
| async def get_pipeline_stats(): | |
| """Get overall pipeline execution statistics""" | |
| # This would query the pipeline executions collection | |
| # For now, return basic stats | |
| return { | |
| "success": True, | |
| "stats": { | |
| "total_executions": 0, | |
| "bedrock_executions": 0, | |
| "crewai_executions": 0, | |
| "avg_duration_seconds": 0 | |
| } | |
| } | |
| # ======================== | |
| # HEALTH CHECK | |
| # ======================== | |
| async def health_check(): | |
| """API health check""" | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.now().isoformat(), | |
| "version": "2.0.0", | |
| "features": { | |
| "bedrock_available": os.getenv("AWS_ACCESS_KEY_ID") is not None, | |
| "gemini_available": os.getenv("GOOGLE_API_KEY") is not None, | |
| "mongodb_connected": session_manager.sessions_col is not None | |
| } | |
| } | |