Spaces:
Running
Running
File size: 6,438 Bytes
e5b81b3 7e41b02 e5b81b3 7e41b02 e5b81b3 7e41b02 e5b81b3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 | #!/usr/bin/env python3
"""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β QUASAR RANKER β LOGS REST API β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Flask blueprint for exposing ranker logs via REST endpoints. β
β VERSION: v1.0 | 2026-03-26 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
"""
from flask import Blueprint, jsonify, request, send_file
from typing import Optional
from pathlib import Path
import re as _re
import json as _json
ranker_logs_bp = Blueprint("ranker_logs", __name__, url_prefix="/api/ranker/logs")
_logger: Optional[object] = None
# ββ Training-metrics extractor ββββββββββββββββββββββββββββββββββββββββββββββββ
# Parses the structured training line emitted by ranker_logging.training_update():
# [ts] | DEBUG | TRAINING | step=1 | loss=0.1234 | lr=0.000100 | assets=7
_TRAINING_RE = _re.compile(
r'step=(\d+)\s*\|\s*loss=([\d.]+)\s*\|\s*lr=([\d.eE+\-]+)\s*\|\s*assets=(\d+)'
)
_JSON_BLOB_RE = _re.compile(r'(\{.*\})\s*$')
def _enrich_entry(entry: dict) -> dict:
"""
Attach a parsed `data` dict to TRAINING log entries so the dashboard
KPI cards (Loss / LR / Step / Assets) always have structured values.
Works from both the pipe-delimited text and any trailing JSON metadata blob.
"""
if entry.get("category", "").upper() != "TRAINING":
return entry
if entry.get("data"): # already enriched (e.g. from in-memory logger)
return entry
msg = entry.get("message", "")
# 1. Try pipe-delimited format
m = _TRAINING_RE.search(msg)
if m:
entry["data"] = {
"step": int(m.group(1)),
"loss": float(m.group(2)),
"lr": float(m.group(3)),
"asset_count": int(m.group(4)),
}
return entry
# 2. Fallback: trailing JSON blob
jm = _JSON_BLOB_RE.search(msg)
if jm:
try:
blob = _json.loads(jm.group(1))
if "step" in blob:
entry["data"] = {
"step": blob.get("step", 0),
"loss": blob.get("loss", 0.0),
"lr": blob.get("lr", 0.0),
"asset_count": blob.get("asset_count", blob.get("assets", 0)),
}
except (ValueError, KeyError):
pass
return entry
def init_ranker_logs_api(ranker_logger):
"""Call this from hub_dashboard_service.py during initialization."""
global _logger
_logger = ranker_logger
@ranker_logs_bp.route("/recent", methods=["GET"])
def get_recent_logs():
"""GET /api/ranker/logs/recent?limit=50&category=signal"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 50))
category = request.args.get("category")
entries = _logger.get_recent(n=limit, category=category)
entries = [_enrich_entry(e) for e in entries]
return jsonify({
"logs": entries,
"count": len(entries),
"stats": _logger.get_stats(),
})
@ranker_logs_bp.route("/asset/<asset>", methods=["GET"])
def get_asset_logs(asset: str):
"""GET /api/ranker/logs/asset/V75?limit=30"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 30))
entries = _logger.get_by_asset(asset, n=limit)
return jsonify({
"asset": asset,
"logs": entries,
"count": len(entries),
})
@ranker_logs_bp.route("/level/<level>", methods=["GET"])
def get_level_logs(level: str):
"""GET /api/ranker/logs/level/ERROR?limit=50"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 50))
entries = _logger.get_by_level(level, n=limit)
return jsonify({
"level": level.upper(),
"logs": entries,
"count": len(entries),
})
@ranker_logs_bp.route("/stats", methods=["GET"])
def get_log_stats():
"""GET /api/ranker/logs/stats"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
return jsonify(_logger.get_stats())
@ranker_logs_bp.route("/export", methods=["GET"])
def export_logs():
"""GET /api/ranker/logs/export?limit=500 β download JSON"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 500))
export_path = Path("/tmp/ranker_logs_export.json")
try:
_logger.export_json(str(export_path), n=limit)
return send_file(
export_path,
mimetype="application/json",
as_attachment=True,
download_name="ranker_logs_export.json"
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@ranker_logs_bp.route("/clear", methods=["POST"])
def clear_logs():
"""POST /api/ranker/logs/clear β Clear in-memory buffer"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
try:
_logger.clear_buffer()
return jsonify({"status": "cleared"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@ranker_logs_bp.errorhandler(404)
def not_found(error):
return jsonify({"error": "Endpoint not found"}), 404 |