""" security.py — session tokens (JWT) and at-rest encryption for user secrets. A single server secret governs both: • signing/verifying session tokens (so the server can authenticate every request instead of trusting a client-supplied user_id — closes the IDOR class) • deriving a Fernet key to encrypt BYOK API keys in SQLite at rest Secret resolution order: 1. SECRET_KEY env var (set this in HF Space Secrets for production) 2. a persisted file next to the DB (auto-generated once) — survives restarts as long as the DB volume does 3. an ephemeral random value (last resort; sessions reset on restart) """ import os import time import base64 import hashlib import secrets import logging from pathlib import Path import jwt # PyJWT from cryptography.fernet import Fernet, InvalidToken from config import DB_PATH logger = logging.getLogger(__name__) def _load_secret() -> str: env = os.environ.get("SECRET_KEY", "").strip() if env: return env # Persist a generated secret alongside the database so sessions and encrypted # keys remain valid across restarts (as long as the volume persists). try: secret_path = Path(DB_PATH).resolve().parent / ".app_secret" if secret_path.exists(): val = secret_path.read_text(encoding="utf-8").strip() if val: return val val = secrets.token_urlsafe(48) secret_path.write_text(val, encoding="utf-8") try: os.chmod(secret_path, 0o600) except Exception: pass logger.warning( "SECRET_KEY not set — generated and persisted one at %s. " "Set SECRET_KEY in your environment for a stable, portable secret.", secret_path, ) return val except Exception as exc: logger.error("Could not persist app secret (%s) — using an ephemeral key; " "sessions and encrypted keys will reset on restart.", exc) return secrets.token_urlsafe(48) SECRET_KEY = _load_secret() # Fernet needs a 32-byte url-safe base64 key; derive it deterministically from # SECRET_KEY so one secret drives both signing and encryption. _FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(SECRET_KEY.encode()).digest())) # ── Session tokens ──────────────────────────────────────────── _JWT_ALG = "HS256" SESSION_TTL = 30 * 24 * 3600 # 30 days def mint_session(google_id: str) -> str: """Issue a signed session token for an authenticated user.""" now = int(time.time()) return jwt.encode( {"sub": google_id, "iat": now, "exp": now + SESSION_TTL}, SECRET_KEY, algorithm=_JWT_ALG, ) def verify_session(token: str) -> str | None: """Return the google_id from a valid token, or None if missing/expired/forged.""" if not token: return None try: payload = jwt.decode(token, SECRET_KEY, algorithms=[_JWT_ALG]) return payload.get("sub") or None except Exception: return None # ── At-rest encryption for BYOK secrets ─────────────────────── _ENC_PREFIX = "enc:v1:" def encrypt_secret(plaintext: str) -> str: """Encrypt a user secret for storage. Empty stays empty.""" if not plaintext: return plaintext or "" return _ENC_PREFIX + _FERNET.encrypt(plaintext.encode()).decode() def decrypt_secret(stored: str) -> str: """Decrypt a stored secret. Tolerates legacy plaintext rows (no prefix).""" if not stored: return stored or "" if not stored.startswith(_ENC_PREFIX): return stored # legacy plaintext written before encryption was added try: return _FERNET.decrypt(stored[len(_ENC_PREFIX):].encode()).decode() except InvalidToken: logger.warning("Failed to decrypt a stored secret (key changed?).") return ""