| """In-process hourly KPI rollup, owned by the backend Space lifespan. |
| |
| Replaces an external GitHub Actions cron so the rollup lives next to the data |
| and reuses the Space's existing HF token — no production secrets on the |
| public source repo. See ``scripts/build_kpis.py`` for the data-flow diagram |
| and metric definitions. |
| |
| Behaviour:: |
| |
| lifespan startup → start APScheduler with cron("5 * * * *", UTC) |
| → fire a best-effort 6-hour backfill (fire-and-forget) |
| each :05 → run ``build_kpis.run_for_hour`` for the just-completed hour |
| lifespan shutdown → scheduler.shutdown(wait=False) |
| |
| Environment:: |
| |
| HF_KPI_WRITE_TOKEN | HF_SESSION_UPLOAD_TOKEN | HF_TOKEN | HF_ADMIN_TOKEN |
| First one found is used. Least-privilege first. |
| KPI_SOURCE_REPO default smolagents/ml-intern-sessions |
| KPI_TARGET_REPO default smolagents/ml-intern-kpis |
| ML_INTERN_KPIS_DISABLED if truthy, the scheduler is not started |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import importlib.util |
| import logging |
| import os |
| from datetime import datetime, timedelta, timezone |
| from pathlib import Path |
| from typing import Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _PROJECT_ROOT = Path(__file__).resolve().parent.parent |
|
|
| |
| _background_tasks: set[asyncio.Task] = set() |
|
|
| _scheduler = None |
|
|
|
|
| def _resolve_token() -> Optional[str]: |
| """Pick the first available HF token. Least-privilege first.""" |
| for var in ( |
| "HF_KPI_WRITE_TOKEN", |
| "HF_SESSION_UPLOAD_TOKEN", |
| "HF_TOKEN", |
| "HF_ADMIN_TOKEN", |
| ): |
| val = os.environ.get(var) |
| if val: |
| return val |
| return None |
|
|
|
|
| def _load_build_kpis(): |
| """Import ``scripts/build_kpis.py`` without putting ``scripts/`` on sys.path.""" |
| spec = importlib.util.spec_from_file_location( |
| "build_kpis", |
| _PROJECT_ROOT / "scripts" / "build_kpis.py", |
| ) |
| mod = importlib.util.module_from_spec(spec) |
| assert spec.loader is not None |
| spec.loader.exec_module(mod) |
| return mod |
|
|
|
|
| async def _run_hour(hour_dt: datetime) -> None: |
| """Run one hourly rollup off the event loop. Best-effort, never raises.""" |
| token = _resolve_token() |
| if not token: |
| logger.warning("kpis_scheduler: no HF token available, skipping %s", hour_dt) |
| return |
| try: |
| mod = _load_build_kpis() |
| from huggingface_hub import HfApi |
|
|
| api = HfApi() |
| source = os.environ.get("KPI_SOURCE_REPO", "smolagents/ml-intern-sessions") |
| target = os.environ.get("KPI_TARGET_REPO", "smolagents/ml-intern-kpis") |
| await asyncio.to_thread(mod.run_for_hour, api, source, target, hour_dt, token) |
| except Exception as e: |
| logger.warning("kpis_scheduler: rollup for %s failed: %s", hour_dt, e) |
|
|
|
|
| async def run_last_completed_hour() -> None: |
| """The scheduled-at-:05 job. Rolls up the previous whole hour.""" |
| now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) |
| await _run_hour(now - timedelta(hours=1)) |
|
|
|
|
| async def backfill(hours: int = 6) -> None: |
| """Catch-up pass for hours the Space was down. Idempotent (overwrites).""" |
| now = datetime.now(timezone.utc).replace(minute=0, second=0, microsecond=0) |
| for i in range(1, hours + 1): |
| await _run_hour(now - timedelta(hours=i)) |
|
|
|
|
| def start(backfill_hours: int = 6) -> None: |
| """Called from FastAPI lifespan startup.""" |
| global _scheduler |
| if os.environ.get("ML_INTERN_KPIS_DISABLED"): |
| logger.info("kpis_scheduler: disabled via ML_INTERN_KPIS_DISABLED") |
| return |
| if _scheduler is not None: |
| return |
|
|
| try: |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| from apscheduler.triggers.cron import CronTrigger |
| except ImportError: |
| logger.warning("kpis_scheduler: apscheduler not installed, skipping") |
| return |
|
|
| _scheduler = AsyncIOScheduler(timezone="UTC") |
| _scheduler.add_job( |
| run_last_completed_hour, |
| CronTrigger(minute=5), |
| id="kpis_hourly", |
| misfire_grace_time=600, |
| coalesce=True, |
| max_instances=1, |
| replace_existing=True, |
| ) |
| _scheduler.start() |
| logger.info("kpis_scheduler: started (cron '5 * * * *' UTC)") |
|
|
| |
| |
| try: |
| task = asyncio.get_running_loop().create_task(backfill(backfill_hours)) |
| _background_tasks.add(task) |
| task.add_done_callback(_background_tasks.discard) |
| except RuntimeError: |
| |
| pass |
|
|
|
|
| async def shutdown() -> None: |
| """Called from FastAPI lifespan shutdown.""" |
| global _scheduler |
| if _scheduler is None: |
| return |
| _scheduler.shutdown(wait=False) |
| _scheduler = None |
| logger.info("kpis_scheduler: stopped") |
|
|