| """ |
| Simple auth: users in MongoDB, JWT for sessions. |
| Use scripts/create_user.py to add users. |
| """ |
|
|
| import os |
|
|
| import bcrypt |
| import jwt |
| from fastapi import Depends, HTTPException, status |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
| from app.mongo import get_mongo_db |
|
|
| JWT_ALGORITHM = "HS256" |
| JWT_SECRET = os.environ.get("JWT_SECRET", "amalfa-dev-secret-change-in-production") |
| TOKEN_EXPIRE_HOURS = 24 * 7 |
| USERS_COLLECTION = "users" |
|
|
| security = HTTPBearer(auto_error=False) |
|
|
|
|
| def _load_users() -> list[dict]: |
| db = get_mongo_db() |
| if db is None: |
| return [] |
| users = list(db[USERS_COLLECTION].find({}, {"_id": 0})) |
| return users |
|
|
|
|
| def _save_users(users: list[dict]) -> None: |
| db = get_mongo_db() |
| if db is None: |
| return |
| coll = db[USERS_COLLECTION] |
| coll.delete_many({}) |
| if users: |
| coll.insert_many(users) |
|
|
|
|
| def verify_password(plain: str, hashed: str) -> bool: |
| try: |
| return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) |
| except Exception: |
| return False |
|
|
|
|
| def hash_password(plain: str) -> str: |
| return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") |
|
|
|
|
| def get_user_by_username(username: str) -> dict | None: |
| db = get_mongo_db() |
| normalized = (username or "").strip().lower() |
| if db is None: |
| return None |
| doc = db[USERS_COLLECTION].find_one({"username_lower": normalized}, {"_id": 0}) |
| if doc: |
| return doc |
| users = _load_users() |
| for u in users: |
| if (u.get("username") or "").strip().lower() == normalized: |
| return u |
| return None |
|
|
|
|
| def authenticate_user(username: str, password: str) -> dict | None: |
| user = get_user_by_username(username) |
| if not user or not verify_password(password, user.get("password_hash", "")): |
| return None |
| return {"username": user.get("username"), "id": user.get("id")} |
|
|
|
|
| def create_access_token(username: str) -> str: |
| import datetime |
| now = datetime.datetime.now(datetime.timezone.utc) |
| payload = { |
| "sub": username, |
| "exp": now + datetime.timedelta(hours=TOKEN_EXPIRE_HOURS), |
| "iat": now, |
| } |
| return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) |
|
|
|
|
| def decode_token(token: str) -> dict | None: |
| try: |
| return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) |
| except jwt.PyJWTError: |
| return None |
|
|
|
|
| async def get_current_user( |
| credentials: HTTPAuthorizationCredentials | None = Depends(security), |
| ) -> dict: |
| if not credentials or credentials.scheme != "Bearer": |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Not authenticated", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| payload = decode_token(credentials.credentials) |
| if not payload or not payload.get("sub"): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid or expired token", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| username = payload["sub"] |
| if not get_user_by_username(username): |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="User no longer exists", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| return {"username": username} |
|
|