Spaces:
Sleeping
Sleeping
File size: 3,033 Bytes
c4a0359 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | import os
from jose import jwt
from jose.exceptions import ExpiredSignatureError, JWTError
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from passlib.context import CryptContext
import logging
logger = logging.getLogger(__name__)
# JWT configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", os.getenv("MASTER_PASSWORD", "change-me-in-production"))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7 # 7 days
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# HTTP Bearer token scheme
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""Verify and decode a JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except ExpiredSignatureError:
logger.warning("Token has expired")
return None
except JWTError:
logger.warning("Invalid token")
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Dict[str, Any]:
"""
Dependency to get the current authenticated user from JWT token.
"""
token = credentials.credentials
payload = verify_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return payload
async def get_current_user_optional(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False))
) -> Optional[Dict[str, Any]]:
"""
Dependency to get the current user if authenticated, None otherwise.
"""
if credentials is None:
return None
token = credentials.credentials
payload = verify_token(token)
return payload
def require_auth(func):
"""
Decorator to require authentication for an endpoint.
"""
async def wrapper(*args, **kwargs):
# This will be handled by the Depends(get_current_user) in the route
return await func(*args, **kwargs)
return wrapper
|