Multimodel_Rag / app /api /auth.py
Dhrumil Parikh
deploy GeminiRAG
cdc55f4
Raw
History Blame Contribute Delete
2.61 kB
"""
Authentication endpoints.
POST /auth/register — create a user account (email + password + role).
Duplicate email returns HTTP 409.
POST /auth/login — validate credentials and return a signed JWT.
Rate-limited to 10 requests/minute per IP to slow
brute-force attacks.
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Request, status
from pydantic import BaseModel
from sqlmodel import Session, select
from app.config import settings
from app.deps import get_db
from app.limiter import limiter
from app.models.db import User, UserRole
from app.security import hash_password, verify_password, create_access_token
router = APIRouter()
class RegisterRequest(BaseModel):
email: str
password: str
role: UserRole = UserRole.user
class RegisterResponse(BaseModel):
id: str
email: str
role: str
created_at: datetime
class LoginRequest(BaseModel):
email: str
password: str
class LoginResponse(BaseModel):
access_token: str
token_type: str
role: str
@router.post("/register", response_model=RegisterResponse, status_code=201)
def register(req: RegisterRequest, db: Session = Depends(get_db)):
existing = db.exec(select(User).where(User.email == req.email)).first()
if existing:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Email already registered")
user = User(
email=req.email,
hashed_password=hash_password(req.password),
role=req.role,
created_at=datetime.utcnow(),
)
db.add(user)
db.commit()
db.refresh(user)
return RegisterResponse(
id=str(user.id),
email=user.email,
role=user.role.value,
created_at=user.created_at,
)
@router.post("/login", response_model=LoginResponse)
@limiter.limit("10/minute")
def login(request: Request, req: LoginRequest, db: Session = Depends(get_db)):
user = db.exec(select(User).where(User.email == req.email)).first()
if not user or not verify_password(req.password, user.hashed_password):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
if not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account is inactive")
token = create_access_token(
data={"sub": str(user.id), "role": user.role.value},
expires_minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES,
)
return LoginResponse(access_token=token, token_type="bearer", role=user.role.value)