from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form from pydantic import BaseModel from typing import Optional, Dict, Any from api.mcp_clients.rag_client import RAGClient from api.services.document_ingestion import ( prepare_ingestion_payload, process_ingestion, detect_source_type, normalize_text, extract_text_from_file_bytes ) router = APIRouter() rag_client = RAGClient() class IngestRequest(BaseModel): """Legacy simple ingestion request""" content: str class DocumentIngestRequest(BaseModel): """Enhanced ingestion request matching the system prompt specification""" action: str = "ingest_document" tenant_id: Optional[str] = None # Can come from header source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown content: str metadata: Optional[Dict[str, Any]] = None class SearchRequest(BaseModel): query: str @router.post("/search") async def rag_search( req: SearchRequest, x_tenant_id: str = Header(None) ): """ Search tenant knowledge base using the RAG MCP server. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: results = await rag_client.search(req.query, x_tenant_id) return { "tenant_id": x_tenant_id, "query": req.query, "results": results } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/ingest") async def rag_ingest( req: IngestRequest, x_tenant_id: str = Header(None) ): """ Legacy ingestion endpoint - simple content ingestion. Ingest content into tenant knowledge base using the RAG MCP server. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: result = await rag_client.ingest(req.content, x_tenant_id) return { "tenant_id": x_tenant_id, "status": "ok", **result } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/ingest-document") async def rag_ingest_document( req: DocumentIngestRequest, x_tenant_id: Optional[str] = Header(None) ): """ Enhanced document ingestion endpoint matching the system prompt specification. Supports: - PDF, DOCX, TXT, Markdown files - URLs (fetches content automatically) - Raw text - Metadata (filename, url, doc_id) Expected payload: { "action": "ingest_document", "tenant_id": "...", "source_type": "pdf | docx | txt | url | raw_text", "content": "...", "metadata": { "filename": "...", "url": "...", "doc_id": "..." } } """ # Use tenant_id from header if not in body (for backward compatibility) tenant_id = req.tenant_id or x_tenant_id if not tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: # Prepare ingestion payload (async for URL fetching) payload = await prepare_ingestion_payload( tenant_id=tenant_id, content=req.content, source_type=req.source_type, filename=req.metadata.get("filename") if req.metadata else None, url=req.metadata.get("url") if req.metadata else None, doc_id=req.metadata.get("doc_id") if req.metadata else None, metadata=req.metadata ) # Process ingestion result = await process_ingestion(payload, rag_client) return { "status": "ok", "message": f"Document ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored.", **result } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/ingest-file") async def rag_ingest_file( file: UploadFile = File(...), x_tenant_id: Optional[str] = Header(None), tenant_id: Optional[str] = Form(None) ): """ File upload endpoint for binary files (PDF, DOCX, TXT, MD). Extracts text server-side and ingests into knowledge base. Usage: POST /rag/ingest-file Headers: x-tenant-id: Form Data: file: tenant_id: """ # Use tenant_id from form or header tenant_id_value = tenant_id or x_tenant_id if not tenant_id_value: raise HTTPException(status_code=400, detail="Missing tenant ID") try: # Read file bytes file_bytes = await file.read() if not file_bytes: raise HTTPException(status_code=400, detail="File is empty") # Extract text from binary file try: extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) if not extracted_text or not extracted_text.strip(): raise HTTPException(status_code=400, detail="No text could be extracted from file") # Prepare ingestion payload payload = await prepare_ingestion_payload( tenant_id=tenant_id_value, content=extracted_text, source_type=None, # Auto-detect from filename filename=file.filename, url=None, doc_id=None, metadata=None ) # Process ingestion result = await process_ingestion(payload, rag_client) return { "status": "ok", "message": f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored.", **result } except HTTPException: raise except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/list") async def rag_list( limit: int = 1000, offset: int = 0, x_tenant_id: str = Header(None) ): """ List all documents in tenant knowledge base. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.delete("/delete/{document_id}") async def rag_delete( document_id: int, x_tenant_id: str = Header(None) ): """ Delete a specific document by ID from tenant knowledge base. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: result = await rag_client.delete_document(x_tenant_id, document_id) if "error" in result: # Check if it's a connection error (500) or not found (404) error_msg = result["error"] if "Cannot connect" in error_msg: raise HTTPException(status_code=503, detail=error_msg) elif "not found" in error_msg.lower() or "access denied" in error_msg.lower(): raise HTTPException(status_code=404, detail=error_msg) else: raise HTTPException(status_code=500, detail=error_msg) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.delete("/delete-all") async def rag_delete_all( x_tenant_id: str = Header(None) ): """ Delete all documents for a tenant. """ if not x_tenant_id: raise HTTPException(status_code=400, detail="Missing tenant ID") try: result = await rag_client.delete_all_documents(x_tenant_id) if "error" in result: error_msg = result["error"] # Check if it's a connection error (503) or other error if "Cannot connect" in error_msg: raise HTTPException(status_code=503, detail=error_msg) else: raise HTTPException(status_code=500, detail=error_msg) return result except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e))