#!/usr/bin/env python3 """ K1RL QUASAR — Real-Time Monitoring Dashboard (HuggingFace Spaces) Enhanced with visitor tracking, metrics extraction, and container optimization PATCH: 2026-03-09-v4 — full route diagnostics + visitor tracking fix """ from flask import Flask, render_template, jsonify, Response, send_from_directory, request from flask_cors import CORS import subprocess import json import time import os import re from datetime import datetime from collections import deque, defaultdict import psutil import threading import sys import traceback as _tb from pathlib import Path try: from log_metrics_reader import start_log_publisher _log_publisher_available = True except Exception as _lp_import_err: _log_publisher_available = False print(f"⚠️ log_metrics_reader import failed (non-fatal): {_lp_import_err}", file=sys.stderr) # ── Build Flask app FIRST — before any imports that could fail ──────────── app = Flask(__name__) CORS(app) APP_PATCH_VERSION = '2026-03-09-v4' # ── CANARY ROUTE — defined before ANYTHING else ────────────────────────── # If this returns 404, then a DIFFERENT app.py is running. @app.route('/ping') def _ping(): return jsonify({ 'pong': True, 'patch': APP_PATCH_VERSION, 'routes': [rule.rule for rule in app.url_map.iter_rules()], 'timestamp': datetime.now().isoformat() }) # ── Rich visitor tracking (bot-aware, geo-located) ───────────────────────── _vt_err_msg = None _VT_AVAILABLE = False try: import visitor_tracker as _vt _VT_AVAILABLE = True print(f"✅ visitor_tracker loaded (version: {getattr(_vt, 'PATCH_VERSION', 'unknown')})", file=sys.stderr, flush=True) except Exception as _vt_err: _VT_AVAILABLE = False _vt_err_msg = str(_vt_err) print(f"⚠️ visitor_tracker unavailable — using fallback: {_vt_err}", file=sys.stderr, flush=True) # Also print full traceback for debugging _tb.print_exc(file=sys.stderr) # ============================================================================ # CONFIGURATION (HuggingFace Spaces) # ============================================================================ BASE_DIR = Path('/home/user/app') LOG_DIR = BASE_DIR / 'logs' CHECKPOINT_DIR = BASE_DIR / 'checkpoints' STATUS_FILE = Path('/tmp/quasar_status.json') for dir_path in [LOG_DIR, CHECKPOINT_DIR]: dir_path.mkdir(parents=True, exist_ok=True) LOG_FILES = { 'quasar': LOG_DIR / 'quasar_engine.log', 'quasar_engine': LOG_DIR / 'quasar_engine.log', # alias 'features': LOG_DIR / 'features.log', 'rewards': LOG_DIR / 'rewards.log' } # ============================================================================ # VISITOR TRACKING (delegates to visitor_tracker.py when available) # ============================================================================ _fb_active: dict = {} _fb_all_time: set = set() _fb_lock = threading.Lock() _FB_TIMEOUT = 120 def track_visitor(ip, user_agent=''): if _VT_AVAILABLE: _vt.track_visitor(ip, user_agent) else: with _fb_lock: _fb_active[ip] = time.time() _fb_all_time.add(ip) def get_visitor_stats(): if _VT_AVAILABLE: return _vt.get_stats() with _fb_lock: now = time.time() stale = [ip for ip, ts in _fb_active.items() if now - ts > _FB_TIMEOUT] for ip in stale: del _fb_active[ip] return { 'active_now': len(_fb_active), 'active_ips': list(_fb_active.keys()), 'all_time_unique': len(_fb_all_time), 'bots_active': 0, 'bots_total': 0, } # ============================================================================ # METRICS STORAGE # ============================================================================ loss_history = deque(maxlen=50) metrics_history = deque(maxlen=100) log_buffer = deque(maxlen=500) # ============================================================================ # METRIC EXTRACTION # ============================================================================ def extract_latest_metrics(): """Extract latest metrics from quasar log""" try: log_file = LOG_FILES['quasar'] if not log_file.exists(): return None with open(log_file, 'r') as f: lines = f.readlines()[-600:] # v3.0.1: raised from 500 — worst-case AVN gap is 511 lines metrics = { 'training_steps': None, 'actor_loss': None, 'critic_loss': None, 'total_loss': None, 'avn_loss': None, 'avn_accuracy': None, 'buffer_size': None, 'matched_rewards': None, 'unmatched_rewards': None, 'duplicates': None, 'entropy_bonus': None, 'diversity_loss': None, 'entanglement_loss': None, 'timestamp': datetime.now().isoformat() } for line in reversed(lines): # v3.0.1 FIX-C: log never emits dict-format {'actor_loss':...}. # Actual format is label-style: "Actor Loss: -29900.759766" # and inline: "Critic Loss: 0.1234 | Actor Loss: -0.5678" if 'Actor Loss:' in line and not metrics['actor_loss']: m = re.search(r'Actor Loss:\s*([-\d.]+)', line) if m: metrics['actor_loss'] = float(m.group(1)) if 'Critic Loss:' in line and not metrics['critic_loss']: m = re.search(r'Critic Loss:\s*([-\d.]+)', line) if m: metrics['critic_loss'] = float(m.group(1)) if 'Total Loss:' in line and not metrics['total_loss']: m = re.search(r'Total Loss:\s*([-\d.]+)', line) if m: metrics['total_loss'] = float(m.group(1)) if 'buffer_size' in line and not metrics['buffer_size']: m = re.search(r"buffer_size['\"]?\s*:\s*(\d+)", line) if m: metrics['buffer_size'] = int(m.group(1)) if 'avn_training_steps:' in line and not metrics['training_steps']: m = re.search(r'avn_training_steps:\s*(\d+)', line) if m: metrics['training_steps'] = int(m.group(1)) # v3.0.1 FIX-B: was '🎯 Avg Loss:' — wrong emoji (🎯 = Avg Accuracy). # Actual log line uses 📉 emoji. Dropping emoji from check entirely # to be robust to future emoji changes; substring 'Avg Loss:' is unique. if 'Avg Loss:' in line and not metrics['avn_loss']: m = re.search(r'Avg Loss:\s*([-\d.]+)', line) if m: metrics['avn_loss'] = float(m.group(1)) if ('AVN Accuracy:' in line or 'Avg Accuracy:' in line) and metrics['avn_accuracy'] is None: m = re.search(r'(?:AVN|Avg) Accuracy:\s*([\d.]+)%?', line) if m: metrics['avn_accuracy'] = float(m.group(1)) if 'avn_rewards:' in line: if not metrics['matched_rewards']: m = re.search(r'matched=(\d+)', line) if m: metrics['matched_rewards'] = int(m.group(1)) if not metrics['unmatched_rewards']: m = re.search(r'unmatched=(\d+)', line) if m: metrics['unmatched_rewards'] = int(m.group(1)) if not metrics['duplicates']: m = re.search(r'duplicate=(\d+)', line) if m: metrics['duplicates'] = int(m.group(1)) if metrics['actor_loss'] is not None: loss_history.append({ 'actor_loss': metrics['actor_loss'], 'critic_loss': metrics['critic_loss'], 'total_loss': metrics['total_loss'], 'avn_accuracy': metrics['avn_accuracy'], 'timestamp': datetime.now().isoformat() }) return metrics except Exception as e: print(f"Metric extraction error: {e}") return None def get_service_status(): """Get supervisor service status — per-line parsing""" services = {} service_map = { 'quasar_engine': 'quasar_engine', 'dashboard': 'dashboard', 'health_monitor': 'health_monitor', 'redis': 'redis', 'features': 'features', 'rewards': 'rewards', } try: result = subprocess.run( ['supervisorctl', 'status'], capture_output=True, text=True, timeout=5 ) if result.returncode != 0 or not result.stdout.strip(): raise RuntimeError(f"supervisorctl error: {result.stderr.strip()}") for line in result.stdout.splitlines(): parts = line.split() if not parts: continue program_name = parts[0] status_token = parts[1] if len(parts) > 1 else 'UNKNOWN' friendly = service_map.get(program_name) if friendly is None: continue if status_token == 'RUNNING': services[friendly] = 'active' elif status_token in ('STOPPED', 'FATAL', 'EXITED', 'BACKOFF'): services[friendly] = 'inactive' else: services[friendly] = 'unknown' except Exception: services = { 'dashboard': 'active', 'health_monitor': 'unknown', 'redis': 'unknown', 'quasar_engine': 'unknown' } return services def get_system_resources(): """Get system resource usage""" try: cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') quasar_memory = 0 process_count = 0 for proc in psutil.process_iter(['name', 'memory_info', 'cmdline']): try: cmdline = ' '.join(proc.info.get('cmdline', []) or []) if 'quasar' in cmdline.lower() or 'python' in cmdline.lower(): quasar_memory += proc.info['memory_info'].rss / (1024 ** 3) process_count += 1 except Exception: pass return { 'cpu_percent': round(cpu_percent, 1), 'memory_percent': round(memory.percent, 1), 'memory_used_gb': round(memory.used / (1024 ** 3), 2), 'memory_total_gb': round(memory.total / (1024 ** 3), 2), 'memory_available_gb': round(memory.available / (1024 ** 3), 2), 'disk_percent': round(disk.percent, 1), 'disk_used_gb': round(disk.used / (1024 ** 3), 2), 'disk_total_gb': round(disk.total / (1024 ** 3), 2), 'quasar_memory_gb': round(quasar_memory, 2), 'process_count': process_count } except Exception as e: print(f"Resource error: {e}") return None def get_checkpoint_info(): """Get latest checkpoint information""" try: if not CHECKPOINT_DIR.exists(): return None checkpoints = list(CHECKPOINT_DIR.glob('checkpoint_*.pt')) if not checkpoints: return None checkpoints.sort(key=lambda x: x.stat().st_mtime, reverse=True) latest = checkpoints[0] size_mb = latest.stat().st_size / (1024 ** 2) modified = datetime.fromtimestamp(latest.stat().st_mtime) match = re.search(r'step_(\d+)', latest.name) step = int(match.group(1)) if match else None return { 'filename': latest.name, 'size_mb': round(size_mb, 2), 'modified': modified.strftime('%Y-%m-%d %H:%M:%S'), 'step': step, 'count': len(checkpoints) } except Exception as e: print(f"Checkpoint error: {e}") return None # ============================================================================ # REQUEST TRACKING (wrapped so crash never takes down an API response) # ============================================================================ @app.before_request def before_request(): try: ip = request.headers.get('X-Forwarded-For', request.remote_addr) if ip: ip = ip.split(',')[0].strip() user_agent = request.headers.get('User-Agent', '') track_visitor(ip, user_agent) except Exception as e: print(f"⚠️ before_request error (non-fatal): {e}", file=sys.stderr) # ============================================================================ # API ENDPOINTS # ============================================================================ @app.route('/') def index(): dashboard_path = BASE_DIR / 'dashboard.html' if dashboard_path.exists(): return send_from_directory(str(BASE_DIR), 'dashboard.html') return """
System Initializing...
Dashboard components loading...