Spaces:
Sleeping
Sleeping
| """ | |
| Document Ingestion Service | |
| Handles ingestion of various document types (PDF, DOCX, TXT, URL, raw_text) | |
| with metadata support and automatic type detection. | |
| """ | |
| import os | |
| import re | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from urllib.parse import urlparse | |
| import httpx | |
| from io import BytesIO | |
| logger = logging.getLogger("document_ingestion") | |
| def detect_source_type(content: str, filename: Optional[str] = None, url: Optional[str] = None) -> str: | |
| """ | |
| Detect the source type from content, filename, or URL. | |
| Returns: 'pdf', 'docx', 'txt', 'url', 'raw_text', 'markdown' | |
| """ | |
| if url: | |
| return "url" | |
| if filename: | |
| ext = filename.lower().split('.')[-1] if '.' in filename else '' | |
| if ext in ['pdf']: | |
| return 'pdf' | |
| elif ext in ['docx', 'doc']: | |
| return 'docx' | |
| elif ext in ['txt', 'text']: | |
| return 'txt' | |
| elif ext in ['md', 'markdown']: | |
| return 'markdown' | |
| # Heuristic detection from content | |
| content_lower = content.lower() | |
| if 'http://' in content_lower or 'https://' in content_lower or 'www.' in content_lower: | |
| return 'url' | |
| return 'raw_text' | |
| async def extract_text_from_url(url: str, timeout: int = 30) -> str: | |
| """ | |
| Fetch and extract text content from a URL (async). | |
| """ | |
| try: | |
| async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: | |
| response = await client.get(url) | |
| response.raise_for_status() | |
| # Basic HTML stripping (for simple pages) | |
| text = response.text | |
| # Remove script and style tags | |
| text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.DOTALL | re.IGNORECASE) | |
| text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL | re.IGNORECASE) | |
| # Remove HTML tags | |
| text = re.sub(r'<[^>]+>', ' ', text) | |
| # Normalize whitespace | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| except Exception as e: | |
| logger.error(f"Failed to fetch URL {url}: {e}") | |
| raise ValueError(f"Failed to fetch URL: {str(e)}") | |
| def extract_text_from_file_bytes(file_bytes: bytes, filename: str) -> str: | |
| """ | |
| Extract text from binary file data (PDF, DOCX, etc.). | |
| Args: | |
| file_bytes: Binary file content | |
| filename: Original filename (for type detection) | |
| Returns: | |
| Extracted text content | |
| """ | |
| ext = filename.lower().split('.')[-1] if '.' in filename else '' | |
| # PDF extraction | |
| if ext == 'pdf': | |
| try: | |
| import PyPDF2 | |
| pdf_file = BytesIO(file_bytes) | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text_parts = [] | |
| for page in pdf_reader.pages: | |
| text_parts.append(page.extract_text()) | |
| return '\n'.join(text_parts) | |
| except ImportError: | |
| logger.warning("PyPDF2 not installed, cannot extract PDF text") | |
| raise ValueError("PDF extraction requires PyPDF2. Install with: pip install PyPDF2") | |
| except Exception as e: | |
| logger.error(f"PDF extraction failed: {e}") | |
| raise ValueError(f"Failed to extract text from PDF: {str(e)}") | |
| # DOCX extraction | |
| elif ext in ['docx', 'doc']: | |
| try: | |
| from docx import Document | |
| doc_file = BytesIO(file_bytes) | |
| doc = Document(doc_file) | |
| return '\n'.join(paragraph.text for paragraph in doc.paragraphs) | |
| except ImportError: | |
| logger.warning("python-docx not installed, cannot extract DOCX text") | |
| raise ValueError("DOCX extraction requires python-docx. Install with: pip install python-docx") | |
| except Exception as e: | |
| logger.error(f"DOCX extraction failed: {e}") | |
| raise ValueError(f"Failed to extract text from DOCX: {str(e)}") | |
| # Text files (TXT, MD) | |
| elif ext in ['txt', 'md', 'markdown', 'text']: | |
| try: | |
| return file_bytes.decode('utf-8', errors='ignore') | |
| except Exception as e: | |
| logger.error(f"Text file decoding failed: {e}") | |
| raise ValueError(f"Failed to decode text file: {str(e)}") | |
| else: | |
| # Try to decode as UTF-8 text as fallback | |
| try: | |
| return file_bytes.decode('utf-8', errors='ignore') | |
| except Exception: | |
| raise ValueError(f"Unsupported file type: {ext}. Supported: pdf, docx, txt, md") | |
| def normalize_text(text: str) -> str: | |
| """ | |
| Sanitize and normalize text before ingestion. | |
| """ | |
| # Remove excessive whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove control characters except newlines and tabs | |
| text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F]', '', text) | |
| # Strip leading/trailing whitespace | |
| text = text.strip() | |
| return text | |
| async def prepare_ingestion_payload( | |
| tenant_id: str, | |
| content: str, | |
| source_type: Optional[str] = None, | |
| filename: Optional[str] = None, | |
| url: Optional[str] = None, | |
| doc_id: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Prepare ingestion payload according to the system prompt specification. | |
| Returns: | |
| { | |
| "action": "ingest_document", | |
| "tenant_id": "...", | |
| "source_type": "pdf | docx | txt | url | raw_text", | |
| "content": "...", | |
| "metadata": { | |
| "filename": "...", | |
| "url": "...", | |
| "doc_id": "..." | |
| } | |
| } | |
| """ | |
| # Auto-detect source type if not provided | |
| if not source_type: | |
| source_type = detect_source_type(content, filename, url) | |
| # Handle URL: fetch content (async) | |
| if source_type == "url" and url: | |
| try: | |
| content = await extract_text_from_url(url) | |
| except Exception as e: | |
| logger.warning(f"URL fetch failed, using provided content: {e}") | |
| # Normalize content | |
| content = normalize_text(content) | |
| if not content: | |
| raise ValueError("Content is empty after normalization") | |
| # Generate doc_id if not provided | |
| if not doc_id: | |
| if filename: | |
| doc_id = filename | |
| elif url: | |
| parsed = urlparse(url) | |
| doc_id = f"{parsed.netloc}{parsed.path}".replace('/', '_')[:100] | |
| else: | |
| import hashlib | |
| doc_id = hashlib.md5(content.encode()).hexdigest()[:16] | |
| # Build metadata | |
| ingestion_metadata = { | |
| "doc_id": doc_id, | |
| **(metadata or {}) | |
| } | |
| if filename: | |
| ingestion_metadata["filename"] = filename | |
| if url: | |
| ingestion_metadata["url"] = url | |
| return { | |
| "action": "ingest_document", | |
| "tenant_id": tenant_id, | |
| "source_type": source_type, | |
| "content": content, | |
| "metadata": ingestion_metadata | |
| } | |
| async def process_ingestion( | |
| payload: Dict[str, Any], | |
| rag_client | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process the ingestion payload by sending it to the RAG MCP server. | |
| Args: | |
| payload: The ingestion payload from prepare_ingestion_payload | |
| rag_client: RAGClient instance | |
| Returns: | |
| Result from RAG ingestion | |
| """ | |
| tenant_id = payload["tenant_id"] | |
| content = payload["content"] | |
| # Send to RAG MCP server | |
| result = await rag_client.ingest(content, tenant_id) | |
| # Enhance result with metadata | |
| return { | |
| "status": "ok", | |
| "tenant_id": tenant_id, | |
| "source_type": payload["source_type"], | |
| "doc_id": payload["metadata"].get("doc_id"), | |
| "chunks_stored": result.get("chunks_stored", 0), | |
| "metadata": payload["metadata"], | |
| **result | |
| } | |