nothingworry's picture
Add Docker support and remove Ollama
0452a50
import os
import httpx
from typing import Optional, Dict, Any
from dotenv import load_dotenv
load_dotenv()
class RAGClient:
"""
Communicates with the RAG MCP server.
"""
def __init__(self):
self.base_url = os.getenv("RAG_MCP_URL", "http://localhost:8900/rag")
if not self.base_url:
raise ValueError("RAG_MCP_URL environment variable is not set")
self.search_endpoint = f"{self.base_url}/search"
self.ingest_endpoint = f"{self.base_url}/ingest"
async def search(self, query: str, tenant_id: str):
"""
Sends the query to the RAG server and returns document chunks.
Unwraps MCP server responses automatically.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
self.search_endpoint,
json={
"query": query,
"tenant_id": tenant_id
}
)
if response.status_code != 200:
return []
data = response.json()
if isinstance(data, dict) and data.get("status") == "error":
print("RAG Client Error:", data.get("message"))
return []
if isinstance(data, dict) and "data" in data:
payload = data["data"]
return payload.get("results", []) if isinstance(payload, dict) else payload
return data.get("results", []) if isinstance(data, dict) else data
except Exception as e:
print("RAG Client Error:", e)
return []
async def ingest(self, content: str, tenant_id: str):
"""
Sends content to the RAG server for ingestion.
Returns the unwrapped data from the MCP server response.
"""
return await self.ingest_with_metadata(content, tenant_id, metadata=None, doc_id=None)
async def ingest_with_metadata(
self,
content: str,
tenant_id: str,
metadata: Optional[Dict[str, Any]] = None,
doc_id: Optional[str] = None,
user_role: Optional[str] = None
):
"""
Sends content to the RAG server for ingestion with metadata.
Returns the unwrapped data from the MCP server response.
Args:
content: Text content to ingest
tenant_id: Tenant identifier
metadata: Optional metadata dictionary
doc_id: Optional document ID
user_role: User role (viewer, editor, admin, owner) - required for permission checks
"""
try:
async with httpx.AsyncClient() as client:
payload = {
"tenant_id": tenant_id,
"content": content
}
# Add role to payload (MCP server expects it for permission checks)
if user_role:
payload["user_role"] = user_role
# Add metadata if provided
if metadata:
payload["metadata"] = metadata
if doc_id:
payload["doc_id"] = doc_id
response = await client.post(
self.ingest_endpoint,
json=payload
)
if response.status_code != 200:
error_text = response.text[:500] if hasattr(response, 'text') else f"HTTP {response.status_code}"
raise RuntimeError(
f"RAG server returned error {response.status_code}: {error_text}\n\n"
f"Please check:\n"
f"1. RAG MCP server is running at {self.base_url}\n"
f"2. Database connection (POSTGRESQL_URL) is configured\n"
f"3. The 'documents' table exists in the database"
)
data = response.json()
# MCP server wraps response in a 'data' field
# Extract the actual result data
if isinstance(data, dict) and "data" in data:
result = data["data"]
# Map chunks_ingested to chunks_stored for consistency
if "chunks_ingested" in result:
result["chunks_stored"] = result.pop("chunks_ingested")
return result
# If not wrapped, return as-is (backward compatibility)
return data
except httpx.RequestError as e:
error_msg = f"Failed to connect to RAG server at {self.base_url}: {str(e)}"
print(f"❌ RAG Ingest Connection Error: {error_msg}")
raise RuntimeError(
f"{error_msg}\n\n"
f"Please check:\n"
f"1. RAG_MCP_URL is set correctly (current: {self.base_url})\n"
f"2. RAG MCP server is running\n"
f"3. Network connectivity to the server"
) from e
except Exception as e:
error_msg = f"RAG ingestion error: {str(e)}"
print(f"❌ {error_msg}")
raise RuntimeError(
f"{error_msg}\n\n"
f"Please check the RAG server logs for more details."
) from e
async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0):
"""
List all documents for a tenant.
Returns the unwrapped data from the MCP server response.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/list",
params={
"tenant_id": tenant_id,
"limit": limit,
"offset": offset
}
)
if response.status_code != 200:
return {"documents": [], "total": 0, "limit": limit, "offset": offset}
data = response.json()
# MCP server wraps response in a 'data' field
# Extract the actual result data
if isinstance(data, dict) and "data" in data:
return data["data"]
# If not wrapped, return as-is (backward compatibility)
return data
except Exception as e:
print("RAG List Error:", e)
return {"documents": [], "total": 0, "limit": limit, "offset": offset}
async def delete_document(self, tenant_id: str, document_id: int, user_role: Optional[str] = None):
"""
Delete a specific document by ID for a tenant.
Returns the unwrapped data from the MCP server response.
Args:
tenant_id: Tenant identifier
document_id: Document ID to delete
user_role: User role (viewer, editor, admin, owner) - required for permission checks
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# Use POST with JSON payload to include role for MCP protocol
payload = {
"tenant_id": tenant_id,
"document_id": document_id
}
if user_role:
payload["user_role"] = user_role
response = await client.post(
f"{self.base_url}/delete",
json=payload
)
if response.status_code == 404:
return {"error": f"Document {document_id} not found or access denied"}
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get("detail", error_text)
except:
pass
return {"error": f"HTTP {response.status_code}: {error_text}"}
data = response.json()
# Check if MCP server returned an error response
if isinstance(data, dict) and data.get("status") == "error":
error_msg = data.get("message", "Unknown error")
error_type = data.get("error_type", "unknown_error")
# For validation errors (like "not found"), return clear error message
if error_type == "validation_error":
if "not found" in error_msg.lower():
return {"error": f"Document {document_id} not found for tenant '{tenant_id}'"}
elif "access denied" in error_msg.lower() or "permission" in error_msg.lower():
return {"error": f"Access denied: {error_msg}"}
return {"error": error_msg}
# MCP server wraps response in a 'data' field
# Extract the actual result data
if isinstance(data, dict) and "data" in data:
return data["data"]
# If not wrapped, return as-is (backward compatibility)
return data
except httpx.ConnectError as e:
print(f"RAG Delete Error: Cannot connect to RAG MCP server at {self.base_url}")
return {"error": f"Cannot connect to RAG MCP server. Is it running at {self.base_url}?"}
except Exception as e:
print(f"RAG Delete Error: {e}")
return {"error": str(e)}
async def delete_all_documents(self, tenant_id: str, user_role: Optional[str] = None):
"""
Delete all documents for a tenant.
Returns the unwrapped data from the MCP server response.
Args:
tenant_id: Tenant identifier
user_role: User role (viewer, editor, admin, owner) - required for permission checks
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
# Use POST with JSON payload to include role for MCP protocol
payload = {
"tenant_id": tenant_id,
"delete_all": True
}
if user_role:
payload["user_role"] = user_role
response = await client.post(
f"{self.base_url}/delete-all",
json=payload
)
if response.status_code != 200:
error_text = response.text
try:
error_json = response.json()
error_text = error_json.get("detail", error_text)
except:
pass
return {"error": f"HTTP {response.status_code}: {error_text}"}
data = response.json()
# Check if MCP server returned an error response
if isinstance(data, dict) and data.get("status") == "error":
error_msg = data.get("message", "Unknown error")
return {"error": error_msg}
# MCP server wraps response in a 'data' field
# Extract the actual result data
if isinstance(data, dict) and "data" in data:
return data["data"]
# If not wrapped, return as-is (backward compatibility)
return data
except httpx.ConnectError as e:
print(f"RAG Delete All Error: Cannot connect to RAG MCP server at {self.base_url}")
return {"error": f"Cannot connect to RAG MCP server. Is it running at {self.base_url}?"}
except Exception as e:
print(f"RAG Delete All Error: {e}")
return {"error": str(e)}