"""Frontend-side POSIX readers for `/data`. Frontend is read-mostly: it lists queue directories and reads small JSON records. The only write it performs is the atomic ``enqueue_pending`` in ``src.submission.frontend``. """ from __future__ import annotations import json import logging import os from typing import Dict, List, Optional from src.envs import ( COMPLETE_DIR, PENDING_DIR, PROCESSING_DIR, PROGRESS_DIR, RESULTS_DIR, ) logger = logging.getLogger(__name__) def _read_json(path: str) -> Optional[dict]: try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: return None except (OSError, json.JSONDecodeError) as e: logger.warning("malformed json %s: %s", path, e) return None def _list_json(dirpath: str) -> List[str]: try: names = os.listdir(dirpath) except FileNotFoundError: return [] return sorted(n for n in names if n.endswith(".json") and not n.startswith(".")) # ---------- queue ---------------------------------------------------------- def list_pending() -> List[dict]: out = [] for name in _list_json(PENDING_DIR): d = _read_json(os.path.join(PENDING_DIR, name)) if d: out.append(d) out.sort(key=lambda d: d.get("submitted_at") or "") return out def list_processing() -> List[dict]: out = [] for name in _list_json(PROCESSING_DIR): d = _read_json(os.path.join(PROCESSING_DIR, name)) if d: out.append(d) return out def list_complete(limit: int = 100) -> List[dict]: names = _list_json(COMPLETE_DIR) # most-recent first by mtime (cheap, no need to read JSON twice) items = [] for name in names: p = os.path.join(COMPLETE_DIR, name) try: mtime = os.path.getmtime(p) except OSError: continue items.append((mtime, p)) items.sort(reverse=True) out = [] for _, p in items[:limit]: d = _read_json(p) if d: out.append(d) return out def read_progress(sid: str) -> Optional[dict]: return _read_json(os.path.join(PROGRESS_DIR, f"{sid}.json")) # ---------- results / leaderboard ----------------------------------------- def list_task_results(task_name: str) -> List[dict]: task_dir = os.path.join(RESULTS_DIR, task_name) out: List[dict] = [] for name in _list_json(task_dir): if not name.endswith("_best.json"): continue d = _read_json(os.path.join(task_dir, name)) if d: out.append(d) return out def list_task_history(task_name: str, limit: int = 500) -> List[dict]: """Read all monthly ``_history.YYYY-MM.jsonl`` files for ``task_name``. Returns at most ``limit`` records, newest first (by ``completed_at``). """ task_dir = os.path.join(RESULTS_DIR, task_name) try: names = os.listdir(task_dir) except FileNotFoundError: return [] files = sorted( n for n in names if n.startswith("_history.") and n.endswith(".jsonl") and not n.startswith(".") ) rows: List[dict] = [] for name in files: path = os.path.join(task_dir, name) try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: rows.append(json.loads(line)) except json.JSONDecodeError: continue except OSError: continue rows.sort(key=lambda r: r.get("completed_at") or "", reverse=True) return rows[:limit] # ---------- /data health snapshot (for About / debugging) ----------------- def data_root_present() -> bool: return os.path.isdir(PENDING_DIR)