"""Simple JSON-file-backed API key store. Keys are stored as sha256 hashes; the plaintext is shown once at creation. Path comes from $API_KEYS_FILE; default is /data/api_keys.json on HF Spaces with persistent storage, otherwise ./api_keys.json next to main.py. """ from __future__ import annotations import hashlib import hmac import json import os import secrets from datetime import datetime, timezone from pathlib import Path from threading import Lock from typing import TypedDict _LOCK = Lock() def _default_path() -> Path: """Prefer HF Space persistent storage at /data so keys survive restarts.""" data_dir = Path("/data") if data_dir.is_dir() and os.access(data_dir, os.W_OK): return data_dir / "api_keys.json" return Path("api_keys.json") _PATH = Path(os.environ.get("API_KEYS_FILE") or _default_path()) class KeyRecord(TypedDict): id: str name: str hash: str prefix: str created_at: str last_used_at: str | None def _load() -> list[KeyRecord]: if not _PATH.exists(): return [] try: with _PATH.open("r", encoding="utf-8") as f: data = json.load(f) keys = data.get("keys", []) return keys if isinstance(keys, list) else [] except (json.JSONDecodeError, OSError): return [] def _save(keys: list[KeyRecord]) -> None: tmp = _PATH.with_suffix(".tmp") tmp.write_text(json.dumps({"keys": keys}, indent=2), encoding="utf-8") tmp.replace(_PATH) def _hash(plain: str) -> str: return hashlib.sha256(plain.encode("utf-8")).hexdigest() def list_keys() -> list[dict]: with _LOCK: keys = _load() return [ { "id": k["id"], "name": k["name"], "prefix": k["prefix"], "created_at": k["created_at"], "last_used_at": k.get("last_used_at"), } for k in keys ] def create_key(name: str) -> dict: """Returns the newly created record including the plaintext key (one-time).""" plaintext = "tx_" + secrets.token_hex(24) now = datetime.now(timezone.utc).isoformat(timespec="seconds") rec: KeyRecord = { "id": "kid_" + secrets.token_hex(8), "name": name.strip()[:120] or "unnamed", "hash": _hash(plaintext), "prefix": plaintext[:10], "created_at": now, "last_used_at": None, } with _LOCK: keys = _load() keys.append(rec) _save(keys) return { "id": rec["id"], "name": rec["name"], "prefix": rec["prefix"], "created_at": rec["created_at"], "key": plaintext, # shown ONCE } def revoke_key(key_id: str) -> bool: with _LOCK: keys = _load() new_keys = [k for k in keys if k["id"] != key_id] if len(new_keys) == len(keys): return False _save(new_keys) return True def verify_key(plain: str) -> bool: """Constant-time lookup; updates last_used_at on hit.""" if not plain: return False digest = _hash(plain) with _LOCK: keys = _load() hit_idx = -1 for i, k in enumerate(keys): if hmac.compare_digest(k["hash"], digest): hit_idx = i break if hit_idx == -1: return False keys[hit_idx]["last_used_at"] = datetime.now(timezone.utc).isoformat( timespec="seconds" ) _save(keys) return True