Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import hashlib | |
| import hmac | |
| import json | |
| import time | |
| from typing import Any | |
| from core.config import get_settings | |
| def create_access_token(user: dict[str, Any], expires_in_seconds: int = 60 * 60 * 24 * 7) -> str: | |
| now = int(time.time()) | |
| payload = { | |
| "sub": user["username"], | |
| "role": user["role"], | |
| "iat": now, | |
| "exp": now + expires_in_seconds, | |
| } | |
| payload_bytes = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8") | |
| payload_b64 = _b64encode(payload_bytes) | |
| signature = _sign(payload_b64.encode("ascii")) | |
| return f"{payload_b64}.{signature}" | |
| def verify_access_token(token: str) -> dict[str, Any] | None: | |
| try: | |
| payload_b64, signature = token.split(".", 1) | |
| except ValueError: | |
| return None | |
| expected = _sign(payload_b64.encode("ascii")) | |
| if not hmac.compare_digest(signature, expected): | |
| return None | |
| try: | |
| payload = json.loads(_b64decode(payload_b64)) | |
| except (ValueError, json.JSONDecodeError): | |
| return None | |
| if int(payload.get("exp", 0)) < int(time.time()): | |
| return None | |
| return payload | |
| def _sign(data: bytes) -> str: | |
| secret = get_settings().auth_secret_key.encode("utf-8") | |
| digest = hmac.new(secret, data, hashlib.sha256).digest() | |
| return _b64encode(digest) | |
| def _b64encode(data: bytes) -> str: | |
| return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") | |
| def _b64decode(data: str) -> bytes: | |
| padding = "=" * (-len(data) % 4) | |
| return base64.urlsafe_b64decode(data + padding) | |