"""Security utilities - password hashing and JWT tokens""" import os from datetime import datetime, timedelta from typing import Optional import hashlib import hmac import base64 import json import secrets # JWT settings SECRET_KEY = os.getenv("JWT_SECRET_KEY", secrets.token_hex(32)) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = 24 * 7 # 1 week def hash_password(password: str) -> str: """Hash password using PBKDF2-SHA256""" salt = secrets.token_hex(16) pwd_hash = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000 ) return f"{salt}:{pwd_hash.hex()}" def verify_password(password: str, hashed: str) -> bool: """Verify password against hash""" try: salt, stored_hash = hashed.split(':') pwd_hash = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000 ) return hmac.compare_digest(pwd_hash.hex(), stored_hash) except ValueError: return False def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)) to_encode.update({"exp": expire.timestamp()}) # Create JWT manually (no external dependency) header = {"alg": ALGORITHM, "typ": "JWT"} header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).rstrip(b'=').decode() payload_b64 = base64.urlsafe_b64encode(json.dumps(to_encode).encode()).rstrip(b'=').decode() signature_input = f"{header_b64}.{payload_b64}" signature = hmac.new( SECRET_KEY.encode(), signature_input.encode(), hashlib.sha256 ).digest() signature_b64 = base64.urlsafe_b64encode(signature).rstrip(b'=').decode() return f"{header_b64}.{payload_b64}.{signature_b64}" def decode_access_token(token: str) -> Optional[dict]: """Decode and verify JWT token""" try: parts = token.split('.') if len(parts) != 3: return None header_b64, payload_b64, signature_b64 = parts # Verify signature signature_input = f"{header_b64}.{payload_b64}" expected_sig = hmac.new( SECRET_KEY.encode(), signature_input.encode(), hashlib.sha256 ).digest() expected_sig_b64 = base64.urlsafe_b64encode(expected_sig).rstrip(b'=').decode() if not hmac.compare_digest(signature_b64, expected_sig_b64): return None # Decode payload padding = 4 - len(payload_b64) % 4 payload_b64 += '=' * padding payload = json.loads(base64.urlsafe_b64decode(payload_b64)) # Check expiration if payload.get("exp", 0) < datetime.utcnow().timestamp(): return None return payload except Exception: return None