from ingestion.document_processor import DocumentProcessor from app.db.mongodb import MongoDB from app.db.vector_store import vector_store from app.utils.logger import logger from app.config import config from typing import Dict, List, Optional from pathlib import Path import os class DocumentService: def __init__(self): self.processor = DocumentProcessor() self.mongodb = MongoDB() self.vector_store = vector_store self.upload_dir = config["app"]["upload"]["upload_dir"] self.collection_name = config["database"]["qdrant"]["collection_name"] async def process_document( self, file_path: str, metadata: Optional[Dict] = None ) -> Dict: try: result = await self.processor.process_document(file_path, metadata) logger.info(f"Document processed: {result['file_name']}") return result except Exception as e: logger.error(f"Document processing failed: {str(e)}") raise async def get_all_documents(self) -> List[Dict]: try: if self.mongodb.db is None: await self.mongodb.connect() collection = await self.mongodb.get_collection("documents") documents = await collection.find().to_list(length=None) return documents except Exception as e: logger.error(f"Get documents error: {str(e)}") return [] async def get_document_by_id(self, doc_id: str) -> Optional[Dict]: try: if self.mongodb.db is None: await self.mongodb.connect() collection = await self.mongodb.get_collection("documents") document = await collection.find_one({"doc_id": doc_id}) return document except Exception as e: logger.error(f"Get document error: {str(e)}") return None async def delete_document(self, doc_id: str) -> bool: try: if self.mongodb.db is None: await self.mongodb.connect() await self.vector_store.delete_by_metadata( collection_name=self.collection_name, metadata_key="doc_id", metadata_value=doc_id ) collection = await self.mongodb.get_collection("documents") result = await collection.delete_one({"doc_id": doc_id}) if result.deleted_count > 0: logger.info(f"Document deleted: {doc_id}") return True return False except Exception as e: logger.error(f"Delete document error: {str(e)}") return False async def search_documents( self, query: str, limit: int = 10 ) -> List[Dict]: try: if self.mongodb.db is None: await self.mongodb.connect() collection = await self.mongodb.get_collection("documents") documents = await collection.find( {"$text": {"$search": query}} ).limit(limit).to_list(length=limit) return documents except Exception as e: logger.error(f"Search documents error: {str(e)}") return [] async def get_document_stats(self) -> Dict: try: if self.mongodb.db is None: await self.mongodb.connect() collection = await self.mongodb.get_collection("documents") total_docs = await collection.count_documents({}) pipeline = [ { "$group": { "_id": None, "total_chunks": {"$sum": "$num_chunks"} } } ] result = await collection.aggregate(pipeline).to_list(length=1) total_chunks = result[0]["total_chunks"] if result else 0 return { "total_documents": total_docs, "total_chunks": total_chunks } except Exception as e: logger.error(f"Get stats error: {str(e)}") return {"total_documents": 0, "total_chunks": 0} def save_uploaded_file(self, file_content: bytes, filename: str) -> str: try: os.makedirs(self.upload_dir, exist_ok=True) file_path = os.path.join(self.upload_dir, filename) with open(file_path, "wb") as f: f.write(file_content) logger.info(f"File saved: {file_path}") return file_path except Exception as e: logger.error(f"Save file error: {str(e)}") raise def delete_file(self, file_path: str) -> bool: try: if os.path.exists(file_path): os.remove(file_path) logger.info(f"File deleted: {file_path}") return True return False except Exception as e: logger.error(f"Delete file error: {str(e)}") return False document_service = DocumentService()