""" Document Ingestion Service Handles ingestion of various document types (PDF, DOCX, TXT, URL, raw_text) with metadata support and automatic type detection. """ import os import re import logging from typing import Dict, Any, Optional from urllib.parse import urlparse import httpx logger = logging.getLogger("document_ingestion") def detect_source_type(content: str, filename: Optional[str] = None, url: Optional[str] = None) -> str: """ Detect the source type from content, filename, or URL. Returns: 'pdf', 'docx', 'txt', 'url', 'raw_text', 'markdown' """ if url: return "url" if filename: ext = filename.lower().split('.')[-1] if '.' in filename else '' if ext in ['pdf']: return 'pdf' elif ext in ['docx', 'doc']: return 'docx' elif ext in ['txt', 'text']: return 'txt' elif ext in ['md', 'markdown']: return 'markdown' # Heuristic detection from content content_lower = content.lower() if 'http://' in content_lower or 'https://' in content_lower or 'www.' in content_lower: return 'url' return 'raw_text' async def extract_text_from_url(url: str, timeout: int = 30) -> str: """ Fetch and extract text content from a URL (async). """ try: async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: response = await client.get(url) response.raise_for_status() # Basic HTML stripping (for simple pages) text = response.text # Remove script and style tags text = re.sub(r']*>.*?', '', text, flags=re.DOTALL | re.IGNORECASE) text = re.sub(r']*>.*?', '', text, flags=re.DOTALL | re.IGNORECASE) # Remove HTML tags text = re.sub(r'<[^>]+>', ' ', text) # Normalize whitespace text = re.sub(r'\s+', ' ', text).strip() return text except Exception as e: logger.error(f"Failed to fetch URL {url}: {e}") raise ValueError(f"Failed to fetch URL: {str(e)}") def normalize_text(text: str) -> str: """ Sanitize and normalize text before ingestion. """ # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove control characters except newlines and tabs text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text) # Strip leading/trailing whitespace text = text.strip() return text async def prepare_ingestion_payload( tenant_id: str, content: str, source_type: Optional[str] = None, filename: Optional[str] = None, url: Optional[str] = None, doc_id: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Prepare ingestion payload according to the system prompt specification. Returns: { "action": "ingest_document", "tenant_id": "...", "source_type": "pdf | docx | txt | url | raw_text", "content": "...", "metadata": { "filename": "...", "url": "...", "doc_id": "..." } } """ # Auto-detect source type if not provided if not source_type: source_type = detect_source_type(content, filename, url) # Handle URL: fetch content (async) if source_type == "url" and url: try: content = await extract_text_from_url(url) except Exception as e: logger.warning(f"URL fetch failed, using provided content: {e}") # Normalize content content = normalize_text(content) if not content: raise ValueError("Content is empty after normalization") # Generate doc_id if not provided if not doc_id: if filename: doc_id = filename elif url: parsed = urlparse(url) doc_id = f"{parsed.netloc}{parsed.path}".replace('/', '_')[:100] else: import hashlib doc_id = hashlib.md5(content.encode()).hexdigest()[:16] # Build metadata ingestion_metadata = { "doc_id": doc_id, **(metadata or {}) } if filename: ingestion_metadata["filename"] = filename if url: ingestion_metadata["url"] = url return { "action": "ingest_document", "tenant_id": tenant_id, "source_type": source_type, "content": content, "metadata": ingestion_metadata } async def process_ingestion( payload: Dict[str, Any], rag_client ) -> Dict[str, Any]: """ Process the ingestion payload by sending it to the RAG MCP server. Args: payload: The ingestion payload from prepare_ingestion_payload rag_client: RAGClient instance Returns: Result from RAG ingestion """ tenant_id = payload["tenant_id"] content = payload["content"] # Send to RAG MCP server result = await rag_client.ingest(content, tenant_id) # Enhance result with metadata return { "status": "ok", "tenant_id": tenant_id, "source_type": payload["source_type"], "doc_id": payload["metadata"].get("doc_id"), "chunks_stored": result.get("chunks_stored", 0), "metadata": payload["metadata"], **result }