Spaces:
Sleeping
Sleeping
| from ingestion.document_processor import DocumentProcessor | |
| from app.db.mongodb import MongoDB | |
| from app.db.vector_store import vector_store | |
| from app.utils.logger import logger | |
| from app.config import config | |
| from typing import Dict, List, Optional | |
| from pathlib import Path | |
| import os | |
| class DocumentService: | |
| def __init__(self): | |
| self.processor = DocumentProcessor() | |
| self.mongodb = MongoDB() | |
| self.vector_store = vector_store | |
| self.upload_dir = config["app"]["upload"]["upload_dir"] | |
| self.collection_name = config["database"]["qdrant"]["collection_name"] | |
| async def process_document( | |
| self, | |
| file_path: str, | |
| metadata: Optional[Dict] = None | |
| ) -> Dict: | |
| try: | |
| result = await self.processor.process_document(file_path, metadata) | |
| logger.info(f"Document processed: {result['file_name']}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Document processing failed: {str(e)}") | |
| raise | |
| async def get_all_documents(self) -> List[Dict]: | |
| try: | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| collection = await self.mongodb.get_collection("documents") | |
| documents = await collection.find().to_list(length=None) | |
| return documents | |
| except Exception as e: | |
| logger.error(f"Get documents error: {str(e)}") | |
| return [] | |
| async def get_document_by_id(self, doc_id: str) -> Optional[Dict]: | |
| try: | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| collection = await self.mongodb.get_collection("documents") | |
| document = await collection.find_one({"doc_id": doc_id}) | |
| return document | |
| except Exception as e: | |
| logger.error(f"Get document error: {str(e)}") | |
| return None | |
| async def delete_document(self, doc_id: str) -> bool: | |
| try: | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| await self.vector_store.delete_by_metadata( | |
| collection_name=self.collection_name, | |
| metadata_key="doc_id", | |
| metadata_value=doc_id | |
| ) | |
| collection = await self.mongodb.get_collection("documents") | |
| result = await collection.delete_one({"doc_id": doc_id}) | |
| if result.deleted_count > 0: | |
| logger.info(f"Document deleted: {doc_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Delete document error: {str(e)}") | |
| return False | |
| async def search_documents( | |
| self, | |
| query: str, | |
| limit: int = 10 | |
| ) -> List[Dict]: | |
| try: | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| collection = await self.mongodb.get_collection("documents") | |
| documents = await collection.find( | |
| {"$text": {"$search": query}} | |
| ).limit(limit).to_list(length=limit) | |
| return documents | |
| except Exception as e: | |
| logger.error(f"Search documents error: {str(e)}") | |
| return [] | |
| async def get_document_stats(self) -> Dict: | |
| try: | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| collection = await self.mongodb.get_collection("documents") | |
| total_docs = await collection.count_documents({}) | |
| pipeline = [ | |
| { | |
| "$group": { | |
| "_id": None, | |
| "total_chunks": {"$sum": "$num_chunks"} | |
| } | |
| } | |
| ] | |
| result = await collection.aggregate(pipeline).to_list(length=1) | |
| total_chunks = result[0]["total_chunks"] if result else 0 | |
| return { | |
| "total_documents": total_docs, | |
| "total_chunks": total_chunks | |
| } | |
| except Exception as e: | |
| logger.error(f"Get stats error: {str(e)}") | |
| return {"total_documents": 0, "total_chunks": 0} | |
| def save_uploaded_file(self, file_content: bytes, filename: str) -> str: | |
| try: | |
| os.makedirs(self.upload_dir, exist_ok=True) | |
| file_path = os.path.join(self.upload_dir, filename) | |
| with open(file_path, "wb") as f: | |
| f.write(file_content) | |
| logger.info(f"File saved: {file_path}") | |
| return file_path | |
| except Exception as e: | |
| logger.error(f"Save file error: {str(e)}") | |
| raise | |
| def delete_file(self, file_path: str) -> bool: | |
| try: | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| logger.info(f"File deleted: {file_path}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Delete file error: {str(e)}") | |
| return False | |
| document_service = DocumentService() | |