Quasar-Executo / ranker_logs_api.py
KarlQuant's picture
Upload 3 files
7e41b02 verified
#!/usr/bin/env python3
"""
╔══════════════════════════════════════════════════════════════════════════════════════╗
║ QUASAR RANKER — LOGS REST API ║
║ ────────────────────────────────────────────────────────────────────────────────── ║
║ Flask blueprint for exposing ranker logs via REST endpoints. ║
║ VERSION: v1.0 | 2026-03-26 ║
╚══════════════════════════════════════════════════════════════════════════════════════╝
"""
from flask import Blueprint, jsonify, request, send_file
from typing import Optional
from pathlib import Path
import re as _re
import json as _json
ranker_logs_bp = Blueprint("ranker_logs", __name__, url_prefix="/api/ranker/logs")
_logger: Optional[object] = None
# ── Training-metrics extractor ────────────────────────────────────────────────
# Parses the structured training line emitted by ranker_logging.training_update():
# [ts] | DEBUG | TRAINING | step=1 | loss=0.1234 | lr=0.000100 | assets=7
_TRAINING_RE = _re.compile(
r'step=(\d+)\s*\|\s*loss=([\d.]+)\s*\|\s*lr=([\d.eE+\-]+)\s*\|\s*assets=(\d+)'
)
_JSON_BLOB_RE = _re.compile(r'(\{.*\})\s*$')
def _enrich_entry(entry: dict) -> dict:
"""
Attach a parsed `data` dict to TRAINING log entries so the dashboard
KPI cards (Loss / LR / Step / Assets) always have structured values.
Works from both the pipe-delimited text and any trailing JSON metadata blob.
"""
if entry.get("category", "").upper() != "TRAINING":
return entry
if entry.get("data"): # already enriched (e.g. from in-memory logger)
return entry
msg = entry.get("message", "")
# 1. Try pipe-delimited format
m = _TRAINING_RE.search(msg)
if m:
entry["data"] = {
"step": int(m.group(1)),
"loss": float(m.group(2)),
"lr": float(m.group(3)),
"asset_count": int(m.group(4)),
}
return entry
# 2. Fallback: trailing JSON blob
jm = _JSON_BLOB_RE.search(msg)
if jm:
try:
blob = _json.loads(jm.group(1))
if "step" in blob:
entry["data"] = {
"step": blob.get("step", 0),
"loss": blob.get("loss", 0.0),
"lr": blob.get("lr", 0.0),
"asset_count": blob.get("asset_count", blob.get("assets", 0)),
}
except (ValueError, KeyError):
pass
return entry
def init_ranker_logs_api(ranker_logger):
"""Call this from hub_dashboard_service.py during initialization."""
global _logger
_logger = ranker_logger
@ranker_logs_bp.route("/recent", methods=["GET"])
def get_recent_logs():
"""GET /api/ranker/logs/recent?limit=50&category=signal"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 50))
category = request.args.get("category")
entries = _logger.get_recent(n=limit, category=category)
entries = [_enrich_entry(e) for e in entries]
return jsonify({
"logs": entries,
"count": len(entries),
"stats": _logger.get_stats(),
})
@ranker_logs_bp.route("/asset/<asset>", methods=["GET"])
def get_asset_logs(asset: str):
"""GET /api/ranker/logs/asset/V75?limit=30"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 30))
entries = _logger.get_by_asset(asset, n=limit)
return jsonify({
"asset": asset,
"logs": entries,
"count": len(entries),
})
@ranker_logs_bp.route("/level/<level>", methods=["GET"])
def get_level_logs(level: str):
"""GET /api/ranker/logs/level/ERROR?limit=50"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 50))
entries = _logger.get_by_level(level, n=limit)
return jsonify({
"level": level.upper(),
"logs": entries,
"count": len(entries),
})
@ranker_logs_bp.route("/stats", methods=["GET"])
def get_log_stats():
"""GET /api/ranker/logs/stats"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
return jsonify(_logger.get_stats())
@ranker_logs_bp.route("/export", methods=["GET"])
def export_logs():
"""GET /api/ranker/logs/export?limit=500 → download JSON"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
limit = int(request.args.get("limit", 500))
export_path = Path("/tmp/ranker_logs_export.json")
try:
_logger.export_json(str(export_path), n=limit)
return send_file(
export_path,
mimetype="application/json",
as_attachment=True,
download_name="ranker_logs_export.json"
)
except Exception as e:
return jsonify({"error": str(e)}), 500
@ranker_logs_bp.route("/clear", methods=["POST"])
def clear_logs():
"""POST /api/ranker/logs/clear — Clear in-memory buffer"""
if not _logger:
return jsonify({"error": "Logger not initialized"}), 500
try:
_logger.clear_buffer()
return jsonify({"status": "cleared"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@ranker_logs_bp.errorhandler(404)
def not_found(error):
return jsonify({"error": "Endpoint not found"}), 404