Spaces:
Sleeping
Sleeping
File size: 3,565 Bytes
7701dac 9777394 7701dac 9777394 7701dac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | """
Authentication utilities β password hashing, JWT creation / verification.
"""
import os
import hashlib
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
# ββ Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββ
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "factcheck-thesis-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
# ββ Password hashing (SHA-256 + salt) ββββββββββββββββββββββββββββββββ
# ββ Bearer token scheme βββββββββββββββββββββββββββββββββββββββββββββ
bearer_scheme = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
"""Hash a plaintext password using SHA-256 with a random salt."""
salt = secrets.token_hex(16)
hashed = hashlib.sha256((salt + password).encode()).hexdigest()
return f"{salt}${hashed}"
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plaintext password against its salted SHA-256 hash."""
try:
salt, stored_hash = hashed_password.split("$", 1)
test_hash = hashlib.sha256((salt + plain_password).encode()).hexdigest()
return test_hash == stored_hash
except (ValueError, AttributeError):
return False
def create_access_token(user_id: int, email: str) -> str:
"""Create a signed JWT with the user's id and email in the payload."""
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": str(user_id),
"email": email,
"exp": expire,
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict:
"""
Decode and validate a JWT.
Returns the payload dict or raises an HTTPException.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token.",
)
# ββ FastAPI dependencies βββββββββββββββββββββββββββββββββββββββββββββ
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
) -> Optional[dict]:
"""
Dependency that extracts the current user from the Authorization header.
Returns the decoded payload dict if a valid token is present, or None if
no token is provided or if the token is invalid/expired.
"""
if credentials is None:
return None
try:
return decode_access_token(credentials.credentials)
except HTTPException:
return None
async def require_current_user(
credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()),
) -> dict:
"""
Stricter version β always requires a valid token.
Raises 401 if missing or invalid.
"""
return decode_access_token(credentials.credentials)
|