StocKing / auth.py
adoomy's picture
๋กœ๊ทธ์ธ ๊ธฐ๋Šฅ ์ถ”๊ฐ€
8200518
"""JWT ๊ธฐ๋ฐ˜ ์ธ์ฆ ์œ ํ‹ธ๋ฆฌํ‹ฐ.
Google OAuth ์ฝœ๋ฐฑ ํ›„ JWT๋ฅผ HttpOnly ์ฟ ํ‚ค๋กœ ๋ฐœ๊ธ‰.
์—”๋“œํฌ์ธํŠธ ๋ณดํ˜ธ์šฉ FastAPI ์˜์กด์„ฑ ํ•จ์ˆ˜ ์ œ๊ณต.
"""
import os
import logging
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, Request
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from database import SessionLocal
from models import User
logger = logging.getLogger(__name__)
JWT_SECRET = os.getenv("JWT_SECRET", "change-me-in-production")
JWT_ALGORITHM = "HS256"
JWT_EXPIRE_DAYS = 30
COOKIE_NAME = "session"
def create_jwt(user: User) -> str:
payload = {
"sub": str(user.id),
"email": user.email,
"name": user.name,
"is_admin": user.is_admin,
"exp": datetime.utcnow() + timedelta(days=JWT_EXPIRE_DAYS),
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_jwt(token: str) -> dict | None:
try:
return jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
except JWTError:
return None
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(request: Request, db: Session = Depends(get_db)) -> User | None:
"""์ฟ ํ‚ค์—์„œ JWT๋ฅผ ์ฝ์–ด User ๋ฐ˜ํ™˜. ๋น„๋กœ๊ทธ์ธ์ด๋ฉด None."""
token = request.cookies.get(COOKIE_NAME)
if not token:
return None
payload = verify_jwt(token)
if not payload:
return None
user = db.get(User, int(payload["sub"]))
return user
def require_user(user: User | None = Depends(get_current_user)) -> User:
"""๋กœ๊ทธ์ธ ํ•„์ˆ˜. ๋ฏธ๋กœ๊ทธ์ธ ์‹œ 401."""
if user is None:
raise HTTPException(status_code=401, detail="๋กœ๊ทธ์ธ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.")
return user
def require_admin(user: User | None = Depends(get_current_user)) -> User:
"""๊ด€๋ฆฌ์ž ํ•„์ˆ˜. ๋ฏธ๋กœ๊ทธ์ธ ์‹œ 401, ๋น„๊ด€๋ฆฌ์ž ์‹œ 403."""
if user is None:
raise HTTPException(status_code=401, detail="๋กœ๊ทธ์ธ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.")
if not user.is_admin:
raise HTTPException(status_code=403, detail="๊ด€๋ฆฌ์ž ๊ถŒํ•œ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.")
return user