nothingworry's picture
fix the issues in the deleting kb
30e3a7e
raw
history blame
17.3 kB
from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form, Request
from pydantic import BaseModel
from typing import Optional, Dict, Any
from api.mcp_clients.rag_client import RAGClient
from api.services.document_ingestion import (
prepare_ingestion_payload,
process_ingestion,
detect_source_type,
normalize_text,
extract_text_from_file_bytes
)
from ..utils.access_control import require_api_permission
router = APIRouter()
rag_client = RAGClient()
class IngestRequest(BaseModel):
"""Legacy simple ingestion request"""
content: str
class DocumentIngestRequest(BaseModel):
"""Enhanced ingestion request matching the system prompt specification"""
action: str = "ingest_document"
tenant_id: Optional[str] = None # Can come from header
source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown
content: str
metadata: Optional[Dict[str, Any]] = None
class SearchRequest(BaseModel):
query: str
@router.post("/search")
async def rag_search(
req: SearchRequest,
x_tenant_id: str = Header(None)
):
"""
Search tenant knowledge base using the RAG MCP server.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
try:
results = await rag_client.search(req.query, x_tenant_id)
return {
"tenant_id": x_tenant_id,
"query": req.query,
"results": results
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ingest")
async def rag_ingest(
req: IngestRequest,
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Legacy ingestion endpoint - simple content ingestion.
Ingest content into tenant knowledge base using the RAG MCP server.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "ingest_documents")
try:
result = await rag_client.ingest(req.content, x_tenant_id)
return {
"tenant_id": x_tenant_id,
"status": "ok",
**result
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ingest-document")
async def rag_ingest_document(
req: DocumentIngestRequest,
request: Request,
x_tenant_id: Optional[str] = Header(None),
x_user_role: str = Header("viewer")
):
"""
Enhanced document ingestion endpoint matching the system prompt specification.
Supports:
- PDF, DOCX, TXT, Markdown files
- URLs (fetches content automatically)
- Raw text
- Metadata (filename, url, doc_id)
Expected payload:
{
"action": "ingest_document",
"tenant_id": "...",
"source_type": "pdf | docx | txt | url | raw_text",
"content": "...",
"metadata": {
"filename": "...",
"url": "...",
"doc_id": "..."
}
}
"""
# Use tenant_id from header if not in body (for backward compatibility)
tenant_id = req.tenant_id or x_tenant_id
if not tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
import sys
# Debug: Check actual headers received
all_headers = dict(request.headers)
print(f"🔍 DEBUG: All headers received: {list(all_headers.keys())}", file=sys.stderr)
print(f"🔍 DEBUG: x-user-role header value: '{all_headers.get('x-user-role', 'NOT FOUND')}'", file=sys.stderr)
print(f"🔍 DEBUG: x-user-role header value (case-insensitive): '{all_headers.get('X-User-Role', all_headers.get('x-user-role', 'NOT FOUND'))}'", file=sys.stderr)
print(f"🔍 DEBUG: Backend received x_user_role parameter='{x_user_role}' (type: {type(x_user_role)})", file=sys.stderr)
print(f"🔍 DEBUG: x_tenant_id header='{x_tenant_id}'", file=sys.stderr)
require_api_permission(x_user_role, "ingest_documents")
content_length = len(req.content) if req.content else 0
print(f"📥 Ingestion request received: tenant_id={tenant_id}, source_type={req.source_type}, content_length={content_length}", file=sys.stderr)
# Validate content is not too short
if not req.content or not req.content.strip():
raise HTTPException(status_code=400, detail="Content cannot be empty. Please provide text to ingest.")
if content_length < 10:
print(f"⚠️ Warning: Content is very short ({content_length} chars). This may result in no chunks being created.", file=sys.stderr)
try:
print("🔧 Step 1: Preparing ingestion payload...", file=sys.stderr)
# Prepare ingestion payload (async for URL fetching)
try:
payload = await prepare_ingestion_payload(
tenant_id=tenant_id,
content=req.content,
source_type=req.source_type,
filename=req.metadata.get("filename") if req.metadata else None,
url=req.metadata.get("url") if req.metadata else None,
doc_id=req.metadata.get("doc_id") if req.metadata else None,
metadata=req.metadata
)
print(f"✅ Step 1 complete: payload prepared", file=sys.stderr)
except Exception as prep_err:
print(f"❌ Step 1 FAILED (prepare_ingestion_payload): {prep_err}", file=sys.stderr)
import traceback
print(traceback.format_exc(), file=sys.stderr)
raise
print("🔧 Step 2: Processing ingestion with RAG client...", file=sys.stderr)
# Process ingestion with metadata extraction
extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True
try:
result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata, user_role=x_user_role)
print(f"✅ Step 2 complete: chunks_stored={result.get('chunks_stored', 0) if isinstance(result, dict) else 'N/A'}", file=sys.stderr)
except HTTPException:
# Re-raise HTTP exceptions (like 403 permission errors) as-is
raise
except Exception as proc_err:
# Check if it's a permission error with status_code attribute
if hasattr(proc_err, 'status_code') and proc_err.status_code == 403:
raise HTTPException(status_code=403, detail=getattr(proc_err, 'detail', str(proc_err)))
print(f"❌ Step 2 FAILED (process_ingestion): {proc_err}", file=sys.stderr)
import traceback
print(traceback.format_exc(), file=sys.stderr)
raise
# Check if ingestion actually succeeded
# First check if the result itself indicates an error
if isinstance(result, dict) and result.get('status') == 'error':
error_msg = result.get('message') or result.get('error') or "Unknown error from RAG server"
error_type = result.get('error_type', 'unknown')
print(f"❌ RAG server returned error ({error_type}): {error_msg}", file=sys.stderr)
# If it's a permission error, return 403
if 'permission' in error_msg.lower() or 'not permitted' in error_msg.lower() or error_type == 'validation_error':
raise HTTPException(
status_code=403,
detail=f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown."
)
else:
raise HTTPException(status_code=500, detail=f"RAG server error: {error_msg}")
chunks_stored = result.get('chunks_stored', 0)
print(f"🔍 Debug: result keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, chunks_stored={chunks_stored}", file=sys.stderr)
if chunks_stored == 0:
# Get more details about why no chunks were stored
error_detail = result.get('error') or result.get('warnings') or result.get('message') or "No chunks were stored"
warnings = result.get('warnings')
error_msg = f"Ingestion failed: {error_detail}"
if warnings:
error_msg += f"\nWarnings: {warnings}"
error_msg += (
"\n\nPossible causes:\n"
"1. Content too short or empty (minimum text required)\n"
"2. Database connection issue (check POSTGRESQL_URL in RAG server)\n"
"3. RAG MCP server error (check RAG server logs)\n"
"4. Database table 'documents' doesn't exist"
)
print(f"❌ No chunks stored. Error detail: {error_detail}", file=sys.stderr)
raise HTTPException(status_code=500, detail=error_msg)
# Build response message
message = f"Document ingested successfully. {chunks_stored} chunk(s) stored."
if result.get("extracted_metadata"):
metadata_info = result["extracted_metadata"]
if metadata_info.get("title"):
message += f" Title: {metadata_info['title']}"
if metadata_info.get("quality_score"):
message += f" Quality: {metadata_info['quality_score']:.2f}"
return {
"status": "ok",
"message": message,
**result
}
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except ValueError as e:
import traceback
print(f"❌ Ingestion ValueError: {e}")
print(traceback.format_exc())
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
except Exception as e:
import traceback
import sys
error_detail = str(e)
error_type = type(e).__name__
full_traceback = traceback.format_exc()
# Log to console with full details (use both stderr and stdout to ensure visibility)
error_log = f"❌ Ingestion Error ({error_type}): {error_detail}\nFull traceback:\n{full_traceback}"
print(error_log, file=sys.stderr)
print(error_log) # Also print to stdout for uvicorn logs
# Provide helpful error message
if "POSTGRESQL_URL" in error_detail or "database" in error_detail.lower() or "connection" in error_detail.lower():
error_msg = (
f"Database connection error: {error_detail}\n\n"
f"Please check:\n"
f"1. POSTGRESQL_URL is set correctly in your .env file\n"
f"2. Database is accessible\n"
f"3. The 'documents' table exists (run initialize_database() if needed)"
)
elif "RAG" in error_detail or "rag" in error_detail.lower() or "mcp" in error_detail.lower():
error_msg = (
f"RAG server error: {error_detail}\n\n"
f"Please check:\n"
f"1. RAG_MCP_URL is set correctly (default: http://localhost:8001)\n"
f"2. RAG MCP server is running\n"
f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server"
)
else:
# For unknown errors, include the full error message
error_msg = f"Ingestion failed ({error_type}): {error_detail}"
# If it's a long traceback, include just the first few lines
if len(error_detail) > 500:
error_msg = f"Ingestion failed ({error_type}): {error_detail[:500]}...\n\nSee server logs for full traceback."
# Ensure error message is not too long for HTTP response
if len(error_msg) > 2000:
error_msg = error_msg[:2000] + "...\n\n(Error message truncated. See server logs for full details.)"
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/ingest-file")
async def rag_ingest_file(
file: UploadFile = File(...),
x_tenant_id: Optional[str] = Header(None),
tenant_id: Optional[str] = Form(None),
x_user_role: str = Header("viewer")
):
"""
File upload endpoint for binary files (PDF, DOCX, TXT, MD).
Extracts text server-side and ingests into knowledge base.
Usage:
POST /rag/ingest-file
Headers:
x-tenant-id: <tenant_id>
Form Data:
file: <binary file>
tenant_id: <optional, can use header instead>
"""
# Use tenant_id from form or header
tenant_id_value = tenant_id or x_tenant_id
if not tenant_id_value:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "ingest_documents")
try:
# Read file bytes
file_bytes = await file.read()
if not file_bytes:
raise HTTPException(status_code=400, detail="File is empty")
# Extract text from binary file
try:
extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not extracted_text or not extracted_text.strip():
raise HTTPException(status_code=400, detail="No text could be extracted from file")
# Prepare ingestion payload
payload = await prepare_ingestion_payload(
tenant_id=tenant_id_value,
content=extracted_text,
source_type=None, # Auto-detect from filename
filename=file.filename,
url=None,
doc_id=None,
metadata=None
)
# Process ingestion with metadata extraction
result = await process_ingestion(payload, rag_client, extract_metadata=True)
# Build response message
message = f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored."
if result.get("extracted_metadata"):
metadata_info = result["extracted_metadata"]
if metadata_info.get("title"):
message += f" Title: {metadata_info['title']}"
if metadata_info.get("quality_score"):
message += f" Quality: {metadata_info['quality_score']:.2f}"
return {
"status": "ok",
"message": message,
**result
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/list")
async def rag_list(
limit: int = 1000,
offset: int = 0,
x_tenant_id: str = Header(None)
):
"""
List all documents in tenant knowledge base.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
try:
result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/delete/{document_id}")
async def rag_delete(
document_id: int,
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Delete a specific document by ID from tenant knowledge base.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "delete_documents")
try:
result = await rag_client.delete_document(x_tenant_id, document_id, user_role=x_user_role)
if "error" in result:
# Check if it's a connection error (500) or not found (404)
error_msg = result["error"]
if "Cannot connect" in error_msg:
raise HTTPException(status_code=503, detail=error_msg)
elif "not found" in error_msg.lower() or "access denied" in error_msg.lower():
raise HTTPException(status_code=404, detail=error_msg)
else:
raise HTTPException(status_code=500, detail=error_msg)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/delete-all")
async def rag_delete_all(
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Delete all documents for a tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "delete_documents")
try:
result = await rag_client.delete_all_documents(x_tenant_id, user_role=x_user_role)
if "error" in result:
error_msg = result["error"]
# Check if it's a connection error (503) or other error
if "Cannot connect" in error_msg:
raise HTTPException(status_code=503, detail=error_msg)
else:
raise HTTPException(status_code=500, detail=error_msg)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))