from motor.motor_asyncio import AsyncIOMotorChangeStream from app.db.mongodb import MongoDB from app.utils.logger import logger from typing import Callable, Dict import asyncio class MongoDBWatcher: def __init__(self): self.mongodb = MongoDB() self.watchers = {} self.running = False async def start(self): if self.mongodb.db is None: await self.mongodb.connect() self.running = True logger.info("MongoDB watcher started") async def stop(self): self.running = False for watcher in self.watchers.values(): if hasattr(watcher, 'close'): await watcher.close() logger.info("MongoDB watcher stopped") async def watch_collection( self, collection_name: str, callback: Callable, pipeline: list = None ): try: collection = await self.mongodb.get_collection(collection_name) if pipeline is None: pipeline = [ { "$match": { "operationType": {"$in": ["insert", "update", "delete"]} } } ] logger.info(f"Watching collection: {collection_name}") async with collection.watch(pipeline) as change_stream: self.watchers[collection_name] = change_stream async for change in change_stream: if not self.running: break try: await callback(change) except Exception as e: logger.error(f"Callback error for {collection_name}: {e}") except Exception as e: logger.error(f"Watch error for {collection_name}: {e}") async def on_document_change(change: Dict): operation = change.get("operationType") document = change.get("fullDocument") if operation == "insert": logger.info(f"New document inserted: {document.get('file_name')}") elif operation == "update": logger.info(f"Document updated: {document.get('file_name')}") elif operation == "delete": doc_id = change.get("documentKey", {}).get("_id") logger.info(f"Document deleted: {doc_id}") mongodb_watcher = MongoDBWatcher() async def start_watchers(): await mongodb_watcher.start() asyncio.create_task( mongodb_watcher.watch_collection( "documents", on_document_change ) ) logger.info("All MongoDB watchers started") async def stop_watchers(): await mongodb_watcher.stop()