"""
Document Ingestion Service
Handles ingestion of various document types (PDF, DOCX, TXT, URL, raw_text)
with metadata support and automatic type detection.
"""
import os
import re
import logging
from typing import Dict, Any, Optional
from urllib.parse import urlparse
import httpx
logger = logging.getLogger("document_ingestion")
def detect_source_type(content: str, filename: Optional[str] = None, url: Optional[str] = None) -> str:
"""
Detect the source type from content, filename, or URL.
Returns: 'pdf', 'docx', 'txt', 'url', 'raw_text', 'markdown'
"""
if url:
return "url"
if filename:
ext = filename.lower().split('.')[-1] if '.' in filename else ''
if ext in ['pdf']:
return 'pdf'
elif ext in ['docx', 'doc']:
return 'docx'
elif ext in ['txt', 'text']:
return 'txt'
elif ext in ['md', 'markdown']:
return 'markdown'
# Heuristic detection from content
content_lower = content.lower()
if 'http://' in content_lower or 'https://' in content_lower or 'www.' in content_lower:
return 'url'
return 'raw_text'
async def extract_text_from_url(url: str, timeout: int = 30) -> str:
"""
Fetch and extract text content from a URL (async).
"""
try:
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
response = await client.get(url)
response.raise_for_status()
# Basic HTML stripping (for simple pages)
text = response.text
# Remove script and style tags
text = re.sub(r'', '', text, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'', '', text, flags=re.DOTALL | re.IGNORECASE)
# Remove HTML tags
text = re.sub(r'<[^>]+>', ' ', text)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text
except Exception as e:
logger.error(f"Failed to fetch URL {url}: {e}")
raise ValueError(f"Failed to fetch URL: {str(e)}")
def normalize_text(text: str) -> str:
"""
Sanitize and normalize text before ingestion.
"""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text)
# Strip leading/trailing whitespace
text = text.strip()
return text
async def prepare_ingestion_payload(
tenant_id: str,
content: str,
source_type: Optional[str] = None,
filename: Optional[str] = None,
url: Optional[str] = None,
doc_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Prepare ingestion payload according to the system prompt specification.
Returns:
{
"action": "ingest_document",
"tenant_id": "...",
"source_type": "pdf | docx | txt | url | raw_text",
"content": "...",
"metadata": {
"filename": "...",
"url": "...",
"doc_id": "..."
}
}
"""
# Auto-detect source type if not provided
if not source_type:
source_type = detect_source_type(content, filename, url)
# Handle URL: fetch content (async)
if source_type == "url" and url:
try:
content = await extract_text_from_url(url)
except Exception as e:
logger.warning(f"URL fetch failed, using provided content: {e}")
# Normalize content
content = normalize_text(content)
if not content:
raise ValueError("Content is empty after normalization")
# Generate doc_id if not provided
if not doc_id:
if filename:
doc_id = filename
elif url:
parsed = urlparse(url)
doc_id = f"{parsed.netloc}{parsed.path}".replace('/', '_')[:100]
else:
import hashlib
doc_id = hashlib.md5(content.encode()).hexdigest()[:16]
# Build metadata
ingestion_metadata = {
"doc_id": doc_id,
**(metadata or {})
}
if filename:
ingestion_metadata["filename"] = filename
if url:
ingestion_metadata["url"] = url
return {
"action": "ingest_document",
"tenant_id": tenant_id,
"source_type": source_type,
"content": content,
"metadata": ingestion_metadata
}
async def process_ingestion(
payload: Dict[str, Any],
rag_client
) -> Dict[str, Any]:
"""
Process the ingestion payload by sending it to the RAG MCP server.
Args:
payload: The ingestion payload from prepare_ingestion_payload
rag_client: RAGClient instance
Returns:
Result from RAG ingestion
"""
tenant_id = payload["tenant_id"]
content = payload["content"]
# Send to RAG MCP server
result = await rag_client.ingest(content, tenant_id)
# Enhance result with metadata
return {
"status": "ok",
"tenant_id": tenant_id,
"source_type": payload["source_type"],
"doc_id": payload["metadata"].get("doc_id"),
"chunks_stored": result.get("chunks_stored", 0),
"metadata": payload["metadata"],
**result
}