Spaces:
Sleeping
Sleeping
File size: 5,436 Bytes
64d7fdf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | from ingestion.document_processor import DocumentProcessor
from app.db.mongodb import MongoDB
from app.db.vector_store import vector_store
from app.utils.logger import logger
from app.config import config
from typing import Dict, List, Optional
from pathlib import Path
import os
class DocumentService:
def __init__(self):
self.processor = DocumentProcessor()
self.mongodb = MongoDB()
self.vector_store = vector_store
self.upload_dir = config["app"]["upload"]["upload_dir"]
self.collection_name = config["database"]["qdrant"]["collection_name"]
async def process_document(
self,
file_path: str,
metadata: Optional[Dict] = None
) -> Dict:
try:
result = await self.processor.process_document(file_path, metadata)
logger.info(f"Document processed: {result['file_name']}")
return result
except Exception as e:
logger.error(f"Document processing failed: {str(e)}")
raise
async def get_all_documents(self) -> List[Dict]:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
documents = await collection.find().to_list(length=None)
return documents
except Exception as e:
logger.error(f"Get documents error: {str(e)}")
return []
async def get_document_by_id(self, doc_id: str) -> Optional[Dict]:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
document = await collection.find_one({"doc_id": doc_id})
return document
except Exception as e:
logger.error(f"Get document error: {str(e)}")
return None
async def delete_document(self, doc_id: str) -> bool:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
await self.vector_store.delete_by_metadata(
collection_name=self.collection_name,
metadata_key="doc_id",
metadata_value=doc_id
)
collection = await self.mongodb.get_collection("documents")
result = await collection.delete_one({"doc_id": doc_id})
if result.deleted_count > 0:
logger.info(f"Document deleted: {doc_id}")
return True
return False
except Exception as e:
logger.error(f"Delete document error: {str(e)}")
return False
async def search_documents(
self,
query: str,
limit: int = 10
) -> List[Dict]:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
documents = await collection.find(
{"$text": {"$search": query}}
).limit(limit).to_list(length=limit)
return documents
except Exception as e:
logger.error(f"Search documents error: {str(e)}")
return []
async def get_document_stats(self) -> Dict:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
total_docs = await collection.count_documents({})
pipeline = [
{
"$group": {
"_id": None,
"total_chunks": {"$sum": "$num_chunks"}
}
}
]
result = await collection.aggregate(pipeline).to_list(length=1)
total_chunks = result[0]["total_chunks"] if result else 0
return {
"total_documents": total_docs,
"total_chunks": total_chunks
}
except Exception as e:
logger.error(f"Get stats error: {str(e)}")
return {"total_documents": 0, "total_chunks": 0}
def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
try:
os.makedirs(self.upload_dir, exist_ok=True)
file_path = os.path.join(self.upload_dir, filename)
with open(file_path, "wb") as f:
f.write(file_content)
logger.info(f"File saved: {file_path}")
return file_path
except Exception as e:
logger.error(f"Save file error: {str(e)}")
raise
def delete_file(self, file_path: str) -> bool:
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"File deleted: {file_path}")
return True
return False
except Exception as e:
logger.error(f"Delete file error: {str(e)}")
return False
document_service = DocumentService()
|