Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException, UploadFile, File, Request, Form | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel, Field | |
| from typing import Optional, List, Dict, Any, Generator, Callable, Tuple | |
| import json | |
| import os | |
| from datetime import datetime, timedelta | |
| from urllib.parse import urlparse | |
| import tempfile | |
| import uuid | |
| # AWS S3 (server-side access, no presigned URLs) | |
| import boto3 | |
| from boto3.s3.transfer import TransferConfig | |
| from botocore.exceptions import ClientError | |
| # Services | |
| from services.pipeline_generator import generate_pipeline, format_pipeline_for_display | |
| from services.pipeline_executor import execute_pipeline_streaming, execute_pipeline | |
| from services.session_manager import session_manager | |
| from services.intent_classifier import intent_classifier | |
| router = APIRouter(prefix="/api/v2", tags=["MasterLLM API V2 - Enhanced"]) | |
| # ======================== | |
| # CONFIG: S3 | |
| # ======================== | |
| AWS_REGION = os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1" | |
| S3_BUCKET = os.getenv("S3_BUCKET") or os.getenv("S3_BUCKET_NAME") | |
| S3_PREFIX = os.getenv("S3_PREFIX", "masterllm") | |
| if not S3_BUCKET: | |
| raise RuntimeError("Missing S3 bucket. Set S3_BUCKET (or S3_BUCKET_NAME).") | |
| s3 = boto3.client("s3", region_name=AWS_REGION) | |
| # ======================== | |
| # MODELS | |
| # ======================== | |
| class Message(BaseModel): | |
| message_id: Optional[str] = None # Unique message identifier | |
| role: str | |
| content: str | |
| timestamp: Optional[str] = None | |
| file: Optional[bool] = None | |
| fileName: Optional[str] = None | |
| fileUrl: Optional[str] = None | |
| class ChatResponse(BaseModel): | |
| message_id: Optional[str] = None # Message ID for assistant response | |
| assistant_response: str | |
| output: Dict[str, Any] = Field(default_factory=dict) | |
| final_output: Optional[Dict[str, Any]] = None | |
| hasError: bool = False # Error flag | |
| exception: Optional[str] = None | |
| api_response: Dict[str, Any] | |
| intent: Dict[str, Any] | |
| chat_id: str | |
| chat_name: Optional[str] = None | |
| state: str | |
| # REMOVED: history field (use separate endpoint to get messages) | |
| file: Optional[bool] = None | |
| fileName: Optional[str] = None | |
| fileUrl: Optional[str] = None | |
| # ======================== | |
| # V3 ARCHITECTURE HELPERS | |
| # ======================== | |
| def _get_conversation_s3_key(session_id: str) -> str: | |
| """Return the canonical S3 key for a session's conversation.""" | |
| return f"{S3_PREFIX}/conversations/{session_id}.json" | |
| def _save_conversation_to_s3(session_id: str, messages: List[Dict[str, Any]], update_activity: bool = False): | |
| """ | |
| V3 RULE: Persist full conversation history to S3. | |
| MongoDB stores metadata only. | |
| This overwrites the existing file on every new message (simple append architecture). | |
| Args: | |
| session_id: Session ID | |
| messages: List of messages to save | |
| update_activity: If True, update last_activity timestamp (only for user messages) | |
| """ | |
| key = _get_conversation_s3_key(session_id) | |
| try: | |
| s3.put_object( | |
| Bucket=S3_BUCKET, | |
| Key=key, | |
| Body=json.dumps(messages, ensure_ascii=False), | |
| ContentType="application/json" | |
| ) | |
| # Update MongoDB with reference and stats | |
| updates = { | |
| "conversation_s3_key": key, | |
| "last_message_count": len(messages), | |
| "last_updated": datetime.utcnow().isoformat() + "Z", | |
| "stats.message_count": len(messages) | |
| } | |
| # Only update last_activity if explicitly requested (for user messages) | |
| if update_activity: | |
| updates["last_activity"] = datetime.utcnow().isoformat() + "Z" | |
| session_manager.update_session(session_id, updates, update_activity=False) | |
| except Exception as e: | |
| print(f"CRITICAL ERROR saving conversation to S3: {e}") | |
| # In a real system, we might want to retry or circuit break | |
| raise | |
| def _load_conversation_from_s3(session_id: str) -> List[Dict[str, Any]]: | |
| """ | |
| V3 RULE: Read conversation history from S3. | |
| """ | |
| key = _get_conversation_s3_key(session_id) | |
| try: | |
| response = s3.get_object(Bucket=S3_BUCKET, Key=key) | |
| content = response["Body"].read().decode("utf-8") | |
| return json.loads(content) | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "NoSuchKey": | |
| return [] | |
| print(f"Error loading conversation from S3: {e}") | |
| return [] | |
| except Exception as e: | |
| print(f"Unexpected error loading from S3: {e}") | |
| return [] | |
| def _validate_conversation_integrity(session_id: str) -> Dict[str, Any]: | |
| """ | |
| V3 RULE: Check for corrupted state (MongoDB exists but S3 missing). | |
| """ | |
| session = session_manager.get_session(session_id) | |
| if not session: | |
| return {"valid": False, "error": "Session not found in MongoDB"} | |
| key = session.get("conversation_s3_key") | |
| if not key: | |
| # Transitional state: Old session without S3 key might be migrated | |
| # Check if default key exists | |
| default_key = _get_conversation_s3_key(session_id) | |
| try: | |
| s3.head_object(Bucket=S3_BUCKET, Key=default_key) | |
| # It exists, just missing metadata. Fix it. | |
| session_manager.update_session(session_id, {"conversation_s3_key": default_key}) | |
| return {"valid": True, "note": "Fixed missing s3_key metadata"} | |
| except ClientError: | |
| # S3 missing. Migrate old MongoDB messages to S3 (V3 migration path) | |
| msgs = session.get("messages", []) | |
| if msgs: | |
| # Auto-migrate old session to V3 | |
| try: | |
| _save_conversation_to_s3(session_id, msgs, update_activity=False) | |
| # Clear old MongoDB messages after successful migration | |
| session_manager.update_session(session_id, {"messages": []}) | |
| return {"valid": True, "note": "Migrated old MongoDB messages to S3"} | |
| except Exception as e: | |
| return {"valid": False, "error": f"Migration failed: {str(e)}"} | |
| # New empty session - initialize S3 | |
| _save_conversation_to_s3(session_id, [], update_activity=False) | |
| return {"valid": True, "note": "Initialized new session"} | |
| try: | |
| s3.head_object(Bucket=S3_BUCKET, Key=key) | |
| return {"valid": True} | |
| except ClientError: | |
| return {"valid": False, "error": f"CORRUPTED: S3 object {key} missing but referenced in MongoDB"} | |
| def _validate_pipeline_integrity(pipeline_id: str, session_id: str) -> Dict[str, Any]: | |
| """ | |
| V3 RULE: Check for corrupted pipeline state (MongoDB exists but S3 missing). | |
| """ | |
| # 1. Check MongoDB metadata exists | |
| session = session_manager.get_session(session_id) | |
| if not session: | |
| return {"valid": False, "error": "Session not found in MongoDB"} | |
| # Find pipeline in pipelines_history | |
| pipelines = session.get("pipelines_history", []) | |
| pipeline_meta = None | |
| for p in pipelines: | |
| if p.get("pipeline_id") == pipeline_id: | |
| pipeline_meta = p | |
| break | |
| if not pipeline_meta: | |
| return {"valid": False, "error": f"Pipeline {pipeline_id} not found in session metadata"} | |
| # 2. Check pipeline_s3_key is set | |
| s3_key = pipeline_meta.get("pipeline_s3_key") | |
| if not s3_key: | |
| # Try default key format | |
| default_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json" | |
| try: | |
| s3.head_object(Bucket=S3_BUCKET, Key=default_key) | |
| # Exists but metadata missing - fix it | |
| pipeline_meta["pipeline_s3_key"] = default_key | |
| session_manager.update_session(session_id, {"pipelines_history": pipelines}) | |
| return {"valid": True, "note": "Fixed missing pipeline_s3_key metadata"} | |
| except ClientError: | |
| return {"valid": False, "error": f"CORRUPTED: Pipeline {pipeline_id} has no S3 key and file is missing"} | |
| # 3. Verify S3 file actually exists | |
| try: | |
| s3.head_object(Bucket=S3_BUCKET, Key=s3_key) | |
| return {"valid": True} | |
| except ClientError: | |
| return {"valid": False, "error": f"CORRUPTED: S3 pipeline file {s3_key} missing but referenced in MongoDB"} | |
| # --- PIPELINE LIFECYCLE HELPERS --- | |
| def _create_pipeline_record( | |
| session_id: str, | |
| pipeline_def: Dict[str, Any], | |
| status: str = "proposed", | |
| created_from: str = "request" | |
| ) -> str: | |
| """ | |
| V3 RULE: Create pipeline_id and persist to S3 BEFORE execution. | |
| MongoDB stores metadata only. | |
| Returns: pipeline_id | |
| """ | |
| pipeline_id = pipeline_def.get("pipeline_id") | |
| if not pipeline_id: | |
| pipeline_id = str(uuid.uuid4()) | |
| pipeline_def["pipeline_id"] = pipeline_id | |
| # S3 Key | |
| s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json" | |
| pipeline_def["pipeline_s3_key"] = s3_key | |
| pipeline_def["session_id"] = session_id | |
| pipeline_def["created_at"] = datetime.utcnow().isoformat() + "Z" | |
| # 1. Upload definition to S3 | |
| try: | |
| s3.put_object( | |
| Bucket=S3_BUCKET, | |
| Key=s3_key, | |
| Body=json.dumps(pipeline_def, ensure_ascii=False), | |
| ContentType="application/json" | |
| ) | |
| except Exception as e: | |
| print(f"CRITICAL ERROR saving pipeline to S3: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to persist pipeline definition") | |
| # 2. Create MongoDB Metadata Record (Append to pipelines_history in session) | |
| pipeline_meta = { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": pipeline_def.get("pipeline_name"), | |
| "status": status, | |
| "created_at": pipeline_def["created_at"], | |
| "created_from": created_from, | |
| "model_provider": pipeline_def.get("_model_provider"), | |
| "model_name": pipeline_def.get("_model"), | |
| "pipeline_s3_key": s3_key, | |
| "result_preview": None | |
| } | |
| # V3 CRITICAL: Update session.pipelines_history | |
| current_session = session_manager.get_session(session_id) or {} | |
| hist = list(current_session.get("pipelines_history", [])) | |
| hist.insert(0, pipeline_meta) | |
| session_manager.update_session(session_id, {"pipelines_history": hist}) | |
| # V3 CRITICAL: Also create record in pipelines collection | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| pipeline_mgr.create_pipeline_metadata( | |
| pipeline_id=pipeline_id, | |
| session_id=session_id, | |
| pipeline_name=pipeline_def.get("pipeline_name", "Untitled"), | |
| s3_key=s3_key, | |
| status=status, | |
| created_by_message="" | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Failed to create pipelines collection record: {e}") | |
| return pipeline_id | |
| def _update_pipeline_status(pipeline_id: str, session_id: str, status: str, result: Optional[Dict] = None): | |
| """ | |
| Update status in MongoDB and result in S3. | |
| """ | |
| # 1. Update S3 with result if provided | |
| if result: | |
| try: | |
| s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json" | |
| resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key) | |
| data = json.loads(resp["Body"].read().decode("utf-8")) | |
| data["status"] = status | |
| data["result"] = result | |
| data["updated_at"] = datetime.utcnow().isoformat() + "Z" | |
| s3.put_object( | |
| Bucket=S3_BUCKET, | |
| Key=s3_key, | |
| Body=json.dumps(data, ensure_ascii=False), | |
| ContentType="application/json" | |
| ) | |
| except Exception as e: | |
| print(f"Error updating pipeline S3: {e}") | |
| # 2. Update MongoDB Metadata | |
| try: | |
| session = session_manager.get_session(session_id) | |
| if session: | |
| hist = list(session.get("pipelines_history", [])) | |
| for p in hist: | |
| if p.get("pipeline_id") == pipeline_id: | |
| p["status"] = status | |
| p["updated_at"] = datetime.utcnow().isoformat() + "Z" | |
| if result: | |
| text = _extract_user_facing_text(result) | |
| if isinstance(text, str): | |
| p["result_preview"] = text[:500] | |
| break | |
| session_manager.update_session(session_id, {"pipelines_history": hist}) | |
| # 3. Also update pipelines collection | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| pipeline_mgr.update_pipeline_status( | |
| pipeline_id=pipeline_id, | |
| status=status | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Failed to update pipelines collection: {e}") | |
| except Exception: | |
| pass | |
| def _record_model_attribution( | |
| pipeline_id: str, | |
| session_id: str, | |
| model_provider: str, | |
| model_name: str, | |
| is_fallback: bool | |
| ): | |
| """ | |
| V3 RULE: Track model usage for every pipeline execution. | |
| """ | |
| if "claude" in model_name.lower() and "bedrock" not in model_provider.lower(): | |
| if "anthropic" in model_name.lower() or "claude" in model_name.lower(): | |
| print(f"WARNING: Forbidden model detected: {model_name}") | |
| try: | |
| session = session_manager.get_session(session_id) | |
| if session: | |
| hist = list(session.get("pipelines_history", [])) | |
| for p in hist: | |
| if p.get("pipeline_id") == pipeline_id: | |
| p["model_provider"] = model_provider | |
| p["model_name"] = model_name | |
| p["is_fallback"] = is_fallback | |
| break | |
| session_manager.update_session(session_id, {"pipelines_history": hist}) | |
| except Exception: | |
| pass | |
| ## helpers for presigned url chat name and some more updates | |
| def _is_simple_message(message: str) -> bool: | |
| """ | |
| Check if message is a simple greeting or test message that should use timestamp naming. | |
| """ | |
| if not message or len(message.strip()) > 30: | |
| return False | |
| simple_patterns = [ | |
| "hello", "hi", "hey", "test", "testing", "hola", "bonjour", | |
| "namaste", "greetings", "good morning", "good afternoon", "good evening" | |
| ] | |
| msg_lower = message.lower().strip() | |
| return any(pattern in msg_lower for pattern in simple_patterns) | |
| def _generate_chat_name_with_gemini(user_message: str, file_name: Optional[str] = None) -> str: | |
| """ | |
| Generate a creative chat name using Gemini model. | |
| Returns generated name or falls back to timestamp on error. | |
| """ | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") | |
| GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.0-flash") | |
| GEMINI_ENDPOINT = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL}:generateContent" | |
| if not GEMINI_API_KEY: | |
| # Fallback to timestamp if no API key | |
| return f"Chat - {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}" | |
| # Build prompt | |
| prompt = ( | |
| "Create a succinct, creative, and descriptive 3-6 word title for this chat session.\n" | |
| "The title should capture the essence of what the user wants to do.\n" | |
| "Return ONLY the title, without quotes or extra text.\n\n" | |
| f"User's first message: {user_message}\n" | |
| ) | |
| if file_name: | |
| prompt += f"File uploaded: {file_name}\n" | |
| try: | |
| import requests | |
| response = requests.post( | |
| f"{GEMINI_ENDPOINT}?key={GEMINI_API_KEY}", | |
| headers={"Content-Type": "application/json"}, | |
| json={ | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.7, | |
| "maxOutputTokens": 50, | |
| } | |
| }, | |
| timeout=5, # Short timeout to avoid blocking | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| # Extract text from Gemini response | |
| title = result["candidates"][0]["content"]["parts"][0]["text"] | |
| title = title.strip().strip('"').strip("'").strip() | |
| # Validate title length (should be reasonable) | |
| if len(title) > 100: | |
| title = title[:100] | |
| return title or f"Chat - {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}" | |
| except Exception as e: | |
| print(f"Gemini chat name generation failed: {e}") | |
| # Fallback to timestamp | |
| return f"Chat - {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}" | |
| def _maybe_generate_chat_name(chat_id: str): | |
| """ | |
| Auto-generate a chat title after the first real user message. | |
| Uses Gemini for creative naming, or timestamp for simple messages. | |
| """ | |
| try: | |
| s = session_manager.get_session(chat_id) or {} | |
| # Skip if chat name already exists | |
| if s.get("chat_name"): | |
| return | |
| # Load messages from S3 (V3 architecture) | |
| msgs = _load_conversation_from_s3(chat_id) | |
| # Find first real user message (not file upload) | |
| first_user = None | |
| for m in msgs: | |
| if (m.get("role") or "") == "user": | |
| content = (m.get("content") or "").strip() | |
| if not content.lower().startswith("uploaded file:"): | |
| first_user = content | |
| break | |
| if not first_user: | |
| return | |
| # Get file name if available | |
| file_name = (s.get("file_metadata") or {}).get("file_name") | |
| # Check if it's a simple message | |
| if _is_simple_message(first_user): | |
| # Use timestamp for simple greetings | |
| title = f"Chat - {datetime.utcnow().strftime('%Y-%m-%d %H:%M')}" | |
| model_used = "timestamp" | |
| else: | |
| # Use Gemini for creative naming | |
| title = _generate_chat_name_with_gemini(first_user, file_name) | |
| model_used = os.getenv("GEMINI_MODEL", "gemini-2.0-flash") | |
| # Update session with chat name | |
| session_manager.update_session( | |
| chat_id, | |
| { | |
| "chat_name": title[:100], | |
| "chat_name_generated_at": datetime.utcnow().isoformat() + "Z", | |
| "chat_name_model": model_used, | |
| }, | |
| ) | |
| print(f"✅ Generated chat name for {chat_id}: '{title}' (using {model_used})") | |
| except Exception as e: | |
| print(f"Error generating chat name: {e}") | |
| # Don't fail the request if chat naming fails | |
| pass | |
| def _generate_presigned_get_url(bucket: str, key: str, expires_in: int = 604800) -> Dict[str, str]: | |
| """ | |
| CHANGE: NEW helper. | |
| Generate a presigned S3 GET URL with max expiry (7 days). We only generate once on upload. | |
| """ | |
| try: | |
| url = s3.generate_presigned_url( | |
| "get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires_in | |
| ) | |
| expires_at = (datetime.utcnow() + timedelta(seconds=expires_in)).isoformat() + "Z" | |
| return {"presigned_url": url, "presigned_expires_at": expires_at} | |
| except Exception: | |
| return {} | |
| def _extract_user_facing_text(obj: Any) -> str: | |
| """ | |
| CHANGE: NEW helper. | |
| Heuristically extract user-facing text from pipeline results. | |
| """ | |
| try: | |
| if isinstance(obj, str): | |
| return obj | |
| if isinstance(obj, dict): | |
| for k in ["summary", "final_text", "content", "text", "output"]: | |
| v = obj.get(k) | |
| if isinstance(v, str) and v.strip(): | |
| return v.strip() | |
| if isinstance(v, list): | |
| texts = [x for x in v if isinstance(x, str)] | |
| if texts: | |
| return "\n".join(texts[:3]).strip() | |
| return json.dumps(obj, ensure_ascii=False)[:2000] | |
| if isinstance(obj, list): | |
| texts = [x for x in obj if isinstance(x, str)] | |
| if texts: | |
| return "\n".join(texts[:5]) | |
| return json.dumps(obj, ensure_ascii=False)[:2000] | |
| return "" | |
| except Exception: | |
| return "" | |
| # ======================== | |
| # HELPERS | |
| # ======================== | |
| def _ensure_chat(chat_id: Optional[str]) -> str: | |
| """ | |
| Ensure a chat exists; if not provided or missing, create a new one. | |
| """ | |
| if chat_id and session_manager.get_session(chat_id): | |
| return chat_id | |
| new_id = session_manager.create_session() | |
| session_manager.get_session(new_id) | |
| return new_id | |
| def _get_session_or_init(chat_id: str): | |
| """ | |
| Always return a session dict; initialize if missing. | |
| V3 RULE: Validate S3 integrity. | |
| """ | |
| session = session_manager.get_session(chat_id) | |
| if not session: | |
| # Create new session (metadata in Mongo) | |
| session_manager.update_session(chat_id, {"state": "initial"}) | |
| # Initialize S3 conversation file (Source of Truth) | |
| _save_conversation_to_s3(chat_id, [], update_activity=False) | |
| return session_manager.get_session(chat_id) or {"state": "initial"} | |
| # Validate integrity of existing session | |
| check = _validate_conversation_integrity(chat_id) | |
| if not check["valid"]: | |
| raise HTTPException(status_code=500, detail=f"Session Integrity Failure: {check.get('error')}") | |
| return session | |
| def _normalize_components_for_api(executed_components: List[Dict[str, Any]], pipeline_id: Optional[str]) -> List[Dict[str, Any]]: | |
| """ | |
| Convert components_executed[] into schema-aligned components[] with: | |
| - component_id, step_id, tool_name, status (success/failed/running) | |
| - hasError/error | |
| - parameters (merged start_page/end_page + params) | |
| - component_output (original result) | |
| - meta (executor, execution_time, tool_version, success) | |
| """ | |
| normalized: List[Dict[str, Any]] = [] | |
| for comp in executed_components or []: | |
| step_number = comp.get("step_number") or comp.get("step_id") or comp.get("step") | |
| tool_name = comp.get("tool_name") or comp.get("tool") | |
| params = comp.get("params") or {} | |
| start_page = comp.get("start_page") | |
| end_page = comp.get("end_page") | |
| parameters = {} | |
| if start_page is not None: | |
| parameters["start_page"] = start_page | |
| if end_page is not None: | |
| parameters["end_page"] = end_page | |
| if isinstance(params, dict): | |
| parameters.update(params) | |
| raw_status = comp.get("status") | |
| # Map "completed" -> "success" to align with final API | |
| status = "success" if raw_status == "completed" else (raw_status or ("failed" if comp.get("error") else "running")) | |
| normalized.append({ | |
| "component_id": f"{pipeline_id}:{step_number}" if pipeline_id and step_number is not None else f"{pipeline_id}:{tool_name}", | |
| "step_id": step_number, | |
| "tool_name": tool_name, | |
| "status": status, | |
| "hasError": bool(comp.get("error")) or status == "failed", | |
| "error": comp.get("error"), | |
| "parameters": parameters, | |
| "component_output": comp.get("result"), | |
| "meta": { | |
| "executor": comp.get("executor"), | |
| "execution_time": comp.get("execution_time"), | |
| "tool_version": comp.get("tool_version"), | |
| "success": comp.get("success"), | |
| } | |
| }) | |
| return normalized | |
| def _build_api_result_summary(result: Dict[str, Any], session: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Build api_response.result with a concise summary + metadata. | |
| """ | |
| status = result.get("status") or "completed" | |
| executor = result.get("executor") or (result.get("metadata") or {}).get("executor") or "unknown" | |
| # Best-effort pages processed from components | |
| pages_set = set() | |
| for comp in result.get("components_executed", []): | |
| sp, ep = comp.get("start_page"), comp.get("end_page") | |
| if sp is not None and ep is not None: | |
| try: | |
| pages_set.update(range(int(sp), int(ep) + 1)) | |
| except Exception: | |
| pass | |
| pages_processed = len(pages_set) if pages_set else None | |
| source_file = (session.get("file_metadata") or {}).get("file_name") | |
| summary = f"Pipeline execution {status} using {executor}." | |
| metadata = { | |
| "source_file": source_file, | |
| "processed_at": datetime.utcnow().isoformat() + "Z", | |
| "executor": executor | |
| } | |
| if pages_processed is not None: | |
| metadata["pages_processed"] = pages_processed | |
| return {"summary": summary, "metadata": metadata} | |
| def _add_and_mirror_message( | |
| chat_id: str, | |
| role: str, | |
| content: str, | |
| *, | |
| pipeline_id: Optional[str] = None, | |
| pipeline_action: Optional[str] = None, # "created" | "executed" | "failed" | |
| pipeline_status: Optional[str] = None, | |
| pipeline_result: Optional[dict] = None, | |
| file_metadata: Optional[dict] = None, | |
| ): | |
| """ | |
| V3 RULE: Append message to S3 conversation. | |
| Now properly includes pipeline_id and results in messages. | |
| Args: | |
| chat_id: Session ID | |
| role: "user" | "assistant" | "system" | |
| content: Message text | |
| pipeline_id: ID of related pipeline (if any) | |
| pipeline_action: "created" | "executed" | "failed" | |
| pipeline_status: Pipeline status (for executed pipelines) | |
| pipeline_result: Full pipeline result (for executed pipelines) | |
| file_metadata: File info if message has file attachment | |
| """ | |
| # 1. Load existing | |
| current_messages = _load_conversation_from_s3(chat_id) | |
| # 2. Create message | |
| new_msg = { | |
| "role": role, | |
| "content": content if isinstance(content, str) else json.dumps(content, ensure_ascii=False), | |
| "timestamp": datetime.utcnow().isoformat() + "Z" | |
| } | |
| # ✅ FIX: Add pipeline_id if provided | |
| if pipeline_id: | |
| new_msg["pipeline_id"] = pipeline_id | |
| # ✅ FIX: Add pipeline_action if provided | |
| if pipeline_action: | |
| new_msg["pipeline_action"] = pipeline_action | |
| # ✅ FIX: Add result for executed/failed pipelines | |
| if role == "assistant" and pipeline_action in ("executed", "failed") and pipeline_id: | |
| result_data = { | |
| "pipeline_id": pipeline_id, | |
| "status": pipeline_status or (pipeline_result.get("status") if pipeline_result else None) or pipeline_action, | |
| } | |
| if pipeline_result: | |
| # Extract user-facing text from result | |
| result_text = "" | |
| execution_results = pipeline_result.get("execution_results", pipeline_result) | |
| # Try to extract the main result text from various locations | |
| # Check for image descriptions | |
| if execution_results.get("image_descriptions"): | |
| image_desc = execution_results["image_descriptions"] | |
| if image_desc.get("result") and isinstance(image_desc["result"], list) and len(image_desc["result"]) > 0: | |
| page_result = image_desc["result"][0] | |
| if page_result.get("gemini", {}).get("description"): | |
| result_text = page_result["gemini"]["description"] | |
| elif page_result.get("mistral", {}).get("description"): | |
| result_text = page_result["mistral"]["description"] | |
| # Check for extracted text | |
| elif execution_results.get("text"): | |
| result_text = execution_results["text"] | |
| # Check for component results | |
| elif execution_results.get("components_executed"): | |
| for comp in execution_results.get("components_executed", []): | |
| comp_result = comp.get("result", {}) | |
| if comp_result.get("text"): | |
| result_text = comp_result["text"] | |
| break | |
| elif comp_result.get("image_descriptions"): | |
| image_desc = comp_result["image_descriptions"] | |
| if image_desc.get("result") and isinstance(image_desc["result"], list) and len(image_desc["result"]) > 0: | |
| page_result = image_desc["result"][0] | |
| if page_result.get("gemini", {}).get("description"): | |
| result_text = page_result["gemini"]["description"] | |
| break | |
| elif page_result.get("mistral", {}).get("description"): | |
| result_text = page_result["mistral"]["description"] | |
| break | |
| # Fallback to generic extraction | |
| if not result_text: | |
| result_text = _extract_user_facing_text(pipeline_result) | |
| result_data["text"] = result_text | |
| # Add error if present | |
| if pipeline_result.get("error"): | |
| result_data["error"] = pipeline_result.get("error") | |
| # Add summary stats | |
| if pipeline_result.get("completed_steps") is not None: | |
| result_data["completed_steps"] = pipeline_result.get("completed_steps") | |
| if pipeline_result.get("total_steps") is not None: | |
| result_data["total_steps"] = pipeline_result.get("total_steps") | |
| elif pipeline_action == "failed": | |
| result_data["error"] = "Pipeline execution failed" | |
| new_msg["result"] = result_data | |
| # Add file metadata if provided | |
| if file_metadata: | |
| new_msg["file_data"] = { | |
| "has_file": True, | |
| "file_name": file_metadata.get("fileName"), | |
| "file_url": file_metadata.get("fileUrl") or file_metadata.get("s3_uri") | |
| } | |
| else: | |
| new_msg["file_data"] = {"has_file": False} | |
| current_messages.append(new_msg) | |
| # 3. Save to S3 - update last_activity for all message interactions | |
| _save_conversation_to_s3(chat_id, current_messages, update_activity=True) | |
| # 4. Auto-generate chat name after first user message | |
| if role == "user": | |
| _maybe_generate_chat_name(chat_id) | |
| def _assistant_response_payload( | |
| chat_id: str, | |
| friendly_response: str, | |
| intent: Dict[str, Any], | |
| api_data: Dict[str, Any], | |
| state: str, | |
| output: Optional[Dict[str, Any]] = None, | |
| final_output: Optional[Dict[str, Any]] = None, | |
| exception: Optional[str] = None, | |
| pipeline_result: Optional[Dict[str, Any]] = None, | |
| pipeline_id: Optional[str] = None, | |
| pipeline_action: Optional[str] = None, # ✅ NEW: "created" | "executed" | "failed" | |
| ) -> ChatResponse: | |
| """ | |
| Create ChatResponse payload with all required fields. | |
| Args: | |
| chat_id: Session ID | |
| friendly_response: User-friendly message | |
| intent: Intent classification result | |
| api_data: Technical API response data | |
| state: Current session state | |
| output: Pipeline summary output | |
| final_output: Final downloadable result | |
| exception: Error message if any | |
| pipeline_result: Full pipeline execution result | |
| pipeline_id: Pipeline ID (for tracking in messages) | |
| pipeline_action: "created" | "executed" | "failed" | |
| """ | |
| # Generate message_id for assistant response | |
| from services.schemas import generate_message_id | |
| message_id = generate_message_id() | |
| # Determine pipeline status from result or exception | |
| pipeline_status = None | |
| if pipeline_result: | |
| pipeline_status = pipeline_result.get("status") | |
| elif exception: | |
| pipeline_status = "failed" | |
| # Persist assistant message to S3 WITH pipeline data | |
| _add_and_mirror_message( | |
| chat_id=chat_id, | |
| role="assistant", | |
| content=friendly_response, | |
| pipeline_id=pipeline_id, | |
| pipeline_action=pipeline_action, | |
| pipeline_status=pipeline_status, | |
| pipeline_result=pipeline_result, | |
| ) | |
| # Get file metadata from session | |
| session = session_manager.get_session(chat_id) or {} | |
| chat_name = session.get("chat_name") | |
| file_metadata = session.get("file_metadata", {}) | |
| return ChatResponse( | |
| message_id=message_id, | |
| assistant_response=friendly_response, | |
| output=output or {}, | |
| final_output=final_output, | |
| hasError=bool(exception), | |
| exception=exception, | |
| api_response=api_data, | |
| intent=intent, | |
| chat_id=chat_id, | |
| chat_name=chat_name, | |
| state=state, | |
| file=file_metadata.get("has_file", False), | |
| fileName=file_metadata.get("file_name"), | |
| fileUrl=file_metadata.get("file_url") | |
| ) | |
| def parse_s3_uri(uri: str) -> Tuple[str, str]: | |
| """ | |
| Parse s3://bucket/key to (bucket, key). | |
| """ | |
| p = urlparse(uri, allow_fragments=False) | |
| if p.scheme != "s3": | |
| raise ValueError(f"Not an S3 URI: {uri}") | |
| return p.netloc, p.path.lstrip("/") | |
| def download_to_temp_file(file_ref: Optional[str]) -> Tuple[Optional[str], Callable[[], None]]: | |
| """ | |
| If file_ref is an S3 URI, download to a temporary file and return (path, cleanup). | |
| If local path or None, return as-is and a no-op cleanup. | |
| """ | |
| def noop(): | |
| pass | |
| if not file_ref: | |
| return None, noop | |
| if isinstance(file_ref, str) and file_ref.startswith("s3://"): | |
| bucket, key = parse_s3_uri(file_ref) | |
| suffix = os.path.splitext(key)[1] or "" | |
| fd, temp_path = tempfile.mkstemp(prefix="masterllm_", suffix=suffix) | |
| os.close(fd) | |
| try: | |
| s3.download_file(bucket, key, temp_path) | |
| except ClientError as e: | |
| try: | |
| os.remove(temp_path) | |
| except Exception: | |
| pass | |
| raise RuntimeError(f"Failed to download from S3: {file_ref} ({e})") | |
| def cleanup(): | |
| try: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| except Exception: | |
| pass | |
| return temp_path, cleanup | |
| # Already a local path | |
| return file_ref, noop | |
| def upload_stream_to_s3(chat_id: str, file: UploadFile, create_message: bool = True) -> str: | |
| """ | |
| Stream an UploadFile directly to S3, return s3:// URI. | |
| Supports optional SSE via env S3_SSE and S3_KMS_KEY_ID. | |
| CHANGE: | |
| - Generate a presigned GET URL with max expiry (7 days) once at upload time. | |
| - Store presigned_url and presigned_expires_at in session.file_metadata (do not regenerate later). | |
| - create_message: If True, creates "Uploaded file" message. Set to False when file is sent with user message. | |
| """ | |
| key = f"{S3_PREFIX}/{chat_id}/{file.filename}" | |
| config = TransferConfig(multipart_threshold=8 * 1024 * 1024, max_concurrency=4) | |
| extra_args = {"ContentType": file.content_type or "application/octet-stream"} | |
| sse = os.getenv("S3_SSE", "").upper() | |
| if sse == "AES256": | |
| extra_args["ServerSideEncryption"] = "AES256" | |
| elif sse == "KMS": | |
| extra_args["ServerSideEncryption"] = "aws:kms" | |
| kms_key = os.getenv("S3_KMS_KEY_ID") | |
| if kms_key: | |
| extra_args["SSEKMSKeyId"] = kms_key | |
| try: | |
| s3.upload_fileobj( | |
| Fileobj=file.file, | |
| Bucket=S3_BUCKET, | |
| Key=key, | |
| ExtraArgs=extra_args, | |
| Config=config | |
| ) | |
| except ClientError as e: | |
| code = e.response.get("Error", {}).get("Code", "Unknown") | |
| msg = f"S3 upload failed: {code}. Check AWS credentials, permissions (s3:PutObject), region and bucket." | |
| raise HTTPException( | |
| status_code=403 if code in ("AccessDenied", "InvalidAccessKeyId", "SignatureDoesNotMatch") else 500, | |
| detail=msg | |
| ) | |
| s3_uri = f"s3://{S3_BUCKET}/{key}" | |
| # CHANGE: generate and store a single presigned URL (max 7 days) for later reuse | |
| presigned = _generate_presigned_get_url(S3_BUCKET, key, expires_in=604800) | |
| # Store file metadata in session (adds presigned fields) | |
| session_manager.update_session(chat_id, { | |
| "current_file": s3_uri, | |
| "state": "initial", | |
| "file_metadata": { | |
| "has_file": True, | |
| "file_name": file.filename, | |
| "file_url": presigned["presigned_url"], # CRITICAL: Use presigned URL, not S3 URI | |
| "s3_uri": s3_uri, # Keep for internal reference | |
| "uploaded_at": datetime.utcnow().isoformat() + "Z", | |
| **presigned # CHANGE | |
| } | |
| }) | |
| # Only create file upload message if requested (not when file is sent with user message) | |
| if create_message: | |
| # Create a user message with file metadata (instead of system message) | |
| file_message = { | |
| "role": "user", | |
| "content": f"Uploaded file: {file.filename}", | |
| "timestamp": datetime.utcnow().isoformat() + "Z", | |
| "file_data": { | |
| "has_file": True, | |
| "file_name": file.filename, | |
| "file_url": presigned["presigned_url"] # Use presigned URL for user access | |
| } | |
| } | |
| # V3 RULE: Append to S3 conversation history | |
| # 1. Load existing | |
| current_messages = _load_conversation_from_s3(chat_id) | |
| # 2. Append | |
| current_messages.append(file_message) | |
| # 3. Save to S3 (and update MongoDB metadata) | |
| _save_conversation_to_s3(chat_id, current_messages) | |
| return s3_uri | |
| # ======================== | |
| # GET ALL SESSIONS ENDPOINT | |
| # ======================== | |
| async def get_all_sessions( | |
| limit: int = 100, | |
| skip: int = 0, | |
| include_stats: bool = False | |
| ): | |
| # """ | |
| # Get all session IDs from the database with optional pagination | |
| # CHANGE: When include_stats=True, include 'chat_name' (added field only).""" | |
| try: | |
| all_session_ids = session_manager.get_all_session_ids() | |
| if not all_session_ids: | |
| return {"sessions": [], "pagination": {"total": 0, "returned": 0}} | |
| # Apply pagination | |
| total_sessions = len(all_session_ids) | |
| paginated_ids = all_session_ids[skip:skip + limit] | |
| if not include_stats: | |
| # Return just session IDs with pagination info | |
| sessions_basic = [ | |
| { | |
| "session_id": sid, | |
| "created_at": None, | |
| "last_activity": None | |
| } | |
| for sid in paginated_ids | |
| ] | |
| return { | |
| "sessions": sessions_basic, | |
| "pagination": { | |
| "total": total_sessions, | |
| "returned": len(sessions_basic), | |
| "limit": limit, | |
| "skip": skip, | |
| "has_more": total_sessions > (skip + limit) | |
| } | |
| } | |
| # Include detailed statistics for each session | |
| sessions_with_stats = [] | |
| for session_id in paginated_ids: | |
| session = session_manager.get_session(session_id) | |
| if session: | |
| # Format datetime objects for JSON serialization | |
| created_at = session.get("created_at") | |
| last_activity = session.get("last_activity") | |
| if isinstance(created_at, datetime): | |
| created_at = created_at.isoformat() | |
| if isinstance(last_activity, datetime): | |
| last_activity = last_activity.isoformat() | |
| sessions_with_stats.append({ | |
| "session_id": session_id, | |
| "user_id": session.get("user_id"), | |
| "created_at": created_at, | |
| "last_activity": last_activity, | |
| "state": session.get("state", "unknown"), | |
| # REMOVED: current_file (not needed for session list) | |
| "chat_name": session.get("chat_name"), # CHANGE: added field | |
| "stats": session.get("stats", {}), | |
| "total_messages": session.get("last_message_count", 0), | |
| "pipeline_executions_count": len(session.get("pipeline_executions", [])) | |
| }) | |
| # Sort by most recent activity first | |
| sessions_with_stats.sort( | |
| key=lambda s: s.get("last_activity") or s.get("created_at") or "", | |
| reverse=True | |
| ) | |
| return { | |
| "sessions": sessions_with_stats, | |
| "pagination": { | |
| "total": total_sessions, | |
| "returned": len(sessions_with_stats), | |
| "limit": limit, | |
| "skip": skip, | |
| "has_more": total_sessions > (skip + limit) | |
| } | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error retrieving sessions: {str(e)}" | |
| ) | |
| # ======================== | |
| # GET SESSION HISTORY ENDPOINT (with session_id in response) | |
| # ======================== | |
| async def get_session_history( | |
| session_id: str, | |
| limit: int = 50 | |
| ): | |
| """ | |
| Get conversation history for a session. | |
| V3 FIX: | |
| - pipeline_id and pipeline_action are now stored IN the messages | |
| - result is embedded for executed pipelines | |
| - Only enrich if result is missing (fallback) | |
| """ | |
| try: | |
| # Load conversation history from S3 | |
| history = _load_conversation_from_s3(session_id) | |
| enhanced_history = [] | |
| for msg in history: | |
| msg_copy = msg.copy() | |
| # Ensure message_id exists | |
| if "message_id" not in msg_copy: | |
| from services.schemas import generate_message_id | |
| msg_copy["message_id"] = generate_message_id() | |
| # Normalize timestamp | |
| if "timestamp" in msg_copy and isinstance(msg_copy["timestamp"], datetime): | |
| msg_copy["timestamp"] = msg_copy["timestamp"].isoformat() | |
| # ✅ pipeline_id and pipeline_action are now IN the message - no guessing needed! | |
| pipeline_id = msg_copy.get("pipeline_id") | |
| pipeline_action = msg_copy.get("pipeline_action") | |
| # ✅ Enrich ONLY if result is missing for executed/failed pipelines | |
| if ( | |
| pipeline_id | |
| and pipeline_action in ("executed", "failed") | |
| and not msg_copy.get("result") | |
| ): | |
| # Fallback: Load result from S3 pipeline file | |
| try: | |
| s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json" | |
| resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key) | |
| pipeline_data = json.loads(resp["Body"].read().decode("utf-8")) | |
| if "result" in pipeline_data: | |
| # Extract user-facing text | |
| result_text = _extract_user_facing_text(pipeline_data["result"]) | |
| msg_copy["result"] = { | |
| "pipeline_id": pipeline_id, | |
| "status": pipeline_data.get("status", pipeline_action), | |
| "text": result_text, | |
| } | |
| if pipeline_data["result"].get("error"): | |
| msg_copy["result"]["error"] = pipeline_data["result"]["error"] | |
| except Exception as e: | |
| # Non-fatal enrichment failure | |
| msg_copy["result"] = { | |
| "pipeline_id": pipeline_id, | |
| "status": "unknown", | |
| "error": f"Could not load result: {str(e)}" | |
| } | |
| # ✅ Add pipeline_metadata for created/executed pipelines | |
| if pipeline_id: | |
| try: | |
| s3_key = f"{S3_PREFIX}/pipelines/{pipeline_id}.json" | |
| resp = s3.get_object(Bucket=S3_BUCKET, Key=s3_key) | |
| pipeline_data = json.loads(resp["Body"].read().decode("utf-8")) | |
| msg_copy["pipeline_metadata"] = { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": pipeline_data.get("pipeline_name", "unknown"), | |
| "status": pipeline_data.get("status", "unknown"), | |
| "created_at": pipeline_data.get("created_at"), | |
| "updated_at": pipeline_data.get("updated_at"), | |
| } | |
| except Exception: | |
| # Keep existing pipeline_id, just no extra metadata | |
| pass | |
| enhanced_history.append(msg_copy) | |
| # Apply limit (keep most recent) | |
| if limit and len(enhanced_history) > limit: | |
| enhanced_history = enhanced_history[-limit:] | |
| # Load session metadata | |
| session = session_manager.get_session(session_id) or {} | |
| # ----- PIPELINES HISTORY ----- | |
| pipelines_hist = session.get("pipelines_history", []) | |
| enhanced_pipelines = [] | |
| for pipeline_meta in pipelines_hist: | |
| enhanced_pipe = pipeline_meta.copy() | |
| # Remove internal-only keys | |
| enhanced_pipe.pop("pipeline_s3_key", None) | |
| # Load full pipeline definition if available | |
| pipeline_s3_key = pipeline_meta.get("pipeline_s3_key") | |
| if pipeline_s3_key: | |
| try: | |
| resp = s3.get_object(Bucket=S3_BUCKET, Key=pipeline_s3_key) | |
| pipeline_def = json.loads(resp["Body"].read().decode("utf-8")) | |
| components = ( | |
| pipeline_def.get("components") | |
| or pipeline_def.get("pipeline_steps", []) | |
| ) | |
| enhanced_pipe["components"] = components | |
| enhanced_pipe["component_count"] = len(components) | |
| enhanced_pipe["tools"] = [ | |
| c.get("tool_name", c.get("tool", "unknown")) | |
| for c in components | |
| ] | |
| if "result" in pipeline_def: | |
| enhanced_pipe["execution_results"] = pipeline_def["result"] | |
| except Exception as e: | |
| enhanced_pipe["components"] = [] | |
| enhanced_pipe["component_count"] = 0 | |
| enhanced_pipe["tools"] = [] | |
| enhanced_pipe["load_error"] = str(e) | |
| else: | |
| enhanced_pipe["components"] = [] | |
| enhanced_pipe["component_count"] = 0 | |
| enhanced_pipe["tools"] = [] | |
| if "hasError" not in enhanced_pipe: | |
| enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed" | |
| enhanced_pipelines.append(enhanced_pipe) | |
| # Sort pipelines by recency | |
| enhanced_pipelines.sort( | |
| key=lambda p: p.get("updated_at") or p.get("created_at") or "", | |
| reverse=True | |
| ) | |
| return { | |
| "session_id": session_id, | |
| "history": enhanced_history, | |
| "count": len(enhanced_history), | |
| "limit": limit, | |
| "chat_name": session.get("chat_name"), | |
| "pipelines_history": enhanced_pipelines | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error retrieving session history: {str(e)}" | |
| ) | |
| async def delete_session(session_id: str): | |
| """ | |
| V3 RULE: Delete session metadata (MongoDB) and conversation (S3). | |
| Retain immutable pipeline history. | |
| """ | |
| # 1. Check existence | |
| s = session_manager.get_session(session_id) | |
| if not s: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| # 2. Delete S3 Conversation | |
| key = s.get("conversation_s3_key") or _get_conversation_s3_key(session_id) | |
| try: | |
| s3.delete_object(Bucket=S3_BUCKET, Key=key) | |
| except Exception as e: | |
| print(f"Warning: Failed to delete S3 conversation {key}: {e}") | |
| # 3. Delete MongoDB Metadata | |
| success = session_manager.delete_session(session_id) | |
| if not success: | |
| raise HTTPException(status_code=500, detail="Failed to delete session metadata") | |
| return {"status": "deleted", "session_id": session_id} | |
| async def get_session_pipelines(session_id: str): | |
| """ | |
| Get all pipeline executions for a session with full component details | |
| """ | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| # Get all pipelines for this session from pipeline_manager | |
| pipelines = pipeline_mgr.get_session_pipelines(session_id, limit=100) | |
| # Process each pipeline to add component details and remove internal fields | |
| enhanced_pipelines = [] | |
| for pipeline in pipelines: | |
| enhanced_pipe = pipeline.copy() | |
| # Rename result_preview to result if present | |
| if "result_preview" in enhanced_pipe: | |
| enhanced_pipe["result"] = enhanced_pipe.pop("result_preview") | |
| # Remove internal S3 keys | |
| enhanced_pipe.pop("pipeline_s3_key", None) | |
| enhanced_pipe.pop("pipeline_definition_s3_key", None) | |
| # Add hasError if not present | |
| if "hasError" not in enhanced_pipe: | |
| enhanced_pipe["hasError"] = enhanced_pipe.get("status") == "failed" | |
| # Ensure final_output_url is included (presigned URL) | |
| if "final_output_presigned_url" in enhanced_pipe: | |
| enhanced_pipe["final_output_url"] = enhanced_pipe["final_output_presigned_url"] | |
| # Components are already in the correct format from pipeline_manager | |
| # They include: component_id, status, component_output, hasError, error, metadata | |
| enhanced_pipelines.append(enhanced_pipe) | |
| return { | |
| "session_id": session_id, | |
| "pipelines": enhanced_pipelines | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error retrieving pipelines: {str(e)}" | |
| ) | |
| # ======================== | |
| # UNIFIED CHAT (non-streaming) | |
| # ======================== | |
| async def chat_unified( | |
| request: Request, | |
| chat_id: Optional[str] = Form(None), | |
| message: Optional[str] = Form(None), | |
| prefer_bedrock: Optional[bool] = Form(True), | |
| file: Optional[UploadFile] = File(None), | |
| ): | |
| """ | |
| One endpoint that behaves like the Gradio chatbot: | |
| - Accepts multipart/form-data (file + message) OR application/json. | |
| - If a file is included, it uploads to S3 and sets current_file. | |
| - Handles casual chat, pipeline request, approve/reject, and edits. | |
| - On approval, executes the pipeline (non-stream) and returns the final result. | |
| Returns assistant_response + full history (role/content). | |
| """ | |
| # Support JSON payloads too | |
| content_type = (request.headers.get("content-type") or "").lower() | |
| file_path_from_json = None | |
| if "application/json" in content_type: | |
| body = await request.json() | |
| chat_id = body.get("chat_id") or chat_id | |
| message = body.get("message") if "message" in body else message | |
| prefer_bedrock = body.get("prefer_bedrock", True) if "prefer_bedrock" in body else prefer_bedrock | |
| file_path_from_json = body.get("file_path") | |
| chat_id = _ensure_chat(chat_id) | |
| session = _get_session_or_init(chat_id) | |
| # If JSON included a file_path (e.g., s3://...), attach it | |
| if file_path_from_json: | |
| session_manager.update_session(chat_id, {"current_file": file_path_from_json}) | |
| _add_and_mirror_message(chat_id, "system", f"File attached: {file_path_from_json}") | |
| session = _get_session_or_init(chat_id) | |
| # If a file is included in the form, upload to S3 and attach it | |
| file_info = None | |
| if file is not None: | |
| # Don't create automatic "Uploaded file" message if user is also sending text | |
| has_user_message = message and str(message).strip() != "" | |
| s3_uri = upload_stream_to_s3(chat_id, file, create_message=not has_user_message) | |
| meta = (session_manager.get_session(chat_id) or {}).get("file_metadata", {}) or {} | |
| file_info = { | |
| "bucket": S3_BUCKET, | |
| "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], | |
| "s3_uri": s3_uri, | |
| "presigned_url": meta.get("presigned_url"), | |
| "presigned_expires_at": meta.get("presigned_expires_at") | |
| } | |
| session = _get_session_or_init(chat_id) | |
| # If no message and only a file was sent, respond with an acknowledgement | |
| if (message is None or str(message).strip() == "") and file_info: | |
| friendly = "📁 File uploaded successfully. Tell me what you'd like to do with it (e.g., extract text, get tables, summarize)." | |
| api_data = {"type": "file_uploaded", "file": file_info, "next_action": "send_instruction"} | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent={"intent": "file_uploaded"}, | |
| api_data=api_data, | |
| state=session.get("state", "initial") | |
| ) | |
| # If still no message, nudge the user | |
| if message is None or str(message).strip() == "": | |
| friendly = "Please provide a message (e.g., 'extract text', 'get tables', 'summarize')." | |
| api_data = {"type": "missing_message"} | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent={"intent": "missing_message"}, | |
| api_data=api_data, | |
| state=session.get("state", "initial") | |
| ) | |
| # Add user message (with file metadata if file was uploaded) | |
| file_meta_for_msg = None | |
| if file_info: | |
| file_meta_for_msg = { | |
| "fileName": file.filename if file else None, | |
| "fileUrl": file_info.get("presigned_url"), | |
| "s3_uri": file_info.get("s3_uri") | |
| } | |
| _add_and_mirror_message(chat_id, "user", message, file_metadata=file_meta_for_msg) | |
| _maybe_generate_chat_name(chat_id) | |
| # Classify intent | |
| intent_data = intent_classifier.classify_intent(message) | |
| current_state = session.get("state", "initial") | |
| try: | |
| # Casual chat | |
| if intent_data["intent"] == "casual_chat": | |
| friendly = intent_classifier.get_friendly_response("casual_chat", message) | |
| api_data = { | |
| "type": "casual_response", | |
| "message": friendly, | |
| "intent_classification": intent_data, | |
| "suggestions": [ | |
| "Upload a document to get started", | |
| "Ask 'what can you do?' to see capabilities", | |
| "Type 'help' for usage instructions" | |
| ] | |
| } | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state=current_state | |
| ) | |
| # Questions | |
| if intent_data["intent"] == "question": | |
| friendly = intent_classifier.get_friendly_response("question", message) | |
| api_data = {"type": "informational_response", "message": friendly, "intent_classification": intent_data} | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state=current_state | |
| ) | |
| # Unclear | |
| if intent_data["intent"] == "unclear": | |
| friendly = intent_classifier.get_friendly_response("unclear", message) | |
| api_data = { | |
| "type": "clarification_needed", | |
| "message": friendly, | |
| "intent_classification": intent_data, | |
| "suggestions": [ | |
| "Be more specific about what you want to do", | |
| "Use keywords like: extract, summarize, translate, etc.", | |
| "Type 'help' for examples" | |
| ] | |
| } | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state=current_state | |
| ) | |
| # Approval (execute now in unified endpoint) | |
| if intent_data["intent"] == "approval" and current_state == "pipeline_proposed": | |
| proposed = session.get("proposed_pipeline") | |
| if not proposed: | |
| msg = "No pipeline to approve. Please request a task first." | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=msg, | |
| intent=intent_data, | |
| api_data={"type": "error", "message": msg}, | |
| state=current_state | |
| ) | |
| file_ref = session.get("current_file") | |
| local_path, cleanup = download_to_temp_file(file_ref) | |
| session_manager.update_session(chat_id, {"state": "executing"}) | |
| # ✅ Get pipeline_id from proposed pipeline | |
| pipeline_id = proposed.get("pipeline_id") | |
| try: | |
| result = execute_pipeline( | |
| pipeline=proposed, | |
| file_path=local_path, | |
| session_id=chat_id, | |
| prefer_bedrock=bool(prefer_bedrock), | |
| ) | |
| session_manager.update_session(chat_id, {"pipeline_result": result, "state": "initial"}) | |
| # Check if pipeline actually succeeded or failed | |
| pipeline_status = result.get("status", "unknown") | |
| completed_steps = result.get("completed_steps", 0) | |
| total_steps = result.get("total_steps", 0) | |
| has_error = result.get("error") is not None or pipeline_status in ["failed", "partial"] | |
| # Determine if this is a real success or a failure | |
| is_success = (pipeline_status == "completed" and completed_steps == total_steps and not has_error) | |
| # V3: Update pipeline status in S3 | |
| if pipeline_id: | |
| final_status = "completed" if is_success else "failed" | |
| _update_pipeline_status(pipeline_id, chat_id, final_status, result=result) | |
| _record_model_attribution( | |
| pipeline_id=pipeline_id, | |
| session_id=chat_id, | |
| model_provider=proposed.get("_model_provider", "unknown"), | |
| model_name=proposed.get("_model", "unknown"), | |
| is_fallback=False | |
| ) | |
| # Build response based on actual success/failure | |
| if is_success: | |
| friendly = "🎉 Pipeline completed successfully!" | |
| output = { | |
| "component_summary": "Pipeline executed successfully", | |
| "steps": total_steps, | |
| "pipeline_id": pipeline_id | |
| } | |
| api_type = "pipeline_completed" | |
| exception_msg = None | |
| # Normalize components and result summary for api_response | |
| normalized_components = _normalize_components_for_api( | |
| result.get("components_executed", []), | |
| pipeline_id or proposed.get("pipeline_id") | |
| ) | |
| api_result = _build_api_result_summary(result, session) | |
| api_pipeline = { | |
| "pipeline_id": pipeline_id or proposed.get("pipeline_id"), | |
| "pipeline_name": proposed.get("pipeline_name"), | |
| "components": normalized_components | |
| } | |
| # Final output as a downloadable pointer ONLY | |
| final_output: Optional[Dict[str, Any]] = {} | |
| if pipeline_id: | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| pipeline_record = pipeline_mgr.get_pipeline(pipeline_id) | |
| if pipeline_record: | |
| if pipeline_record.get("output_id"): | |
| final_output["output_id"] = pipeline_record.get("output_id") | |
| if pipeline_record.get("final_output_presigned_url"): | |
| final_output["download_url"] = pipeline_record.get("final_output_presigned_url") | |
| except Exception as e: | |
| print(f"Warning: Could not get output_id/download_url: {e}") | |
| if not final_output: | |
| final_output = None | |
| # Build api_response data for success | |
| api_data = { | |
| "type": api_type, | |
| "result": api_result, | |
| "pipeline": api_pipeline | |
| } | |
| # ✅ Return response with pipeline_id and pipeline_action | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent={"intent": "pipeline_execute"}, | |
| api_data=api_data, | |
| state="initial", | |
| output=output, | |
| final_output=final_output, | |
| exception=exception_msg, | |
| pipeline_result=result, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="executed" # ✅ ADD THIS | |
| ) | |
| else: | |
| # Pipeline failed or partially completed | |
| error_msg = result.get("error", "Pipeline execution incomplete") | |
| # Check for component-level errors | |
| failed_components = [] | |
| for comp in result.get("components_executed", []): | |
| if comp.get("status") == "failed" or comp.get("error"): | |
| failed_components.append({ | |
| "tool_name": comp.get("tool_name", comp.get("tool", "unknown")), | |
| "error": comp.get("error", comp.get("result", {}).get("error", "Unknown error")) | |
| }) | |
| if failed_components: | |
| first_error = failed_components[0] | |
| friendly = f"❌ Pipeline failed: {first_error['tool_name']} - {first_error['error']}" | |
| else: | |
| friendly = f"⚠️ Pipeline partially completed: {error_msg}" | |
| output = { | |
| "component_summary": f"Pipeline {pipeline_status}", | |
| "steps": total_steps, | |
| "completed": completed_steps, | |
| "failed": total_steps - completed_steps, | |
| "pipeline_id": pipeline_id | |
| } | |
| final_output = {"text": f"Pipeline execution {pipeline_status} with {completed_steps}/{total_steps} steps completed"} | |
| api_type = "pipeline_failed" if pipeline_status == "failed" else "pipeline_partial" | |
| exception_msg = error_msg | |
| api_data = { | |
| "type": api_type, | |
| "result": result, | |
| "pipeline": proposed | |
| } | |
| # ✅ Return response with pipeline_id and pipeline_action="failed" | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent={"intent": "pipeline_execute"}, | |
| api_data=api_data, | |
| state="initial", | |
| output=output, | |
| final_output=final_output, | |
| exception=exception_msg, | |
| pipeline_result=result, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="failed" # ✅ ADD THIS | |
| ) | |
| except Exception as e: | |
| session_manager.update_session(chat_id, {"state": "initial"}) | |
| if pipeline_id: | |
| _update_pipeline_status(pipeline_id, chat_id, "failed", result={"error": str(e)}) | |
| # Get component-level error details | |
| failed_component = None | |
| if pipeline_id: | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| pipeline_record = pipeline_mgr.get_pipeline(pipeline_id) | |
| if pipeline_record and pipeline_record.get("components"): | |
| for comp in pipeline_record.get("components", []): | |
| if comp.get("hasError") or comp.get("status") == "failed": | |
| failed_component = { | |
| "component_id": comp.get("component_id"), | |
| "tool_name": comp.get("component_name"), | |
| "error": comp.get("error") | |
| } | |
| break | |
| except Exception as comp_error: | |
| print(f"Warning: Could not get component error details: {comp_error}") | |
| friendly = f"❌ Pipeline execution failed: {str(e)}" | |
| error_result = { | |
| "pipeline_id": pipeline_id, | |
| "status": "failed", | |
| "error": str(e) | |
| } | |
| api_data = { | |
| "type": "error", | |
| "error_code": "PIPELINE_EXECUTION_FAILED", | |
| "message": str(e) | |
| } | |
| if failed_component: | |
| api_data["failed_component"] = failed_component | |
| # ✅ Return with pipeline_id and pipeline_action="failed" | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent={"intent": "pipeline_execute"}, | |
| api_data=api_data, | |
| state="initial", | |
| exception=str(e), | |
| pipeline_result=error_result, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="failed" # ✅ ADD THIS | |
| ) | |
| finally: | |
| try: | |
| cleanup() | |
| except Exception: | |
| pass | |
| # Rejection | |
| if intent_data["intent"] == "rejection" and current_state == "pipeline_proposed": | |
| session_manager.update_session(chat_id, {"state": "initial", "proposed_pipeline": None}) | |
| friendly = "👍 No problem! The pipeline has been cancelled. What else would you like me to help you with?" | |
| api_data = {"type": "pipeline_rejected", "message": "Pipeline cancelled by user", "state_reset": True} | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state="initial" | |
| ) | |
| # Pipeline request | |
| if intent_data["intent"] == "pipeline_request" and intent_data.get("requires_pipeline", False): | |
| if not session.get("current_file"): | |
| friendly = ( | |
| "📁 Please upload a document first before I can process it!\n\n" | |
| "Once you upload a file, I'll be happy to help you with that task." | |
| ) | |
| api_data = { | |
| "type": "error", | |
| "error_code": "NO_FILE_UPLOADED", | |
| "message": "Document required before pipeline generation", | |
| "action_required": "upload_file" | |
| } | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state=current_state | |
| ) | |
| try: | |
| pipeline = generate_pipeline( | |
| user_input=message, | |
| file_path=session.get("current_file"), | |
| prefer_bedrock=bool(prefer_bedrock), | |
| ) | |
| # FIX: Ensure pipeline has both 'components' and 'pipeline_steps' for compatibility | |
| if "components" in pipeline and "pipeline_steps" not in pipeline: | |
| pipeline["pipeline_steps"] = pipeline["components"] | |
| elif "pipeline_steps" in pipeline and "components" not in pipeline: | |
| pipeline["components"] = pipeline["pipeline_steps"] | |
| # V3: Create pipeline record in S3 and MongoDB | |
| pipeline_id = _create_pipeline_record(chat_id, pipeline, status="proposed", created_from="request") | |
| # Update session with pipeline that includes pipeline_id | |
| session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"}) | |
| # Get steps list from either field | |
| steps_list = pipeline.get("pipeline_steps", pipeline.get("components", [])) | |
| pipeline_name = pipeline.get("pipeline_name", "Document Processing") | |
| # Create steps summary with proper tool name extraction | |
| steps_summary = "\n".join([ | |
| f" {i+1}. {step.get('tool_name', step.get('tool', 'Unknown'))}" | |
| for i, step in enumerate(steps_list) | |
| ]) | |
| friendly = ( | |
| f"🎯 **Pipeline Created: {pipeline_name}**\n" | |
| f"Here's what I'll do:\n{steps_summary}\n" | |
| f"**Ready to proceed?**\n" | |
| f"- Type 'approve' or 'yes' to execute\n" | |
| f"- Type 'reject' or 'no' to cancel\n" | |
| f"- Describe changes to modify the plan" | |
| ) | |
| api_data = { | |
| "type": "pipeline_generated", | |
| "message": "Pipeline successfully created", | |
| "pipeline": pipeline, | |
| "pipeline_summary": { | |
| "name": pipeline_name, | |
| "total_steps": len(steps_list), | |
| "steps": steps_list, | |
| "generator": pipeline.get("_generator"), | |
| "model": pipeline.get("_model") | |
| }, | |
| "required_action": "approval", | |
| "next_steps": { | |
| "approve": "Type 'approve' or 'yes'", | |
| "reject": "Type 'reject' or 'no'", | |
| "modify": "Describe your changes" | |
| } | |
| } | |
| # Create output summary | |
| output = { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": pipeline_name, | |
| "steps_count": len(steps_list), | |
| "tools": [ | |
| step.get("tool_name", step.get("tool", "unknown")) | |
| for step in steps_list | |
| ] | |
| } | |
| # ✅ Return with pipeline_id and pipeline_action="created" | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state="pipeline_proposed", | |
| output=output, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="created" # ✅ ADD THIS | |
| ) | |
| except Exception as e: | |
| friendly = ( | |
| f"❌ Oops! I encountered an error while creating the pipeline:\n\n{str(e)}\n\n" | |
| "Please try rephrasing your request or type 'help' for examples." | |
| ) | |
| api_data = { | |
| "type": "error", | |
| "error_code": "PIPELINE_GENERATION_FAILED", | |
| "message": str(e), | |
| "traceback": str(e), | |
| } | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state=current_state, | |
| exception=str(e) | |
| ) | |
| # Modify when pipeline_proposed and user describes changes | |
| if current_state == "pipeline_proposed": | |
| if len(message.strip()) > 5: | |
| try: | |
| original_plan = session.get("proposed_pipeline", {}) | |
| edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}" | |
| new_pipeline = generate_pipeline( | |
| user_input=edit_context, | |
| file_path=session.get("current_file"), | |
| prefer_bedrock=bool(prefer_bedrock) | |
| ) | |
| # FIX: Ensure pipeline has both 'components' and 'pipeline_steps' for compatibility | |
| if "components" in new_pipeline and "pipeline_steps" not in new_pipeline: | |
| new_pipeline["pipeline_steps"] = new_pipeline["components"] | |
| elif "pipeline_steps" in new_pipeline and "components" not in new_pipeline: | |
| new_pipeline["components"] = new_pipeline["pipeline_steps"] | |
| # V3: Create pipeline record for edited pipeline | |
| pipeline_id = _create_pipeline_record(chat_id, new_pipeline, status="proposed", created_from="edit") | |
| session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"}) | |
| formatted = format_pipeline_for_display(new_pipeline) | |
| friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```" | |
| api_data = { | |
| "type": "pipeline_modified", | |
| "message": "Pipeline updated based on user's edits", | |
| "pipeline": new_pipeline | |
| } | |
| # Get steps list for output | |
| steps_list = new_pipeline.get("pipeline_steps", new_pipeline.get("components", [])) | |
| # Create output summary for modified pipeline | |
| output = { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": new_pipeline.get("pipeline_name", "Document Processing"), | |
| "steps_count": len(steps_list), | |
| "tools": [ | |
| step.get("tool_name", step.get("tool", "unknown")) | |
| for step in steps_list | |
| ], | |
| "modification": "edited" | |
| } | |
| # ✅ Return with pipeline_id and pipeline_action="created" | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state="pipeline_proposed", | |
| output=output, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="created" # ✅ ADD THIS | |
| ) | |
| except Exception as e: | |
| api_data = { | |
| "type": "edit_failed", | |
| "error": str(e), | |
| "message": "Could not modify the plan", | |
| "action": "Try 'approve' to run as-is, or 'reject' to start over" | |
| } | |
| friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```" | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state="pipeline_proposed", | |
| exception=str(e) | |
| ) | |
| # Waiting for confirmation | |
| api_data = { | |
| "type": "waiting_for_confirmation", | |
| "message": "Please type 'approve', 'reject', or describe changes", | |
| "hint": "You can also say 'edit' for modification hints" | |
| } | |
| friendly = f"```json\n{json.dumps(api_data, indent=2)}\n```" | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state="pipeline_proposed" | |
| ) | |
| # Default nudge | |
| friendly = ( | |
| "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\n" | |
| "For example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\n" | |
| "Type 'help' to see all capabilities!" | |
| ) | |
| api_data = { | |
| "type": "unclear_intent", | |
| "message": "Could not determine appropriate action", | |
| "intent_classification": intent_data, | |
| "current_state": current_state | |
| } | |
| return _assistant_response_payload( | |
| chat_id=chat_id, | |
| friendly_response=friendly, | |
| intent=intent_data, | |
| api_data=api_data, | |
| state=current_state | |
| ) | |
| except Exception as e: | |
| error_msg = f"An unexpected error occurred: {str(e)}" | |
| return ChatResponse( | |
| assistant_response=error_msg, | |
| output={}, | |
| final_output=None, | |
| exception=str(e), | |
| api_response={"type": "unexpected_error", "error": str(e)}, | |
| intent=intent_data if isinstance(intent_data, dict) else {"intent": "unknown"}, | |
| chat_id=chat_id, | |
| state=current_state if isinstance(current_state, str) else "initial", | |
| file=False, | |
| fileName=None, | |
| fileUrl=None | |
| ) | |
| # ======================== | |
| # UNIFIED CHAT (streaming, NDJSON) | |
| # ======================== | |
| async def chat_unified_stream( | |
| request: Request, | |
| chat_id: Optional[str] = Form(None), | |
| message: Optional[str] = Form(None), | |
| prefer_bedrock: Optional[bool] = Form(True), | |
| file: Optional[UploadFile] = File(None), | |
| ): | |
| """ | |
| Unified streaming endpoint (NDJSON), same behavior as Gradio: | |
| - Accepts multipart/form-data (file + message) OR JSON. | |
| - Uploads file if included. | |
| - On approval, streams execution progress and final result. | |
| """ | |
| # Parse JSON if needed | |
| content_type = (request.headers.get("content-type") or "").lower() | |
| file_path_from_json = None | |
| if "application/json" in content_type: | |
| body = await request.json() | |
| chat_id = body.get("chat_id") or chat_id | |
| message = body.get("message") if "message" in body else message | |
| prefer_bedrock = body.get("prefer_bedrock", True) if "prefer_bedrock" in body else prefer_bedrock | |
| file_path_from_json = body.get("file_path") | |
| chat_id = _ensure_chat(chat_id) | |
| session = _get_session_or_init(chat_id) | |
| # Attach JSON file path if provided | |
| if file_path_from_json: | |
| session_manager.update_session(chat_id, {"current_file": file_path_from_json}) | |
| _add_and_mirror_message(chat_id, "system", f"File attached: {file_path_from_json}") | |
| session = _get_session_or_init(chat_id) | |
| # Upload file if provided | |
| uploaded_file_info = None | |
| if file is not None: | |
| s3_uri = upload_stream_to_s3(chat_id, file) | |
| meta = (session_manager.get_session(chat_id) or {}).get("file_metadata", {}) or {} | |
| uploaded_file_info = { | |
| "bucket": S3_BUCKET, | |
| "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], | |
| "s3_uri": s3_uri, | |
| "presigned_url": meta.get("presigned_url"), | |
| "presigned_expires_at": meta.get("presigned_expires_at") | |
| } | |
| session = _get_session_or_init(chat_id) | |
| def emit(obj: Dict[str, Any]) -> bytes: | |
| obj.setdefault("chat_id", chat_id) | |
| current_session = session_manager.get_session(chat_id) or {} | |
| obj.setdefault("chat_name", current_session.get("chat_name")) | |
| obj.setdefault("state", current_session.get("state", "initial")) | |
| return (json.dumps(obj, ensure_ascii=False) + "\n").encode("utf-8") | |
| def stream_gen() -> Generator[bytes, None, None]: | |
| session_local = _get_session_or_init(chat_id) | |
| # Only-file case | |
| if (message is None or str(message).strip() == "") and uploaded_file_info: | |
| friendly = "📁 File uploaded successfully. Tell me what you'd like to do with it (e.g., extract text, get tables, summarize)." | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly, "file": uploaded_file_info}) | |
| return | |
| # No message | |
| if message is None or str(message).strip() == "": | |
| friendly = "Please provide a message (e.g., 'extract text', 'get tables', 'summarize')." | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly}) | |
| return | |
| # Add user message | |
| _add_and_mirror_message(chat_id, "user", message) | |
| _maybe_generate_chat_name(chat_id) | |
| # Classify | |
| intent_data = intent_classifier.classify_intent(message) | |
| current_state = session_local.get("state", "initial") | |
| # Casual / question / unclear | |
| if intent_data["intent"] in {"casual_chat", "question", "unclear"} and current_state == "initial": | |
| friendly = intent_classifier.get_friendly_response(intent_data["intent"], message) | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data}) | |
| return | |
| # Initial: nudge or generate plan | |
| if current_state == "initial": | |
| if not intent_data.get("requires_pipeline", False): | |
| friendly = ( | |
| "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\n" | |
| "For example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\n" | |
| "Type 'help' to see all capabilities!" | |
| ) | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data}) | |
| return | |
| if not session_local.get("current_file"): | |
| friendly = "📁 Please upload a document first before I can process it!" | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly, "intent": intent_data}) | |
| return | |
| yield emit({"type": "status", "message": "Analyzing request and creating a pipeline..."}) | |
| try: | |
| pipeline = generate_pipeline( | |
| user_input=message, | |
| file_path=session_local.get("current_file"), | |
| prefer_bedrock=bool(prefer_bedrock), | |
| ) | |
| # FIX: Ensure pipeline has both 'components' and 'pipeline_steps' for compatibility | |
| if "components" in pipeline and "pipeline_steps" not in pipeline: | |
| pipeline["pipeline_steps"] = pipeline["components"] | |
| elif "pipeline_steps" in pipeline and "components" not in pipeline: | |
| pipeline["components"] = pipeline["pipeline_steps"] | |
| # V3 LIFECYCLE: Create pipeline record in S3 + MongoDB immediately | |
| pipeline_id = _create_pipeline_record(chat_id, pipeline, status="proposed", created_from="request") | |
| # Update session with proposed pipeline | |
| session_manager.update_session(chat_id, {"proposed_pipeline": pipeline, "state": "pipeline_proposed"}) | |
| # Get steps list from either field | |
| steps_list = pipeline.get("pipeline_steps", pipeline.get("components", [])) | |
| pipeline_name = pipeline.get("pipeline_name", "Document Processing") | |
| # Create steps summary with proper tool name extraction | |
| steps_summary = "\n".join([ | |
| f" {i+1}. {step.get('tool_name', step.get('tool', 'Unknown'))}" | |
| for i, step in enumerate(steps_list) | |
| ]) | |
| friendly = ( | |
| f"🎯 **Pipeline Created: {pipeline_name}**\n" | |
| f"Here's what I'll do:\n{steps_summary}\n" | |
| f"**Ready to proceed?**\n" | |
| f"- Type 'approve' or 'yes' to execute\n" | |
| f"- Type 'reject' or 'no' to cancel\n" | |
| f"- Describe changes to modify the plan" | |
| ) | |
| # ✅ FIX: Add pipeline_id and pipeline_action to message | |
| _add_and_mirror_message( | |
| chat_id, | |
| "assistant", | |
| friendly, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="created" | |
| ) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly, | |
| "pipeline": pipeline, | |
| "pipeline_id": pipeline_id, # ✅ Include in emit | |
| "pipeline_action": "created", | |
| "output": { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": pipeline_name, | |
| "steps_count": len(steps_list), | |
| "tools": [ | |
| step.get("tool_name", step.get("tool", "unknown")) | |
| for step in steps_list | |
| ] | |
| } | |
| }) | |
| return | |
| except Exception as e: | |
| friendly = f"❌ Error generating pipeline: {str(e)}" | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly, | |
| "error": str(e), | |
| "exception": str(e) | |
| }) | |
| return | |
| # Pipeline proposed | |
| if current_state == "pipeline_proposed": | |
| if intent_data["intent"] == "approval": | |
| session_manager.update_session(chat_id, {"state": "executing"}) | |
| plan = session_local.get("proposed_pipeline", {}) | |
| pipeline_id = plan.get("pipeline_id") # ✅ Get pipeline_id | |
| initial = ( | |
| f"✅ Approved! Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\n" | |
| f"🚀 Processing, please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_" | |
| ) | |
| yield emit({"type": "assistant_delta", "content": initial}) | |
| steps_completed, final_payload, executor_used = [], None, "unknown" | |
| accumulated = initial | |
| file_ref = session_local.get("current_file") | |
| local_path, cleanup = download_to_temp_file(file_ref) | |
| try: | |
| for event in execute_pipeline_streaming( | |
| pipeline=plan, | |
| file_path=local_path, | |
| session_id=chat_id, | |
| prefer_bedrock=bool(prefer_bedrock) | |
| ): | |
| etype = event.get("type") | |
| if etype == "info": | |
| msg2 = f"ℹ️ {event.get('message')} _(Executor: {event.get('executor', 'unknown')})_" | |
| accumulated += "\n\n" + msg2 | |
| yield emit({"type": "assistant_delta", "content": accumulated}) | |
| elif etype == "step": | |
| step_num = event.get("step", 0) | |
| tool_name = event.get("tool", "processing") | |
| status = event.get("status", "running") | |
| if status == "completed" and "observation" in event: | |
| obs_preview = str(event.get("observation"))[:80] | |
| step_msg = f"✅ Step {step_num}: {tool_name} - Completed!\n Preview: {obs_preview}..." | |
| elif status == "executing": | |
| step_msg = f"⏳ Step {step_num}: {tool_name} - Processing..." | |
| else: | |
| step_msg = f"📍 Step {step_num}: {tool_name}" | |
| steps_completed.append({ | |
| "step": step_num, | |
| "tool": tool_name, | |
| "status": status, | |
| "executor": event.get("executor", "unknown"), | |
| "observation": event.get("observation"), | |
| "input": event.get("input"), | |
| }) | |
| executor_used = event.get("executor", executor_used) | |
| accumulated += "\n\n" + step_msg | |
| yield emit({"type": "assistant_delta", "content": accumulated}) | |
| elif etype == "final": | |
| final_payload = event.get("data") | |
| executor_used = event.get("executor", executor_used) | |
| elif etype == "error": | |
| err = event.get("error", "Unknown error") | |
| friendly_err = f"❌ Pipeline Failed\n\nError: {err}\n\nCompleted {len(steps_completed)} step(s) before failure." | |
| session_manager.update_session(chat_id, {"state": "initial"}) | |
| # ✅ FIX: Add pipeline_id and pipeline_action to message | |
| _add_and_mirror_message( | |
| chat_id, | |
| "assistant", | |
| friendly_err, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="failed", | |
| pipeline_result={"error": str(err), "status": "failed"} | |
| ) | |
| # V3: Update status + result in S3 | |
| if pipeline_id: | |
| _update_pipeline_status(pipeline_id, chat_id, "failed", result={"error": str(err)}) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly_err, | |
| "error": err, | |
| "exception": str(err), | |
| "pipeline_id": pipeline_id, | |
| "pipeline_action": "failed" | |
| }) | |
| return | |
| if final_payload: | |
| session_manager.update_session(chat_id, {"pipeline_result": final_payload, "state": "initial"}) | |
| # V3: Update pipeline status | |
| if pipeline_id: | |
| _update_pipeline_status(pipeline_id, chat_id, "completed", result=final_payload) | |
| _record_model_attribution( | |
| pipeline_id=pipeline_id, | |
| session_id=chat_id, | |
| model_provider=plan.get("_model_provider", "unknown"), | |
| model_name=plan.get("_model", "unknown"), | |
| is_fallback=False | |
| ) | |
| success_count = len([s for s in steps_completed if s.get("status") == "completed"]) | |
| result_text = _extract_user_facing_text(final_payload) | |
| friendly_final = ( | |
| f"🎉 Pipeline Completed Successfully!\n" | |
| f"- Pipeline: {plan.get('pipeline_name', 'Document Processing')}\n" | |
| f"- Total Steps: {len(steps_completed)}\n" | |
| f"- Successful: {success_count}\n" | |
| f"- Executor: {executor_used}\n\n" | |
| f"{result_text or ''}" | |
| ) | |
| # ✅ FIX: Add pipeline_id and pipeline_action to message | |
| _add_and_mirror_message( | |
| chat_id, | |
| "assistant", | |
| friendly_final, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="executed", | |
| pipeline_result=final_payload | |
| ) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly_final, | |
| "result": final_payload, | |
| "pipeline_id": pipeline_id, | |
| "pipeline_action": "executed", | |
| "output": { | |
| "component_summary": f"Executed {success_count} steps successfully", | |
| "steps_completed": success_count, | |
| "total_steps": len(steps_completed), | |
| "pipeline_id": pipeline_id | |
| }, | |
| "final_output": { | |
| "text": result_text, | |
| "result": final_payload | |
| } | |
| }) | |
| return | |
| else: | |
| # Success but no payload? | |
| done = f"✅ Pipeline Completed! Executed {len(steps_completed)} steps." | |
| session_manager.update_session(chat_id, {"state": "initial"}) | |
| # ✅ FIX: Add pipeline_id and pipeline_action to message | |
| _add_and_mirror_message( | |
| chat_id, | |
| "assistant", | |
| done, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="executed", | |
| pipeline_result={"message": "Completed without output", "status": "completed"} | |
| ) | |
| # V3 check | |
| if pipeline_id: | |
| _update_pipeline_status(pipeline_id, chat_id, "completed", result={"message": "Completed without output"}) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": done, | |
| "pipeline_id": pipeline_id, | |
| "pipeline_action": "executed" | |
| }) | |
| return | |
| except Exception as e: | |
| friendly_err = f"❌ Pipeline Execution Failed\n\nError: {str(e)}" | |
| session_manager.update_session(chat_id, {"state": "initial"}) | |
| # ✅ FIX: Add pipeline_id and pipeline_action to message | |
| _add_and_mirror_message( | |
| chat_id, | |
| "assistant", | |
| friendly_err, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="failed", | |
| pipeline_result={"error": str(e), "status": "failed"} | |
| ) | |
| # V3 Update | |
| if pipeline_id: | |
| _update_pipeline_status(pipeline_id, chat_id, "failed", result={"error": str(e)}) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly_err, | |
| "error": str(e), | |
| "exception": str(e), | |
| "pipeline_id": pipeline_id, | |
| "pipeline_action": "failed" | |
| }) | |
| return | |
| finally: | |
| try: | |
| cleanup() | |
| except Exception: | |
| pass | |
| elif intent_data["intent"] == "rejection": | |
| session_manager.update_session(chat_id, {"state": "initial", "proposed_pipeline": None}) | |
| friendly = "👍 No problem! Pipeline cancelled. What else would you like me to help you with?" | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly}) | |
| return | |
| else: | |
| # Edit request | |
| try: | |
| original_plan = session_local.get("proposed_pipeline", {}) | |
| edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}" | |
| new_pipeline = generate_pipeline( | |
| user_input=edit_context, | |
| file_path=session_local.get("current_file"), | |
| prefer_bedrock=bool(prefer_bedrock) | |
| ) | |
| # FIX: Ensure pipeline has both 'components' and 'pipeline_steps' for compatibility | |
| if "components" in new_pipeline and "pipeline_steps" not in new_pipeline: | |
| new_pipeline["pipeline_steps"] = new_pipeline["components"] | |
| elif "pipeline_steps" in new_pipeline and "components" not in new_pipeline: | |
| new_pipeline["components"] = new_pipeline["pipeline_steps"] | |
| # V3: Create pipeline record for edited pipeline | |
| pipeline_id = _create_pipeline_record(chat_id, new_pipeline, status="proposed", created_from="edit") | |
| session_manager.update_session(chat_id, {"proposed_pipeline": new_pipeline, "state": "pipeline_proposed"}) | |
| formatted = format_pipeline_for_display(new_pipeline) | |
| friendly = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```" | |
| # ✅ FIX: Add pipeline_id and pipeline_action to message | |
| _add_and_mirror_message( | |
| chat_id, | |
| "assistant", | |
| friendly, | |
| pipeline_id=pipeline_id, | |
| pipeline_action="created" | |
| ) | |
| # Get steps list for output | |
| steps_list = new_pipeline.get("pipeline_steps", new_pipeline.get("components", [])) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly, | |
| "pipeline": new_pipeline, | |
| "pipeline_id": pipeline_id, | |
| "pipeline_action": "created", | |
| "output": { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": new_pipeline.get("pipeline_name", "Document Processing"), | |
| "steps_count": len(steps_list), | |
| "tools": [ | |
| step.get("tool_name", step.get("tool", "unknown")) | |
| for step in steps_list | |
| ], | |
| "modification": "edited" | |
| } | |
| }) | |
| return | |
| except Exception as e: | |
| friendly = f"❌ Edit failed: {str(e)}" | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({ | |
| "type": "assistant_final", | |
| "content": friendly, | |
| "error": str(e), | |
| "exception": str(e) | |
| }) | |
| return | |
| # Default | |
| friendly = "Please upload a document and tell me what you'd like me to do (e.g., extract text, summarize, translate)." | |
| _add_and_mirror_message(chat_id, "assistant", friendly) | |
| yield emit({"type": "assistant_final", "content": friendly}) | |
| # Return a real StreamingResponse | |
| return StreamingResponse(stream_gen(), media_type="application/x-ndjson") | |
| # ======================== | |
| # CHAT MANAGEMENT (sessions → chats) | |
| # ======================== | |
| def create_chat(): | |
| chat_id = session_manager.create_session() | |
| session = session_manager.get_session(chat_id) | |
| return {"chat_id": chat_id, | |
| "chat_name": session.get("chat_name") | |
| } | |
| # ======================== | |
| # FILE UPLOAD (to S3, no presigned URLs) — still available | |
| # ======================== | |
| async def upload_file_to_chat(chat_id: str, file: UploadFile = File(...)): | |
| """ | |
| CHANGE: Include presigned_url and presigned_expires_at in response file object. | |
| These are generated once during upload_stream_to_s3 and stored in session; we reuse them here. | |
| """ | |
| chat_id = _ensure_chat(chat_id) | |
| s3_uri = upload_stream_to_s3(chat_id, file) | |
| meta = (session_manager.get_session(chat_id) or {}).get("file_metadata", {}) or {} | |
| return { | |
| "status": "success", | |
| "message": "File uploaded to S3", | |
| "file": { | |
| "bucket": S3_BUCKET, | |
| "key": s3_uri.split(f"s3://{S3_BUCKET}/", 1)[1], | |
| "s3_uri": s3_uri, | |
| "presigned_url": meta.get("presigned_url"), | |
| "presigned_expires_at": meta.get("presigned_expires_at") | |
| }, | |
| "chat_id": chat_id, | |
| "chat_name": (session_manager.get_session(chat_id) or {}).get("chat_name"), | |
| "next_action": "💬 Now tell me what you'd like to do with this document" | |
| } | |
| # Optional: server-side proxy to fetch the S3 file (no presigned URL) | |
| def download_chat_file(chat_id: str): | |
| s = session_manager.get_session(chat_id) | |
| if not s: | |
| raise HTTPException(status_code=404, detail="Chat not found") | |
| file_ref = s.get("current_file") | |
| if not file_ref or not isinstance(file_ref, str) or not file_ref.startswith("s3://"): | |
| raise HTTPException(status_code=404, detail="No S3 file attached to this chat") | |
| bucket, key = parse_s3_uri(file_ref) | |
| try: | |
| obj = s3.get_object(Bucket=bucket, Key=key) | |
| except ClientError as e: | |
| raise HTTPException(status_code=404, detail=f"File not found in S3: {str(e)}") | |
| body = obj["Body"] # StreamingBody | |
| def stream(): | |
| for chunk in iter(lambda: body.read(1024 * 1024), b""): | |
| yield chunk | |
| media_type = obj.get("ContentType", "application/octet-stream") | |
| return StreamingResponse(stream(), media_type=media_type, headers={ | |
| "Content-Disposition": f'attachment; filename="{os.path.basename(key)}"' | |
| }) | |
| # ======================== | |
| # V3 NEW ENDPOINTS | |
| # ======================== | |
| async def rename_session(session_id: str, new_name: str = Form(...)): | |
| """Rename a session/chat""" | |
| from services.session_manager import session_manager | |
| success = session_manager.rename_session(session_id, new_name) | |
| if success: | |
| return {"success": True, "session_id": session_id, "new_name": new_name} | |
| else: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| async def get_workflows(limit: int = 100, skip: int = 0): | |
| """Get all saved workflows""" | |
| from services.workflow_manager import get_workflow_manager | |
| workflow_mgr = get_workflow_manager() | |
| workflows = workflow_mgr.get_workflows(limit=limit, skip=skip) | |
| return { | |
| "workflows": workflows, | |
| "count": len(workflows), | |
| "total": workflow_mgr.count_workflows() | |
| } | |
| async def get_workflow(workflow_id: str): | |
| """Get specific workflow with full definition""" | |
| from services.workflow_manager import get_workflow_manager | |
| workflow_mgr = get_workflow_manager() | |
| workflow = workflow_mgr.get_workflow(workflow_id) | |
| if not workflow: | |
| raise HTTPException(status_code=404, detail="Workflow not found") | |
| return workflow | |
| async def regenerate_file_url(file_id: str): | |
| """Regenerate expired presigned URL for a file""" | |
| from services.s3_manager import get_s3_manager | |
| # Note: In a full implementation, you'd look up the file in the files collection | |
| # For now, this is a placeholder that assumes file_id is actually an S3 key | |
| s3 = get_s3_manager() | |
| try: | |
| presigned = s3.generate_presigned_url(file_id, expires_in=604800, add_prefix=False) | |
| return { | |
| "file_id": file_id, | |
| "presigned_url": presigned["presigned_url"], | |
| "expires_at": presigned["presigned_expires_at"] | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=404, detail=f"File not found: {str(e)}") | |
| def get_session_pipeline_executions(session_id: str, limit: int = 50): | |
| """Get all pipeline executions for session with final_output_url""" | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| pipelines = pipeline_mgr.get_session_pipelines(session_id, limit=limit) | |
| return { | |
| "session_id": session_id, | |
| "pipelines": pipelines | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to get pipeline executions: {str(e)}") | |
| # ======================== | |
| # V3 WORKFLOW SAVE API | |
| # ======================== | |
| def save_workflow_from_pipeline( | |
| pipeline_id: str = Form(...), | |
| workflow_name: Optional[str] = Form(None) | |
| ): | |
| """ | |
| Save workflow from pipeline (works at any stage: proposed, executing, completed) | |
| Uses pipeline_id instead of execution_id. | |
| Now accepts Form data for Swagger UI testing. | |
| """ | |
| if not pipeline_id: | |
| raise HTTPException(status_code=400, detail="pipeline_id is required") | |
| try: | |
| from services.pipeline_manager import get_pipeline_manager | |
| from services.workflow_manager import get_workflow_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| workflow_mgr = get_workflow_manager() | |
| # Get pipeline metadata | |
| pipeline_metadata = pipeline_mgr.get_pipeline_metadata(pipeline_id) | |
| if not pipeline_metadata: | |
| raise HTTPException(status_code=404, detail="Pipeline not found") | |
| # Download full pipeline document from S3 | |
| pipeline_doc = pipeline_mgr.get_full_pipeline_document(pipeline_id) | |
| if not pipeline_doc: | |
| raise HTTPException(status_code=404, detail="Pipeline document not found in S3") | |
| # Extract definition | |
| pipeline_def = pipeline_doc.get("definition", {}) | |
| # Override name if provided | |
| if workflow_name: | |
| pipeline_def["pipeline_name"] = workflow_name | |
| # Save as workflow with source tracking | |
| workflow_id = workflow_mgr.save_workflow( | |
| session_id=pipeline_doc.get("session_id", "unknown"), | |
| pipeline_definition=pipeline_def, | |
| user_message=f"Saved from pipeline {pipeline_id}", | |
| source_pipeline_id=pipeline_id, | |
| pipeline_status=pipeline_doc.get("status", "unknown") | |
| ) | |
| return { | |
| "workflow_id": workflow_id, | |
| "pipeline_name": pipeline_def.get("pipeline_name", "Untitled"), | |
| "source_pipeline_id": pipeline_id, | |
| "pipeline_status": pipeline_doc.get("status"), | |
| "message": "Workflow saved successfully" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to save workflow: {str(e)}") | |
| # ======================== | |
| # HEALTH | |
| # ======================== | |
| def health_check(): | |
| return {"status": "ok", "service": "MasterLLM v2.0", "time": datetime.utcnow().isoformat() + "Z"} |