Spaces:
Sleeping
Sleeping
| """ | |
| Authentication endpoints. | |
| POST /auth/register — create a user account (email + password + role). | |
| Duplicate email returns HTTP 409. | |
| POST /auth/login — validate credentials and return a signed JWT. | |
| Rate-limited to 10 requests/minute per IP to slow | |
| brute-force attacks. | |
| """ | |
| from datetime import datetime | |
| from fastapi import APIRouter, Depends, HTTPException, Request, status | |
| from pydantic import BaseModel | |
| from sqlmodel import Session, select | |
| from app.config import settings | |
| from app.deps import get_db | |
| from app.limiter import limiter | |
| from app.models.db import User, UserRole | |
| from app.security import hash_password, verify_password, create_access_token | |
| router = APIRouter() | |
| class RegisterRequest(BaseModel): | |
| email: str | |
| password: str | |
| role: UserRole = UserRole.user | |
| class RegisterResponse(BaseModel): | |
| id: str | |
| email: str | |
| role: str | |
| created_at: datetime | |
| class LoginRequest(BaseModel): | |
| email: str | |
| password: str | |
| class LoginResponse(BaseModel): | |
| access_token: str | |
| token_type: str | |
| role: str | |
| def register(req: RegisterRequest, db: Session = Depends(get_db)): | |
| existing = db.exec(select(User).where(User.email == req.email)).first() | |
| if existing: | |
| raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered") | |
| user = User( | |
| email=req.email, | |
| hashed_password=hash_password(req.password), | |
| role=req.role, | |
| created_at=datetime.utcnow(), | |
| ) | |
| db.add(user) | |
| db.commit() | |
| db.refresh(user) | |
| return RegisterResponse( | |
| id=str(user.id), | |
| email=user.email, | |
| role=user.role.value, | |
| created_at=user.created_at, | |
| ) | |
| def login(request: Request, req: LoginRequest, db: Session = Depends(get_db)): | |
| user = db.exec(select(User).where(User.email == req.email)).first() | |
| if not user or not verify_password(req.password, user.hashed_password): | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") | |
| if not user.is_active: | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account is inactive") | |
| token = create_access_token( | |
| data={"sub": str(user.id), "role": user.role.value}, | |
| expires_minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES, | |
| ) | |
| return LoginResponse(access_token=token, token_type="bearer", role=user.role.value) | |