oracle / scripts /cache_debug_web.py
zirobtc's picture
Upload folder using huggingface_hub
a547253 verified
#!/usr/bin/env python3
"""
Minimal web viewer for cached context samples.
Loads one cache sample with the same torch.load path used by dump_cache_sample.py
and renders:
- top-level debug metadata
- cached Chart_Segment line view
- quant window boundaries
- per-window level/pattern summaries
- full per-window feature maps
Usage:
/venv/main/bin/python scripts/cache_debug_web.py
/venv/main/bin/python scripts/cache_debug_web.py --cache_dir data/cache --port 8765
/venv/main/bin/python scripts/cache_debug_web.py --file data/cache/sample_ABC.pt
"""
import argparse
import json
import os
import random
import sys
from collections import Counter
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, urlparse
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import numpy as np
import torch
import pandas as pd
from data.quant_ohlc_feature_schema import FEATURE_NAMES
from signals.support_resistance import compute_support_resistance_debug, compute_support_resistance_features
from signals.trendlines import _fit_with_trendln
from ta.trend import ema_indicator, sma_indicator
def _load_cache_sample(path: Path) -> Dict[str, Any]:
return torch.load(path, map_location="cpu", weights_only=False)
def _safe_float(value: Any) -> float:
try:
return float(value)
except Exception:
return 0.0
def _feature_map(window: Dict[str, Any]) -> Dict[str, float]:
vector = window.get("feature_vector", [])
if not isinstance(vector, list):
return {}
return {
name: _safe_float(vector[idx]) if idx < len(vector) else 0.0
for idx, name in enumerate(FEATURE_NAMES)
}
def _chart_points(chart_event: Dict[str, Any]) -> List[Dict[str, float]]:
raw_opens = chart_event.get("raw_opens")
raw_closes = chart_event.get("raw_closes")
if isinstance(raw_opens, list) and isinstance(raw_closes, list) and raw_closes:
opens = raw_opens
closes = raw_closes
else:
opens_logged = chart_event.get("opens", []) or []
closes_logged = chart_event.get("closes", []) or []
opens = [float(np.exp(v)) for v in opens_logged]
closes = [float(np.exp(v)) for v in closes_logged]
end_ts = int(chart_event.get("timestamp", 0) or 0)
if not closes:
return []
interval_str = str(chart_event.get("i", "1s"))
try:
interval_seconds = max(1, int(interval_str.rstrip("s")))
except Exception:
interval_seconds = 1
start_ts = end_ts - interval_seconds * (len(closes) - 1)
points: List[Dict[str, float]] = []
for idx, (open_value, close_value) in enumerate(zip(opens, closes)):
ts = start_ts + idx * interval_seconds
high_value = max(open_value, close_value)
low_value = min(open_value, close_value)
points.append({
"time": int(ts),
"open": _safe_float(open_value),
"high": _safe_float(high_value),
"low": _safe_float(low_value),
"close": _safe_float(close_value),
"index": idx,
})
return points
def _compute_level_overlays(points: List[Dict[str, float]], windows: List[Dict[str, Any]]) -> Dict[str, Any]:
del windows
if not points:
return {"support_levels": [], "resistance_levels": []}
closes = [_safe_float(p["close"]) for p in points]
highs = [_safe_float(p["high"]) for p in points]
lows = [_safe_float(p["low"]) for p in points]
timestamps = [int(p["time"]) for p in points]
debug = compute_support_resistance_debug(
closes=closes,
highs=highs,
lows=lows,
timestamps=timestamps,
)
support_levels = debug.get("support_levels", []) or debug.get("all_support_levels", [])
resistance_levels = debug.get("resistance_levels", []) or debug.get("all_resistance_levels", [])
return {
"support_levels": support_levels,
"resistance_levels": resistance_levels,
"all_support_levels": debug.get("all_support_levels", []),
"all_resistance_levels": debug.get("all_resistance_levels", []),
"sr_available": debug.get("sr_available", 0.0),
}
def _compute_trendline_overlays(points: List[Dict[str, float]]) -> List[Dict[str, Any]]:
if len(points) < 5:
return []
closes = np.asarray([p["close"] for p in points], dtype=np.float64)
highs = np.asarray([p["high"] for p in points], dtype=np.float64)
lows = np.asarray([p["low"] for p in points], dtype=np.float64)
out: List[Dict[str, Any]] = []
try:
lower_line, upper_line = _fit_with_trendln(closes)
except Exception:
lower_line, upper_line = None, None
if lower_line is None:
try:
lower_line, _ = _fit_with_trendln(lows)
except Exception:
lower_line = None
if upper_line is None:
try:
_, upper_line = _fit_with_trendln(highs)
except Exception:
upper_line = None
def _line_payload(name: str, line: Any, color: str) -> Optional[Dict[str, Any]]:
if line is None:
return None
slope, intercept = line
x0, x1 = 0, len(points) - 1
y0 = slope * x0 + intercept
y1 = slope * x1 + intercept
return {
"name": name,
"color": color,
"points": [
{"time": points[x0]["time"], "value": _safe_float(y0)},
{"time": points[x1]["time"], "value": _safe_float(y1)},
],
}
lower_payload = _line_payload("lower_trendline", lower_line, "#0f766e")
upper_payload = _line_payload("upper_trendline", upper_line, "#b91c1c")
if lower_payload:
out.append(lower_payload)
if upper_payload:
out.append(upper_payload)
return out
def _compute_window_boundaries(points: List[Dict[str, float]], windows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not points:
return []
min_value = min(point["low"] for point in points)
max_value = max(point["high"] for point in points)
out: List[Dict[str, Any]] = []
for idx, window in enumerate(windows):
end_ts = int(window.get("end_ts", 0) or 0)
breakout_active = any(
_safe_float((window.get("keylevel_flags", {}) or {}).get(flag, 0.0)) > 0.0
for flag in ("breakout_up", "breakout_down", "flip_to_support", "flip_to_resistance")
)
out.append({
"name": f"window_{idx}",
"window_idx": idx,
"color": "#8b1e3f" if breakout_active else "#c84c2d",
"points": [
{"time": end_ts, "value": _safe_float(min_value)},
{"time": end_ts, "value": _safe_float(max_value)},
],
})
return out
def _compute_indicator_overlays(points: List[Dict[str, float]]) -> List[Dict[str, Any]]:
if not points:
return []
closes = pd.Series([_safe_float(point["close"]) for point in points], dtype="float64")
ema_fast = ema_indicator(closes, window=8, fillna=True)
ema_medium = ema_indicator(closes, window=21, fillna=True)
sma_fast = sma_indicator(closes, window=8, fillna=True)
sma_medium = sma_indicator(closes, window=21, fillna=True)
def _series_payload(name: str, series: pd.Series, color: str, dash: str = "solid") -> Dict[str, Any]:
return {
"name": name,
"color": color,
"dash": dash,
"points": [
{"time": points[idx]["time"], "value": _safe_float(value)}
for idx, value in enumerate(series.tolist())
],
}
return [
_series_payload("ema_fast_8", ema_fast, "#2563eb"),
_series_payload("ema_medium_21", ema_medium, "#7c3aed"),
_series_payload("sma_fast_8", sma_fast, "#ea580c", "dot"),
_series_payload("sma_medium_21", sma_medium, "#0891b2", "dot"),
]
def _recompute_window_keylevel_flags(
points: List[Dict[str, float]],
windows: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
if not points or not windows:
return windows
closes = [_safe_float(point["close"]) for point in points]
highs = [_safe_float(point["high"]) for point in points]
lows = [_safe_float(point["low"]) for point in points]
timestamps = [int(point["time"]) for point in points]
time_to_idx = {timestamp: idx for idx, timestamp in enumerate(timestamps)}
updated: List[Dict[str, Any]] = []
for window in windows:
start_ts = int(window.get("start_ts", 0) or 0)
end_ts = int(window.get("end_ts", 0) or 0)
if end_ts not in time_to_idx:
updated.append(window)
continue
end_idx = time_to_idx[end_ts]
start_idx = time_to_idx.get(start_ts, max(0, end_idx))
if start_idx > end_idx:
start_idx = end_idx
sr_features = compute_support_resistance_features(
closes=closes,
highs=highs,
lows=lows,
end_idx=end_idx,
window_start=start_idx,
window_end=end_idx + 1,
timestamps=timestamps,
)
keylevel_flags = {
"breakout_up": sr_features.get("keylevel_breakout_up", 0.0),
"breakout_down": sr_features.get("keylevel_breakout_down", 0.0),
"hold_above": sr_features.get("keylevel_hold_above", 0.0),
"hold_below": sr_features.get("keylevel_hold_below", 0.0),
"failed_breakout_up": sr_features.get("keylevel_failed_breakout_up", 0.0),
"failed_breakout_down": sr_features.get("keylevel_failed_breakout_down", 0.0),
"flip_to_support": sr_features.get("keylevel_flip_to_support", 0.0),
"flip_to_resistance": sr_features.get("keylevel_flip_to_resistance", 0.0),
}
top_signal_name = "none"
for signal_name in (
"breakout_up",
"breakout_down",
"flip_to_support",
"flip_to_resistance",
"failed_breakout_up",
"failed_breakout_down",
):
if _safe_float(keylevel_flags.get(signal_name, 0.0)) > 0.0:
top_signal_name = signal_name
break
updated_window = dict(window)
updated_window["keylevel_flags"] = keylevel_flags
updated_window["top_signal_name"] = top_signal_name
updated.append(updated_window)
return updated
def _compute_keylevel_signal_overlays(
points: List[Dict[str, float]],
windows: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
if not points or not windows:
return []
time_to_point = {int(point["time"]): point for point in points}
signal_specs = {
"breakout_up": {"color": "#15803d", "symbol": "triangle-up", "y_key": "high"},
"breakout_down": {"color": "#b91c1c", "symbol": "triangle-down", "y_key": "low"},
"flip_to_support": {"color": "#1d4ed8", "symbol": "diamond", "y_key": "close"},
"flip_to_resistance": {"color": "#7c2d12", "symbol": "diamond", "y_key": "close"},
"failed_breakout_up": {"color": "#ea580c", "symbol": "x", "y_key": "high"},
"failed_breakout_down": {"color": "#9333ea", "symbol": "x", "y_key": "low"},
}
overlays: List[Dict[str, Any]] = []
for window in windows:
end_ts = int(window.get("end_ts", 0) or 0)
point = time_to_point.get(end_ts)
if point is None:
continue
flags = window.get("keylevel_flags", {}) or {}
for signal_name, spec in signal_specs.items():
if _safe_float(flags.get(signal_name, 0.0)) <= 0.0:
continue
y_value = _safe_float(point.get(spec["y_key"], point["close"]))
if spec["y_key"] == "high":
y_value *= 1.003
elif spec["y_key"] == "low":
y_value *= 0.997
overlays.append({
"name": signal_name,
"time": end_ts,
"value": y_value,
"color": spec["color"],
"symbol": spec["symbol"],
"window_idx": int(window.get("idx", -1)),
})
return overlays
def _sample_to_payload(sample: Dict[str, Any], source_file: Path) -> Dict[str, Any]:
event_sequence = sample.get("event_sequence", [])
event_counts = Counter(event.get("event_type", "Unknown") for event in event_sequence)
chart_events = [event for event in event_sequence if event.get("event_type") == "Chart_Segment"]
chart_event = chart_events[0] if chart_events else {}
quant_windows = chart_event.get("quant_ohlc_features", []) or []
windows_payload: List[Dict[str, Any]] = []
for idx, window in enumerate(quant_windows):
feature_map = _feature_map(window)
windows_payload.append({
"idx": idx,
"start_ts": window.get("start_ts"),
"end_ts": window.get("end_ts"),
"window_seconds": window.get("window_seconds"),
"level_snapshot": window.get("level_snapshot", {}) or {},
"keylevel_flags": window.get("keylevel_flags", {}) or {},
"top_signal_name": "none",
"feature_map": feature_map,
"sr_available": _safe_float(feature_map.get("sr_available", 0.0)),
"trendline_available": _safe_float(feature_map.get("trendline_available", 0.0)),
})
chart_points = _chart_points(chart_event) if chart_event else []
windows_payload = _recompute_window_keylevel_flags(chart_points, windows_payload)
level_overlays = _compute_level_overlays(chart_points, windows_payload)
trendline_overlays = _compute_trendline_overlays(chart_points)
boundary_overlays = _compute_window_boundaries(chart_points, windows_payload)
indicator_overlays = _compute_indicator_overlays(chart_points)
signal_overlays = _compute_keylevel_signal_overlays(chart_points, windows_payload)
return {
"source_file": str(source_file),
"sample": {
"token_address": sample.get("token_address"),
"source_token": sample.get("source_token"),
"sample_idx": sample.get("sample_idx"),
"class_id": sample.get("class_id"),
"context_bucket": sample.get("context_bucket"),
"context_score": sample.get("context_score"),
"quality_score": _safe_float(sample.get("quality_score", 0.0)),
"t_cutoff": sample.get("t_cutoff"),
"labels": sample.get("labels").tolist() if hasattr(sample.get("labels"), "tolist") else sample.get("labels"),
"labels_mask": sample.get("labels_mask").tolist() if hasattr(sample.get("labels_mask"), "tolist") else sample.get("labels_mask"),
"event_counts": dict(event_counts),
"n_events": len(event_sequence),
"n_wallets": len(sample.get("wallets", {})),
"n_tokens": len(sample.get("tokens", {})),
"n_graph_link_types": len(sample.get("graph_links", {})),
},
"chart": {
"present": bool(chart_event),
"timestamp": chart_event.get("timestamp"),
"relative_ts": chart_event.get("relative_ts"),
"interval": chart_event.get("i"),
"opens": chart_event.get("opens", []) or [],
"closes": chart_event.get("closes", []) or [],
"windows": windows_payload,
"points": chart_points,
"overlays": {
"levels": level_overlays,
"trendlines": trendline_overlays,
"boundaries": boundary_overlays,
"indicators": indicator_overlays,
"signals": signal_overlays,
},
},
}
HTML = """<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Cache Debug</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
:root {
--bg: #f4efe7;
--panel: #fffaf2;
--ink: #1f1a17;
--muted: #6f645b;
--line: #d7ccbf;
--accent: #c84c2d;
--accent2: #0f766e;
--warn: #8b1e3f;
--chart: #1f1a17;
--chart2: #6a8a82;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: Georgia, "Times New Roman", serif;
background: radial-gradient(circle at top left, #fffdf8, var(--bg) 46%, #efe4d4 100%);
color: var(--ink);
}
header, main { max-width: 1420px; margin: 0 auto; padding: 18px 20px; }
header { display: flex; gap: 12px; align-items: center; justify-content: space-between; }
h1 { margin: 0; font-size: 28px; letter-spacing: -0.03em; }
.sub { color: var(--muted); font-size: 14px; }
.actions { display: flex; gap: 10px; align-items: center; }
button, input {
font: inherit;
border: 1px solid var(--line);
background: var(--panel);
color: var(--ink);
border-radius: 999px;
padding: 9px 14px;
}
button { cursor: pointer; }
button:hover { border-color: var(--accent); }
main { display: grid; gap: 18px; }
.grid {
display: grid;
grid-template-columns: 340px minmax(0, 1fr);
gap: 18px;
align-items: start;
}
.panel {
background: color-mix(in srgb, var(--panel) 88%, white);
border: 1px solid var(--line);
border-radius: 22px;
box-shadow: 0 14px 50px rgba(31, 26, 23, 0.06);
}
.panel h2, .panel h3 { margin: 0 0 12px; }
.meta { padding: 18px; }
.kv {
display: grid;
grid-template-columns: 120px 1fr;
gap: 8px 10px;
font-size: 14px;
margin-bottom: 16px;
}
.kv div:nth-child(odd) { color: var(--muted); }
.badge {
display: inline-block;
border: 1px solid var(--line);
border-radius: 999px;
padding: 4px 10px;
margin: 0 6px 6px 0;
font-size: 12px;
background: #fff;
}
.main-panel { padding: 18px; }
#chart-wrap { border: 1px solid var(--line); border-radius: 18px; background: #fff; overflow: hidden; padding: 6px; }
#chart-root { width: 100%; height: 460px; }
.legend { display: flex; flex-wrap: wrap; gap: 10px; margin-top: 12px; color: var(--muted); font-size: 13px; }
.legend span::before { content: ""; display: inline-block; width: 16px; height: 2px; margin-right: 6px; vertical-align: middle; background: var(--ink); }
.legend .close::before { background: var(--chart); }
.legend .open::before { background: var(--chart2); }
.legend .boundary::before { background: var(--accent); }
.legend .pattern::before { background: var(--warn); }
.window-layout {
display: grid;
grid-template-columns: minmax(0, 1fr) 380px;
gap: 18px;
align-items: start;
}
table {
width: 100%;
border-collapse: collapse;
font-size: 13px;
}
th, td {
padding: 8px 10px;
border-bottom: 1px solid var(--line);
text-align: left;
vertical-align: top;
}
tr:hover { background: rgba(200, 76, 45, 0.05); cursor: pointer; }
tr.active { background: rgba(200, 76, 45, 0.11); }
.detail {
padding: 14px;
border: 1px solid var(--line);
border-radius: 18px;
background: #fff;
position: sticky;
top: 18px;
}
.detail pre {
margin: 0;
white-space: pre-wrap;
word-break: break-word;
font-size: 12px;
color: var(--ink);
}
.muted { color: var(--muted); }
.toggles {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin: 0 0 12px;
font-size: 13px;
color: var(--muted);
}
.toggles label {
display: inline-flex;
align-items: center;
gap: 6px;
padding: 6px 10px;
border: 1px solid var(--line);
border-radius: 999px;
background: #fff;
cursor: pointer;
}
@media (max-width: 1080px) {
.grid, .window-layout { grid-template-columns: 1fr; }
.detail { position: static; }
}
</style>
</head>
<body>
<header>
<div>
<h1>Cache Debug Console</h1>
<div class="sub">Render one cached sample, its chart segment, quant-window boundaries, keylevels, breakouts, and feature payloads.</div>
</div>
<div class="actions">
<input id="file-input" placeholder="sample_123.pt or absolute path">
<button id="load-file">Load File</button>
<button id="random-btn">Random Sample</button>
</div>
</header>
<main>
<div class="grid">
<section class="panel meta">
<h2>Sample</h2>
<div id="meta"></div>
<h3>Event Counts</h3>
<div id="events"></div>
</section>
<section class="panel main-panel">
<h2>Chart Segment</h2>
<div class="toggles">
<label><input type="checkbox" id="toggle-indicators" checked> Indicators</label>
<label><input type="checkbox" id="toggle-levels"> Levels</label>
<label><input type="checkbox" id="toggle-trendlines"> Trendlines</label>
<label><input type="checkbox" id="toggle-boundaries" checked> Boundaries</label>
</div>
<div id="chart-wrap">
<div id="chart-root"></div>
</div>
<div class="legend">
<span class="close">Price</span>
<span class="open">Levels</span>
<span class="boundary">Quant Window Boundary</span>
<span class="pattern">Break / Trendline</span>
</div>
</section>
</div>
<section class="panel main-panel">
<h2>Quant Windows</h2>
<div class="window-layout">
<div style="overflow:auto;">
<table>
<thead>
<tr>
<th>#</th>
<th>Range</th>
<th>SR</th>
<th>Trend</th>
<th>Top Signal</th>
<th>Support</th>
<th>Resistance</th>
</tr>
</thead>
<tbody id="windows-body"></tbody>
</table>
</div>
<div class="detail">
<h3 id="detail-title">Window Detail</h3>
<pre id="detail-pre">Select a quant window.</pre>
</div>
</div>
</section>
</main>
<script src="https://cdn.plot.ly/plotly-2.35.2.min.js"></script>
<script>
const chartRoot = document.getElementById("chart-root");
const meta = document.getElementById("meta");
const events = document.getElementById("events");
const body = document.getElementById("windows-body");
const detailTitle = document.getElementById("detail-title");
const detailPre = document.getElementById("detail-pre");
let currentPayload = null;
let activeWindow = -1;
const layerState = {
indicators: true,
levels: false,
trendlines: false,
boundaries: true,
};
function q(path) {
return fetch(path).then(r => {
if (!r.ok) throw new Error("HTTP " + r.status);
return r.json();
});
}
function fmt(v) {
if (v === null || v === undefined) return "n/a";
if (typeof v === "number") return Number(v).toFixed(4);
return String(v);
}
function renderMeta(data) {
const s = data.sample;
meta.innerHTML = `
<div class="kv">
<div>file</div><div>${data.source_file}</div>
<div>token</div><div>${s.token_address || "n/a"}</div>
<div>source token</div><div>${s.source_token || "n/a"}</div>
<div>sample idx</div><div>${fmt(s.sample_idx)}</div>
<div>class</div><div>${fmt(s.class_id)}</div>
<div>bucket</div><div>${fmt(s.context_bucket)}</div>
<div>context score</div><div>${fmt(s.context_score)}</div>
<div>quality score</div><div>${fmt(s.quality_score)}</div>
<div>t cutoff</div><div>${fmt(s.t_cutoff)}</div>
<div>events</div><div>${fmt(s.n_events)}</div>
<div>wallets</div><div>${fmt(s.n_wallets)}</div>
<div>tokens</div><div>${fmt(s.n_tokens)}</div>
<div>graph types</div><div>${fmt(s.n_graph_link_types)}</div>
<div>labels</div><div>${JSON.stringify(s.labels)}</div>
<div>mask</div><div>${JSON.stringify(s.labels_mask)}</div>
</div>
`;
events.innerHTML = Object.entries(s.event_counts)
.sort((a, b) => b[1] - a[1])
.map(([k, v]) => `<span class="badge">${k}: ${v}</span>`)
.join("");
}
function clearChart() {
Plotly.purge(chartRoot);
chartRoot.innerHTML = "";
}
function renderChart(data) {
clearChart();
const chart = data.chart;
if (!chart.present || !(chart.points || []).length) {
chartRoot.innerHTML = `<div style="padding:24px;color:#6f645b;">No Chart_Segment found in this sample.</div>`;
return;
}
const xs = chart.points.map((_, idx) => idx);
const hoverTexts = chart.points.map((p, idx) =>
`idx=${idx}<br>ts=${new Date(p.time * 1000).toISOString()}<br>open=${p.open.toFixed(4)}<br>close=${p.close.toFixed(4)}`
);
const traces = [{
type: 'scatter',
mode: 'lines+markers',
x: xs,
y: chart.points.map(p => p.close),
line: { color: '#1f1a17', width: 3 },
marker: { color: '#1f1a17', size: 5 },
name: 'close',
text: hoverTexts,
hovertemplate: '%{text}<extra></extra>',
}, {
type: 'scatter',
mode: 'lines',
x: xs,
y: chart.points.map(p => p.open),
line: { color: '#6a8a82', width: 1.5, dash: 'dot' },
name: 'open',
text: hoverTexts,
hovertemplate: '%{text}<extra></extra>',
}];
const timeToIndex = new Map(chart.points.map((p, idx) => [p.time, idx]));
const selectedWindow = activeWindow >= 0 ? chart.windows[activeWindow] : null;
function overlayTrace(name, points, color, width = 2, dash = 'solid', opacity = 1.0) {
return {
type: 'scatter',
mode: 'lines',
x: points.map(p => timeToIndex.has(p.time) ? timeToIndex.get(p.time) : 0),
y: points.map(p => p.value),
line: { color, width, dash },
opacity,
name,
hovertemplate: `${name}<br>idx=%{x}<br>%{y}<extra></extra>`,
};
}
if (layerState.levels) {
(chart.overlays.levels.support_levels || []).forEach((level, idx) => {
const isSelected = activeWindow >= 0 && level.window_idx === activeWindow;
traces.push(overlayTrace(
`support_${idx}`,
[
{ time: chart.points[0].time, value: level.price },
{ time: chart.points[chart.points.length - 1].time, value: level.price },
],
'#064e3b',
isSelected ? 4 : 3,
'dash',
isSelected ? 1.0 : 0.95
));
});
(chart.overlays.levels.resistance_levels || []).forEach((level, idx) => {
const isSelected = activeWindow >= 0 && level.window_idx === activeWindow;
traces.push(overlayTrace(
`resistance_${idx}`,
[
{ time: chart.points[0].time, value: level.price },
{ time: chart.points[chart.points.length - 1].time, value: level.price },
],
'#7f1d1d',
isSelected ? 4 : 3,
'dash',
isSelected ? 1.0 : 0.95
));
});
}
if (layerState.trendlines) {
(chart.overlays.trendlines || []).forEach((overlay) => {
traces.push(overlayTrace(overlay.name, overlay.points, overlay.color, 2.5, 'solid', 0.55));
});
}
if (layerState.indicators) {
(chart.overlays.indicators || []).forEach((overlay) => {
traces.push(overlayTrace(
overlay.name,
overlay.points,
overlay.color,
overlay.name.includes('medium') ? 2.4 : 1.8,
overlay.dash || 'solid',
0.95
));
});
}
const signalGroups = {};
(chart.overlays.signals || []).forEach((signal) => {
if (!signalGroups[signal.name]) signalGroups[signal.name] = [];
signalGroups[signal.name].push(signal);
});
Object.entries(signalGroups).forEach(([signalName, signals]) => {
traces.push({
type: 'scatter',
mode: 'markers+text',
x: signals.map(signal => timeToIndex.has(signal.time) ? timeToIndex.get(signal.time) : chart.points.length - 1),
y: signals.map(signal => signal.value),
text: signals.map(() => signalName),
textposition: signalName.includes('down') ? 'bottom center' : 'top center',
marker: {
color: signals[0].color,
size: 14,
symbol: signals[0].symbol,
line: { color: '#111827', width: 1.5 },
},
name: signalName,
hovertemplate: `${signalName}<br>idx=%{x}<br>%{y}<extra></extra>`,
});
});
const shapes = layerState.boundaries ? (chart.overlays.boundaries || []).map((overlay) => {
const x = timeToIndex.has(overlay.points[0].time) ? timeToIndex.get(overlay.points[0].time) : 0;
const isActive = activeWindow >= 0 && overlay.window_idx === activeWindow;
return {
type: 'line',
x0: x,
x1: x,
y0: Math.min(...chart.points.map(p => p.low)),
y1: Math.max(...chart.points.map(p => p.high)),
line: {
color: overlay.color,
width: isActive ? 3 : 1,
dash: isActive ? 'solid' : 'dot',
},
opacity: isActive ? 0.95 : 0.35,
};
}) : [];
if (selectedWindow) {
const startIdx = Math.max(0, Math.floor((selectedWindow.start_ts - chart.points[0].time) / Math.max(1, chart.points[1] ? chart.points[1].time - chart.points[0].time : 1)));
const endIdx = timeToIndex.has(selectedWindow.end_ts) ? timeToIndex.get(selectedWindow.end_ts) : startIdx;
shapes.push({
type: 'rect',
x0: startIdx,
x1: endIdx,
y0: Math.min(...chart.points.map(p => p.low)),
y1: Math.max(...chart.points.map(p => p.high)),
fillcolor: 'rgba(200, 76, 45, 0.08)',
line: { width: 0 },
layer: 'below',
});
}
const layout = {
margin: { l: 50, r: 18, t: 28, b: 40 },
paper_bgcolor: '#fff',
plot_bgcolor: '#fff',
font: { family: 'Georgia, "Times New Roman", serif', color: '#1f1a17' },
xaxis: {
showgrid: true,
gridcolor: '#efe4d4',
rangeslider: { visible: false },
tickmode: 'array',
tickvals: xs,
ticktext: chart.points.map((p, idx) => idx === 0 || idx === chart.points.length - 1 || idx % 5 === 0
? new Date(p.time * 1000).toLocaleTimeString()
: ''),
},
yaxis: {
showgrid: true,
gridcolor: '#efe4d4',
fixedrange: false,
},
shapes,
showlegend: false,
hovermode: 'closest',
};
Plotly.newPlot(chartRoot, traces, layout, {
displayModeBar: true,
responsive: true,
});
}
function selectWindow(idx) {
activeWindow = idx;
const win = currentPayload.chart.windows[idx];
detailTitle.textContent = `Window ${idx} | ${win.start_ts} -> ${win.end_ts}`;
detailPre.textContent = JSON.stringify(win, null, 2);
Array.from(body.querySelectorAll("tr")).forEach((row, rowIdx) => row.classList.toggle("active", rowIdx === idx));
renderChart(currentPayload);
}
function renderWindows(data) {
body.innerHTML = "";
const windows = data.chart.windows || [];
windows.forEach((win, idx) => {
const tr = document.createElement("tr");
tr.innerHTML = `
<td>${idx}</td>
<td>${win.start_ts} -> ${win.end_ts}<br><span class="muted">${win.window_seconds}s</span></td>
<td>${fmt(win.sr_available)}</td>
<td>${fmt(win.trendline_available)}</td>
<td>${win.top_signal_name}</td>
<td>${fmt((win.level_snapshot || {}).support_distance)}</td>
<td>${fmt((win.level_snapshot || {}).resistance_distance)}</td>
`;
tr.addEventListener("click", () => selectWindow(idx));
body.appendChild(tr);
});
if (windows.length) selectWindow(0);
else detailPre.textContent = "No quant windows present.";
}
function render(data) {
currentPayload = data;
renderMeta(data);
renderChart(data);
renderWindows(data);
}
function loadRandom() {
q("/api/sample?random=1").then(render).catch(err => alert(err));
}
function loadByFile() {
const value = document.getElementById("file-input").value.trim();
if (!value) return;
q("/api/sample?file=" + encodeURIComponent(value)).then(render).catch(err => alert(err));
}
document.getElementById("random-btn").addEventListener("click", loadRandom);
document.getElementById("load-file").addEventListener("click", loadByFile);
document.getElementById("toggle-indicators").addEventListener("change", (e) => { layerState.indicators = e.target.checked; if (currentPayload) renderChart(currentPayload); });
document.getElementById("toggle-levels").addEventListener("change", (e) => { layerState.levels = e.target.checked; if (currentPayload) renderChart(currentPayload); });
document.getElementById("toggle-trendlines").addEventListener("change", (e) => { layerState.trendlines = e.target.checked; if (currentPayload) renderChart(currentPayload); });
document.getElementById("toggle-boundaries").addEventListener("change", (e) => { layerState.boundaries = e.target.checked; if (currentPayload) renderChart(currentPayload); });
loadRandom();
</script>
</body>
</html>
"""
class CacheDebugHandler(BaseHTTPRequestHandler):
cache_dir: Path = Path("data/cache")
fixed_file: Optional[Path] = None
def _json(self, payload: Dict[str, Any], code: int = 200) -> None:
encoded = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
def _html(self, body: str, code: int = 200) -> None:
encoded = body.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
self.wfile.write(encoded)
def _pick_sample(self, qs: Dict[str, List[str]]) -> Path:
if self.fixed_file is not None:
return self.fixed_file
if "file" in qs and qs["file"] and qs["file"][0]:
candidate = Path(qs["file"][0]).expanduser()
if not candidate.is_absolute():
candidate = (Path.cwd() / candidate).resolve()
if not candidate.exists():
raise FileNotFoundError(candidate)
return candidate
files = sorted(self.cache_dir.glob("sample_*.pt"))
if not files:
raise FileNotFoundError(f"No sample_*.pt files found in {self.cache_dir}")
if "index" in qs and qs["index"]:
idx = max(0, min(int(qs["index"][0]), len(files) - 1))
return files[idx]
return random.choice(files)
def do_GET(self) -> None:
parsed = urlparse(self.path)
if parsed.path == "/":
self._html(HTML)
return
if parsed.path == "/api/sample":
try:
source = self._pick_sample(parse_qs(parsed.query))
sample = _load_cache_sample(source)
self._json(_sample_to_payload(sample, source))
except Exception as exc:
self._json({"error": str(exc)}, code=500)
return
self.send_response(404)
self.end_headers()
def main() -> int:
parser = argparse.ArgumentParser(description="View cached samples in a simple web UI.")
parser.add_argument("--cache_dir", type=str, default="data/cache", help="Cache directory containing sample_*.pt")
parser.add_argument("--file", type=str, default=None, help="Optional fixed sample file to always render")
parser.add_argument("--host", type=str, default="127.0.0.1", help="Bind host")
parser.add_argument("--port", type=int, default=8765, help="Bind port")
args = parser.parse_args()
CacheDebugHandler.cache_dir = Path(args.cache_dir)
CacheDebugHandler.fixed_file = Path(args.file).resolve() if args.file else None
server = ThreadingHTTPServer((args.host, args.port), CacheDebugHandler)
print(f"Cache debug viewer running at http://{args.host}:{args.port}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
return 0
if __name__ == "__main__":
raise SystemExit(main())