Spaces:
Configuration error
Configuration error
| """Personal API keys for programmatic access to ``/api/*``. | |
| Storage backend: | |
| - If the Firebase Admin SDK is initialized (``FIREBASE_ADMIN_CREDENTIALS`` | |
| is configured) we persist to Firestore at ``users/{uid}/api_keys/{id}``. | |
| - Otherwise we fall back to a per-process in-memory dict — fine for tests | |
| and local dev, useless in production. The Admin SDK init log line tells | |
| you which mode you are in. | |
| Security: | |
| - Keys are 32 bytes of url-safe randomness, prefixed with ``evk_``. | |
| - We store ``sha256(key)`` plus the first 8 chars of the random portion as | |
| a display prefix. The raw value is shown to the user exactly once. | |
| - Revocation is a flag on the record, not a delete, so previously-used keys | |
| remain auditable. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import logging | |
| import secrets | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Optional | |
| logger = logging.getLogger(__name__) | |
| KEY_PREFIX = "evk_" | |
| PREFIX_DISPLAY_CHARS = 8 | |
| RAW_BYTES = 32 # 256 bits | |
| # --------------------------------------------------------------------------- | |
| # Record dataclass | |
| # --------------------------------------------------------------------------- | |
| class ApiKeyRecord: | |
| id: str | |
| uid: str | |
| name: str | |
| prefix: str # the first PREFIX_DISPLAY_CHARS chars of the raw secret | |
| hash: str # sha256 of the full evk_... string | |
| created_at: float | |
| last_used_at: Optional[float] = None | |
| revoked: bool = False | |
| # Lightweight owner identity copy so the auth layer doesn't need a | |
| # second Firestore round-trip just to populate AuthedUser. | |
| owner_email: Optional[str] = None | |
| owner_name: Optional[str] = None | |
| def public_dict(self) -> dict[str, object]: | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "prefix": self.prefix, | |
| "created_at": int(self.created_at), | |
| "last_used_at": int(self.last_used_at) if self.last_used_at else None, | |
| "revoked": self.revoked, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # In-memory store (default / test fallback) | |
| # --------------------------------------------------------------------------- | |
| _lock = threading.Lock() | |
| _store: dict[str, dict[str, ApiKeyRecord]] = {} | |
| _hash_index: dict[str, tuple[str, str]] = {} # hash -> (uid, key_id) | |
| def _hash_token(token: str) -> str: | |
| return hashlib.sha256(token.encode("utf-8")).hexdigest() | |
| def _generate_token() -> tuple[str, str, str]: | |
| """Returns (raw_token, sha256_hex, prefix_for_display).""" | |
| raw = secrets.token_urlsafe(RAW_BYTES) | |
| token = f"{KEY_PREFIX}{raw}" | |
| return token, _hash_token(token), token[: len(KEY_PREFIX) + PREFIX_DISPLAY_CHARS] | |
| # --------------------------------------------------------------------------- | |
| # Firestore-backed store (when Admin SDK is initialized) | |
| # --------------------------------------------------------------------------- | |
| def _firestore_collection(uid: str): # type: ignore[no-untyped-def] | |
| from . import auth as auth_mod | |
| db = auth_mod.firestore_db() | |
| if db is None: | |
| return None | |
| return db.collection("users").document(uid).collection("api_keys") | |
| def _firestore_doc_to_record(uid: str, doc: object) -> ApiKeyRecord: # type: ignore[no-untyped-def] | |
| data = doc.to_dict() or {} # type: ignore[attr-defined] | |
| return ApiKeyRecord( | |
| id=doc.id, # type: ignore[attr-defined] | |
| uid=uid, | |
| name=str(data.get("name") or ""), | |
| prefix=str(data.get("prefix") or ""), | |
| hash=str(data.get("hash") or ""), | |
| created_at=float(data.get("created_at") or 0.0), | |
| last_used_at=(float(data["last_used_at"]) if data.get("last_used_at") else None), | |
| revoked=bool(data.get("revoked") or False), | |
| owner_email=data.get("owner_email"), | |
| owner_name=data.get("owner_name"), | |
| ) | |
| def _firestore_index_collection(): # type: ignore[no-untyped-def] | |
| """Top-level lookup so we can find a record by hash without scanning all | |
| user subcollections. Document id = sha256(token); fields = uid, key_id.""" | |
| from . import auth as auth_mod | |
| db = auth_mod.firestore_db() | |
| if db is None: | |
| return None | |
| return db.collection("api_key_index") | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def list_keys(uid: str) -> list[ApiKeyRecord]: | |
| """Return all keys for a user, newest first.""" | |
| coll = _firestore_collection(uid) | |
| if coll is not None: | |
| try: | |
| docs = list(coll.stream()) | |
| records = [_firestore_doc_to_record(uid, d) for d in docs] | |
| records.sort(key=lambda r: r.created_at, reverse=True) | |
| return records | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Firestore list_keys failed; falling back to in-memory") | |
| with _lock: | |
| records = list(_store.get(uid, {}).values()) | |
| records.sort(key=lambda r: r.created_at, reverse=True) | |
| return records | |
| def create_key( | |
| uid: str, | |
| name: str, | |
| *, | |
| owner_email: Optional[str] = None, | |
| owner_name: Optional[str] = None, | |
| ) -> tuple[str, ApiKeyRecord]: | |
| """Generate a new key for ``uid``. Returns ``(raw_token, record)``. The | |
| raw token is the only chance the caller has to see it; we store only the | |
| SHA-256 hash + a short display prefix.""" | |
| token, token_hash, prefix = _generate_token() | |
| record = ApiKeyRecord( | |
| id=secrets.token_urlsafe(12), | |
| uid=uid, | |
| name=(name or "").strip()[:80] or "Untitled key", | |
| prefix=prefix, | |
| hash=token_hash, | |
| created_at=time.time(), | |
| owner_email=owner_email, | |
| owner_name=owner_name, | |
| ) | |
| coll = _firestore_collection(uid) | |
| if coll is not None: | |
| try: | |
| coll.document(record.id).set( | |
| { | |
| "name": record.name, | |
| "prefix": record.prefix, | |
| "hash": record.hash, | |
| "created_at": record.created_at, | |
| "revoked": False, | |
| "owner_email": record.owner_email, | |
| "owner_name": record.owner_name, | |
| } | |
| ) | |
| idx = _firestore_index_collection() | |
| if idx is not None: | |
| idx.document(record.hash).set( | |
| {"uid": uid, "key_id": record.id, "revoked": False} | |
| ) | |
| return token, record | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Firestore create_key failed; falling back to in-memory") | |
| with _lock: | |
| _store.setdefault(uid, {})[record.id] = record | |
| _hash_index[record.hash] = (uid, record.id) | |
| return token, record | |
| def revoke_key(uid: str, key_id: str) -> bool: | |
| """Mark a key as revoked. Returns True if found, False otherwise.""" | |
| coll = _firestore_collection(uid) | |
| if coll is not None: | |
| try: | |
| doc_ref = coll.document(key_id) | |
| snap = doc_ref.get() | |
| if not snap.exists: | |
| return False | |
| data = snap.to_dict() or {} | |
| doc_ref.update({"revoked": True}) | |
| idx = _firestore_index_collection() | |
| if idx is not None and data.get("hash"): | |
| idx.document(str(data["hash"])).update({"revoked": True}) | |
| return True | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Firestore revoke_key failed; falling back to in-memory") | |
| with _lock: | |
| bucket = _store.get(uid) | |
| if not bucket or key_id not in bucket: | |
| return False | |
| bucket[key_id].revoked = True | |
| return True | |
| def find_active_by_token(token: str) -> Optional[ApiKeyRecord]: | |
| """Resolve a raw ``evk_…`` token to its record, or return None if it's | |
| unknown / revoked.""" | |
| if not token.startswith(KEY_PREFIX): | |
| return None | |
| token_hash = _hash_token(token) | |
| idx = _firestore_index_collection() | |
| if idx is not None: | |
| try: | |
| snap = idx.document(token_hash).get() | |
| if snap.exists: | |
| data = snap.to_dict() or {} | |
| if data.get("revoked"): | |
| return None | |
| uid = str(data.get("uid") or "") | |
| key_id = str(data.get("key_id") or "") | |
| if not uid or not key_id: | |
| return None | |
| coll = _firestore_collection(uid) | |
| if coll is None: | |
| return None | |
| doc_snap = coll.document(key_id).get() | |
| if not doc_snap.exists: | |
| return None | |
| rec = _firestore_doc_to_record(uid, doc_snap) | |
| if rec.revoked: | |
| return None | |
| return rec | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Firestore find_active_by_token failed; falling back to in-memory") | |
| with _lock: | |
| hit = _hash_index.get(token_hash) | |
| if not hit: | |
| return None | |
| uid, key_id = hit | |
| rec = _store.get(uid, {}).get(key_id) | |
| if rec is None or rec.revoked: | |
| return None | |
| return rec | |
| def touch_last_used(uid: str, key_id: str) -> None: | |
| """Best-effort update of `last_used_at`. Errors are swallowed.""" | |
| now = time.time() | |
| coll = _firestore_collection(uid) | |
| if coll is not None: | |
| try: | |
| coll.document(key_id).update({"last_used_at": now}) | |
| return | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Firestore touch_last_used failed; falling back to in-memory") | |
| with _lock: | |
| rec = _store.get(uid, {}).get(key_id) | |
| if rec is not None: | |
| rec.last_used_at = now | |
| def reset_for_tests() -> None: | |
| """Wipe the in-memory store. Tests call this between runs.""" | |
| with _lock: | |
| _store.clear() | |
| _hash_index.clear() | |