Spaces:
Running
Running
| """Frontend-side POSIX readers for `/data`. | |
| Frontend is read-mostly: it lists queue directories and reads small JSON | |
| records. The only write it performs is the atomic ``enqueue_pending`` in | |
| ``src.submission.frontend``. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| from typing import Dict, List, Optional | |
| from src.envs import ( | |
| COMPLETE_DIR, | |
| PENDING_DIR, | |
| PROCESSING_DIR, | |
| PROGRESS_DIR, | |
| RESULTS_DIR, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def _read_json(path: str) -> Optional[dict]: | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| return None | |
| except (OSError, json.JSONDecodeError) as e: | |
| logger.warning("malformed json %s: %s", path, e) | |
| return None | |
| def _list_json(dirpath: str) -> List[str]: | |
| try: | |
| names = os.listdir(dirpath) | |
| except FileNotFoundError: | |
| return [] | |
| return sorted(n for n in names if n.endswith(".json") and not n.startswith(".")) | |
| # ---------- queue ---------------------------------------------------------- | |
| def list_pending() -> List[dict]: | |
| out = [] | |
| for name in _list_json(PENDING_DIR): | |
| d = _read_json(os.path.join(PENDING_DIR, name)) | |
| if d: | |
| out.append(d) | |
| out.sort(key=lambda d: d.get("submitted_at") or "") | |
| return out | |
| def list_processing() -> List[dict]: | |
| out = [] | |
| for name in _list_json(PROCESSING_DIR): | |
| d = _read_json(os.path.join(PROCESSING_DIR, name)) | |
| if d: | |
| out.append(d) | |
| return out | |
| def list_complete(limit: int = 100) -> List[dict]: | |
| names = _list_json(COMPLETE_DIR) | |
| # most-recent first by mtime (cheap, no need to read JSON twice) | |
| items = [] | |
| for name in names: | |
| p = os.path.join(COMPLETE_DIR, name) | |
| try: | |
| mtime = os.path.getmtime(p) | |
| except OSError: | |
| continue | |
| items.append((mtime, p)) | |
| items.sort(reverse=True) | |
| out = [] | |
| for _, p in items[:limit]: | |
| d = _read_json(p) | |
| if d: | |
| out.append(d) | |
| return out | |
| def read_progress(sid: str) -> Optional[dict]: | |
| return _read_json(os.path.join(PROGRESS_DIR, f"{sid}.json")) | |
| # ---------- results / leaderboard ----------------------------------------- | |
| def list_task_results(task_name: str) -> List[dict]: | |
| task_dir = os.path.join(RESULTS_DIR, task_name) | |
| out: List[dict] = [] | |
| for name in _list_json(task_dir): | |
| if not name.endswith("_best.json"): | |
| continue | |
| d = _read_json(os.path.join(task_dir, name)) | |
| if d: | |
| out.append(d) | |
| return out | |
| def list_task_history(task_name: str, limit: int = 500) -> List[dict]: | |
| """Read all monthly ``_history.YYYY-MM.jsonl`` files for ``task_name``. | |
| Returns at most ``limit`` records, newest first (by ``completed_at``). | |
| """ | |
| task_dir = os.path.join(RESULTS_DIR, task_name) | |
| try: | |
| names = os.listdir(task_dir) | |
| except FileNotFoundError: | |
| return [] | |
| files = sorted( | |
| n for n in names | |
| if n.startswith("_history.") and n.endswith(".jsonl") and not n.startswith(".") | |
| ) | |
| rows: List[dict] = [] | |
| for name in files: | |
| path = os.path.join(task_dir, name) | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| rows.append(json.loads(line)) | |
| except json.JSONDecodeError: | |
| continue | |
| except OSError: | |
| continue | |
| rows.sort(key=lambda r: r.get("completed_at") or "", reverse=True) | |
| return rows[:limit] | |
| # ---------- /data health snapshot (for About / debugging) ----------------- | |
| def data_root_present() -> bool: | |
| return os.path.isdir(PENDING_DIR) | |