Spaces:
Sleeping
Sleeping
| from ingestion.loaders import document_loader | |
| from ingestion.preprocessor import text_preprocessor | |
| from ingestion.chunker import chunker | |
| from ingestion.embedder import embedder | |
| from app.db.vector_store import vector_store | |
| from app.db.mongodb import MongoDB | |
| from app.utils.logger import logger | |
| from app.utils.errors import DocumentProcessingError | |
| from app.config import config | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Dict, List | |
| import hashlib | |
| class DocumentProcessor: | |
| def __init__(self): | |
| self.loader = document_loader | |
| self.preprocessor = text_preprocessor | |
| self.chunker = chunker | |
| self.embedder = embedder | |
| self.vector_store = vector_store | |
| self.mongodb = MongoDB() | |
| self.collection_name = config["database"]["qdrant"]["collection_name"] | |
| logger.info("DocumentProcessor initialized") | |
| def _generate_doc_id(self, file_path: str) -> str: | |
| content = f"{file_path}{datetime.utcnow().isoformat()}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| async def process_document(self, file_path: str, metadata: Dict = None) -> Dict: | |
| try: | |
| # Ensure MongoDB is connected | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| documents = self.loader.load(file_path) | |
| if not documents: | |
| raise DocumentProcessingError("No content extracted from document") | |
| documents = self.preprocessor.preprocess_documents(documents) | |
| chunks = self.chunker.split_documents(documents) | |
| if not chunks: | |
| raise DocumentProcessingError("No chunks created from document") | |
| logger.info(f"Created {len(chunks)} chunks") | |
| doc_id = self._generate_doc_id(file_path) | |
| file_name = Path(file_path).name | |
| for i, chunk in enumerate(chunks): | |
| if not hasattr(chunk, 'metadata'): | |
| chunk.metadata = {} | |
| chunk.metadata.update({ | |
| "doc_id": doc_id, | |
| "file_name": file_name, | |
| "chunk_index": i, | |
| "total_chunks": len(chunks), | |
| "processed_at": datetime.utcnow().isoformat(), | |
| **(metadata or {}) | |
| }) | |
| # Add documents to Qdrant via vector store | |
| from app.core.retriever import hybrid_retriever | |
| hybrid_retriever._initialize_vector_store() | |
| hybrid_retriever.vector_store.add_documents(chunks) | |
| doc_metadata = { | |
| "doc_id": doc_id, | |
| "file_name": file_name, | |
| "file_path": file_path, | |
| "num_chunks": len(chunks), | |
| "processed_at": datetime.utcnow(), | |
| "metadata": metadata or {} | |
| } | |
| collection = self.mongodb.get_collection("documents") | |
| await collection.insert_one(doc_metadata) | |
| logger.info(f"Successfully processed document: {file_name}") | |
| return { | |
| "doc_id": doc_id, | |
| "file_name": file_name, | |
| "num_chunks": len(chunks), | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error processing document {file_path}: {str(e)}") | |
| raise DocumentProcessingError(f"Document processing failed: {str(e)}") | |
| async def process_documents(self, file_paths: List[str]) -> List[Dict]: | |
| results = [] | |
| for file_path in file_paths: | |
| try: | |
| result = await self.process_document(file_path) | |
| results.append(result) | |
| except Exception as e: | |
| results.append({ | |
| "file_path": file_path, | |
| "status": "failed", | |
| "error": str(e) | |
| }) | |
| return results | |
| async def delete_document(self, doc_id: str) -> bool: | |
| try: | |
| await self.vector_store.delete_by_metadata( | |
| collection_name=self.collection_name, | |
| metadata_key="doc_id", | |
| metadata_value=doc_id | |
| ) | |
| collection = self.mongodb.get_collection("documents") | |
| await collection.delete_one({"doc_id": doc_id}) | |
| logger.info(f"Deleted document: {doc_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting document {doc_id}: {str(e)}") | |
| return False | |
| document_processor = DocumentProcessor() |