rag-chatbot / app /services /mongodb_watcher.py
Abeshith's picture
RAG Chatbot with LangChain, FastAPI, and service layer architecture
64d7fdf
from motor.motor_asyncio import AsyncIOMotorChangeStream
from app.db.mongodb import MongoDB
from app.utils.logger import logger
from typing import Callable, Dict
import asyncio
class MongoDBWatcher:
def __init__(self):
self.mongodb = MongoDB()
self.watchers = {}
self.running = False
async def start(self):
if self.mongodb.db is None:
await self.mongodb.connect()
self.running = True
logger.info("MongoDB watcher started")
async def stop(self):
self.running = False
for watcher in self.watchers.values():
if hasattr(watcher, 'close'):
await watcher.close()
logger.info("MongoDB watcher stopped")
async def watch_collection(
self,
collection_name: str,
callback: Callable,
pipeline: list = None
):
try:
collection = await self.mongodb.get_collection(collection_name)
if pipeline is None:
pipeline = [
{
"$match": {
"operationType": {"$in": ["insert", "update", "delete"]}
}
}
]
logger.info(f"Watching collection: {collection_name}")
async with collection.watch(pipeline) as change_stream:
self.watchers[collection_name] = change_stream
async for change in change_stream:
if not self.running:
break
try:
await callback(change)
except Exception as e:
logger.error(f"Callback error for {collection_name}: {e}")
except Exception as e:
logger.error(f"Watch error for {collection_name}: {e}")
async def on_document_change(change: Dict):
operation = change.get("operationType")
document = change.get("fullDocument")
if operation == "insert":
logger.info(f"New document inserted: {document.get('file_name')}")
elif operation == "update":
logger.info(f"Document updated: {document.get('file_name')}")
elif operation == "delete":
doc_id = change.get("documentKey", {}).get("_id")
logger.info(f"Document deleted: {doc_id}")
mongodb_watcher = MongoDBWatcher()
async def start_watchers():
await mongodb_watcher.start()
asyncio.create_task(
mongodb_watcher.watch_collection(
"documents",
on_document_change
)
)
logger.info("All MongoDB watchers started")
async def stop_watchers():
await mongodb_watcher.stop()