#!/usr/bin/env python3 """ ╔══════════════════════════════════════════════════════════════════════════════════════╗ ║ QUASAR RANKER — LOGS REST API ║ ║ ────────────────────────────────────────────────────────────────────────────────── ║ ║ Flask blueprint for exposing ranker logs via REST endpoints. ║ ║ VERSION: v1.0 | 2026-03-26 ║ ╚══════════════════════════════════════════════════════════════════════════════════════╝ """ from flask import Blueprint, jsonify, request, send_file from typing import Optional from pathlib import Path import re as _re import json as _json ranker_logs_bp = Blueprint("ranker_logs", __name__, url_prefix="/api/ranker/logs") _logger: Optional[object] = None # ── Training-metrics extractor ──────────────────────────────────────────────── # Parses the structured training line emitted by ranker_logging.training_update(): # [ts] | DEBUG | TRAINING | step=1 | loss=0.1234 | lr=0.000100 | assets=7 _TRAINING_RE = _re.compile( r'step=(\d+)\s*\|\s*loss=([\d.]+)\s*\|\s*lr=([\d.eE+\-]+)\s*\|\s*assets=(\d+)' ) _JSON_BLOB_RE = _re.compile(r'(\{.*\})\s*$') def _enrich_entry(entry: dict) -> dict: """ Attach a parsed `data` dict to TRAINING log entries so the dashboard KPI cards (Loss / LR / Step / Assets) always have structured values. Works from both the pipe-delimited text and any trailing JSON metadata blob. """ if entry.get("category", "").upper() != "TRAINING": return entry if entry.get("data"): # already enriched (e.g. from in-memory logger) return entry msg = entry.get("message", "") # 1. Try pipe-delimited format m = _TRAINING_RE.search(msg) if m: entry["data"] = { "step": int(m.group(1)), "loss": float(m.group(2)), "lr": float(m.group(3)), "asset_count": int(m.group(4)), } return entry # 2. Fallback: trailing JSON blob jm = _JSON_BLOB_RE.search(msg) if jm: try: blob = _json.loads(jm.group(1)) if "step" in blob: entry["data"] = { "step": blob.get("step", 0), "loss": blob.get("loss", 0.0), "lr": blob.get("lr", 0.0), "asset_count": blob.get("asset_count", blob.get("assets", 0)), } except (ValueError, KeyError): pass return entry def init_ranker_logs_api(ranker_logger): """Call this from hub_dashboard_service.py during initialization.""" global _logger _logger = ranker_logger @ranker_logs_bp.route("/recent", methods=["GET"]) def get_recent_logs(): """GET /api/ranker/logs/recent?limit=50&category=signal""" if not _logger: return jsonify({"error": "Logger not initialized"}), 500 limit = int(request.args.get("limit", 50)) category = request.args.get("category") entries = _logger.get_recent(n=limit, category=category) entries = [_enrich_entry(e) for e in entries] return jsonify({ "logs": entries, "count": len(entries), "stats": _logger.get_stats(), }) @ranker_logs_bp.route("/asset/", methods=["GET"]) def get_asset_logs(asset: str): """GET /api/ranker/logs/asset/V75?limit=30""" if not _logger: return jsonify({"error": "Logger not initialized"}), 500 limit = int(request.args.get("limit", 30)) entries = _logger.get_by_asset(asset, n=limit) return jsonify({ "asset": asset, "logs": entries, "count": len(entries), }) @ranker_logs_bp.route("/level/", methods=["GET"]) def get_level_logs(level: str): """GET /api/ranker/logs/level/ERROR?limit=50""" if not _logger: return jsonify({"error": "Logger not initialized"}), 500 limit = int(request.args.get("limit", 50)) entries = _logger.get_by_level(level, n=limit) return jsonify({ "level": level.upper(), "logs": entries, "count": len(entries), }) @ranker_logs_bp.route("/stats", methods=["GET"]) def get_log_stats(): """GET /api/ranker/logs/stats""" if not _logger: return jsonify({"error": "Logger not initialized"}), 500 return jsonify(_logger.get_stats()) @ranker_logs_bp.route("/export", methods=["GET"]) def export_logs(): """GET /api/ranker/logs/export?limit=500 → download JSON""" if not _logger: return jsonify({"error": "Logger not initialized"}), 500 limit = int(request.args.get("limit", 500)) export_path = Path("/tmp/ranker_logs_export.json") try: _logger.export_json(str(export_path), n=limit) return send_file( export_path, mimetype="application/json", as_attachment=True, download_name="ranker_logs_export.json" ) except Exception as e: return jsonify({"error": str(e)}), 500 @ranker_logs_bp.route("/clear", methods=["POST"]) def clear_logs(): """POST /api/ranker/logs/clear — Clear in-memory buffer""" if not _logger: return jsonify({"error": "Logger not initialized"}), 500 try: _logger.clear_buffer() return jsonify({"status": "cleared"}) except Exception as e: return jsonify({"error": str(e)}), 500 @ranker_logs_bp.errorhandler(404) def not_found(error): return jsonify({"error": "Endpoint not found"}), 404