Spaces:
Sleeping
Sleeping
| """ | |
| Authentication utilities β password hashing, JWT creation / verification. | |
| """ | |
| import os | |
| import hashlib | |
| import secrets | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from jose import JWTError, jwt | |
| # ββ Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SECRET_KEY = os.getenv("JWT_SECRET_KEY", "factcheck-thesis-secret-key-change-in-production") | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days | |
| # ββ Password hashing (SHA-256 + salt) ββββββββββββββββββββββββββββββββ | |
| # ββ Bearer token scheme βββββββββββββββββββββββββββββββββββββββββββββ | |
| bearer_scheme = HTTPBearer(auto_error=False) | |
| def hash_password(password: str) -> str: | |
| """Hash a plaintext password using SHA-256 with a random salt.""" | |
| salt = secrets.token_hex(16) | |
| hashed = hashlib.sha256((salt + password).encode()).hexdigest() | |
| return f"{salt}${hashed}" | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """Verify a plaintext password against its salted SHA-256 hash.""" | |
| try: | |
| salt, stored_hash = hashed_password.split("$", 1) | |
| test_hash = hashlib.sha256((salt + plain_password).encode()).hexdigest() | |
| return test_hash == stored_hash | |
| except (ValueError, AttributeError): | |
| return False | |
| def create_access_token(user_id: int, email: str) -> str: | |
| """Create a signed JWT with the user's id and email in the payload.""" | |
| expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| payload = { | |
| "sub": str(user_id), | |
| "email": email, | |
| "exp": expire, | |
| } | |
| return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) | |
| def decode_access_token(token: str) -> dict: | |
| """ | |
| Decode and validate a JWT. | |
| Returns the payload dict or raises an HTTPException. | |
| """ | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| return payload | |
| except JWTError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token.", | |
| ) | |
| # ββ FastAPI dependencies βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def get_current_user( | |
| credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), | |
| ) -> Optional[dict]: | |
| """ | |
| Dependency that extracts the current user from the Authorization header. | |
| Returns the decoded payload dict if a valid token is present, or None if | |
| no token is provided or if the token is invalid/expired. | |
| """ | |
| if credentials is None: | |
| return None | |
| try: | |
| return decode_access_token(credentials.credentials) | |
| except HTTPException: | |
| return None | |
| async def require_current_user( | |
| credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()), | |
| ) -> dict: | |
| """ | |
| Stricter version β always requires a valid token. | |
| Raises 401 if missing or invalid. | |
| """ | |
| return decode_access_token(credentials.credentials) | |