nothingworry's picture
feat: add knowledge base management and analytics dashboard
aa63765
raw
history blame
2.63 kB
import os
import httpx
from dotenv import load_dotenv
load_dotenv()
class RAGClient:
"""
Communicates with the RAG MCP server.
"""
def __init__(self):
self.base_url = os.getenv("RAG_MCP_URL")
self.search_endpoint = f"{self.base_url}/search"
self.ingest_endpoint = f"{self.base_url}/ingest"
async def search(self, query: str, tenant_id: str):
"""
Sends the query to the RAG server and returns document chunks.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
self.search_endpoint,
json={
"query": query,
"tenant_id": tenant_id
}
)
if response.status_code != 200:
return []
data = response.json()
return data.get("results", [])
except Exception as e:
print("RAG Client Error:", e)
return []
async def ingest(self, content: str, tenant_id: str):
"""
Sends content to the RAG server for ingestion.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
self.ingest_endpoint,
json={
"tenant_id": tenant_id,
"content": content
}
)
if response.status_code != 200:
return {"error": f"HTTP {response.status_code}"}
data = response.json()
return data
except Exception as e:
print("RAG Ingest Error:", e)
return {"error": str(e)}
async def list_documents(self, tenant_id: str, limit: int = 1000, offset: int = 0):
"""
List all documents for a tenant.
"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/list",
params={
"tenant_id": tenant_id,
"limit": limit,
"offset": offset
}
)
if response.status_code != 200:
return {"documents": [], "total": 0, "limit": limit, "offset": offset}
data = response.json()
return data
except Exception as e:
print("RAG List Error:", e)
return {"documents": [], "total": 0, "limit": limit, "offset": offset}