from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form, Request from pydantic import BaseModel from typing import Optional, Dict, Any from api.mcp_clients.rag_client import RAGClient from api.services.document_ingestion import ( prepare_ingestion_payload, process_ingestion, detect_source_type, normalize_text, extract_text_from_file_bytes ) from ..utils.access_control import require_api_permission router = APIRouter() rag_client = RAGClient() class IngestRequest(BaseModel): """Legacy simple ingestion request""" content: str class DocumentIngestRequest(BaseModel): """Enhanced ingestion request matching the system prompt specification""" action: str = "ingest_document" tenant_id: Optional[str] = None # Can come from header source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown content: str metadata: Optional[Dict[str, Any]] = None class SearchRequest(BaseModel): query: str @router.post("/search") async def rag_search( req: SearchRequest, x_tenant_id: str = Header(None) ): """ Search tenant knowledge base using the RAG MCP server. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: results = await rag_client.search(req.query, x_tenant_id) return { "tenant_id": x_tenant_id, "query": req.query, "results": results } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/ingest") async def rag_ingest( req: IngestRequest, x_tenant_id: str = Header(None), x_user_role: str = Header("viewer") ): """ Legacy ingestion endpoint - simple content ingestion. Ingest content into tenant knowledge base using the RAG MCP server. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") require_api_permission(x_user_role, "ingest_documents") try: result = await rag_client.ingest(req.content, x_tenant_id) return { "tenant_id": x_tenant_id, "status": "ok", **result } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/ingest-document") async def rag_ingest_document( req: DocumentIngestRequest, request: Request, x_tenant_id: Optional[str] = Header(None), x_user_role: str = Header("viewer") ): """ Enhanced document ingestion endpoint matching the system prompt specification. Supports: - PDF, DOCX, TXT, Markdown files - URLs (fetches content automatically) - Raw text - Metadata (filename, url, doc_id) Expected payload: { "action": "ingest_document", "tenant_id": "...", "source_type": "pdf | docx | txt | url | raw_text", "content": "...", "metadata": { "filename": "...", "url": "...", "doc_id": "..." } } """ # Use tenant_id from header if not in body (for backward compatibility) tenant_id = req.tenant_id or x_tenant_id if not tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") import sys # Debug: Check actual headers received all_headers = dict(request.headers) print(f"🔍 DEBUG: All headers received: {list(all_headers.keys())}", file=sys.stderr) print(f"🔍 DEBUG: x-user-role header value: '{all_headers.get('x-user-role', 'NOT FOUND')}'", file=sys.stderr) print(f"🔍 DEBUG: x-user-role header value (case-insensitive): '{all_headers.get('X-User-Role', all_headers.get('x-user-role', 'NOT FOUND'))}'", file=sys.stderr) print(f"🔍 DEBUG: Backend received x_user_role parameter='{x_user_role}' (type: {type(x_user_role)})", file=sys.stderr) print(f"🔍 DEBUG: x_tenant_id header='{x_tenant_id}'", file=sys.stderr) require_api_permission(x_user_role, "ingest_documents") content_length = len(req.content) if req.content else 0 print(f"📥 Ingestion request received: tenant_id={tenant_id}, source_type={req.source_type}, content_length={content_length}", file=sys.stderr) # Validate content is not too short if not req.content or not req.content.strip(): raise HTTPException(status_code=400, detail="Content cannot be empty. Please provide text to ingest.") if content_length < 10: print(f"⚠️ Warning: Content is very short ({content_length} chars). This may result in no chunks being created.", file=sys.stderr) try: print("🔧 Step 1: Preparing ingestion payload...", file=sys.stderr) # Prepare ingestion payload (async for URL fetching) try: payload = await prepare_ingestion_payload( tenant_id=tenant_id, content=req.content, source_type=req.source_type, filename=req.metadata.get("filename") if req.metadata else None, url=req.metadata.get("url") if req.metadata else None, doc_id=req.metadata.get("doc_id") if req.metadata else None, metadata=req.metadata ) print(f"✅ Step 1 complete: payload prepared", file=sys.stderr) except Exception as prep_err: print(f"❌ Step 1 FAILED (prepare_ingestion_payload): {prep_err}", file=sys.stderr) import traceback print(traceback.format_exc(), file=sys.stderr) raise print("🔧 Step 2: Processing ingestion with RAG client...", file=sys.stderr) # Process ingestion with metadata extraction extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True try: result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata, user_role=x_user_role) print(f"✅ Step 2 complete: chunks_stored={result.get('chunks_stored', 0) if isinstance(result, dict) else 'N/A'}", file=sys.stderr) except HTTPException: # Re-raise HTTP exceptions (like 403 permission errors) as-is raise except Exception as proc_err: # Check if it's a permission error with status_code attribute if hasattr(proc_err, 'status_code') and proc_err.status_code == 403: raise HTTPException(status_code=403, detail=getattr(proc_err, 'detail', str(proc_err))) print(f"❌ Step 2 FAILED (process_ingestion): {proc_err}", file=sys.stderr) import traceback print(traceback.format_exc(), file=sys.stderr) raise # Check if ingestion actually succeeded # First check if the result itself indicates an error if isinstance(result, dict) and result.get('status') == 'error': error_msg = result.get('message') or result.get('error') or "Unknown error from RAG server" error_type = result.get('error_type', 'unknown') print(f"❌ RAG server returned error ({error_type}): {error_msg}", file=sys.stderr) # If it's a permission error, return 403 if 'permission' in error_msg.lower() or 'not permitted' in error_msg.lower() or error_type == 'validation_error': raise HTTPException( status_code=403, detail=f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown." ) else: raise HTTPException(status_code=500, detail=f"RAG server error: {error_msg}") chunks_stored = result.get('chunks_stored', 0) print(f"🔍 Debug: result keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, chunks_stored={chunks_stored}", file=sys.stderr) if chunks_stored == 0: # Get more details about why no chunks were stored error_detail = result.get('error') or result.get('warnings') or result.get('message') or "No chunks were stored" warnings = result.get('warnings') error_msg = f"Ingestion failed: {error_detail}" if warnings: error_msg += f"\nWarnings: {warnings}" error_msg += ( "\n\nPossible causes:\n" "1. Content too short or empty (minimum text required)\n" "2. Database connection issue (check POSTGRESQL_URL in RAG server)\n" "3. RAG MCP server error (check RAG server logs)\n" "4. Database table 'documents' doesn't exist" ) print(f"❌ No chunks stored. Error detail: {error_detail}", file=sys.stderr) raise HTTPException(status_code=500, detail=error_msg) # Build response message message = f"Document ingested successfully. {chunks_stored} chunk(s) stored." if result.get("extracted_metadata"): metadata_info = result["extracted_metadata"] if metadata_info.get("title"): message += f" Title: {metadata_info['title']}" if metadata_info.get("quality_score"): message += f" Quality: {metadata_info['quality_score']:.2f}" return { "status": "ok", "message": message, **result } except HTTPException: # Re-raise HTTP exceptions as-is raise except ValueError as e: import traceback print(f"❌ Ingestion ValueError: {e}") print(traceback.format_exc()) raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}") except Exception as e: import traceback import sys error_detail = str(e) error_type = type(e).__name__ full_traceback = traceback.format_exc() # Log to console with full details (use both stderr and stdout to ensure visibility) error_log = f"❌ Ingestion Error ({error_type}): {error_detail}\nFull traceback:\n{full_traceback}" print(error_log, file=sys.stderr) print(error_log) # Also print to stdout for uvicorn logs # Provide helpful error message if "POSTGRESQL_URL" in error_detail or "database" in error_detail.lower() or "connection" in error_detail.lower(): error_msg = ( f"Database connection error: {error_detail}\n\n" f"Please check:\n" f"1. POSTGRESQL_URL is set correctly in your .env file\n" f"2. Database is accessible\n" f"3. The 'documents' table exists (run initialize_database() if needed)" ) elif "RAG" in error_detail or "rag" in error_detail.lower() or "mcp" in error_detail.lower(): error_msg = ( f"RAG server error: {error_detail}\n\n" f"Please check:\n" f"1. RAG_MCP_URL is set correctly (default: http://localhost:8900/rag)\n" f"2. RAG MCP server is running\n" f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server" ) else: # For unknown errors, include the full error message error_msg = f"Ingestion failed ({error_type}): {error_detail}" # If it's a long traceback, include just the first few lines if len(error_detail) > 500: error_msg = f"Ingestion failed ({error_type}): {error_detail[:500]}...\n\nSee server logs for full traceback." # Ensure error message is not too long for HTTP response if len(error_msg) > 2000: error_msg = error_msg[:2000] + "...\n\n(Error message truncated. See server logs for full details.)" raise HTTPException(status_code=500, detail=error_msg) @router.post("/ingest-file") async def rag_ingest_file( file: UploadFile = File(...), x_tenant_id: Optional[str] = Header(None), tenant_id: Optional[str] = Form(None), x_user_role: str = Header("viewer") ): """ File upload endpoint for binary files (PDF, DOCX, TXT, MD). Extracts text server-side and ingests into knowledge base. Usage: POST /rag/ingest-file Headers: x-tenant-id: Form Data: file: tenant_id: """ # Use tenant_id from form or header tenant_id_value = tenant_id or x_tenant_id if not tenant_id_value: raise HTTPException(status_code=400, detail="Missing tenant ID") require_api_permission(x_user_role, "ingest_documents") try: # Read file bytes file_bytes = await file.read() if not file_bytes: raise HTTPException(status_code=400, detail="File is empty") # Extract text from binary file try: extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not extracted_text or not extracted_text.strip(): raise HTTPException(status_code=400, detail="No text could be extracted from file") # Prepare ingestion payload payload = await prepare_ingestion_payload( tenant_id=tenant_id_value, content=extracted_text, source_type=None, # Auto-detect from filename filename=file.filename, url=None, doc_id=None, metadata=None ) # Process ingestion with metadata extraction result = await process_ingestion(payload, rag_client, extract_metadata=True) # Build response message message = f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored." if result.get("extracted_metadata"): metadata_info = result["extracted_metadata"] if metadata_info.get("title"): message += f" Title: {metadata_info['title']}" if metadata_info.get("quality_score"): message += f" Quality: {metadata_info['quality_score']:.2f}" return { "status": "ok", "message": message, **result } except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/list") async def rag_list( limit: int = 1000, offset: int = 0, x_tenant_id: str = Header(None) ): """ List all documents in tenant knowledge base. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.delete("/delete/{document_id}") async def rag_delete( document_id: int, x_tenant_id: str = Header(None), x_user_role: str = Header("viewer") ): """ Delete a specific document by ID from tenant knowledge base. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") require_api_permission(x_user_role, "delete_documents") try: result = await rag_client.delete_document(x_tenant_id, document_id, user_role=x_user_role) if "error" in result: # Check if it's a connection error (500) or not found (404) error_msg = result["error"] if "Cannot connect" in error_msg: raise HTTPException(status_code=503, detail=error_msg) elif "not found" in error_msg.lower() or "access denied" in error_msg.lower(): raise HTTPException(status_code=404, detail=error_msg) else: raise HTTPException(status_code=500, detail=error_msg) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.delete("/delete-all") async def rag_delete_all( x_tenant_id: str = Header(None), x_user_role: str = Header("viewer") ): """ Delete all documents for a tenant. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") require_api_permission(x_user_role, "delete_documents") try: result = await rag_client.delete_all_documents(x_tenant_id, user_role=x_user_role) if "error" in result: error_msg = result["error"] # Check if it's a connection error (503) or other error if "Cannot connect" in error_msg: raise HTTPException(status_code=503, detail=error_msg) else: raise HTTPException(status_code=500, detail=error_msg) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e))