Spaces:
Sleeping
Sleeping
File size: 17,309 Bytes
484cae8 aa63765 73fd1fc c16e1c9 73fd1fc b65ef75 c16e1c9 aa63765 73fd1fc aa63765 73fd1fc aa63765 c16e1c9 aa63765 c16e1c9 aa63765 c16e1c9 aa63765 c16e1c9 aa63765 b65ef75 aa63765 73fd1fc aa63765 b65ef75 aa63765 73fd1fc 484cae8 b65ef75 73fd1fc 484cae8 b65ef75 73fd1fc 484cae8 73fd1fc 484cae8 73fd1fc 484cae8 73fd1fc 484cae8 d1e5882 484cae8 d1e5882 484cae8 d1e5882 73fd1fc d1e5882 73fd1fc 484cae8 73fd1fc 484cae8 73fd1fc 484cae8 0452a50 484cae8 73fd1fc b65ef75 73fd1fc b65ef75 73fd1fc d1e5882 73fd1fc d1e5882 73fd1fc aa63765 345b8ff b65ef75 345b8ff b65ef75 345b8ff 30e3a7e 345b8ff b65ef75 345b8ff b65ef75 345b8ff 30e3a7e 345b8ff e44e5dd 345b8ff |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
from fastapi import APIRouter, Header, HTTPException, UploadFile, File, Form, Request
from pydantic import BaseModel
from typing import Optional, Dict, Any
from api.mcp_clients.rag_client import RAGClient
from api.services.document_ingestion import (
prepare_ingestion_payload,
process_ingestion,
detect_source_type,
normalize_text,
extract_text_from_file_bytes
)
from ..utils.access_control import require_api_permission
router = APIRouter()
rag_client = RAGClient()
class IngestRequest(BaseModel):
"""Legacy simple ingestion request"""
content: str
class DocumentIngestRequest(BaseModel):
"""Enhanced ingestion request matching the system prompt specification"""
action: str = "ingest_document"
tenant_id: Optional[str] = None # Can come from header
source_type: Optional[str] = None # pdf | docx | txt | url | raw_text | markdown
content: str
metadata: Optional[Dict[str, Any]] = None
class SearchRequest(BaseModel):
query: str
@router.post("/search")
async def rag_search(
req: SearchRequest,
x_tenant_id: str = Header(None)
):
"""
Search tenant knowledge base using the RAG MCP server.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
try:
results = await rag_client.search(req.query, x_tenant_id)
return {
"tenant_id": x_tenant_id,
"query": req.query,
"results": results
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ingest")
async def rag_ingest(
req: IngestRequest,
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Legacy ingestion endpoint - simple content ingestion.
Ingest content into tenant knowledge base using the RAG MCP server.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "ingest_documents")
try:
result = await rag_client.ingest(req.content, x_tenant_id)
return {
"tenant_id": x_tenant_id,
"status": "ok",
**result
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/ingest-document")
async def rag_ingest_document(
req: DocumentIngestRequest,
request: Request,
x_tenant_id: Optional[str] = Header(None),
x_user_role: str = Header("viewer")
):
"""
Enhanced document ingestion endpoint matching the system prompt specification.
Supports:
- PDF, DOCX, TXT, Markdown files
- URLs (fetches content automatically)
- Raw text
- Metadata (filename, url, doc_id)
Expected payload:
{
"action": "ingest_document",
"tenant_id": "...",
"source_type": "pdf | docx | txt | url | raw_text",
"content": "...",
"metadata": {
"filename": "...",
"url": "...",
"doc_id": "..."
}
}
"""
# Use tenant_id from header if not in body (for backward compatibility)
tenant_id = req.tenant_id or x_tenant_id
if not tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
import sys
# Debug: Check actual headers received
all_headers = dict(request.headers)
print(f"π DEBUG: All headers received: {list(all_headers.keys())}", file=sys.stderr)
print(f"π DEBUG: x-user-role header value: '{all_headers.get('x-user-role', 'NOT FOUND')}'", file=sys.stderr)
print(f"π DEBUG: x-user-role header value (case-insensitive): '{all_headers.get('X-User-Role', all_headers.get('x-user-role', 'NOT FOUND'))}'", file=sys.stderr)
print(f"π DEBUG: Backend received x_user_role parameter='{x_user_role}' (type: {type(x_user_role)})", file=sys.stderr)
print(f"π DEBUG: x_tenant_id header='{x_tenant_id}'", file=sys.stderr)
require_api_permission(x_user_role, "ingest_documents")
content_length = len(req.content) if req.content else 0
print(f"π₯ Ingestion request received: tenant_id={tenant_id}, source_type={req.source_type}, content_length={content_length}", file=sys.stderr)
# Validate content is not too short
if not req.content or not req.content.strip():
raise HTTPException(status_code=400, detail="Content cannot be empty. Please provide text to ingest.")
if content_length < 10:
print(f"β οΈ Warning: Content is very short ({content_length} chars). This may result in no chunks being created.", file=sys.stderr)
try:
print("π§ Step 1: Preparing ingestion payload...", file=sys.stderr)
# Prepare ingestion payload (async for URL fetching)
try:
payload = await prepare_ingestion_payload(
tenant_id=tenant_id,
content=req.content,
source_type=req.source_type,
filename=req.metadata.get("filename") if req.metadata else None,
url=req.metadata.get("url") if req.metadata else None,
doc_id=req.metadata.get("doc_id") if req.metadata else None,
metadata=req.metadata
)
print(f"β
Step 1 complete: payload prepared", file=sys.stderr)
except Exception as prep_err:
print(f"β Step 1 FAILED (prepare_ingestion_payload): {prep_err}", file=sys.stderr)
import traceback
print(traceback.format_exc(), file=sys.stderr)
raise
print("π§ Step 2: Processing ingestion with RAG client...", file=sys.stderr)
# Process ingestion with metadata extraction
extract_metadata = req.metadata.get("extract_metadata", True) if req.metadata else True
try:
result = await process_ingestion(payload, rag_client, extract_metadata=extract_metadata, user_role=x_user_role)
print(f"β
Step 2 complete: chunks_stored={result.get('chunks_stored', 0) if isinstance(result, dict) else 'N/A'}", file=sys.stderr)
except HTTPException:
# Re-raise HTTP exceptions (like 403 permission errors) as-is
raise
except Exception as proc_err:
# Check if it's a permission error with status_code attribute
if hasattr(proc_err, 'status_code') and proc_err.status_code == 403:
raise HTTPException(status_code=403, detail=getattr(proc_err, 'detail', str(proc_err)))
print(f"β Step 2 FAILED (process_ingestion): {proc_err}", file=sys.stderr)
import traceback
print(traceback.format_exc(), file=sys.stderr)
raise
# Check if ingestion actually succeeded
# First check if the result itself indicates an error
if isinstance(result, dict) and result.get('status') == 'error':
error_msg = result.get('message') or result.get('error') or "Unknown error from RAG server"
error_type = result.get('error_type', 'unknown')
print(f"β RAG server returned error ({error_type}): {error_msg}", file=sys.stderr)
# If it's a permission error, return 403
if 'permission' in error_msg.lower() or 'not permitted' in error_msg.lower() or error_type == 'validation_error':
raise HTTPException(
status_code=403,
detail=f"Permission denied: {error_msg}\n\nPlease change your role to 'editor', 'admin', or 'owner' in the User Role dropdown."
)
else:
raise HTTPException(status_code=500, detail=f"RAG server error: {error_msg}")
chunks_stored = result.get('chunks_stored', 0)
print(f"π Debug: result keys={list(result.keys()) if isinstance(result, dict) else 'not a dict'}, chunks_stored={chunks_stored}", file=sys.stderr)
if chunks_stored == 0:
# Get more details about why no chunks were stored
error_detail = result.get('error') or result.get('warnings') or result.get('message') or "No chunks were stored"
warnings = result.get('warnings')
error_msg = f"Ingestion failed: {error_detail}"
if warnings:
error_msg += f"\nWarnings: {warnings}"
error_msg += (
"\n\nPossible causes:\n"
"1. Content too short or empty (minimum text required)\n"
"2. Database connection issue (check POSTGRESQL_URL in RAG server)\n"
"3. RAG MCP server error (check RAG server logs)\n"
"4. Database table 'documents' doesn't exist"
)
print(f"β No chunks stored. Error detail: {error_detail}", file=sys.stderr)
raise HTTPException(status_code=500, detail=error_msg)
# Build response message
message = f"Document ingested successfully. {chunks_stored} chunk(s) stored."
if result.get("extracted_metadata"):
metadata_info = result["extracted_metadata"]
if metadata_info.get("title"):
message += f" Title: {metadata_info['title']}"
if metadata_info.get("quality_score"):
message += f" Quality: {metadata_info['quality_score']:.2f}"
return {
"status": "ok",
"message": message,
**result
}
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except ValueError as e:
import traceback
print(f"β Ingestion ValueError: {e}")
print(traceback.format_exc())
raise HTTPException(status_code=400, detail=f"Validation error: {str(e)}")
except Exception as e:
import traceback
import sys
error_detail = str(e)
error_type = type(e).__name__
full_traceback = traceback.format_exc()
# Log to console with full details (use both stderr and stdout to ensure visibility)
error_log = f"β Ingestion Error ({error_type}): {error_detail}\nFull traceback:\n{full_traceback}"
print(error_log, file=sys.stderr)
print(error_log) # Also print to stdout for uvicorn logs
# Provide helpful error message
if "POSTGRESQL_URL" in error_detail or "database" in error_detail.lower() or "connection" in error_detail.lower():
error_msg = (
f"Database connection error: {error_detail}\n\n"
f"Please check:\n"
f"1. POSTGRESQL_URL is set correctly in your .env file\n"
f"2. Database is accessible\n"
f"3. The 'documents' table exists (run initialize_database() if needed)"
)
elif "RAG" in error_detail or "rag" in error_detail.lower() or "mcp" in error_detail.lower():
error_msg = (
f"RAG server error: {error_detail}\n\n"
f"Please check:\n"
f"1. RAG_MCP_URL is set correctly (default: http://localhost:8900/rag)\n"
f"2. RAG MCP server is running\n"
f"3. Database connection (POSTGRESQL_URL) is configured in the RAG server"
)
else:
# For unknown errors, include the full error message
error_msg = f"Ingestion failed ({error_type}): {error_detail}"
# If it's a long traceback, include just the first few lines
if len(error_detail) > 500:
error_msg = f"Ingestion failed ({error_type}): {error_detail[:500]}...\n\nSee server logs for full traceback."
# Ensure error message is not too long for HTTP response
if len(error_msg) > 2000:
error_msg = error_msg[:2000] + "...\n\n(Error message truncated. See server logs for full details.)"
raise HTTPException(status_code=500, detail=error_msg)
@router.post("/ingest-file")
async def rag_ingest_file(
file: UploadFile = File(...),
x_tenant_id: Optional[str] = Header(None),
tenant_id: Optional[str] = Form(None),
x_user_role: str = Header("viewer")
):
"""
File upload endpoint for binary files (PDF, DOCX, TXT, MD).
Extracts text server-side and ingests into knowledge base.
Usage:
POST /rag/ingest-file
Headers:
x-tenant-id: <tenant_id>
Form Data:
file: <binary file>
tenant_id: <optional, can use header instead>
"""
# Use tenant_id from form or header
tenant_id_value = tenant_id or x_tenant_id
if not tenant_id_value:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "ingest_documents")
try:
# Read file bytes
file_bytes = await file.read()
if not file_bytes:
raise HTTPException(status_code=400, detail="File is empty")
# Extract text from binary file
try:
extracted_text = extract_text_from_file_bytes(file_bytes, file.filename or "unknown")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not extracted_text or not extracted_text.strip():
raise HTTPException(status_code=400, detail="No text could be extracted from file")
# Prepare ingestion payload
payload = await prepare_ingestion_payload(
tenant_id=tenant_id_value,
content=extracted_text,
source_type=None, # Auto-detect from filename
filename=file.filename,
url=None,
doc_id=None,
metadata=None
)
# Process ingestion with metadata extraction
result = await process_ingestion(payload, rag_client, extract_metadata=True)
# Build response message
message = f"File '{file.filename}' ingested successfully. {result.get('chunks_stored', 0)} chunk(s) stored."
if result.get("extracted_metadata"):
metadata_info = result["extracted_metadata"]
if metadata_info.get("title"):
message += f" Title: {metadata_info['title']}"
if metadata_info.get("quality_score"):
message += f" Quality: {metadata_info['quality_score']:.2f}"
return {
"status": "ok",
"message": message,
**result
}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/list")
async def rag_list(
limit: int = 1000,
offset: int = 0,
x_tenant_id: str = Header(None)
):
"""
List all documents in tenant knowledge base.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
try:
result = await rag_client.list_documents(x_tenant_id, limit=limit, offset=offset)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/delete/{document_id}")
async def rag_delete(
document_id: int,
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Delete a specific document by ID from tenant knowledge base.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "delete_documents")
try:
result = await rag_client.delete_document(x_tenant_id, document_id, user_role=x_user_role)
if "error" in result:
# Check if it's a connection error (500) or not found (404)
error_msg = result["error"]
if "Cannot connect" in error_msg:
raise HTTPException(status_code=503, detail=error_msg)
elif "not found" in error_msg.lower() or "access denied" in error_msg.lower():
raise HTTPException(status_code=404, detail=error_msg)
else:
raise HTTPException(status_code=500, detail=error_msg)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/delete-all")
async def rag_delete_all(
x_tenant_id: str = Header(None),
x_user_role: str = Header("viewer")
):
"""
Delete all documents for a tenant.
"""
if not x_tenant_id:
raise HTTPException(status_code=400, detail="Missing tenant ID")
require_api_permission(x_user_role, "delete_documents")
try:
result = await rag_client.delete_all_documents(x_tenant_id, user_role=x_user_role)
if "error" in result:
error_msg = result["error"]
# Check if it's a connection error (503) or other error
if "Cannot connect" in error_msg:
raise HTTPException(status_code=503, detail=error_msg)
else:
raise HTTPException(status_code=500, detail=error_msg)
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) |