# mongo_store.py import os import logging from typing import Optional, Dict, Any from pymongo import MongoClient, ASCENDING from pymongo.collection import Collection from pymongo.errors import ServerSelectionTimeoutError logger = logging.getLogger(__name__) MONGO_URI = os.getenv("MONGODB_URI") MONGO_DB = os.getenv("MONGODB_DB", "point9") # Hardcoded collection name for auth as requested AUTH_COLLECTION = "log_details" _client: Optional[MongoClient] = None _auth_coll: Optional[Collection] = None def get_auth_collection() -> Collection: """ Returns the Mongo collection for auth (log_details). Ensures unique indexes on email and id. """ global _client, _auth_coll if _auth_coll is not None: return _auth_coll if not MONGO_URI: raise RuntimeError("Set MONGODB_URI") _client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) try: _client.admin.command("ping") except ServerSelectionTimeoutError as e: raise RuntimeError(f"Cannot connect to MongoDB: {e}") db = _client[MONGO_DB] _auth_coll = db[AUTH_COLLECTION] # Indexes for auth collection try: _auth_coll.create_index([("email", ASCENDING)], unique=True, name="uniq_email") _auth_coll.create_index([("id", ASCENDING)], unique=True, name="uniq_id") except Exception as e: logger.warning(f"Index creation failed for log_details: {e}") return _auth_coll # Convenience helpers you can use inside log.py def insert_user(doc: Dict[str, Any]) -> None: coll = get_auth_collection() coll.insert_one(doc) def get_user_by_email(email: str) -> Optional[Dict[str, Any]]: coll = get_auth_collection() return coll.find_one({"email": email}) def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]: coll = get_auth_collection() return coll.find_one({"id": user_id}) def update_user(user_id: str, updates: Dict[str, Any]) -> bool: coll = get_auth_collection() res = coll.update_one({"id": user_id}, {"$set": updates}) return res.modified_count > 0