Spaces:
Sleeping
Sleeping
| import os | |
| import httpx | |
| from typing import Optional, Dict, Any | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| class RAGClient: | |
| """ | |
| Communicates with the RAG MCP server. | |
| """ | |
| def __init__(self): | |
| self.base_url = os.getenv("RAG_MCP_URL", "http://localhost:8900/rag") | |
| if not self.base_url: | |
| raise ValueError("RAG_MCP_URL environment variable is not set") | |
| self.search_endpoint = f"{self.base_url}/search" | |
| self.ingest_endpoint = f"{self.base_url}/ingest" | |
| async def search(self, query: str, tenant_id: str): | |
| """ | |
| Sends the query to the RAG server and returns document chunks. | |
| Unwraps MCP server responses automatically. | |
| """ | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| self.search_endpoint, | |
| json={ | |
| "query": query, | |
| "tenant_id": tenant_id | |
| } | |
| ) | |
| if response.status_code != 200: | |
| return [] | |
| data = response.json() | |
| if isinstance(data, dict) and data.get("status") == "error": | |
| print("RAG Client Error:", data.get("message")) | |
| return [] | |
| if isinstance(data, dict) and "data" in data: | |
| payload = data["data"] | |
| return payload.get("results", []) if isinstance(payload, dict) else payload | |
| return data.get("results", []) if isinstance(data, dict) else data | |
| except Exception as e: | |
| print("RAG Client Error:", e) | |
| return [] | |
| async def ingest(self, content: str, tenant_id: str): | |
| """ | |
| Sends content to the RAG server for ingestion. | |
| Returns the unwrapped data from the MCP server response. | |
| """ | |
| return await self.ingest_with_metadata(content, tenant_id, metadata=None, doc_id=None) | |
| async def ingest_with_metadata( | |
| self, | |
| content: str, | |
| tenant_id: str, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| doc_id: Optional[str] = None, | |
| user_role: Optional[str] = None | |
| ): | |
| """ | |
| Sends content to the RAG server for ingestion with metadata. | |
| Returns the unwrapped data from the MCP server response. | |
| Args: | |
| content: Text content to ingest | |
| tenant_id: Tenant identifier | |
| metadata: Optional metadata dictionary | |
| doc_id: Optional document ID | |
| user_role: User role (viewer, editor, admin, owner) - required for permission checks | |
| """ | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "content": content | |
| } | |
| # Add role to payload (MCP server expects it for permission checks) | |
| if user_role: | |
| payload["user_role"] = user_role | |
| # Add metadata if provided | |
| if metadata: | |
| payload["metadata"] = metadata | |
| if doc_id: | |
| payload["doc_id"] = doc_id | |
| response = await client.post( | |
| self.ingest_endpoint, | |
| json=payload | |
| ) | |
| if response.status_code != 200: | |
| error_text = response.text[:500] if hasattr(response, 'text') else f"HTTP {response.status_code}" | |
| raise RuntimeError( | |
| f"RAG server returned error {response.status_code}: {error_text}\n\n" | |
| f"Please check:\n" | |
| f"1. RAG MCP server is running at {self.base_url}\n" | |
| f"2. Database connection (POSTGRESQL_URL) is configured\n" | |
| f"3. The 'documents' table exists in the database" | |
| ) | |
| data = response.json() | |
| # MCP server wraps response in a 'data' field | |
| # Extract the actual result data | |
| if isinstance(data, dict) and "data" in data: | |
| result = data["data"] | |
| # Map chunks_ingested to chunks_stored for consistency | |
| if "chunks_ingested" in result: | |
| result["chunks_stored"] = result.pop("chunks_ingested") | |
| return result | |
| # If not wrapped, return as-is (backward compatibility) | |
| return data | |
| except httpx.RequestError as e: | |
| error_msg = f"Failed to connect to RAG server at {self.base_url}: {str(e)}" | |
| print(f"❌ RAG Ingest Connection Error: {error_msg}") | |
| raise RuntimeError( | |
| f"{error_msg}\n\n" | |
| f"Please check:\n" | |
| f"1. RAG_MCP_URL is set correctly (current: {self.base_url})\n" | |
| f"2. RAG MCP server is running\n" | |
| f"3. Network connectivity to the server" | |
| ) from e | |
| except Exception as e: | |
| error_msg = f"RAG ingestion error: {str(e)}" | |
| print(f"❌ {error_msg}") | |
| raise RuntimeError( | |
| f"{error_msg}\n\n" | |
| f"Please check the RAG server logs for more details." | |
| ) from e | |
| async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0): | |
| """ | |
| List all documents for a tenant. | |
| Returns the unwrapped data from the MCP server response. | |
| """ | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"{self.base_url}/list", | |
| params={ | |
| "tenant_id": tenant_id, | |
| "limit": limit, | |
| "offset": offset | |
| } | |
| ) | |
| if response.status_code != 200: | |
| return {"documents": [], "total": 0, "limit": limit, "offset": offset} | |
| data = response.json() | |
| # MCP server wraps response in a 'data' field | |
| # Extract the actual result data | |
| if isinstance(data, dict) and "data" in data: | |
| return data["data"] | |
| # If not wrapped, return as-is (backward compatibility) | |
| return data | |
| except Exception as e: | |
| print("RAG List Error:", e) | |
| return {"documents": [], "total": 0, "limit": limit, "offset": offset} | |
| async def delete_document(self, tenant_id: str, document_id: int, user_role: Optional[str] = None): | |
| """ | |
| Delete a specific document by ID for a tenant. | |
| Returns the unwrapped data from the MCP server response. | |
| Args: | |
| tenant_id: Tenant identifier | |
| document_id: Document ID to delete | |
| user_role: User role (viewer, editor, admin, owner) - required for permission checks | |
| """ | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| # Use POST with JSON payload to include role for MCP protocol | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "document_id": document_id | |
| } | |
| if user_role: | |
| payload["user_role"] = user_role | |
| response = await client.post( | |
| f"{self.base_url}/delete", | |
| json=payload | |
| ) | |
| if response.status_code == 404: | |
| return {"error": f"Document {document_id} not found or access denied"} | |
| if response.status_code != 200: | |
| error_text = response.text | |
| try: | |
| error_json = response.json() | |
| error_text = error_json.get("detail", error_text) | |
| except: | |
| pass | |
| return {"error": f"HTTP {response.status_code}: {error_text}"} | |
| data = response.json() | |
| # Check if MCP server returned an error response | |
| if isinstance(data, dict) and data.get("status") == "error": | |
| error_msg = data.get("message", "Unknown error") | |
| error_type = data.get("error_type", "unknown_error") | |
| # For validation errors (like "not found"), return clear error message | |
| if error_type == "validation_error": | |
| if "not found" in error_msg.lower(): | |
| return {"error": f"Document {document_id} not found for tenant '{tenant_id}'"} | |
| elif "access denied" in error_msg.lower() or "permission" in error_msg.lower(): | |
| return {"error": f"Access denied: {error_msg}"} | |
| return {"error": error_msg} | |
| # MCP server wraps response in a 'data' field | |
| # Extract the actual result data | |
| if isinstance(data, dict) and "data" in data: | |
| return data["data"] | |
| # If not wrapped, return as-is (backward compatibility) | |
| return data | |
| except httpx.ConnectError as e: | |
| print(f"RAG Delete Error: Cannot connect to RAG MCP server at {self.base_url}") | |
| return {"error": f"Cannot connect to RAG MCP server. Is it running at {self.base_url}?"} | |
| except Exception as e: | |
| print(f"RAG Delete Error: {e}") | |
| return {"error": str(e)} | |
| async def delete_all_documents(self, tenant_id: str, user_role: Optional[str] = None): | |
| """ | |
| Delete all documents for a tenant. | |
| Returns the unwrapped data from the MCP server response. | |
| Args: | |
| tenant_id: Tenant identifier | |
| user_role: User role (viewer, editor, admin, owner) - required for permission checks | |
| """ | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| # Use POST with JSON payload to include role for MCP protocol | |
| payload = { | |
| "tenant_id": tenant_id, | |
| "delete_all": True | |
| } | |
| if user_role: | |
| payload["user_role"] = user_role | |
| response = await client.post( | |
| f"{self.base_url}/delete-all", | |
| json=payload | |
| ) | |
| if response.status_code != 200: | |
| error_text = response.text | |
| try: | |
| error_json = response.json() | |
| error_text = error_json.get("detail", error_text) | |
| except: | |
| pass | |
| return {"error": f"HTTP {response.status_code}: {error_text}"} | |
| data = response.json() | |
| # Check if MCP server returned an error response | |
| if isinstance(data, dict) and data.get("status") == "error": | |
| error_msg = data.get("message", "Unknown error") | |
| return {"error": error_msg} | |
| # MCP server wraps response in a 'data' field | |
| # Extract the actual result data | |
| if isinstance(data, dict) and "data" in data: | |
| return data["data"] | |
| # If not wrapped, return as-is (backward compatibility) | |
| return data | |
| except httpx.ConnectError as e: | |
| print(f"RAG Delete All Error: Cannot connect to RAG MCP server at {self.base_url}") | |
| return {"error": f"Cannot connect to RAG MCP server. Is it running at {self.base_url}?"} | |
| except Exception as e: | |
| print(f"RAG Delete All Error: {e}") | |
| return {"error": str(e)} |