Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form | |
| from pydantic import BaseModel | |
| from typing import Optional, Dict, Any | |
| from api.mcp_clients.rag_client import RAGClient | |
| from api.services.document_ingestion import ( | |
| prepare_ingestion_payload, | |
| process_ingestion, | |
| detect_source_type, | |
| normalize_text, | |
| extract_text_from_file_bytes | |
| ) | |
| router = APIRouter() | |
| rag_client = RAGClient() | |
| class IngestRequest(BaseModel): | |
| """Legacy simple ingestion request""" | |
| content: str | |
| class DocumentIngestRequest(BaseModel): | |
| """Enhanced ingestion request matching the system prompt specification""" | |
| action: str = "ingest_document" | |
| tenant_id: Optional[str] = None # Can come from header | |
| source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown | |
| content: str | |
| metadata: Optional[Dict[str, Any]] = None | |
| class SearchRequest(BaseModel): | |
| query: str | |
| async def rag_search( | |
| req: SearchRequest, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Search tenant knowledge base using the RAG MCP server. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| results = await rag_client.search(req.query, x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "query": req.query, | |
| "results": results | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_ingest( | |
| req: IngestRequest, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Legacy ingestion endpoint - simple content ingestion. | |
| Ingest content into tenant knowledge base using the RAG MCP server. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| result = await rag_client.ingest(req.content, x_tenant_id) | |
| return { | |
| "tenant_id": x_tenant_id, | |
| "status": "ok", | |
| **result | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_ingest_document( | |
| req: DocumentIngestRequest, | |
| x_tenant_id: Optional[str] = Header(None) | |
| ): | |
| """ | |
| Enhanced document ingestion endpoint matching the system prompt specification. | |
| Supports: | |
| - PDF, DOCX, TXT, Markdown files | |
| - URLs (fetches content automatically) | |
| - Raw text | |
| - Metadata (filename, url, doc_id) | |
| Expected payload: | |
| { | |
| "action": "ingest_document", | |
| "tenant_id": "...", | |
| "source_type": "pdf | docx | txt | url | raw_text", | |
| "content": "...", | |
| "metadata": { | |
| "filename": "...", | |
| "url": "...", | |
| "doc_id": "..." | |
| } | |
| } | |
| """ | |
| # Use tenant_id from header if not in body (for backward compatibility) | |
| tenant_id = req.tenant_id or x_tenant_id | |
| if not tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| # Prepare ingestion payload (async for URL fetching) | |
| payload = await prepare_ingestion_payload( | |
| tenant_id=tenant_id, | |
| content=req.content, | |
| source_type=req.source_type, | |
| filename=req.metadata.get("filename") if req.metadata else None, | |
| url=req.metadata.get("url") if req.metadata else None, | |
| doc_id=req.metadata.get("doc_id") if req.metadata else None, | |
| metadata=req.metadata | |
| ) | |
| # Process ingestion | |
| result = await process_ingestion(payload, rag_client) | |
| return { | |
| "status": "ok", | |
| "message": f"Document ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored.", | |
| **result | |
| } | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_ingest_file( | |
| file: UploadFile = File(...), | |
| x_tenant_id: Optional[str] = Header(None), | |
| tenant_id: Optional[str] = Form(None) | |
| ): | |
| """ | |
| File upload endpoint for binary files (PDF, DOCX, TXT, MD). | |
| Extracts text server-side and ingests into knowledge base. | |
| Usage: | |
| POST /rag/ingest-file | |
| Headers: | |
| x-tenant-id: <tenant_id> | |
| Form Data: | |
| file: <binary file> | |
| tenant_id: <optional, can use header instead> | |
| """ | |
| # Use tenant_id from form or header | |
| tenant_id_value = tenant_id or x_tenant_id | |
| if not tenant_id_value: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| # Read file bytes | |
| file_bytes = await file.read() | |
| if not file_bytes: | |
| raise HTTPException(status_code=400, detail="File is empty") | |
| # Extract text from binary file | |
| try: | |
| extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown") | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| if not extracted_text or not extracted_text.strip(): | |
| raise HTTPException(status_code=400, detail="No text could be extracted from file") | |
| # Prepare ingestion payload | |
| payload = await prepare_ingestion_payload( | |
| tenant_id=tenant_id_value, | |
| content=extracted_text, | |
| source_type=None, # Auto-detect from filename | |
| filename=file.filename, | |
| url=None, | |
| doc_id=None, | |
| metadata=None | |
| ) | |
| # Process ingestion | |
| result = await process_ingestion(payload, rag_client) | |
| return { | |
| "status": "ok", | |
| "message": f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored.", | |
| **result | |
| } | |
| except HTTPException: | |
| raise | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_list( | |
| limit: int = 1000, | |
| offset: int = 0, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| List all documents in tenant knowledge base. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset) | |
| return result | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_delete( | |
| document_id: int, | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Delete a specific document by ID from tenant knowledge base. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| result = await rag_client.delete_document(x_tenant_id, document_id) | |
| if "error" in result: | |
| # Check if it's a connection error (500) or not found (404) | |
| error_msg = result["error"] | |
| if "Cannot connect" in error_msg: | |
| raise HTTPException(status_code=503, detail=error_msg) | |
| elif "not found" in error_msg.lower() or "access denied" in error_msg.lower(): | |
| raise HTTPException(status_code=404, detail=error_msg) | |
| else: | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def rag_delete_all( | |
| x_tenant_id: str = Header(None) | |
| ): | |
| """ | |
| Delete all documents for a tenant. | |
| """ | |
| if not x_tenant_id: | |
| raise HTTPException(status_code=400, detail="Missing tenant ID") | |
| try: | |
| result = await rag_client.delete_all_documents(x_tenant_id) | |
| if "error" in result: | |
| error_msg = result["error"] | |
| # Check if it's a connection error (503) or other error | |
| if "Cannot connect" in error_msg: | |
| raise HTTPException(status_code=503, detail=error_msg) | |
| else: | |
| raise HTTPException(status_code=500, detail=error_msg) | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) |