rag-chatbot / app /services /document_service.py
Abeshith's picture
RAG Chatbot with LangChain, FastAPI, and service layer architecture
64d7fdf
from ingestion.document_processor import DocumentProcessor
from app.db.mongodb import MongoDB
from app.db.vector_store import vector_store
from app.utils.logger import logger
from app.config import config
from typing import Dict, List, Optional
from pathlib import Path
import os
class DocumentService:
def __init__(self):
self.processor = DocumentProcessor()
self.mongodb = MongoDB()
self.vector_store = vector_store
self.upload_dir = config["app"]["upload"]["upload_dir"]
self.collection_name = config["database"]["qdrant"]["collection_name"]
async def process_document(
self,
file_path: str,
metadata: Optional[Dict] = None
) -> Dict:
try:
result = await self.processor.process_document(file_path, metadata)
logger.info(f"Document processed: {result['file_name']}")
return result
except Exception as e:
logger.error(f"Document processing failed: {str(e)}")
raise
async def get_all_documents(self) -> List[Dict]:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
documents = await collection.find().to_list(length=None)
return documents
except Exception as e:
logger.error(f"Get documents error: {str(e)}")
return []
async def get_document_by_id(self, doc_id: str) -> Optional[Dict]:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
document = await collection.find_one({"doc_id": doc_id})
return document
except Exception as e:
logger.error(f"Get document error: {str(e)}")
return None
async def delete_document(self, doc_id: str) -> bool:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
await self.vector_store.delete_by_metadata(
collection_name=self.collection_name,
metadata_key="doc_id",
metadata_value=doc_id
)
collection = await self.mongodb.get_collection("documents")
result = await collection.delete_one({"doc_id": doc_id})
if result.deleted_count > 0:
logger.info(f"Document deleted: {doc_id}")
return True
return False
except Exception as e:
logger.error(f"Delete document error: {str(e)}")
return False
async def search_documents(
self,
query: str,
limit: int = 10
) -> List[Dict]:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
documents = await collection.find(
{"$text": {"$search": query}}
).limit(limit).to_list(length=limit)
return documents
except Exception as e:
logger.error(f"Search documents error: {str(e)}")
return []
async def get_document_stats(self) -> Dict:
try:
if self.mongodb.db is None:
await self.mongodb.connect()
collection = await self.mongodb.get_collection("documents")
total_docs = await collection.count_documents({})
pipeline = [
{
"$group": {
"_id": None,
"total_chunks": {"$sum": "$num_chunks"}
}
}
]
result = await collection.aggregate(pipeline).to_list(length=1)
total_chunks = result[0]["total_chunks"] if result else 0
return {
"total_documents": total_docs,
"total_chunks": total_chunks
}
except Exception as e:
logger.error(f"Get stats error: {str(e)}")
return {"total_documents": 0, "total_chunks": 0}
def save_uploaded_file(self, file_content: bytes, filename: str) -> str:
try:
os.makedirs(self.upload_dir, exist_ok=True)
file_path = os.path.join(self.upload_dir, filename)
with open(file_path, "wb") as f:
f.write(file_content)
logger.info(f"File saved: {file_path}")
return file_path
except Exception as e:
logger.error(f"Save file error: {str(e)}")
raise
def delete_file(self, file_path: str) -> bool:
try:
if os.path.exists(file_path):
os.remove(file_path)
logger.info(f"File deleted: {file_path}")
return True
return False
except Exception as e:
logger.error(f"Delete file error: {str(e)}")
return False
document_service = DocumentService()