import os import httpx from typing import Optional, Dict, Any from dotenv import load_dotenv load_dotenv() class RAGClient: """ Communicates with the RAG MCP server. """ def __init__(self): self.base_url = os.getenv("RAG_MCP_URL", "http://localhost:8900/rag") if not self.base_url: raise ValueError("RAG_MCP_URL environment variable is not set") self.search_endpoint = f"{self.base_url}/search" self.ingest_endpoint = f"{self.base_url}/ingest" async def search(self, query: str, tenant_id: str): """ Sends the query to the RAG server and returns document chunks. Unwraps MCP server responses automatically. """ try: async with httpx.AsyncClient() as client: response = await client.post( self.search_endpoint, json={ "query": query, "tenant_id": tenant_id } ) if response.status_code != 200: return [] data = response.json() if isinstance(data, dict) and data.get("status") == "error": print("RAG Client Error:", data.get("message")) return [] if isinstance(data, dict) and "data" in data: payload = data["data"] return payload.get("results", []) if isinstance(payload, dict) else payload return data.get("results", []) if isinstance(data, dict) else data except Exception as e: print("RAG Client Error:", e) return [] async def ingest(self, content: str, tenant_id: str): """ Sends content to the RAG server for ingestion. Returns the unwrapped data from the MCP server response. """ return await self.ingest_with_metadata(content, tenant_id, metadata=None, doc_id=None) async def ingest_with_metadata( self, content: str, tenant_id: str, metadata: Optional[Dict[str, Any]] = None, doc_id: Optional[str] = None, user_role: Optional[str] = None ): """ Sends content to the RAG server for ingestion with metadata. Returns the unwrapped data from the MCP server response. Args: content: Text content to ingest tenant_id: Tenant identifier metadata: Optional metadata dictionary doc_id: Optional document ID user_role: User role (viewer, editor, admin, owner) - required for permission checks """ try: async with httpx.AsyncClient() as client: payload = { "tenant_id": tenant_id, "content": content } # Add role to payload (MCP server expects it for permission checks) if user_role: payload["user_role"] = user_role # Add metadata if provided if metadata: payload["metadata"] = metadata if doc_id: payload["doc_id"] = doc_id response = await client.post( self.ingest_endpoint, json=payload ) if response.status_code != 200: error_text = response.text[:500] if hasattr(response, 'text') else f"HTTP {response.status_code}" raise RuntimeError( f"RAG server returned error {response.status_code}: {error_text}\n\n" f"Please check:\n" f"1. RAG MCP server is running at {self.base_url}\n" f"2. Database connection (POSTGRESQL_URL) is configured\n" f"3. The 'documents' table exists in the database" ) data = response.json() # MCP server wraps response in a 'data' field # Extract the actual result data if isinstance(data, dict) and "data" in data: result = data["data"] # Map chunks_ingested to chunks_stored for consistency if "chunks_ingested" in result: result["chunks_stored"] = result.pop("chunks_ingested") return result # If not wrapped, return as-is (backward compatibility) return data except httpx.RequestError as e: error_msg = f"Failed to connect to RAG server at {self.base_url}: {str(e)}" print(f"❌ RAG Ingest Connection Error: {error_msg}") raise RuntimeError( f"{error_msg}\n\n" f"Please check:\n" f"1. RAG_MCP_URL is set correctly (current: {self.base_url})\n" f"2. RAG MCP server is running\n" f"3. Network connectivity to the server" ) from e except Exception as e: error_msg = f"RAG ingestion error: {str(e)}" print(f"❌ {error_msg}") raise RuntimeError( f"{error_msg}\n\n" f"Please check the RAG server logs for more details." ) from e async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0): """ List all documents for a tenant. Returns the unwrapped data from the MCP server response. """ try: async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/list", params={ "tenant_id": tenant_id, "limit": limit, "offset": offset } ) if response.status_code != 200: return {"documents": [], "total": 0, "limit": limit, "offset": offset} data = response.json() # MCP server wraps response in a 'data' field # Extract the actual result data if isinstance(data, dict) and "data" in data: return data["data"] # If not wrapped, return as-is (backward compatibility) return data except Exception as e: print("RAG List Error:", e) return {"documents": [], "total": 0, "limit": limit, "offset": offset} async def delete_document(self, tenant_id: str, document_id: int, user_role: Optional[str] = None): """ Delete a specific document by ID for a tenant. Returns the unwrapped data from the MCP server response. Args: tenant_id: Tenant identifier document_id: Document ID to delete user_role: User role (viewer, editor, admin, owner) - required for permission checks """ try: async with httpx.AsyncClient(timeout=30.0) as client: # Use POST with JSON payload to include role for MCP protocol payload = { "tenant_id": tenant_id, "document_id": document_id } if user_role: payload["user_role"] = user_role response = await client.post( f"{self.base_url}/delete", json=payload ) if response.status_code == 404: return {"error": f"Document {document_id} not found or access denied"} if response.status_code != 200: error_text = response.text try: error_json = response.json() error_text = error_json.get("detail", error_text) except: pass return {"error": f"HTTP {response.status_code}: {error_text}"} data = response.json() # Check if MCP server returned an error response if isinstance(data, dict) and data.get("status") == "error": error_msg = data.get("message", "Unknown error") error_type = data.get("error_type", "unknown_error") # For validation errors (like "not found"), return clear error message if error_type == "validation_error": if "not found" in error_msg.lower(): return {"error": f"Document {document_id} not found for tenant '{tenant_id}'"} elif "access denied" in error_msg.lower() or "permission" in error_msg.lower(): return {"error": f"Access denied: {error_msg}"} return {"error": error_msg} # MCP server wraps response in a 'data' field # Extract the actual result data if isinstance(data, dict) and "data" in data: return data["data"] # If not wrapped, return as-is (backward compatibility) return data except httpx.ConnectError as e: print(f"RAG Delete Error: Cannot connect to RAG MCP server at {self.base_url}") return {"error": f"Cannot connect to RAG MCP server. Is it running at {self.base_url}?"} except Exception as e: print(f"RAG Delete Error: {e}") return {"error": str(e)} async def delete_all_documents(self, tenant_id: str, user_role: Optional[str] = None): """ Delete all documents for a tenant. Returns the unwrapped data from the MCP server response. Args: tenant_id: Tenant identifier user_role: User role (viewer, editor, admin, owner) - required for permission checks """ try: async with httpx.AsyncClient(timeout=30.0) as client: # Use POST with JSON payload to include role for MCP protocol payload = { "tenant_id": tenant_id, "delete_all": True } if user_role: payload["user_role"] = user_role response = await client.post( f"{self.base_url}/delete-all", json=payload ) if response.status_code != 200: error_text = response.text try: error_json = response.json() error_text = error_json.get("detail", error_text) except: pass return {"error": f"HTTP {response.status_code}: {error_text}"} data = response.json() # Check if MCP server returned an error response if isinstance(data, dict) and data.get("status") == "error": error_msg = data.get("message", "Unknown error") return {"error": error_msg} # MCP server wraps response in a 'data' field # Extract the actual result data if isinstance(data, dict) and "data" in data: return data["data"] # If not wrapped, return as-is (backward compatibility) return data except httpx.ConnectError as e: print(f"RAG Delete All Error: Cannot connect to RAG MCP server at {self.base_url}") return {"error": f"Cannot connect to RAG MCP server. Is it running at {self.base_url}?"} except Exception as e: print(f"RAG Delete All Error: {e}") return {"error": str(e)}