vLAR's picture
leaderboard
b4d728d
"""Frontend-side POSIX readers for `/data`.
Frontend is read-mostly: it lists queue directories and reads small JSON
records. The only write it performs is the atomic ``enqueue_pending`` in
``src.submission.frontend``.
"""
from __future__ import annotations
import json
import logging
import os
from typing import Dict, List, Optional
from src.envs import (
COMPLETE_DIR,
PENDING_DIR,
PROCESSING_DIR,
PROGRESS_DIR,
RESULTS_DIR,
)
logger = logging.getLogger(__name__)
def _read_json(path: str) -> Optional[dict]:
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
return None
except (OSError, json.JSONDecodeError) as e:
logger.warning("malformed json %s: %s", path, e)
return None
def _list_json(dirpath: str) -> List[str]:
try:
names = os.listdir(dirpath)
except FileNotFoundError:
return []
return sorted(n for n in names if n.endswith(".json") and not n.startswith("."))
# ---------- queue ----------------------------------------------------------
def list_pending() -> List[dict]:
out = []
for name in _list_json(PENDING_DIR):
d = _read_json(os.path.join(PENDING_DIR, name))
if d:
out.append(d)
out.sort(key=lambda d: d.get("submitted_at") or "")
return out
def list_processing() -> List[dict]:
out = []
for name in _list_json(PROCESSING_DIR):
d = _read_json(os.path.join(PROCESSING_DIR, name))
if d:
out.append(d)
return out
def list_complete(limit: int = 100) -> List[dict]:
names = _list_json(COMPLETE_DIR)
# most-recent first by mtime (cheap, no need to read JSON twice)
items = []
for name in names:
p = os.path.join(COMPLETE_DIR, name)
try:
mtime = os.path.getmtime(p)
except OSError:
continue
items.append((mtime, p))
items.sort(reverse=True)
out = []
for _, p in items[:limit]:
d = _read_json(p)
if d:
out.append(d)
return out
def read_progress(sid: str) -> Optional[dict]:
return _read_json(os.path.join(PROGRESS_DIR, f"{sid}.json"))
# ---------- results / leaderboard -----------------------------------------
def list_task_results(task_name: str) -> List[dict]:
task_dir = os.path.join(RESULTS_DIR, task_name)
out: List[dict] = []
for name in _list_json(task_dir):
if not name.endswith("_best.json"):
continue
d = _read_json(os.path.join(task_dir, name))
if d:
out.append(d)
return out
def list_task_history(task_name: str, limit: int = 500) -> List[dict]:
"""Read all monthly ``_history.YYYY-MM.jsonl`` files for ``task_name``.
Returns at most ``limit`` records, newest first (by ``completed_at``).
"""
task_dir = os.path.join(RESULTS_DIR, task_name)
try:
names = os.listdir(task_dir)
except FileNotFoundError:
return []
files = sorted(
n for n in names
if n.startswith("_history.") and n.endswith(".jsonl") and not n.startswith(".")
)
rows: List[dict] = []
for name in files:
path = os.path.join(task_dir, name)
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rows.append(json.loads(line))
except json.JSONDecodeError:
continue
except OSError:
continue
rows.sort(key=lambda r: r.get("completed_at") or "", reverse=True)
return rows[:limit]
# ---------- /data health snapshot (for About / debugging) -----------------
def data_root_present() -> bool:
return os.path.isdir(PENDING_DIR)