Spaces:
Runtime error
Runtime error
| """Document ingestion pipeline for RAG functionality.""" | |
| import os | |
| import hashlib | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from pathlib import Path | |
| from datetime import datetime | |
| from langchain_core.documents import Document | |
| from src.rag.chunking import document_chunker | |
| from src.rag.vector_store import vector_store_manager | |
| from src.rag.embeddings import embedding_manager | |
| from src.core.logging_config import get_logger | |
| logger = get_logger(__name__) | |
| class DocumentIngestionService: | |
| """Service for ingesting documents into the RAG system.""" | |
| def __init__(self): | |
| """Initialize the document ingestion service.""" | |
| logger.info("Document ingestion service initialized") | |
| def create_file_hash(self, content: str) -> str: | |
| """Create a full SHA-256 hash for file content to avoid duplicates.""" | |
| return hashlib.sha256(content.encode('utf-8')).hexdigest() | |
| def prepare_document_metadata(self, | |
| source_path: Optional[str] = None, | |
| doc_type: str = "markdown", | |
| additional_metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| """ | |
| Prepare metadata for a document. | |
| Args: | |
| source_path: Original document path | |
| doc_type: Type of document (markdown, pdf, etc.) | |
| additional_metadata: Additional metadata to include | |
| Returns: | |
| Dictionary with document metadata | |
| """ | |
| metadata = { | |
| "source": source_path or "user_upload", | |
| "doc_type": doc_type, | |
| "processed_at": datetime.now().isoformat(), | |
| "ingestion_version": "1.0" | |
| } | |
| if additional_metadata: | |
| metadata.update(additional_metadata) | |
| return metadata | |
| def check_duplicate_in_vector_store(self, file_hash: str) -> bool: | |
| """Check if document with given file hash already exists in vector store.""" | |
| try: | |
| existing_docs = vector_store_manager.get_vector_store()._collection.get( | |
| where={"file_hash": file_hash}, | |
| limit=1 | |
| ) | |
| return len(existing_docs.get('ids', [])) > 0 | |
| except Exception as e: | |
| logger.error(f"Error checking for duplicates: {e}") | |
| return False | |
| def delete_existing_document(self, file_hash: str) -> bool: | |
| """Delete existing document with given file hash from vector store.""" | |
| try: | |
| vector_store_manager.get_vector_store()._collection.delete( | |
| where={"file_hash": file_hash} | |
| ) | |
| logger.info(f"Deleted existing document with hash: {file_hash}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting existing document: {e}") | |
| return False | |
| def ingest_text_content(self, | |
| text_content: str, | |
| content_type: str = "markdown", | |
| source_path: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]: | |
| """ | |
| Ingest text content (markdown or LaTeX) into the RAG system. | |
| Args: | |
| text_content: The text content to ingest (markdown or LaTeX) | |
| content_type: Type of content ("markdown" or "latex") | |
| source_path: Optional source path/filename | |
| metadata: Optional additional metadata | |
| original_file_content: Original file content for hash calculation | |
| Returns: | |
| Tuple of (success, message, ingestion_stats) | |
| """ | |
| try: | |
| if not text_content or not text_content.strip(): | |
| return False, "No content provided for ingestion", {} | |
| # Create file hash using original content if available, otherwise use text content | |
| file_content_for_hash = original_file_content or text_content | |
| file_hash = self.create_file_hash(file_content_for_hash) | |
| # Check for duplicates in vector store | |
| is_duplicate = self.check_duplicate_in_vector_store(file_hash) | |
| replacement_mode = False | |
| if is_duplicate: | |
| logger.info(f"Document with hash {file_hash} already exists, replacing...") | |
| # Delete existing document | |
| if self.delete_existing_document(file_hash): | |
| replacement_mode = True | |
| else: | |
| return False, "Failed to replace existing document", {"status": "error"} | |
| # Prepare document metadata with file hash | |
| doc_metadata = self.prepare_document_metadata( | |
| source_path=source_path, | |
| doc_type=content_type, # Use content_type instead of hardcoded "markdown" | |
| additional_metadata=metadata | |
| ) | |
| doc_metadata["file_hash"] = file_hash | |
| doc_metadata["content_length"] = len(text_content) | |
| doc_metadata["upload_timestamp"] = datetime.now().isoformat() | |
| # Chunk the document using text-aware chunking | |
| logger.info(f"Chunking {content_type} document: {file_hash}") | |
| chunks = document_chunker.chunk_document(text_content, doc_metadata) | |
| if not chunks: | |
| return False, "Failed to create document chunks", {} | |
| # Add chunks to vector store | |
| logger.info(f"Adding {len(chunks)} chunks to vector store") | |
| doc_ids = vector_store_manager.add_documents(chunks) | |
| if not doc_ids: | |
| return False, "Failed to add documents to vector store", {} | |
| # Prepare ingestion statistics | |
| ingestion_stats = { | |
| "status": "success", | |
| "file_hash": file_hash, | |
| "total_chunks": len(chunks), | |
| "document_ids": doc_ids, | |
| "content_length": len(text_content), | |
| "has_tables": any(chunk.metadata.get("has_table", False) for chunk in chunks), | |
| "has_code": any(chunk.metadata.get("has_code", False) for chunk in chunks), | |
| "processed_at": datetime.now().isoformat(), | |
| "replacement_mode": replacement_mode | |
| } | |
| action = "Updated existing" if replacement_mode else "Successfully ingested" | |
| success_msg = f"{action} document with {len(chunks)} chunks" | |
| logger.info(f"{success_msg}: {file_hash}") | |
| return True, success_msg, ingestion_stats | |
| except Exception as e: | |
| error_msg = f"Error during document ingestion: {str(e)}" | |
| logger.error(error_msg) | |
| return False, error_msg, {"status": "error", "error": str(e)} | |
| def ingest_markdown_content(self, | |
| markdown_content: str, | |
| source_path: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]: | |
| """ | |
| Backward compatibility method for ingesting markdown content. | |
| """ | |
| return self.ingest_text_content( | |
| text_content=markdown_content, | |
| content_type="markdown", | |
| source_path=source_path, | |
| metadata=metadata, | |
| original_file_content=original_file_content | |
| ) | |
| def ingest_from_conversion_result(self, conversion_result: Dict[str, Any]) -> Tuple[bool, str, Dict[str, Any]]: | |
| """ | |
| Ingest a document from Markit conversion result. | |
| Args: | |
| conversion_result: Dictionary containing conversion results from Markit | |
| Returns: | |
| Tuple of (success, message, ingestion_stats) | |
| """ | |
| try: | |
| # Extract markdown content from conversion result | |
| markdown_content = conversion_result.get("markdown_content", "") | |
| if not markdown_content: | |
| return False, "No markdown content found in conversion result", {} | |
| # Extract metadata from conversion result | |
| original_filename = conversion_result.get("original_filename", "unknown") | |
| conversion_method = conversion_result.get("conversion_method", "unknown") | |
| original_file_content = conversion_result.get("original_file_content") | |
| additional_metadata = { | |
| "original_filename": original_filename, | |
| "conversion_method": conversion_method, | |
| "file_size": conversion_result.get("file_size", 0), | |
| "conversion_time": conversion_result.get("conversion_time", 0) | |
| } | |
| # Determine content type based on conversion method | |
| content_type = "latex" if "GOT-OCR" in conversion_method else "markdown" | |
| # Ingest the content with original file content for proper hashing | |
| return self.ingest_text_content( | |
| text_content=markdown_content, | |
| content_type=content_type, | |
| source_path=original_filename, | |
| metadata=additional_metadata, | |
| original_file_content=original_file_content | |
| ) | |
| except Exception as e: | |
| error_msg = f"Error ingesting from conversion result: {str(e)}" | |
| logger.error(error_msg) | |
| return False, error_msg, {"status": "error", "error": str(e)} | |
| def get_ingestion_status(self) -> Dict[str, Any]: | |
| """ | |
| Get current ingestion system status. | |
| Returns: | |
| Dictionary with system status information | |
| """ | |
| status = { | |
| "processed_documents": 0, # Will be updated from vector store | |
| "embedding_model_available": False, | |
| "vector_store_available": False, | |
| "system_ready": False | |
| } | |
| try: | |
| # Check embedding model | |
| status["embedding_model_available"] = embedding_manager.test_embedding_model() | |
| # Check vector store | |
| collection_info = vector_store_manager.get_collection_info() | |
| status["vector_store_available"] = "error" not in collection_info | |
| status["total_documents_in_store"] = collection_info.get("document_count", 0) | |
| # System is ready if both components are available | |
| status["system_ready"] = ( | |
| status["embedding_model_available"] and | |
| status["vector_store_available"] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error checking ingestion status: {e}") | |
| status["error"] = str(e) | |
| return status | |
| def test_ingestion_pipeline(self) -> Dict[str, Any]: | |
| """ | |
| Test the complete ingestion pipeline with sample content. | |
| Returns: | |
| Dictionary with test results | |
| """ | |
| test_results = { | |
| "pipeline_test": False, | |
| "chunking_test": False, | |
| "embedding_test": False, | |
| "vector_store_test": False, | |
| "errors": [] | |
| } | |
| try: | |
| # Test with sample markdown content | |
| test_content = """# Test Document | |
| This is a test document for the RAG ingestion pipeline. | |
| ## Features | |
| - Document chunking | |
| - Embedding generation | |
| - Vector store integration | |
| ## Sample Table | |
| | Feature | Status | Priority | | |
| |---------|--------|----------| | |
| | Chunking | β | High | | |
| | Embeddings | β | High | | |
| | Vector Store | β | Medium | | |
| ```python | |
| # Sample code block | |
| def test_function(): | |
| return "Hello, RAG!" | |
| ``` | |
| This document contains various markdown elements to test the ingestion pipeline. | |
| """ | |
| # Test chunking | |
| metadata = self.prepare_document_metadata( | |
| source_path="test_document.md", | |
| doc_type="markdown" | |
| ) | |
| chunks = document_chunker.chunk_document(test_content, metadata) | |
| test_results["chunking_test"] = len(chunks) > 0 | |
| if not test_results["chunking_test"]: | |
| test_results["errors"].append("Chunking test failed: No chunks created") | |
| return test_results | |
| # Test embedding | |
| test_results["embedding_test"] = embedding_manager.test_embedding_model() | |
| if not test_results["embedding_test"]: | |
| test_results["errors"].append("Embedding test failed") | |
| return test_results | |
| # Test vector store (add and retrieve) | |
| doc_ids = vector_store_manager.add_documents(chunks[:1]) # Test with one chunk | |
| test_results["vector_store_test"] = len(doc_ids) > 0 | |
| if test_results["vector_store_test"]: | |
| # Test retrieval | |
| search_results = vector_store_manager.similarity_search("test document", k=1) | |
| test_results["vector_store_test"] = len(search_results) > 0 | |
| if not test_results["vector_store_test"]: | |
| test_results["errors"].append("Vector store test failed") | |
| return test_results | |
| # Overall pipeline test | |
| test_results["pipeline_test"] = ( | |
| test_results["chunking_test"] and | |
| test_results["embedding_test"] and | |
| test_results["vector_store_test"] | |
| ) | |
| logger.info(f"Ingestion pipeline test completed: {test_results['pipeline_test']}") | |
| except Exception as e: | |
| error_msg = f"Pipeline test error: {str(e)}" | |
| test_results["errors"].append(error_msg) | |
| logger.error(error_msg) | |
| return test_results | |
| # Global document ingestion service instance | |
| document_ingestion_service = DocumentIngestionService() |