Spaces:
Sleeping
Sleeping
| """ | |
| security.py β session tokens (JWT) and at-rest encryption for user secrets. | |
| A single server secret governs both: | |
| β’ signing/verifying session tokens (so the server can authenticate every | |
| request instead of trusting a client-supplied user_id β closes the IDOR class) | |
| β’ deriving a Fernet key to encrypt BYOK API keys in SQLite at rest | |
| Secret resolution order: | |
| 1. SECRET_KEY env var (set this in HF Space Secrets for production) | |
| 2. a persisted file next to the DB (auto-generated once) β survives restarts | |
| as long as the DB volume does | |
| 3. an ephemeral random value (last resort; sessions reset on restart) | |
| """ | |
| import os | |
| import time | |
| import base64 | |
| import hashlib | |
| import secrets | |
| import logging | |
| from pathlib import Path | |
| import jwt # PyJWT | |
| from cryptography.fernet import Fernet, InvalidToken | |
| from config import DB_PATH | |
| logger = logging.getLogger(__name__) | |
| def _load_secret() -> str: | |
| env = os.environ.get("SECRET_KEY", "").strip() | |
| if env: | |
| return env | |
| # Persist a generated secret alongside the database so sessions and encrypted | |
| # keys remain valid across restarts (as long as the volume persists). | |
| try: | |
| secret_path = Path(DB_PATH).resolve().parent / ".app_secret" | |
| if secret_path.exists(): | |
| val = secret_path.read_text(encoding="utf-8").strip() | |
| if val: | |
| return val | |
| val = secrets.token_urlsafe(48) | |
| secret_path.write_text(val, encoding="utf-8") | |
| try: | |
| os.chmod(secret_path, 0o600) | |
| except Exception: | |
| pass | |
| logger.warning( | |
| "SECRET_KEY not set β generated and persisted one at %s. " | |
| "Set SECRET_KEY in your environment for a stable, portable secret.", | |
| secret_path, | |
| ) | |
| return val | |
| except Exception as exc: | |
| logger.error("Could not persist app secret (%s) β using an ephemeral key; " | |
| "sessions and encrypted keys will reset on restart.", exc) | |
| return secrets.token_urlsafe(48) | |
| SECRET_KEY = _load_secret() | |
| # Fernet needs a 32-byte url-safe base64 key; derive it deterministically from | |
| # SECRET_KEY so one secret drives both signing and encryption. | |
| _FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(SECRET_KEY.encode()).digest())) | |
| # ββ Session tokens ββββββββββββββββββββββββββββββββββββββββββββ | |
| _JWT_ALG = "HS256" | |
| SESSION_TTL = 30 * 24 * 3600 # 30 days | |
| def mint_session(google_id: str) -> str: | |
| """Issue a signed session token for an authenticated user.""" | |
| now = int(time.time()) | |
| return jwt.encode( | |
| {"sub": google_id, "iat": now, "exp": now + SESSION_TTL}, | |
| SECRET_KEY, | |
| algorithm=_JWT_ALG, | |
| ) | |
| def verify_session(token: str) -> str | None: | |
| """Return the google_id from a valid token, or None if missing/expired/forged.""" | |
| if not token: | |
| return None | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[_JWT_ALG]) | |
| return payload.get("sub") or None | |
| except Exception: | |
| return None | |
| # ββ At-rest encryption for BYOK secrets βββββββββββββββββββββββ | |
| _ENC_PREFIX = "enc:v1:" | |
| def encrypt_secret(plaintext: str) -> str: | |
| """Encrypt a user secret for storage. Empty stays empty.""" | |
| if not plaintext: | |
| return plaintext or "" | |
| return _ENC_PREFIX + _FERNET.encrypt(plaintext.encode()).decode() | |
| def decrypt_secret(stored: str) -> str: | |
| """Decrypt a stored secret. Tolerates legacy plaintext rows (no prefix).""" | |
| if not stored: | |
| return stored or "" | |
| if not stored.startswith(_ENC_PREFIX): | |
| return stored # legacy plaintext written before encryption was added | |
| try: | |
| return _FERNET.decrypt(stored[len(_ENC_PREFIX):].encode()).decode() | |
| except InvalidToken: | |
| logger.warning("Failed to decrypt a stored secret (key changed?).") | |
| return "" | |