Sanjay / app /services /users.py
TheDeepDas's picture
Docker
6c9c901
from datetime import datetime
from typing import Optional
from bson import ObjectId
from ..database import get_collection
USERS_COLLECTION = "users"
def serialize_user(document) -> Optional[dict]:
if not document:
return None
return {
"id": str(document.get("_id")),
"email": document.get("email"),
"display_name": document.get("display_name"),
"organization": document.get("organization"),
"role": document.get("role"),
"created_at": document.get("created_at"),
}
async def get_users_collection():
return get_collection(USERS_COLLECTION)
async def get_user_by_email(email: str) -> Optional[dict]:
users = await get_users_collection()
return await users.find_one({"email": email})
async def create_user(data: dict) -> dict:
users = await get_users_collection()
now = datetime.utcnow()
payload = {
**data,
"created_at": now,
"updated_at": now,
}
result = await users.insert_one(payload)
payload["_id"] = result.inserted_id
return payload
async def get_user_by_id(user_id: str) -> Optional[dict]:
users = await get_users_collection()
try:
oid = ObjectId(user_id)
except Exception:
return None
return await users.find_one({"_id": oid})