from ingestion.loaders import document_loader from ingestion.preprocessor import text_preprocessor from ingestion.chunker import chunker from ingestion.embedder import embedder from app.db.vector_store import vector_store from app.db.mongodb import MongoDB from app.utils.logger import logger from app.utils.errors import DocumentProcessingError from app.config import config from pathlib import Path from datetime import datetime from typing import Dict, List import hashlib class DocumentProcessor: def __init__(self): self.loader = document_loader self.preprocessor = text_preprocessor self.chunker = chunker self.embedder = embedder self.vector_store = vector_store self.mongodb = MongoDB() self.collection_name = config["database"]["qdrant"]["collection_name"] logger.info("DocumentProcessor initialized") def _generate_doc_id(self, file_path: str) -> str: content = f"{file_path}{datetime.utcnow().isoformat()}" return hashlib.md5(content.encode()).hexdigest() async def process_document(self, file_path: str, metadata: Dict = None) -> Dict: try: # Ensure MongoDB is connected if self.mongodb.db is None: await self.mongodb.connect() documents = self.loader.load(file_path) if not documents: raise DocumentProcessingError("No content extracted from document") documents = self.preprocessor.preprocess_documents(documents) chunks = self.chunker.split_documents(documents) if not chunks: raise DocumentProcessingError("No chunks created from document") logger.info(f"Created {len(chunks)} chunks") doc_id = self._generate_doc_id(file_path) file_name = Path(file_path).name for i, chunk in enumerate(chunks): if not hasattr(chunk, 'metadata'): chunk.metadata = {} chunk.metadata.update({ "doc_id": doc_id, "file_name": file_name, "chunk_index": i, "total_chunks": len(chunks), "processed_at": datetime.utcnow().isoformat(), **(metadata or {}) }) # Add documents to Qdrant via vector store from app.core.retriever import hybrid_retriever hybrid_retriever._initialize_vector_store() hybrid_retriever.vector_store.add_documents(chunks) doc_metadata = { "doc_id": doc_id, "file_name": file_name, "file_path": file_path, "num_chunks": len(chunks), "processed_at": datetime.utcnow(), "metadata": metadata or {} } collection = self.mongodb.get_collection("documents") await collection.insert_one(doc_metadata) logger.info(f"Successfully processed document: {file_name}") return { "doc_id": doc_id, "file_name": file_name, "num_chunks": len(chunks), "status": "success" } except Exception as e: logger.error(f"Error processing document {file_path}: {str(e)}") raise DocumentProcessingError(f"Document processing failed: {str(e)}") async def process_documents(self, file_paths: List[str]) -> List[Dict]: results = [] for file_path in file_paths: try: result = await self.process_document(file_path) results.append(result) except Exception as e: results.append({ "file_path": file_path, "status": "failed", "error": str(e) }) return results async def delete_document(self, doc_id: str) -> bool: try: await self.vector_store.delete_by_metadata( collection_name=self.collection_name, metadata_key="doc_id", metadata_value=doc_id ) collection = self.mongodb.get_collection("documents") await collection.delete_one({"doc_id": doc_id}) logger.info(f"Deleted document: {doc_id}") return True except Exception as e: logger.error(f"Error deleting document {doc_id}: {str(e)}") return False document_processor = DocumentProcessor()