Spaces:
Sleeping
Sleeping
File size: 3,454 Bytes
e44e5dd d1e5882 e44e5dd d1e5882 e44e5dd d1e5882 e44e5dd 484cae8 d1e5882 484cae8 e44e5dd d1e5882 484cae8 e44e5dd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
from __future__ import annotations
from typing import Mapping, Optional, Dict, Any
from backend.api.utils.text_extractor import extract_text
from backend.mcp_server.common.database import insert_document_chunks
from backend.mcp_server.common.embeddings import embed_text
from backend.mcp_server.common.tenant import TenantContext
from backend.mcp_server.common.utils import ToolValidationError, tool_handler
@tool_handler("rag.ingest")
async def rag_ingest(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
"""
Ingest raw text into the tenant's knowledge base with optional metadata.
Supports:
- content: Text content to ingest (required)
- chunk_words: Words per chunk (default: 300)
- metadata: JSON metadata object (title, summary, tags, topics, etc.)
- doc_id: Document ID to group chunks from the same document
"""
content = payload.get("content")
if not isinstance(content, str) or not content.strip():
raise ToolValidationError("content must be a non-empty string")
max_words = payload.get("chunk_words", 300)
try:
max_words_value = max(50, min(int(max_words), 800))
except (TypeError, ValueError):
raise ToolValidationError("chunk_words must be an integer between 50 and 800")
# Extract metadata and doc_id if provided
metadata = payload.get("metadata")
if metadata and not isinstance(metadata, dict):
metadata = None # Ignore invalid metadata
doc_id = payload.get("doc_id")
if doc_id and not isinstance(doc_id, str):
doc_id = None
chunks = extract_text(content, max_words=max_words_value)
if not chunks:
raise ToolValidationError("no text detected after preprocessing")
stored = 0
errors = []
for i, chunk in enumerate(chunks):
try:
vector = embed_text(chunk)
# Store metadata with each chunk (same metadata for all chunks from same document)
insert_document_chunks(
context.tenant_id,
chunk,
vector,
metadata=metadata,
doc_id=doc_id
)
stored += 1
except Exception as e:
error_msg = f"Failed to store chunk {i+1}/{len(chunks)}: {str(e)}"
errors.append(error_msg)
print(f"❌ {error_msg}")
# Continue with other chunks, but log the error
if stored == 0:
# If no chunks were stored, raise an error
error_summary = "\n".join(errors) if errors else "Unknown error during database insertion"
raise ToolValidationError(
f"Failed to store any chunks to database. Errors:\n{error_summary}\n\n"
f"Please check:\n"
f"1. POSTGRESQL_URL is set correctly in your .env file\n"
f"2. Database is accessible and the 'documents' table exists\n"
f"3. pgvector extension is installed in your PostgreSQL database"
)
if errors:
# Some chunks failed, but some succeeded - return a warning
print(f"⚠️ WARNING: {len(errors)} chunk(s) failed to store, but {stored} chunk(s) were stored successfully")
return {
"tenant_id": context.tenant_id,
"chunks_ingested": stored,
"metadata": {"chunk_words": max_words_value, **(metadata or {})},
"doc_id": doc_id,
"warnings": errors if errors else None,
}
|