File size: 3,454 Bytes
e44e5dd
 
d1e5882
e44e5dd
 
 
 
 
 
 
 
 
 
 
d1e5882
 
 
 
 
 
 
e44e5dd
 
 
 
 
 
 
 
 
 
 
 
d1e5882
 
 
 
 
 
 
 
 
e44e5dd
 
 
 
 
484cae8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d1e5882
484cae8
 
 
 
e44e5dd
 
 
 
d1e5882
 
484cae8
e44e5dd
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from __future__ import annotations

from typing import Mapping, Optional, Dict, Any

from backend.api.utils.text_extractor import extract_text
from backend.mcp_server.common.database import insert_document_chunks
from backend.mcp_server.common.embeddings import embed_text
from backend.mcp_server.common.tenant import TenantContext
from backend.mcp_server.common.utils import ToolValidationError, tool_handler


@tool_handler("rag.ingest")
async def rag_ingest(context: TenantContext, payload: Mapping[str, object]) -> dict[str, object]:
    """
    Ingest raw text into the tenant's knowledge base with optional metadata.
    
    Supports:
    - content: Text content to ingest (required)
    - chunk_words: Words per chunk (default: 300)
    - metadata: JSON metadata object (title, summary, tags, topics, etc.)
    - doc_id: Document ID to group chunks from the same document
    """

    content = payload.get("content")
    if not isinstance(content, str) or not content.strip():
        raise ToolValidationError("content must be a non-empty string")

    max_words = payload.get("chunk_words", 300)
    try:
        max_words_value = max(50, min(int(max_words), 800))
    except (TypeError, ValueError):
        raise ToolValidationError("chunk_words must be an integer between 50 and 800")

    # Extract metadata and doc_id if provided
    metadata = payload.get("metadata")
    if metadata and not isinstance(metadata, dict):
        metadata = None  # Ignore invalid metadata
    
    doc_id = payload.get("doc_id")
    if doc_id and not isinstance(doc_id, str):
        doc_id = None

    chunks = extract_text(content, max_words=max_words_value)
    if not chunks:
        raise ToolValidationError("no text detected after preprocessing")

    stored = 0
    errors = []
    
    for i, chunk in enumerate(chunks):
        try:
            vector = embed_text(chunk)
            # Store metadata with each chunk (same metadata for all chunks from same document)
            insert_document_chunks(
                context.tenant_id,
                chunk,
                vector,
                metadata=metadata,
                doc_id=doc_id
            )
            stored += 1
        except Exception as e:
            error_msg = f"Failed to store chunk {i+1}/{len(chunks)}: {str(e)}"
            errors.append(error_msg)
            print(f"❌ {error_msg}")
            # Continue with other chunks, but log the error
    
    if stored == 0:
        # If no chunks were stored, raise an error
        error_summary = "\n".join(errors) if errors else "Unknown error during database insertion"
        raise ToolValidationError(
            f"Failed to store any chunks to database. Errors:\n{error_summary}\n\n"
            f"Please check:\n"
            f"1. POSTGRESQL_URL is set correctly in your .env file\n"
            f"2. Database is accessible and the 'documents' table exists\n"
            f"3. pgvector extension is installed in your PostgreSQL database"
        )
    
    if errors:
        # Some chunks failed, but some succeeded - return a warning
        print(f"⚠️ WARNING: {len(errors)} chunk(s) failed to store, but {stored} chunk(s) were stored successfully")

    return {
        "tenant_id": context.tenant_id,
        "chunks_ingested": stored,
        "metadata": {"chunk_words": max_words_value, **(metadata or {})},
        "doc_id": doc_id,
        "warnings": errors if errors else None,
    }