Stem_Copilot / security.py
Krishna111111's picture
Audits, security checks
8552e0f
Raw
History Blame Contribute Delete
4 kB
"""
security.py β€” session tokens (JWT) and at-rest encryption for user secrets.
A single server secret governs both:
β€’ signing/verifying session tokens (so the server can authenticate every
request instead of trusting a client-supplied user_id β€” closes the IDOR class)
β€’ deriving a Fernet key to encrypt BYOK API keys in SQLite at rest
Secret resolution order:
1. SECRET_KEY env var (set this in HF Space Secrets for production)
2. a persisted file next to the DB (auto-generated once) β€” survives restarts
as long as the DB volume does
3. an ephemeral random value (last resort; sessions reset on restart)
"""
import os
import time
import base64
import hashlib
import secrets
import logging
from pathlib import Path
import jwt # PyJWT
from cryptography.fernet import Fernet, InvalidToken
from config import DB_PATH
logger = logging.getLogger(__name__)
def _load_secret() -> str:
env = os.environ.get("SECRET_KEY", "").strip()
if env:
return env
# Persist a generated secret alongside the database so sessions and encrypted
# keys remain valid across restarts (as long as the volume persists).
try:
secret_path = Path(DB_PATH).resolve().parent / ".app_secret"
if secret_path.exists():
val = secret_path.read_text(encoding="utf-8").strip()
if val:
return val
val = secrets.token_urlsafe(48)
secret_path.write_text(val, encoding="utf-8")
try:
os.chmod(secret_path, 0o600)
except Exception:
pass
logger.warning(
"SECRET_KEY not set β€” generated and persisted one at %s. "
"Set SECRET_KEY in your environment for a stable, portable secret.",
secret_path,
)
return val
except Exception as exc:
logger.error("Could not persist app secret (%s) β€” using an ephemeral key; "
"sessions and encrypted keys will reset on restart.", exc)
return secrets.token_urlsafe(48)
SECRET_KEY = _load_secret()
# Fernet needs a 32-byte url-safe base64 key; derive it deterministically from
# SECRET_KEY so one secret drives both signing and encryption.
_FERNET = Fernet(base64.urlsafe_b64encode(hashlib.sha256(SECRET_KEY.encode()).digest()))
# ── Session tokens ────────────────────────────────────────────
_JWT_ALG = "HS256"
SESSION_TTL = 30 * 24 * 3600 # 30 days
def mint_session(google_id: str) -> str:
"""Issue a signed session token for an authenticated user."""
now = int(time.time())
return jwt.encode(
{"sub": google_id, "iat": now, "exp": now + SESSION_TTL},
SECRET_KEY,
algorithm=_JWT_ALG,
)
def verify_session(token: str) -> str | None:
"""Return the google_id from a valid token, or None if missing/expired/forged."""
if not token:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[_JWT_ALG])
return payload.get("sub") or None
except Exception:
return None
# ── At-rest encryption for BYOK secrets ───────────────────────
_ENC_PREFIX = "enc:v1:"
def encrypt_secret(plaintext: str) -> str:
"""Encrypt a user secret for storage. Empty stays empty."""
if not plaintext:
return plaintext or ""
return _ENC_PREFIX + _FERNET.encrypt(plaintext.encode()).decode()
def decrypt_secret(stored: str) -> str:
"""Decrypt a stored secret. Tolerates legacy plaintext rows (no prefix)."""
if not stored:
return stored or ""
if not stored.startswith(_ENC_PREFIX):
return stored # legacy plaintext written before encryption was added
try:
return _FERNET.decrypt(stored[len(_ENC_PREFIX):].encode()).decode()
except InvalidToken:
logger.warning("Failed to decrypt a stored secret (key changed?).")
return ""