| """
|
| Authentication and authorization utilities
|
| """
|
| from fastapi import Depends, HTTPException, status
|
| from fastapi.security import OAuth2PasswordBearer
|
| from jose import JWTError, jwt
|
| from sqlalchemy.orm import Session
|
| from datetime import datetime, timedelta
|
| from typing import Optional
|
|
|
| from core.database import get_db
|
| from models.user import User, UserRole
|
| from schemas.auth import TokenData
|
|
|
|
|
| SECRET_KEY = "your-secret-key"
|
| ALGORITHM = "HS256"
|
| ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
|
|
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
|
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
| """Create JWT token"""
|
| to_encode = data.copy()
|
| if expires_delta:
|
| expire = datetime.utcnow() + expires_delta
|
| else:
|
| expire = datetime.utcnow() + timedelta(minutes=15)
|
| to_encode.update({"exp": expire})
|
| encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
| return encoded_jwt
|
|
|
| async def get_current_user(
|
| token: str = Depends(oauth2_scheme),
|
| db: Session = Depends(get_db)
|
| ) -> User:
|
| """Get current user from JWT token"""
|
| credentials_exception = HTTPException(
|
| status_code=status.HTTP_401_UNAUTHORIZED,
|
| detail="Could not validate credentials",
|
| headers={"WWW-Authenticate": "Bearer"},
|
| )
|
| try:
|
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
| username: str = payload.get("sub")
|
| if username is None:
|
| raise credentials_exception
|
| token_data = TokenData(username=username)
|
| except JWTError:
|
| raise credentials_exception
|
|
|
| user = db.query(User).filter(User.username == token_data.username).first()
|
| if user is None:
|
| raise credentials_exception
|
| return user
|
|
|
| async def get_current_active_user(
|
| current_user: User = Depends(get_current_user)
|
| ) -> User:
|
| """Get current active user"""
|
| if not current_user.is_active:
|
| raise HTTPException(status_code=400, detail="Inactive user")
|
| return current_user
|
|
|
| async def get_current_admin_user(
|
| current_user: User = Depends(get_current_active_user)
|
| ) -> User:
|
| """Get current admin user"""
|
| if current_user.role != UserRole.ADMIN:
|
| raise HTTPException(
|
| status_code=status.HTTP_403_FORBIDDEN,
|
| detail="Not enough privileges"
|
| )
|
| return current_user
|
|
|