from __future__ import annotations from typing import Mapping, Optional, Dict, Any from backend.api.utils.text_extractor import extract_text from backend.mcp_server.common.database import insert_document_chunks from backend.mcp_server.common.embeddings import embed_text from backend.mcp_server.common.tenant import TenantContext from backend.mcp_server.common.utils import ToolValidationError, tool_handler @tool_handler("rag.ingest") async def rag_ingest(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]: """ Ingest raw text into the tenant's knowledge base with optional metadata. Supports: - content: Text content to ingest (required) - chunk_words: Words per chunk (default: 300) - metadata: JSON metadata object (title, summary, tags, topics, etc.) - doc_id: Document ID to group chunks from the same document """ content = payload.get("content") if not isinstance(content, str) or not content.strip(): raise ToolValidationError("content must be a non-empty string") max_words = payload.get("chunk_words", 300) try: max_words_value = max(50, min(int(max_words), 800)) except (TypeError, ValueError): raise ToolValidationError("chunk_words must be an integer between 50 and 800") # Extract metadata and doc_id if provided metadata = payload.get("metadata") if metadata and not isinstance(metadata, dict): metadata = None # Ignore invalid metadata doc_id = payload.get("doc_id") if doc_id and not isinstance(doc_id, str): doc_id = None chunks = extract_text(content, max_words=max_words_value) if not chunks: raise ToolValidationError("no text detected after preprocessing") stored = 0 errors = [] for i, chunk in enumerate(chunks): try: vector = embed_text(chunk) # Store metadata with each chunk (same metadata for all chunks from same document) insert_document_chunks( context.tenant_id, chunk, vector, metadata=metadata, doc_id=doc_id ) stored += 1 except Exception as e: error_msg = f"Failed to store chunk {i+1}/{len(chunks)}: {str(e)}" errors.append(error_msg) print(f"❌ {error_msg}") # Continue with other chunks, but log the error if stored == 0: # If no chunks were stored, raise an error error_summary = "\n".join(errors) if errors else "Unknown error during database insertion" raise ToolValidationError( f"Failed to store any chunks to database. Errors:\n{error_summary}\n\n" f"Please check:\n" f"1. POSTGRESQL_URL is set correctly in your .env file\n" f"2. Database is accessible and the 'documents' table exists\n" f"3. pgvector extension is installed in your PostgreSQL database" ) if errors: # Some chunks failed, but some succeeded - return a warning print(f"⚠️ WARNING: {len(errors)} chunk(s) failed to store, but {stored} chunk(s) were stored successfully") return { "tenant_id": context.tenant_id, "chunks_ingested": stored, "metadata": {"chunk_words": max_words_value, **(metadata or {})}, "doc_id": doc_id, "warnings": errors if errors else None, }