nothingworry's picture
Add Docker support and remove Ollama
0452a50
from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form, Request
from pydantic import BaseModel
from typing import Optional, Dict, Any
from api.mcp_clients.rag_client import RAGClient
from api.services.document_ingestion import (
prepare_ingestion_payload,
process_ingestion,
detect_source_type,
normalize_text,
extract_text_from_file_bytes
)
from ..utils.access_control import require_api_permission
router = APIRouter()
rag_client = RAGClient()
class IngestRequest(BaseModel):
"""Legacy simple ingestion request"""
content: str
class DocumentIngestRequest(BaseModel):
"""Enhanced ingestion request matching the system prompt specification"""
action: str = "ingest_document"
tenant_id: Optional[str] = None # Can come from header
source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown
content: str
metadata: Optional[Dict[str, Any]] = None
class SearchRequest(BaseModel):
query: str
@router.post("/search")
async def rag_search(
req: SearchRequest,
x_tenant_id: str = Header(None)
):
"""
Search tenant knowledge base using the RAG MCP server.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
try:
results = await rag_client.search(req.query, x_tenant_id)
return {
"tenant_id": x_tenant_id,
"query": req.query,
"results": results
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ingest")
async def rag_ingest(
req: IngestRequest,
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Legacy ingestion endpoint - simple content ingestion.
Ingest content into tenant knowledge base using the RAG MCP server.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "ingest_documents")
try:
result = await rag_client.ingest(req.content, x_tenant_id)
return {
"tenant_id": x_tenant_id,
"status": "ok",
**result
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ingest-document")
async def rag_ingest_document(
req: DocumentIngestRequest,
request: Request,
x_tenant_id: Optional[str] = Header(None),
x_user_role: str = Header("viewer")
):
"""
Enhanced document ingestion endpoint matching the system prompt specification.
Supports:
- PDF, DOCX, TXT, Markdown files
- URLs (fetches content automatically)
- Raw text
- Metadata (filename, url, doc_id)
Expected payload:
{
"action": "ingest_document",
"tenant_id": "...",
"source_type": "pdf | docx | txt | url | raw_text",
"content": "...",
"metadata": {
"filename": "...",
"url": "...",
"doc_id": "..."
}
}
"""
# Use tenant_id from header if not in body (for backward compatibility)
tenant_id = req.tenant_id or x_tenant_id
if not tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
import sys
# Debug: Check actual headers received
all_headers = dict(request.headers)
print(f"🔍 DEBUG: All headers received: {list(all_headers.keys())}", file=sys.stderr)
print(f"🔍 DEBUG: x-user-role header value: '{all_headers.get('x-user-role', 'NOT FOUND')}'", file=sys.stderr)
print(f"🔍 DEBUG: x-user-role header value (case-insensitive): '{all_headers.get('X-User-Role', all_headers.get('x-user-role', 'NOT FOUND'))}'", file=sys.stderr)
print(f"🔍 DEBUG: Backend received x_user_role parameter='{x_user_role}' (type: {type(x_user_role)})", file=sys.stderr)
print(f"🔍 DEBUG: x_tenant_id header='{x_tenant_id}'", file=sys.stderr)
require_api_permission(x_user_role, "ingest_documents")
content_length = len(req.content) if req.content else 0
print(f"📥 Ingestion request received: tenant_id={tenant_id}, source_type={req.source_type}, content_length={content_length}", file=sys.stderr)
# Validate content is not too short
if not req.content or not req.content.strip():
raise HTTPException(status_code=400, detail="Content cannot be empty. Please provide text to ingest.")
if content_length < 10:
print(f"⚠️ Warning: Content is very short ({content_length} chars). This may result in no chunks being created.", file=sys.stderr)
try:
print("🔧 Step 1: Preparing ingestion payload...", file=sys.stderr)
# Prepare ingestion payload (async for URL fetching)
try:
payload = await prepare_ingestion_payload(
tenant_id=tenant_id,
content=req.content,
source_type=req.source_type,
filename=req.metadata.get("filename") if req.metadata else None,
url=req.metadata.get("url") if req.metadata else None,
doc_id=req.metadata.get("doc_id") if req.metadata else None,
metadata=req.metadata
)
print(f"✅ Step 1 complete: payload prepared", file=sys.stderr)
except Exception as prep_err:
print(f"❌ Step 1 FAILED (prepare_ingestion_payload): {prep_err}", file=sys.stderr)
import traceback
print(traceback.format_exc(), file=sys.stderr)
raise
print("🔧 Step 2: Processing ingestion with RAG client...", file=sys.stderr)
# Process ingestion with metadata extraction
extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True
try:
result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata, user_role=x_user_role)
print(f"✅ Step 2 complete: chunks_stored={result.get('chunks_stored', 0) if isinstance(result, dict) else 'N/A'}", file=sys.stderr)
except HTTPException:
# Re-raise HTTP exceptions (like 403 permission errors) as-is
raise
except Exception as proc_err:
# Check if it's a permission error with status_code attribute
if hasattr(proc_err, 'status_code') and proc_err.status_code == 403:
raise HTTPException(status_code=403, detail=getattr(proc_err, 'detail', str(proc_err)))
print(f"❌ Step 2 FAILED (process_ingestion): {proc_err}", file=sys.stderr)
import traceback
print(traceback.format_exc(), file=sys.stderr)
raise
# Check if ingestion actually succeeded
# First check if the result itself indicates an error
if isinstance(result, dict) and result.get('status') == 'error':
error_msg = result.get('message') or result.get('error') or "Unknown error from RAG server"
error_type = result.get('error_type', 'unknown')
print(f"❌ RAG server returned error ({error_type}): {error_msg}", file=sys.stderr)
# If it's a permission error, return 403
if 'permission' in error_msg.lower() or 'not permitted' in error_msg.lower() or error_type == 'validation_error':
raise HTTPException(
status_code=403,
detail=f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown."
)
else:
raise HTTPException(status_code=500, detail=f"RAG server error: {error_msg}")
chunks_stored = result.get('chunks_stored', 0)
print(f"🔍 Debug: result keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, chunks_stored={chunks_stored}", file=sys.stderr)
if chunks_stored == 0:
# Get more details about why no chunks were stored
error_detail = result.get('error') or result.get('warnings') or result.get('message') or "No chunks were stored"
warnings = result.get('warnings')
error_msg = f"Ingestion failed: {error_detail}"
if warnings:
error_msg += f"\nWarnings: {warnings}"
error_msg += (
"\n\nPossible causes:\n"
"1. Content too short or empty (minimum text required)\n"
"2. Database connection issue (check POSTGRESQL_URL in RAG server)\n"
"3. RAG MCP server error (check RAG server logs)\n"
"4. Database table 'documents' doesn't exist"
)
print(f"❌ No chunks stored. Error detail: {error_detail}", file=sys.stderr)
raise HTTPException(status_code=500, detail=error_msg)
# Build response message
message = f"Document ingested successfully. {chunks_stored} chunk(s) stored."
if result.get("extracted_metadata"):
metadata_info = result["extracted_metadata"]
if metadata_info.get("title"):
message += f" Title: {metadata_info['title']}"
if metadata_info.get("quality_score"):
message += f" Quality: {metadata_info['quality_score']:.2f}"
return {
"status": "ok",
"message": message,
**result
}
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except ValueError as e:
import traceback
print(f"❌ Ingestion ValueError: {e}")
print(traceback.format_exc())
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
except Exception as e:
import traceback
import sys
error_detail = str(e)
error_type = type(e).__name__
full_traceback = traceback.format_exc()
# Log to console with full details (use both stderr and stdout to ensure visibility)
error_log = f"❌ Ingestion Error ({error_type}): {error_detail}\nFull traceback:\n{full_traceback}"
print(error_log, file=sys.stderr)
print(error_log) # Also print to stdout for uvicorn logs
# Provide helpful error message
if "POSTGRESQL_URL" in error_detail or "database" in error_detail.lower() or "connection" in error_detail.lower():
error_msg = (
f"Database connection error: {error_detail}\n\n"
f"Please check:\n"
f"1. POSTGRESQL_URL is set correctly in your .env file\n"
f"2. Database is accessible\n"
f"3. The 'documents' table exists (run initialize_database() if needed)"
)
elif "RAG" in error_detail or "rag" in error_detail.lower() or "mcp" in error_detail.lower():
error_msg = (
f"RAG server error: {error_detail}\n\n"
f"Please check:\n"
f"1. RAG_MCP_URL is set correctly (default: http://localhost:8900/rag)\n"
f"2. RAG MCP server is running\n"
f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server"
)
else:
# For unknown errors, include the full error message
error_msg = f"Ingestion failed ({error_type}): {error_detail}"
# If it's a long traceback, include just the first few lines
if len(error_detail) > 500:
error_msg = f"Ingestion failed ({error_type}): {error_detail[:500]}...\n\nSee server logs for full traceback."
# Ensure error message is not too long for HTTP response
if len(error_msg) > 2000:
error_msg = error_msg[:2000] + "...\n\n(Error message truncated. See server logs for full details.)"
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/ingest-file")
async def rag_ingest_file(
file: UploadFile = File(...),
x_tenant_id: Optional[str] = Header(None),
tenant_id: Optional[str] = Form(None),
x_user_role: str = Header("viewer")
):
"""
File upload endpoint for binary files (PDF, DOCX, TXT, MD).
Extracts text server-side and ingests into knowledge base.
Usage:
POST /rag/ingest-file
Headers:
x-tenant-id: <tenant_id>
Form Data:
file: <binary file>
tenant_id: <optional, can use header instead>
"""
# Use tenant_id from form or header
tenant_id_value = tenant_id or x_tenant_id
if not tenant_id_value:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "ingest_documents")
try:
# Read file bytes
file_bytes = await file.read()
if not file_bytes:
raise HTTPException(status_code=400, detail="File is empty")
# Extract text from binary file
try:
extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not extracted_text or not extracted_text.strip():
raise HTTPException(status_code=400, detail="No text could be extracted from file")
# Prepare ingestion payload
payload = await prepare_ingestion_payload(
tenant_id=tenant_id_value,
content=extracted_text,
source_type=None, # Auto-detect from filename
filename=file.filename,
url=None,
doc_id=None,
metadata=None
)
# Process ingestion with metadata extraction
result = await process_ingestion(payload, rag_client, extract_metadata=True)
# Build response message
message = f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored."
if result.get("extracted_metadata"):
metadata_info = result["extracted_metadata"]
if metadata_info.get("title"):
message += f" Title: {metadata_info['title']}"
if metadata_info.get("quality_score"):
message += f" Quality: {metadata_info['quality_score']:.2f}"
return {
"status": "ok",
"message": message,
**result
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/list")
async def rag_list(
limit: int = 1000,
offset: int = 0,
x_tenant_id: str = Header(None)
):
"""
List all documents in tenant knowledge base.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
try:
result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/delete/{document_id}")
async def rag_delete(
document_id: int,
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Delete a specific document by ID from tenant knowledge base.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "delete_documents")
try:
result = await rag_client.delete_document(x_tenant_id, document_id, user_role=x_user_role)
if "error" in result:
# Check if it's a connection error (500) or not found (404)
error_msg = result["error"]
if "Cannot connect" in error_msg:
raise HTTPException(status_code=503, detail=error_msg)
elif "not found" in error_msg.lower() or "access denied" in error_msg.lower():
raise HTTPException(status_code=404, detail=error_msg)
else:
raise HTTPException(status_code=500, detail=error_msg)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/delete-all")
async def rag_delete_all(
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Delete all documents for a tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "delete_documents")
try:
result = await rag_client.delete_all_documents(x_tenant_id, user_role=x_user_role)
if "error" in result:
error_msg = result["error"]
# Check if it's a connection error (503) or other error
if "Cannot connect" in error_msg:
raise HTTPException(status_code=503, detail=error_msg)
else:
raise HTTPException(status_code=500, detail=error_msg)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))