| import os |
| import logging |
| from typing import Optional |
|
|
| from bson import ObjectId |
| from motor.motor_asyncio import AsyncIOMotorClient |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _client: Optional[AsyncIOMotorClient] = None |
| _db = None |
|
|
|
|
| async def connect(): |
| global _client, _db |
| uri = os.getenv("MONGODB_URI") |
| if not uri: |
| logger.warning("MONGODB_URI not set — MongoDB writes disabled") |
| return |
| db_name = os.getenv("MONGODB_DB_NAME", "myapp") |
| _client = AsyncIOMotorClient(uri) |
| _db = _client[db_name] |
| logger.info("Connected to MongoDB database '%s'", db_name) |
|
|
|
|
| async def disconnect(): |
| global _client |
| if _client: |
| _client.close() |
|
|
|
|
| async def update_deepfake_result(post_id: str, image_index: int, is_deepfake: bool) -> None: |
| """Write the boolean deepfake result back into the post's image_deepfake_results array.""" |
| if _db is None: |
| logger.debug("MongoDB not connected; skipping write for post %s[%d]", post_id, image_index) |
| return |
| try: |
| field = f"image_deepfake_results.{image_index}" |
| result = await _db.posts.update_one( |
| {"_id": ObjectId(post_id)}, |
| {"$set": {field: is_deepfake}}, |
| ) |
| if result.matched_count == 0: |
| logger.warning("Post %s not found; deepfake result not saved", post_id) |
| except Exception: |
| logger.exception("Failed to update deepfake result for post %s[%d]", post_id, image_index) |
|
|