| """Rate-limit, daily-cap, and quota-gating helpers for the IBM hardware path. |
| |
| The Space is exposed publicly. Without protection, a script could burn |
| through the project's IBM Quantum free-tier budget (~10 minutes / month) |
| in seconds. Three layers of defence: |
| |
| 1. **Per-IP rate limit** — at most one IBM submission per ``window_seconds`` |
| per visitor IP. The IP is read from ``x-forwarded-for`` (set by HF's |
| reverse proxy); the client_host fallback covers local dev. |
| 2. **Global daily cap** — at most ``daily_cap`` IBM submissions per UTC |
| day across all visitors. Persisted to a JSON file when a writable |
| directory is configured (HF Persistent Storage at ``/data`` when |
| enabled), in-memory otherwise. |
| 3. **Quota guard** — when the IBM monthly remaining seconds drops below |
| ``quota_floor_seconds``, every submission is blocked regardless of |
| the per-IP / daily counters. The remaining-seconds value is supplied |
| by the caller (the Space module probes ``service.usage()`` once an |
| hour and caches the result). |
| |
| Verdicts are returned as a dataclass; the Space module turns the verdict |
| into a Gradio output payload. Keeping the logic separate from Gradio |
| makes the unit tests trivial. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import threading |
| from dataclasses import dataclass |
| from datetime import UTC, date, datetime |
| from pathlib import Path |
| from typing import Literal |
|
|
| SafetyReason = Literal["rate_limited", "daily_cap", "quota_exceeded"] |
|
|
|
|
| @dataclass(frozen=True) |
| class SafetyVerdict: |
| """Outcome of a :meth:`RateLimiter.check_and_register` call.""" |
|
|
| allowed: bool |
| reason: SafetyReason | None |
| detail: str |
| daily_remaining: int |
| daily_cap: int |
|
|
|
|
| class RateLimiter: |
| """Per-IP + daily + quota gate for the IBM hardware path. |
| |
| Thread-safe via a single lock; the contention is negligible because |
| each submission is sub-millisecond. |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| window_seconds: int = 300, |
| daily_cap: int = 5, |
| quota_floor_seconds: int = 60, |
| persist_path: Path | None = None, |
| ) -> None: |
| if window_seconds < 0: |
| raise ValueError("window_seconds must be >= 0") |
| if daily_cap < 0: |
| raise ValueError("daily_cap must be >= 0") |
| self._window = window_seconds |
| self._cap = daily_cap |
| self._quota_floor = quota_floor_seconds |
| self._persist_path = persist_path |
| self._lock = threading.Lock() |
|
|
| |
| self._last_ip: dict[str, float] = {} |
|
|
| |
| self._day: date | None = None |
| self._count: int = 0 |
| self._load_persisted() |
|
|
| |
|
|
| def check_and_register( |
| self, |
| *, |
| ip: str, |
| now: datetime, |
| quota_remaining_seconds: float | None = None, |
| ) -> SafetyVerdict: |
| """Decide whether ``ip`` may submit an IBM job at ``now``. |
| |
| The function is **commit-on-allow**: a successful return updates |
| the per-IP timestamp and bumps the daily counter. Callers MUST |
| proceed with the IBM submission when ``allowed=True``. |
| """ |
| with self._lock: |
| self._roll_day_if_needed(now) |
|
|
| if quota_remaining_seconds is not None and quota_remaining_seconds < self._quota_floor: |
| return SafetyVerdict( |
| allowed=False, |
| reason="quota_exceeded", |
| detail=( |
| f"IBM monthly quota is exhausted: " |
| f"{quota_remaining_seconds:.0f}s remaining " |
| f"(floor={self._quota_floor}s). Resets on the 1st of next month UTC." |
| ), |
| daily_remaining=max(0, self._cap - self._count), |
| daily_cap=self._cap, |
| ) |
|
|
| if self._count >= self._cap: |
| return SafetyVerdict( |
| allowed=False, |
| reason="daily_cap", |
| detail=( |
| f"Daily limit of {self._cap} IBM runs reached. Resets at midnight UTC." |
| ), |
| daily_remaining=0, |
| daily_cap=self._cap, |
| ) |
|
|
| last = self._last_ip.get(ip) |
| if last is not None and (now.timestamp() - last) < self._window: |
| wait = self._window - int(now.timestamp() - last) |
| return SafetyVerdict( |
| allowed=False, |
| reason="rate_limited", |
| detail=( |
| f"Per-IP rate limit: wait {wait}s before submitting " |
| "another IBM job (5 minutes between runs per visitor)." |
| ), |
| daily_remaining=max(0, self._cap - self._count), |
| daily_cap=self._cap, |
| ) |
|
|
| |
| self._last_ip[ip] = now.timestamp() |
| self._count += 1 |
| self._persist() |
| return SafetyVerdict( |
| allowed=True, |
| reason=None, |
| detail="ok", |
| daily_remaining=max(0, self._cap - self._count), |
| daily_cap=self._cap, |
| ) |
|
|
| def daily_remaining(self, now: datetime) -> int: |
| """Read-only counter inspector for the UI badge.""" |
| with self._lock: |
| self._roll_day_if_needed(now) |
| return max(0, self._cap - self._count) |
|
|
| def daily_cap(self) -> int: |
| return self._cap |
|
|
| |
|
|
| def _roll_day_if_needed(self, now: datetime) -> None: |
| today = now.astimezone(UTC).date() |
| if self._day != today: |
| self._day = today |
| self._count = 0 |
| |
| |
|
|
| def _load_persisted(self) -> None: |
| if self._persist_path is None: |
| return |
| try: |
| raw = json.loads(self._persist_path.read_text(encoding="utf-8")) |
| except (FileNotFoundError, json.JSONDecodeError): |
| return |
| if not isinstance(raw, dict): |
| return |
| try: |
| self._day = date.fromisoformat(str(raw.get("date"))) |
| self._count = int(raw.get("count", 0)) |
| except (TypeError, ValueError): |
| self._day = None |
| self._count = 0 |
|
|
| def _persist(self) -> None: |
| if self._persist_path is None or self._day is None: |
| return |
| try: |
| self._persist_path.parent.mkdir(parents=True, exist_ok=True) |
| self._persist_path.write_text( |
| json.dumps({"date": self._day.isoformat(), "count": self._count}), |
| encoding="utf-8", |
| ) |
| except OSError: |
| |
| self._persist_path = None |
|
|
|
|
| def default_persist_path() -> Path | None: |
| """Return ``/data/qverify_quota.json`` when HF Persistent Storage is |
| mounted; ``None`` otherwise (in-memory only).""" |
| candidate = Path("/data") |
| if candidate.is_dir(): |
| try: |
| test = candidate / ".qv_write_test" |
| test.write_text("x") |
| test.unlink() |
| except OSError: |
| return None |
| return candidate / "qverify_quota.json" |
| return None |
|
|