Spaces:
Sleeping
Sleeping
| # mongo_store.py | |
| import os | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from pymongo import MongoClient, ASCENDING | |
| from pymongo.collection import Collection | |
| from pymongo.errors import ServerSelectionTimeoutError | |
| logger = logging.getLogger(__name__) | |
| MONGO_URI = os.getenv("MONGODB_URI") | |
| MONGO_DB = os.getenv("MONGODB_DB", "point9") | |
| # Hardcoded collection name for auth as requested | |
| AUTH_COLLECTION = "log_details" | |
| _client: Optional[MongoClient] = None | |
| _auth_coll: Optional[Collection] = None | |
| def get_auth_collection() -> Collection: | |
| """ | |
| Returns the Mongo collection for auth (log_details). | |
| Ensures unique indexes on email and id. | |
| """ | |
| global _client, _auth_coll | |
| if _auth_coll is not None: | |
| return _auth_coll | |
| if not MONGO_URI: | |
| raise RuntimeError("Set MONGODB_URI") | |
| _client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000) | |
| try: | |
| _client.admin.command("ping") | |
| except ServerSelectionTimeoutError as e: | |
| raise RuntimeError(f"Cannot connect to MongoDB: {e}") | |
| db = _client[MONGO_DB] | |
| _auth_coll = db[AUTH_COLLECTION] | |
| # Indexes for auth collection | |
| try: | |
| _auth_coll.create_index([("email", ASCENDING)], unique=True, name="uniq_email") | |
| _auth_coll.create_index([("id", ASCENDING)], unique=True, name="uniq_id") | |
| except Exception as e: | |
| logger.warning(f"Index creation failed for log_details: {e}") | |
| return _auth_coll | |
| # Convenience helpers you can use inside log.py | |
| def insert_user(doc: Dict[str, Any]) -> None: | |
| coll = get_auth_collection() | |
| coll.insert_one(doc) | |
| def get_user_by_email(email: str) -> Optional[Dict[str, Any]]: | |
| coll = get_auth_collection() | |
| return coll.find_one({"email": email}) | |
| def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]: | |
| coll = get_auth_collection() | |
| return coll.find_one({"id": user_id}) | |
| def update_user(user_id: str, updates: Dict[str, Any]) -> bool: | |
| coll = get_auth_collection() | |
| res = coll.update_one({"id": user_id}, {"$set": updates}) | |
| return res.modified_count > 0 |