File size: 6,599 Bytes
568b349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a16262
 
 
 
 
 
 
 
 
 
 
 
 
568b349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a16262
568b349
 
 
eb27a8c
 
 
 
 
6a16262
 
eb27a8c
 
 
 
568b349
 
 
 
 
eb27a8c
 
568b349
 
 
 
 
 
 
 
 
 
 
 
eb27a8c
d634ebf
 
 
 
 
eb27a8c
 
 
 
568b349
 
eb27a8c
 
568b349
 
 
 
 
 
 
 
 
d634ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
568b349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a16262
 
568b349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eb27a8c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
568b349
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional

from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles


BASE_DIR = Path(__file__).resolve().parent
ASSETS_DIR = BASE_DIR / "web_debug"
LOGS_DIR = Path(os.getenv("LOGS_DIR") or os.getenv("TELEMETRY_DIR") or (BASE_DIR / "logs")).resolve()

app = FastAPI(title="Werewolf Debug UI", docs_url=None, redoc_url=None)
app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets")


def _ts_value(raw: Any) -> float:
    if raw is None:
        return 0.0
    if isinstance(raw, (int, float)):
        return float(raw)
    if isinstance(raw, str):
        try:
            return float(raw)
        except Exception:
            return 0.0
    return 0.0


def _safe_session_path(session_id: str) -> Path:
    if not session_id or "/" in session_id or "\\" in session_id:
        raise HTTPException(status_code=400, detail="invalid session id")
    session_path = (LOGS_DIR / session_id).resolve()
    if not str(session_path).startswith(str(LOGS_DIR)):
        raise HTTPException(status_code=400, detail="invalid session id")
    if not session_path.exists() or not session_path.is_dir():
        raise HTTPException(status_code=404, detail="session not found")
    return session_path


def _iter_jsonl(path: Path) -> Iterable[Dict[str, Any]]:
    try:
        with path.open("r", encoding="utf-8", errors="replace") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    yield json.loads(line)
                except json.JSONDecodeError:
                    continue
    except OSError:
        return


def _collect_events(session_path: Path) -> List[Dict[str, Any]]:
    events: List[Dict[str, Any]] = []
    for file_path in sorted(session_path.rglob("*.jsonl")):
        events.extend(_iter_jsonl(file_path))
    events.sort(key=lambda e: _ts_value(e.get("ts")))
    return events


def _filter_after_ts(events: List[Dict[str, Any]], after_ts: Optional[float]) -> List[Dict[str, Any]]:
    if after_ts is None:
        return events
    out = []
    for e in events:
        ts = _ts_value(e.get("ts"))
        if ts > after_ts:
            out.append(e)
    return out


def _stats_from_events(events: List[Dict[str, Any]]) -> Dict[str, Any]:
    roles: Dict[str, int] = {}
    statuses: Dict[str, int] = {}
    fallback = 0
    invalid = 0
    interact = 0
    perceive = 0
    for e in events:
        role = str(e.get("role") or "")
        status = str(e.get("status") or "")
        if role:
            roles[role] = roles.get(role, 0) + 1
        if status:
            statuses[status] = statuses.get(status, 0) + 1
        if e.get("fallback_used"):
            fallback += 1
        parse_valid = e.get("parse_valid")
        if parse_valid is False:
            invalid += 1
        kind = str(e.get("kind") or "")
        if not kind:
            if e.get("final_result") is not None or "final_result" in e:
                kind = "interact"
            elif e.get("event") is not None or "event" in e:
                kind = "perceive"
        if kind == "interact":
            interact += 1
        elif kind == "perceive":
            perceive += 1
    return {
        "total": len(events),
        "interact": interact,
        "perceive": perceive,
        "fallback": fallback,
        "invalid": invalid,
        "roles": roles,
        "statuses": statuses,
    }


@app.get("/", response_class=HTMLResponse)
def index() -> str:
    html = (ASSETS_DIR / "index.html").read_text(encoding="utf-8")
    try:
        v = int(
            max(
                (ASSETS_DIR / "index.html").stat().st_mtime,
                (ASSETS_DIR / "app.js").stat().st_mtime,
                (ASSETS_DIR / "styles.css").stat().st_mtime,
            )
        )
    except Exception:
        v = 1
    html = html.replace("./assets/styles.css", f"./assets/styles.css?v={v}")
    html = html.replace("./assets/app.js", f"./assets/app.js?v={v}")
    return html


@app.get("/api/sessions")
def list_sessions() -> JSONResponse:
    sessions = []
    if not LOGS_DIR.exists():
        return JSONResponse({"sessions": []})
    for entry in LOGS_DIR.iterdir():
        if not entry.is_dir():
            continue
        roles = sorted([p.name for p in entry.iterdir() if p.is_dir()])
        jsonl_files = list(entry.rglob("*.jsonl"))
        event_count = 0
        last_ts: Optional[float] = None
        for file_path in jsonl_files:
            for record in _iter_jsonl(file_path):
                event_count += 1
                ts = _ts_value(record.get("ts"))
                if ts:
                    last_ts = ts if last_ts is None else max(last_ts, ts)
        sessions.append(
            {
                "id": entry.name,
                "roles": roles,
                "events": event_count,
                "updated_at": last_ts or entry.stat().st_mtime,
                "files": len(jsonl_files),
            }
        )
    sessions.sort(key=lambda s: s.get("updated_at", 0), reverse=True)
    return JSONResponse({"sessions": sessions})


@app.get("/api/session/{session_id}")
def session_detail(session_id: str, limit: int = Query(default=5000, ge=1, le=50000)) -> JSONResponse:
    session_path = _safe_session_path(session_id)
    events = _collect_events(session_path)
    if len(events) > limit:
        events = events[-limit:]
    stats = _stats_from_events(events)
    return JSONResponse(
        {
            "session_id": session_id,
            "events": events,
            "stats": stats,
        }
    )


@app.get("/api/session/{session_id}/delta")
def session_delta(
    session_id: str,
    after_ts: Optional[float] = Query(default=None),
    limit: int = Query(default=2000, ge=1, le=20000),
) -> JSONResponse:
    session_path = _safe_session_path(session_id)
    events = _collect_events(session_path)
    events = _filter_after_ts(events, after_ts)
    if len(events) > limit:
        events = events[-limit:]
    stats = _stats_from_events(_collect_events(session_path))
    return JSONResponse(
        {
            "session_id": session_id,
            "events": events,
            "stats": stats,
        }
    )


if __name__ == "__main__":
    import uvicorn

    port = int(os.getenv("DEBUG_UI_PORT", "8009"))
    uvicorn.run("debug_ui:app", host="127.0.0.1", port=port, reload=False)