File size: 3,906 Bytes
71be8a5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4d728d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71be8a5
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Frontend-side POSIX readers for `/data`.

Frontend is read-mostly: it lists queue directories and reads small JSON
records. The only write it performs is the atomic ``enqueue_pending`` in
``src.submission.frontend``.
"""
from __future__ import annotations

import json
import logging
import os
from typing import Dict, List, Optional

from src.envs import (
    COMPLETE_DIR,
    PENDING_DIR,
    PROCESSING_DIR,
    PROGRESS_DIR,
    RESULTS_DIR,
)

logger = logging.getLogger(__name__)


def _read_json(path: str) -> Optional[dict]:
    try:
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    except FileNotFoundError:
        return None
    except (OSError, json.JSONDecodeError) as e:
        logger.warning("malformed json %s: %s", path, e)
        return None


def _list_json(dirpath: str) -> List[str]:
    try:
        names = os.listdir(dirpath)
    except FileNotFoundError:
        return []
    return sorted(n for n in names if n.endswith(".json") and not n.startswith("."))


# ---------- queue ----------------------------------------------------------
def list_pending() -> List[dict]:
    out = []
    for name in _list_json(PENDING_DIR):
        d = _read_json(os.path.join(PENDING_DIR, name))
        if d:
            out.append(d)
    out.sort(key=lambda d: d.get("submitted_at") or "")
    return out


def list_processing() -> List[dict]:
    out = []
    for name in _list_json(PROCESSING_DIR):
        d = _read_json(os.path.join(PROCESSING_DIR, name))
        if d:
            out.append(d)
    return out


def list_complete(limit: int = 100) -> List[dict]:
    names = _list_json(COMPLETE_DIR)
    # most-recent first by mtime (cheap, no need to read JSON twice)
    items = []
    for name in names:
        p = os.path.join(COMPLETE_DIR, name)
        try:
            mtime = os.path.getmtime(p)
        except OSError:
            continue
        items.append((mtime, p))
    items.sort(reverse=True)
    out = []
    for _, p in items[:limit]:
        d = _read_json(p)
        if d:
            out.append(d)
    return out


def read_progress(sid: str) -> Optional[dict]:
    return _read_json(os.path.join(PROGRESS_DIR, f"{sid}.json"))


# ---------- results / leaderboard -----------------------------------------
def list_task_results(task_name: str) -> List[dict]:
    task_dir = os.path.join(RESULTS_DIR, task_name)
    out: List[dict] = []
    for name in _list_json(task_dir):
        if not name.endswith("_best.json"):
            continue
        d = _read_json(os.path.join(task_dir, name))
        if d:
            out.append(d)
    return out


def list_task_history(task_name: str, limit: int = 500) -> List[dict]:
    """Read all monthly ``_history.YYYY-MM.jsonl`` files for ``task_name``.

    Returns at most ``limit`` records, newest first (by ``completed_at``).
    """
    task_dir = os.path.join(RESULTS_DIR, task_name)
    try:
        names = os.listdir(task_dir)
    except FileNotFoundError:
        return []
    files = sorted(
        n for n in names
        if n.startswith("_history.") and n.endswith(".jsonl") and not n.startswith(".")
    )
    rows: List[dict] = []
    for name in files:
        path = os.path.join(task_dir, name)
        try:
            with open(path, "r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    try:
                        rows.append(json.loads(line))
                    except json.JSONDecodeError:
                        continue
        except OSError:
            continue
    rows.sort(key=lambda r: r.get("completed_at") or "", reverse=True)
    return rows[:limit]


# ---------- /data health snapshot (for About / debugging) -----------------
def data_root_present() -> bool:
    return os.path.isdir(PENDING_DIR)