Spaces:
Sleeping
Sleeping
| """Security utilities - password hashing and JWT tokens""" | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| import hashlib | |
| import hmac | |
| import base64 | |
| import json | |
| import secrets | |
| # JWT settings | |
| SECRET_KEY = os.getenv("JWT_SECRET_KEY", secrets.token_hex(32)) | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7 # 1 week | |
| def hash_password(password: str) -> str: | |
| """Hash password using PBKDF2-SHA256""" | |
| salt = secrets.token_hex(16) | |
| pwd_hash = hashlib.pbkdf2_hmac( | |
| 'sha256', | |
| password.encode('utf-8'), | |
| salt.encode('utf-8'), | |
| 100000 | |
| ) | |
| return f"{salt}:{pwd_hash.hex()}" | |
| def verify_password(password: str, hashed: str) -> bool: | |
| """Verify password against hash""" | |
| try: | |
| salt, stored_hash = hashed.split(':') | |
| pwd_hash = hashlib.pbkdf2_hmac( | |
| 'sha256', | |
| password.encode('utf-8'), | |
| salt.encode('utf-8'), | |
| 100000 | |
| ) | |
| return hmac.compare_digest(pwd_hash.hex(), stored_hash) | |
| except ValueError: | |
| return False | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| """Create JWT access token""" | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)) | |
| to_encode.update({"exp": expire.timestamp()}) | |
| # Create JWT manually (no external dependency) | |
| header = {"alg": ALGORITHM, "typ": "JWT"} | |
| header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=').decode() | |
| payload_b64 = base64.urlsafe_b64encode(json.dumps(to_encode).encode()).rstrip(b'=').decode() | |
| signature_input = f"{header_b64}.{payload_b64}" | |
| signature = hmac.new( | |
| SECRET_KEY.encode(), | |
| signature_input.encode(), | |
| hashlib.sha256 | |
| ).digest() | |
| signature_b64 = base64.urlsafe_b64encode(signature).rstrip(b'=').decode() | |
| return f"{header_b64}.{payload_b64}.{signature_b64}" | |
| def decode_access_token(token: str) -> Optional[dict]: | |
| """Decode and verify JWT token""" | |
| try: | |
| parts = token.split('.') | |
| if len(parts) != 3: | |
| return None | |
| header_b64, payload_b64, signature_b64 = parts | |
| # Verify signature | |
| signature_input = f"{header_b64}.{payload_b64}" | |
| expected_sig = hmac.new( | |
| SECRET_KEY.encode(), | |
| signature_input.encode(), | |
| hashlib.sha256 | |
| ).digest() | |
| expected_sig_b64 = base64.urlsafe_b64encode(expected_sig).rstrip(b'=').decode() | |
| if not hmac.compare_digest(signature_b64, expected_sig_b64): | |
| return None | |
| # Decode payload | |
| padding = 4 - len(payload_b64) % 4 | |
| payload_b64 += '=' * padding | |
| payload = json.loads(base64.urlsafe_b64decode(payload_b64)) | |
| # Check expiration | |
| if payload.get("exp", 0) < datetime.utcnow().timestamp(): | |
| return None | |
| return payload | |
| except Exception: | |
| return None | |