"""JWT 기반 인증 유틸리티. Google OAuth 콜백 후 JWT를 HttpOnly 쿠키로 발급. 엔드포인트 보호용 FastAPI 의존성 함수 제공. """ import os import logging from datetime import datetime, timedelta from fastapi import Depends, HTTPException, Request from jose import JWTError, jwt from sqlalchemy.orm import Session from database import SessionLocal from models import User logger = logging.getLogger(__name__) JWT_SECRET = os.getenv("JWT_SECRET", "change-me-in-production") JWT_ALGORITHM = "HS256" JWT_EXPIRE_DAYS = 30 COOKIE_NAME = "session" def create_jwt(user: User) -> str: payload = { "sub": str(user.id), "email": user.email, "name": user.name, "is_admin": user.is_admin, "exp": datetime.utcnow() + timedelta(days=JWT_EXPIRE_DAYS), } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def verify_jwt(token: str) -> dict | None: try: return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) except JWTError: return None def get_db(): db = SessionLocal() try: yield db finally: db.close() def get_current_user(request: Request, db: Session = Depends(get_db)) -> User | None: """쿠키에서 JWT를 읽어 User 반환. 비로그인이면 None.""" token = request.cookies.get(COOKIE_NAME) if not token: return None payload = verify_jwt(token) if not payload: return None user = db.get(User, int(payload["sub"])) return user def require_user(user: User | None = Depends(get_current_user)) -> User: """로그인 필수. 미로그인 시 401.""" if user is None: raise HTTPException(status_code=401, detail="로그인이 필요합니다.") return user def require_admin(user: User | None = Depends(get_current_user)) -> User: """관리자 필수. 미로그인 시 401, 비관리자 시 403.""" if user is None: raise HTTPException(status_code=401, detail="로그인이 필요합니다.") if not user.is_admin: raise HTTPException(status_code=403, detail="관리자 권한이 필요합니다.") return user