Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form, Request | |
| from pydantic import BaseModel | |
| from typing import Optional, Dict, Any | |
| from api.mcp_clients.rag_client import RAGClient | |
| from api.services.document_ingestion import ( | |
| prepare_ingestion_payload, | |
| process_ingestion, | |
| detect_source_type, | |
| normalize_text, | |
| extract_text_from_file_bytes | |
| ) | |
| from ..utils.access_control import require_api_permission | |
| router = APIRouter() | |
| rag_client = RAGClient() | |
| class IngestRequest(BaseModel): | |
| """Legacy simple ingestion request""" | |
| content: str | |
| class DocumentIngestRequest(BaseModel): | |
| """Enhanced ingestion request matching the system prompt specification""" | |
| action: str = "ingest_document" | |
| tenant_id: Optional[str] = None # Can come from header | |
| source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown | |
| content: str | |
| metadata: Optional[Dict[str, Any]] = None | |
| class SearchRequest(BaseModel): | |
| query: str | |
| async def rag_search( | |
| req: SearchRequest, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Search tenant knowledge base using the RAG MCP server. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| results = await rag_client.search(req.query, x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "query": req.query, | |
| "results": results | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_ingest( | |
| req: IngestRequest, | |
| x_tenant_id: str = Header(None), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Legacy ingestion endpoint - simple content ingestion. | |
| Ingest content into tenant knowledge base using the RAG MCP server. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "ingest_documents") | |
| try: | |
| result = await rag_client.ingest(req.content, x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "status": "ok", | |
| **result | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_ingest_document( | |
| req: DocumentIngestRequest, | |
| request: Request, | |
| x_tenant_id: Optional[str] = Header(None), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Enhanced document ingestion endpoint matching the system prompt specification. | |
| Supports: | |
| - PDF, DOCX, TXT, Markdown files | |
| - URLs (fetches content automatically) | |
| - Raw text | |
| - Metadata (filename, url, doc_id) | |
| Expected payload: | |
| { | |
| "action": "ingest_document", | |
| "tenant_id": "...", | |
| "source_type": "pdf | docx | txt | url | raw_text", | |
| "content": "...", | |
| "metadata": { | |
| "filename": "...", | |
| "url": "...", | |
| "doc_id": "..." | |
| } | |
| } | |
| """ | |
| # Use tenant_id from header if not in body (for backward compatibility) | |
| tenant_id = req.tenant_id or x_tenant_id | |
| if not tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| import sys | |
| # Debug: Check actual headers received | |
| all_headers = dict(request.headers) | |
| print(f"🔍 DEBUG: All headers received: {list(all_headers.keys())}", file=sys.stderr) | |
| print(f"🔍 DEBUG: x-user-role header value: '{all_headers.get('x-user-role', 'NOT FOUND')}'", file=sys.stderr) | |
| print(f"🔍 DEBUG: x-user-role header value (case-insensitive): '{all_headers.get('X-User-Role', all_headers.get('x-user-role', 'NOT FOUND'))}'", file=sys.stderr) | |
| print(f"🔍 DEBUG: Backend received x_user_role parameter='{x_user_role}' (type: {type(x_user_role)})", file=sys.stderr) | |
| print(f"🔍 DEBUG: x_tenant_id header='{x_tenant_id}'", file=sys.stderr) | |
| require_api_permission(x_user_role, "ingest_documents") | |
| content_length = len(req.content) if req.content else 0 | |
| print(f"📥 Ingestion request received: tenant_id={tenant_id}, source_type={req.source_type}, content_length={content_length}", file=sys.stderr) | |
| # Validate content is not too short | |
| if not req.content or not req.content.strip(): | |
| raise HTTPException(status_code=400, detail="Content cannot be empty. Please provide text to ingest.") | |
| if content_length < 10: | |
| print(f"⚠️ Warning: Content is very short ({content_length} chars). This may result in no chunks being created.", file=sys.stderr) | |
| try: | |
| print("🔧 Step 1: Preparing ingestion payload...", file=sys.stderr) | |
| # Prepare ingestion payload (async for URL fetching) | |
| try: | |
| payload = await prepare_ingestion_payload( | |
| tenant_id=tenant_id, | |
| content=req.content, | |
| source_type=req.source_type, | |
| filename=req.metadata.get("filename") if req.metadata else None, | |
| url=req.metadata.get("url") if req.metadata else None, | |
| doc_id=req.metadata.get("doc_id") if req.metadata else None, | |
| metadata=req.metadata | |
| ) | |
| print(f"✅ Step 1 complete: payload prepared", file=sys.stderr) | |
| except Exception as prep_err: | |
| print(f"❌ Step 1 FAILED (prepare_ingestion_payload): {prep_err}", file=sys.stderr) | |
| import traceback | |
| print(traceback.format_exc(), file=sys.stderr) | |
| raise | |
| print("🔧 Step 2: Processing ingestion with RAG client...", file=sys.stderr) | |
| # Process ingestion with metadata extraction | |
| extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True | |
| try: | |
| result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata, user_role=x_user_role) | |
| print(f"✅ Step 2 complete: chunks_stored={result.get('chunks_stored', 0) if isinstance(result, dict) else 'N/A'}", file=sys.stderr) | |
| except HTTPException: | |
| # Re-raise HTTP exceptions (like 403 permission errors) as-is | |
| raise | |
| except Exception as proc_err: | |
| # Check if it's a permission error with status_code attribute | |
| if hasattr(proc_err, 'status_code') and proc_err.status_code == 403: | |
| raise HTTPException(status_code=403, detail=getattr(proc_err, 'detail', str(proc_err))) | |
| print(f"❌ Step 2 FAILED (process_ingestion): {proc_err}", file=sys.stderr) | |
| import traceback | |
| print(traceback.format_exc(), file=sys.stderr) | |
| raise | |
| # Check if ingestion actually succeeded | |
| # First check if the result itself indicates an error | |
| if isinstance(result, dict) and result.get('status') == 'error': | |
| error_msg = result.get('message') or result.get('error') or "Unknown error from RAG server" | |
| error_type = result.get('error_type', 'unknown') | |
| print(f"❌ RAG server returned error ({error_type}): {error_msg}", file=sys.stderr) | |
| # If it's a permission error, return 403 | |
| if 'permission' in error_msg.lower() or 'not permitted' in error_msg.lower() or error_type == 'validation_error': | |
| raise HTTPException( | |
| status_code=403, | |
| detail=f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown." | |
| ) | |
| else: | |
| raise HTTPException(status_code=500, detail=f"RAG server error: {error_msg}") | |
| chunks_stored = result.get('chunks_stored', 0) | |
| print(f"🔍 Debug: result keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, chunks_stored={chunks_stored}", file=sys.stderr) | |
| if chunks_stored == 0: | |
| # Get more details about why no chunks were stored | |
| error_detail = result.get('error') or result.get('warnings') or result.get('message') or "No chunks were stored" | |
| warnings = result.get('warnings') | |
| error_msg = f"Ingestion failed: {error_detail}" | |
| if warnings: | |
| error_msg += f"\nWarnings: {warnings}" | |
| error_msg += ( | |
| "\n\nPossible causes:\n" | |
| "1. Content too short or empty (minimum text required)\n" | |
| "2. Database connection issue (check POSTGRESQL_URL in RAG server)\n" | |
| "3. RAG MCP server error (check RAG server logs)\n" | |
| "4. Database table 'documents' doesn't exist" | |
| ) | |
| print(f"❌ No chunks stored. Error detail: {error_detail}", file=sys.stderr) | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| # Build response message | |
| message = f"Document ingested successfully. {chunks_stored} chunk(s) stored." | |
| if result.get("extracted_metadata"): | |
| metadata_info = result["extracted_metadata"] | |
| if metadata_info.get("title"): | |
| message += f" Title: {metadata_info['title']}" | |
| if metadata_info.get("quality_score"): | |
| message += f" Quality: {metadata_info['quality_score']:.2f}" | |
| return { | |
| "status": "ok", | |
| "message": message, | |
| **result | |
| } | |
| except HTTPException: | |
| # Re-raise HTTP exceptions as-is | |
| raise | |
| except ValueError as e: | |
| import traceback | |
| print(f"❌ Ingestion ValueError: {e}") | |
| print(traceback.format_exc()) | |
| raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}") | |
| except Exception as e: | |
| import traceback | |
| import sys | |
| error_detail = str(e) | |
| error_type = type(e).__name__ | |
| full_traceback = traceback.format_exc() | |
| # Log to console with full details (use both stderr and stdout to ensure visibility) | |
| error_log = f"❌ Ingestion Error ({error_type}): {error_detail}\nFull traceback:\n{full_traceback}" | |
| print(error_log, file=sys.stderr) | |
| print(error_log) # Also print to stdout for uvicorn logs | |
| # Provide helpful error message | |
| if "POSTGRESQL_URL" in error_detail or "database" in error_detail.lower() or "connection" in error_detail.lower(): | |
| error_msg = ( | |
| f"Database connection error: {error_detail}\n\n" | |
| f"Please check:\n" | |
| f"1. POSTGRESQL_URL is set correctly in your .env file\n" | |
| f"2. Database is accessible\n" | |
| f"3. The 'documents' table exists (run initialize_database() if needed)" | |
| ) | |
| elif "RAG" in error_detail or "rag" in error_detail.lower() or "mcp" in error_detail.lower(): | |
| error_msg = ( | |
| f"RAG server error: {error_detail}\n\n" | |
| f"Please check:\n" | |
| f"1. RAG_MCP_URL is set correctly (default: http://localhost:8900/rag)\n" | |
| f"2. RAG MCP server is running\n" | |
| f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server" | |
| ) | |
| else: | |
| # For unknown errors, include the full error message | |
| error_msg = f"Ingestion failed ({error_type}): {error_detail}" | |
| # If it's a long traceback, include just the first few lines | |
| if len(error_detail) > 500: | |
| error_msg = f"Ingestion failed ({error_type}): {error_detail[:500]}...\n\nSee server logs for full traceback." | |
| # Ensure error message is not too long for HTTP response | |
| if len(error_msg) > 2000: | |
| error_msg = error_msg[:2000] + "...\n\n(Error message truncated. See server logs for full details.)" | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| async def rag_ingest_file( | |
| file: UploadFile = File(...), | |
| x_tenant_id: Optional[str] = Header(None), | |
| tenant_id: Optional[str] = Form(None), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| File upload endpoint for binary files (PDF, DOCX, TXT, MD). | |
| Extracts text server-side and ingests into knowledge base. | |
| Usage: | |
| POST /rag/ingest-file | |
| Headers: | |
| x-tenant-id: <tenant_id> | |
| Form Data: | |
| file: <binary file> | |
| tenant_id: <optional, can use header instead> | |
| """ | |
| # Use tenant_id from form or header | |
| tenant_id_value = tenant_id or x_tenant_id | |
| if not tenant_id_value: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "ingest_documents") | |
| try: | |
| # Read file bytes | |
| file_bytes = await file.read() | |
| if not file_bytes: | |
| raise HTTPException(status_code=400, detail="File is empty") | |
| # Extract text from binary file | |
| try: | |
| extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown") | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| if not extracted_text or not extracted_text.strip(): | |
| raise HTTPException(status_code=400, detail="No text could be extracted from file") | |
| # Prepare ingestion payload | |
| payload = await prepare_ingestion_payload( | |
| tenant_id=tenant_id_value, | |
| content=extracted_text, | |
| source_type=None, # Auto-detect from filename | |
| filename=file.filename, | |
| url=None, | |
| doc_id=None, | |
| metadata=None | |
| ) | |
| # Process ingestion with metadata extraction | |
| result = await process_ingestion(payload, rag_client, extract_metadata=True) | |
| # Build response message | |
| message = f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored." | |
| if result.get("extracted_metadata"): | |
| metadata_info = result["extracted_metadata"] | |
| if metadata_info.get("title"): | |
| message += f" Title: {metadata_info['title']}" | |
| if metadata_info.get("quality_score"): | |
| message += f" Quality: {metadata_info['quality_score']:.2f}" | |
| return { | |
| "status": "ok", | |
| "message": message, | |
| **result | |
| } | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_list( | |
| limit: int = 1000, | |
| offset: int = 0, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| List all documents in tenant knowledge base. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_delete( | |
| document_id: int, | |
| x_tenant_id: str = Header(None), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Delete a specific document by ID from tenant knowledge base. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "delete_documents") | |
| try: | |
| result = await rag_client.delete_document(x_tenant_id, document_id, user_role=x_user_role) | |
| if "error" in result: | |
| # Check if it's a connection error (500) or not found (404) | |
| error_msg = result["error"] | |
| if "Cannot connect" in error_msg: | |
| raise HTTPException(status_code=503, detail=error_msg) | |
| elif "not found" in error_msg.lower() or "access denied" in error_msg.lower(): | |
| raise HTTPException(status_code=404, detail=error_msg) | |
| else: | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_delete_all( | |
| x_tenant_id: str = Header(None), | |
| x_user_role: str = Header("viewer") | |
| ): | |
| """ | |
| Delete all documents for a tenant. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| require_api_permission(x_user_role, "delete_documents") | |
| try: | |
| result = await rag_client.delete_all_documents(x_tenant_id, user_role=x_user_role) | |
| if "error" in result: | |
| error_msg = result["error"] | |
| # Check if it's a connection error (503) or other error | |
| if "Cannot connect" in error_msg: | |
| raise HTTPException(status_code=503, detail=error_msg) | |
| else: | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) |