| | """
|
| | Authentication and Authorization
|
| | JWT tokens, API keys, password hashing
|
| | """
|
| |
|
| | import secrets
|
| | from datetime import datetime, timedelta
|
| | from typing import Optional, Union
|
| | from jose import JWTError, jwt
|
| | from passlib.context import CryptContext
|
| | from fastapi import Depends, HTTPException, status, Security
|
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
|
| | from sqlalchemy.orm import Session
|
| |
|
| | from src.core.config import settings
|
| | from src.core.exceptions import AuthenticationError, AuthorizationError
|
| | from src.db.models import User, APIKey, get_db
|
| |
|
| |
|
| |
|
| | pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
| |
|
| |
|
| | bearer_scheme = HTTPBearer()
|
| | api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
|
| |
|
| |
|
| | def verify_password(plain_password: str, hashed_password: str) -> bool:
|
| | """Verify password against hash"""
|
| | return pwd_context.verify(plain_password, hashed_password)
|
| |
|
| |
|
| | def get_password_hash(password: str) -> str:
|
| | """Generate password hash"""
|
| | return pwd_context.hash(password)
|
| |
|
| |
|
| | def create_access_token(
|
| | data: dict,
|
| | expires_delta: Optional[timedelta] = None
|
| | ) -> str:
|
| | """Create JWT access token"""
|
| | to_encode = data.copy()
|
| |
|
| | if expires_delta:
|
| | expire = datetime.utcnow() + expires_delta
|
| | else:
|
| | expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
| |
|
| | to_encode.update({"exp": expire})
|
| | encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
| | return encoded_jwt
|
| |
|
| |
|
| | def create_refresh_token(
|
| | data: dict,
|
| | expires_delta: Optional[timedelta] = None
|
| | ) -> str:
|
| | """Create JWT refresh token"""
|
| | to_encode = data.copy()
|
| |
|
| | if expires_delta:
|
| | expire = datetime.utcnow() + expires_delta
|
| | else:
|
| | expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
|
| |
|
| | to_encode.update({"exp": expire, "type": "refresh"})
|
| | encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
| | return encoded_jwt
|
| |
|
| |
|
| | def decode_token(token: str) -> dict:
|
| | """Decode and validate JWT token"""
|
| | try:
|
| | payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
| | return payload
|
| | except JWTError:
|
| | raise AuthenticationError("Invalid or expired token")
|
| |
|
| |
|
| | def generate_api_key() -> str:
|
| | """Generate secure API key"""
|
| | return secrets.token_urlsafe(32)
|
| |
|
| |
|
| |
|
| | async def get_current_user(
|
| | credentials: HTTPAuthorizationCredentials = Security(bearer_scheme),
|
| | db: Session = Depends(get_db)
|
| | ) -> User:
|
| | """Get current authenticated user from JWT token"""
|
| |
|
| | try:
|
| | token = credentials.credentials
|
| | payload = decode_token(token)
|
| |
|
| | user_id: int = payload.get("sub")
|
| | if user_id is None:
|
| | raise AuthenticationError("Invalid token payload")
|
| |
|
| | except JWTError:
|
| | raise AuthenticationError("Could not validate credentials")
|
| |
|
| | user = db.query(User).filter(User.id == user_id).first()
|
| | if user is None:
|
| | raise AuthenticationError("User not found")
|
| |
|
| | if not user.is_active:
|
| | raise AuthenticationError("User account is inactive")
|
| |
|
| | return user
|
| |
|
| |
|
| |
|
| | async def get_current_user_from_api_key(
|
| | api_key: Optional[str] = Security(api_key_header),
|
| | db: Session = Depends(get_db)
|
| | ) -> Optional[User]:
|
| | """Get current user from API key"""
|
| |
|
| | if not api_key:
|
| | return None
|
| |
|
| |
|
| | api_key_obj = db.query(APIKey).filter(
|
| | APIKey.key == api_key,
|
| | APIKey.is_active == True
|
| | ).first()
|
| |
|
| | if not api_key_obj:
|
| | raise AuthenticationError("Invalid API key")
|
| |
|
| |
|
| | if api_key_obj.expires_at and api_key_obj.expires_at < datetime.utcnow():
|
| | raise AuthenticationError("API key has expired")
|
| |
|
| |
|
| | api_key_obj.last_used_at = datetime.utcnow()
|
| | db.commit()
|
| |
|
| |
|
| | user = db.query(User).filter(User.id == api_key_obj.user_id).first()
|
| |
|
| | if not user or not user.is_active:
|
| | raise AuthenticationError("User not found or inactive")
|
| |
|
| | return user
|
| |
|
| |
|
| |
|
| | async def get_current_user_flexible(
|
| | bearer: Optional[HTTPAuthorizationCredentials] = Security(bearer_scheme, auto_error=False),
|
| | api_key: Optional[str] = Security(api_key_header),
|
| | db: Session = Depends(get_db)
|
| | ) -> User:
|
| | """Get current user from JWT or API key"""
|
| |
|
| |
|
| | if bearer:
|
| | try:
|
| | token = bearer.credentials
|
| | payload = decode_token(token)
|
| | user_id: int = payload.get("sub")
|
| |
|
| | user = db.query(User).filter(User.id == user_id).first()
|
| | if user and user.is_active:
|
| | return user
|
| | except:
|
| | pass
|
| |
|
| |
|
| | if api_key:
|
| | user = await get_current_user_from_api_key(api_key, db)
|
| | if user:
|
| | return user
|
| |
|
| | raise AuthenticationError("Authentication required")
|
| |
|
| |
|
| |
|
| | async def get_current_superuser(
|
| | current_user: User = Depends(get_current_user_flexible)
|
| | ) -> User:
|
| | """Require superuser privileges"""
|
| |
|
| | if not current_user.is_superuser:
|
| | raise AuthorizationError("Superuser privileges required")
|
| |
|
| | return current_user
|
| |
|
| |
|
| |
|
| | def authenticate_user(
|
| | db: Session,
|
| | email: str,
|
| | password: str
|
| | ) -> Optional[User]:
|
| | """Authenticate user with email and password"""
|
| |
|
| | user = db.query(User).filter(User.email == email).first()
|
| | if not user:
|
| | return None
|
| |
|
| | if not verify_password(password, user.hashed_password):
|
| | return None
|
| |
|
| | return user
|
| |
|
| |
|
| |
|
| | def create_user(
|
| | db: Session,
|
| | email: str,
|
| | password: str,
|
| | full_name: Optional[str] = None,
|
| | is_superuser: bool = False
|
| | ) -> User:
|
| | """Create new user"""
|
| |
|
| |
|
| | existing_user = db.query(User).filter(User.email == email).first()
|
| | if existing_user:
|
| | raise ValueError("User with this email already exists")
|
| |
|
| |
|
| | user = User(
|
| | email=email,
|
| | hashed_password=get_password_hash(password),
|
| | full_name=full_name,
|
| | is_superuser=is_superuser,
|
| | is_active=True
|
| | )
|
| |
|
| | db.add(user)
|
| | db.commit()
|
| | db.refresh(user)
|
| |
|
| | return user
|
| |
|
| |
|
| |
|
| | def create_api_key_for_user(
|
| | db: Session,
|
| | user_id: int,
|
| | name: Optional[str] = None,
|
| | expires_days: Optional[int] = None
|
| | ) -> APIKey:
|
| | """Create API key for user"""
|
| |
|
| | key = generate_api_key()
|
| |
|
| | api_key = APIKey(
|
| | key=key,
|
| | name=name or "API Key",
|
| | user_id=user_id,
|
| | is_active=True,
|
| | rate_limit_per_minute=settings.RATE_LIMIT_PER_MINUTE,
|
| | rate_limit_per_hour=settings.RATE_LIMIT_PER_HOUR,
|
| | expires_at=datetime.utcnow() + timedelta(days=expires_days) if expires_days else None
|
| | )
|
| |
|
| | db.add(api_key)
|
| | db.commit()
|
| | db.refresh(api_key)
|
| |
|
| | return api_key
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| |
|
| | password = "test_password_123"
|
| | hashed = get_password_hash(password)
|
| | print(f"Hashed: {hashed}")
|
| | print(f"Verified: {verify_password(password, hashed)}")
|
| |
|
| |
|
| | token = create_access_token({"sub": 1, "email": "test@example.com"})
|
| | print(f"Token: {token}")
|
| |
|
| | payload = decode_token(token)
|
| | print(f"Decoded: {payload}")
|
| |
|
| |
|
| | api_key = generate_api_key()
|
| | print(f"API Key: {api_key}")
|
| |
|