Spaces:
Sleeping
Sleeping
| from motor.motor_asyncio import AsyncIOMotorChangeStream | |
| from app.db.mongodb import MongoDB | |
| from app.utils.logger import logger | |
| from typing import Callable, Dict | |
| import asyncio | |
| class MongoDBWatcher: | |
| def __init__(self): | |
| self.mongodb = MongoDB() | |
| self.watchers = {} | |
| self.running = False | |
| async def start(self): | |
| if self.mongodb.db is None: | |
| await self.mongodb.connect() | |
| self.running = True | |
| logger.info("MongoDB watcher started") | |
| async def stop(self): | |
| self.running = False | |
| for watcher in self.watchers.values(): | |
| if hasattr(watcher, 'close'): | |
| await watcher.close() | |
| logger.info("MongoDB watcher stopped") | |
| async def watch_collection( | |
| self, | |
| collection_name: str, | |
| callback: Callable, | |
| pipeline: list = None | |
| ): | |
| try: | |
| collection = await self.mongodb.get_collection(collection_name) | |
| if pipeline is None: | |
| pipeline = [ | |
| { | |
| "$match": { | |
| "operationType": {"$in": ["insert", "update", "delete"]} | |
| } | |
| } | |
| ] | |
| logger.info(f"Watching collection: {collection_name}") | |
| async with collection.watch(pipeline) as change_stream: | |
| self.watchers[collection_name] = change_stream | |
| async for change in change_stream: | |
| if not self.running: | |
| break | |
| try: | |
| await callback(change) | |
| except Exception as e: | |
| logger.error(f"Callback error for {collection_name}: {e}") | |
| except Exception as e: | |
| logger.error(f"Watch error for {collection_name}: {e}") | |
| async def on_document_change(change: Dict): | |
| operation = change.get("operationType") | |
| document = change.get("fullDocument") | |
| if operation == "insert": | |
| logger.info(f"New document inserted: {document.get('file_name')}") | |
| elif operation == "update": | |
| logger.info(f"Document updated: {document.get('file_name')}") | |
| elif operation == "delete": | |
| doc_id = change.get("documentKey", {}).get("_id") | |
| logger.info(f"Document deleted: {doc_id}") | |
| mongodb_watcher = MongoDBWatcher() | |
| async def start_watchers(): | |
| await mongodb_watcher.start() | |
| asyncio.create_task( | |
| mongodb_watcher.watch_collection( | |
| "documents", | |
| on_document_change | |
| ) | |
| ) | |
| logger.info("All MongoDB watchers started") | |
| async def stop_watchers(): | |
| await mongodb_watcher.stop() | |