sushilideaclan01's picture
refactor: migrate user and gallery data storage from JSON files to MongoDB
c4a64b4
"""
Simple auth: users in MongoDB, JWT for sessions.
Use scripts/create_user.py to add users.
"""
import os
import bcrypt
import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from app.mongo import get_mongo_db
JWT_ALGORITHM = "HS256"
JWT_SECRET = os.environ.get("JWT_SECRET", "amalfa-dev-secret-change-in-production")
TOKEN_EXPIRE_HOURS = 24 * 7 # 7 days
USERS_COLLECTION = "users"
security = HTTPBearer(auto_error=False)
def _load_users() -> list[dict]:
db = get_mongo_db()
if db is None:
return []
users = list(db[USERS_COLLECTION].find({}, {"_id": 0}))
return users
def _save_users(users: list[dict]) -> None:
db = get_mongo_db()
if db is None:
return
coll = db[USERS_COLLECTION]
coll.delete_many({})
if users:
coll.insert_many(users)
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
except Exception:
return False
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def get_user_by_username(username: str) -> dict | None:
db = get_mongo_db()
normalized = (username or "").strip().lower()
if db is None:
return None
doc = db[USERS_COLLECTION].find_one({"username_lower": normalized}, {"_id": 0})
if doc:
return doc
users = _load_users()
for u in users:
if (u.get("username") or "").strip().lower() == normalized:
return u
return None
def authenticate_user(username: str, password: str) -> dict | None:
user = get_user_by_username(username)
if not user or not verify_password(password, user.get("password_hash", "")):
return None
return {"username": user.get("username"), "id": user.get("id")}
def create_access_token(username: str) -> str:
import datetime
now = datetime.datetime.now(datetime.timezone.utc)
payload = {
"sub": username,
"exp": now + datetime.timedelta(hours=TOKEN_EXPIRE_HOURS),
"iat": now,
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def decode_token(token: str) -> dict | None:
try:
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
except jwt.PyJWTError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> dict:
if not credentials or credentials.scheme != "Bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_token(credentials.credentials)
if not payload or not payload.get("sub"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
username = payload["sub"]
if not get_user_by_username(username):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User no longer exists",
headers={"WWW-Authenticate": "Bearer"},
)
return {"username": username}