File size: 2,141 Bytes
bf45da8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# mongo_store.py
import os
import logging
from typing import Optional, Dict, Any
from pymongo import MongoClient, ASCENDING
from pymongo.collection import Collection
from pymongo.errors import ServerSelectionTimeoutError

logger = logging.getLogger(__name__)

MONGO_URI = os.getenv("MONGODB_URI")
MONGO_DB = os.getenv("MONGODB_DB", "point9")

# Hardcoded collection name for auth as requested
AUTH_COLLECTION = "log_details"

_client: Optional[MongoClient] = None
_auth_coll: Optional[Collection] = None


def get_auth_collection() -> Collection:
    """

    Returns the Mongo collection for auth (log_details).

    Ensures unique indexes on email and id.

    """
    global _client, _auth_coll
    if _auth_coll is not None:
        return _auth_coll

    if not MONGO_URI:
        raise RuntimeError("Set MONGODB_URI")

    _client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
    try:
        _client.admin.command("ping")
    except ServerSelectionTimeoutError as e:
        raise RuntimeError(f"Cannot connect to MongoDB: {e}")

    db = _client[MONGO_DB]
    _auth_coll = db[AUTH_COLLECTION]

    # Indexes for auth collection
    try:
        _auth_coll.create_index([("email", ASCENDING)], unique=True, name="uniq_email")
        _auth_coll.create_index([("id", ASCENDING)], unique=True, name="uniq_id")
    except Exception as e:
        logger.warning(f"Index creation failed for log_details: {e}")

    return _auth_coll


# Convenience helpers you can use inside log.py
def insert_user(doc: Dict[str, Any]) -> None:
    coll = get_auth_collection()
    coll.insert_one(doc)


def get_user_by_email(email: str) -> Optional[Dict[str, Any]]:
    coll = get_auth_collection()
    return coll.find_one({"email": email})


def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
    coll = get_auth_collection()
    return coll.find_one({"id": user_id})


def update_user(user_id: str, updates: Dict[str, Any]) -> bool:
    coll = get_auth_collection()
    res = coll.update_one({"id": user_id}, {"$set": updates})
    return res.modified_count > 0