#!/usr/bin/env python3 """ K1RL QUASAR — Real-Time Monitoring Dashboard (HuggingFace Spaces) Enhanced with visitor tracking, metrics extraction, and container optimization PATCH: 2026-03-09-v4 — full route diagnostics + visitor tracking fix """ from flask import Flask, render_template, jsonify, Response, send_from_directory, request from flask_cors import CORS import subprocess import json import time import os import re from datetime import datetime from collections import deque, defaultdict import psutil import threading import sys import traceback as _tb from pathlib import Path try: from log_metrics_reader import start_log_publisher _log_publisher_available = True except Exception as _lp_import_err: _log_publisher_available = False print(f"⚠️ log_metrics_reader import failed (non-fatal): {_lp_import_err}", file=sys.stderr) # ── Build Flask app FIRST — before any imports that could fail ──────────── app = Flask(__name__) CORS(app) APP_PATCH_VERSION = '2026-03-09-v4' # ── CANARY ROUTE — defined before ANYTHING else ────────────────────────── # If this returns 404, then a DIFFERENT app.py is running. @app.route('/ping') def _ping(): return jsonify({ 'pong': True, 'patch': APP_PATCH_VERSION, 'routes': [rule.rule for rule in app.url_map.iter_rules()], 'timestamp': datetime.now().isoformat() }) # ── Rich visitor tracking (bot-aware, geo-located) ───────────────────────── _vt_err_msg = None _VT_AVAILABLE = False try: import visitor_tracker as _vt _VT_AVAILABLE = True print(f"✅ visitor_tracker loaded (version: {getattr(_vt, 'PATCH_VERSION', 'unknown')})", file=sys.stderr, flush=True) except Exception as _vt_err: _VT_AVAILABLE = False _vt_err_msg = str(_vt_err) print(f"⚠️ visitor_tracker unavailable — using fallback: {_vt_err}", file=sys.stderr, flush=True) # Also print full traceback for debugging _tb.print_exc(file=sys.stderr) # ============================================================================ # CONFIGURATION (HuggingFace Spaces) # ============================================================================ BASE_DIR = Path('/home/user/app') LOG_DIR = BASE_DIR / 'logs' CHECKPOINT_DIR = BASE_DIR / 'checkpoints' STATUS_FILE = Path('/tmp/quasar_status.json') for dir_path in [LOG_DIR, CHECKPOINT_DIR]: dir_path.mkdir(parents=True, exist_ok=True) LOG_FILES = { 'quasar': LOG_DIR / 'quasar_engine.log', 'quasar_engine': LOG_DIR / 'quasar_engine.log', # alias 'features': LOG_DIR / 'features.log', 'rewards': LOG_DIR / 'rewards.log' } # ============================================================================ # VISITOR TRACKING (delegates to visitor_tracker.py when available) # ============================================================================ _fb_active: dict = {} _fb_all_time: set = set() _fb_lock = threading.Lock() _FB_TIMEOUT = 120 def track_visitor(ip, user_agent=''): if _VT_AVAILABLE: _vt.track_visitor(ip, user_agent) else: with _fb_lock: _fb_active[ip] = time.time() _fb_all_time.add(ip) def get_visitor_stats(): if _VT_AVAILABLE: return _vt.get_stats() with _fb_lock: now = time.time() stale = [ip for ip, ts in _fb_active.items() if now - ts > _FB_TIMEOUT] for ip in stale: del _fb_active[ip] return { 'active_now': len(_fb_active), 'active_ips': list(_fb_active.keys()), 'all_time_unique': len(_fb_all_time), 'bots_active': 0, 'bots_total': 0, } # ============================================================================ # METRICS STORAGE # ============================================================================ loss_history = deque(maxlen=50) metrics_history = deque(maxlen=100) log_buffer = deque(maxlen=500) # ============================================================================ # METRIC EXTRACTION # ============================================================================ def extract_latest_metrics(): """Extract latest metrics from quasar log""" try: log_file = LOG_FILES['quasar'] if not log_file.exists(): return None with open(log_file, 'r') as f: lines = f.readlines()[-600:] # v3.0.1: raised from 500 — worst-case AVN gap is 511 lines metrics = { 'training_steps': None, 'actor_loss': None, 'critic_loss': None, 'total_loss': None, 'avn_loss': None, 'avn_accuracy': None, 'buffer_size': None, 'matched_rewards': None, 'unmatched_rewards': None, 'duplicates': None, 'entropy_bonus': None, 'diversity_loss': None, 'entanglement_loss': None, 'timestamp': datetime.now().isoformat() } for line in reversed(lines): # v3.0.1 FIX-C: log never emits dict-format {'actor_loss':...}. # Actual format is label-style: "Actor Loss: -29900.759766" # and inline: "Critic Loss: 0.1234 | Actor Loss: -0.5678" if 'Actor Loss:' in line and not metrics['actor_loss']: m = re.search(r'Actor Loss:\s*([-\d.]+)', line) if m: metrics['actor_loss'] = float(m.group(1)) if 'Critic Loss:' in line and not metrics['critic_loss']: m = re.search(r'Critic Loss:\s*([-\d.]+)', line) if m: metrics['critic_loss'] = float(m.group(1)) if 'Total Loss:' in line and not metrics['total_loss']: m = re.search(r'Total Loss:\s*([-\d.]+)', line) if m: metrics['total_loss'] = float(m.group(1)) if 'buffer_size' in line and not metrics['buffer_size']: m = re.search(r"buffer_size['\"]?\s*:\s*(\d+)", line) if m: metrics['buffer_size'] = int(m.group(1)) if 'avn_training_steps:' in line and not metrics['training_steps']: m = re.search(r'avn_training_steps:\s*(\d+)', line) if m: metrics['training_steps'] = int(m.group(1)) # v3.0.1 FIX-B: was '🎯 Avg Loss:' — wrong emoji (🎯 = Avg Accuracy). # Actual log line uses 📉 emoji. Dropping emoji from check entirely # to be robust to future emoji changes; substring 'Avg Loss:' is unique. if 'Avg Loss:' in line and not metrics['avn_loss']: m = re.search(r'Avg Loss:\s*([-\d.]+)', line) if m: metrics['avn_loss'] = float(m.group(1)) if ('AVN Accuracy:' in line or 'Avg Accuracy:' in line) and metrics['avn_accuracy'] is None: m = re.search(r'(?:AVN|Avg) Accuracy:\s*([\d.]+)%?', line) if m: metrics['avn_accuracy'] = float(m.group(1)) if 'avn_rewards:' in line: if not metrics['matched_rewards']: m = re.search(r'matched=(\d+)', line) if m: metrics['matched_rewards'] = int(m.group(1)) if not metrics['unmatched_rewards']: m = re.search(r'unmatched=(\d+)', line) if m: metrics['unmatched_rewards'] = int(m.group(1)) if not metrics['duplicates']: m = re.search(r'duplicate=(\d+)', line) if m: metrics['duplicates'] = int(m.group(1)) if metrics['actor_loss'] is not None: loss_history.append({ 'actor_loss': metrics['actor_loss'], 'critic_loss': metrics['critic_loss'], 'total_loss': metrics['total_loss'], 'avn_accuracy': metrics['avn_accuracy'], 'timestamp': datetime.now().isoformat() }) return metrics except Exception as e: print(f"Metric extraction error: {e}") return None def get_service_status(): """Get supervisor service status — per-line parsing""" services = {} service_map = { 'quasar_engine': 'quasar_engine', 'dashboard': 'dashboard', 'health_monitor': 'health_monitor', 'redis': 'redis', 'features': 'features', 'rewards': 'rewards', } try: result = subprocess.run( ['supervisorctl', 'status'], capture_output=True, text=True, timeout=5 ) if result.returncode != 0 or not result.stdout.strip(): raise RuntimeError(f"supervisorctl error: {result.stderr.strip()}") for line in result.stdout.splitlines(): parts = line.split() if not parts: continue program_name = parts[0] status_token = parts[1] if len(parts) > 1 else 'UNKNOWN' friendly = service_map.get(program_name) if friendly is None: continue if status_token == 'RUNNING': services[friendly] = 'active' elif status_token in ('STOPPED', 'FATAL', 'EXITED', 'BACKOFF'): services[friendly] = 'inactive' else: services[friendly] = 'unknown' except Exception: services = { 'dashboard': 'active', 'health_monitor': 'unknown', 'redis': 'unknown', 'quasar_engine': 'unknown' } return services def get_system_resources(): """Get system resource usage""" try: cpu_percent = psutil.cpu_percent(interval=0.1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') quasar_memory = 0 process_count = 0 for proc in psutil.process_iter(['name', 'memory_info', 'cmdline']): try: cmdline = ' '.join(proc.info.get('cmdline', []) or []) if 'quasar' in cmdline.lower() or 'python' in cmdline.lower(): quasar_memory += proc.info['memory_info'].rss / (1024 ** 3) process_count += 1 except Exception: pass return { 'cpu_percent': round(cpu_percent, 1), 'memory_percent': round(memory.percent, 1), 'memory_used_gb': round(memory.used / (1024 ** 3), 2), 'memory_total_gb': round(memory.total / (1024 ** 3), 2), 'memory_available_gb': round(memory.available / (1024 ** 3), 2), 'disk_percent': round(disk.percent, 1), 'disk_used_gb': round(disk.used / (1024 ** 3), 2), 'disk_total_gb': round(disk.total / (1024 ** 3), 2), 'quasar_memory_gb': round(quasar_memory, 2), 'process_count': process_count } except Exception as e: print(f"Resource error: {e}") return None def get_checkpoint_info(): """Get latest checkpoint information""" try: if not CHECKPOINT_DIR.exists(): return None checkpoints = list(CHECKPOINT_DIR.glob('checkpoint_*.pt')) if not checkpoints: return None checkpoints.sort(key=lambda x: x.stat().st_mtime, reverse=True) latest = checkpoints[0] size_mb = latest.stat().st_size / (1024 ** 2) modified = datetime.fromtimestamp(latest.stat().st_mtime) match = re.search(r'step_(\d+)', latest.name) step = int(match.group(1)) if match else None return { 'filename': latest.name, 'size_mb': round(size_mb, 2), 'modified': modified.strftime('%Y-%m-%d %H:%M:%S'), 'step': step, 'count': len(checkpoints) } except Exception as e: print(f"Checkpoint error: {e}") return None # ============================================================================ # REQUEST TRACKING (wrapped so crash never takes down an API response) # ============================================================================ @app.before_request def before_request(): try: ip = request.headers.get('X-Forwarded-For', request.remote_addr) if ip: ip = ip.split(',')[0].strip() user_agent = request.headers.get('User-Agent', '') track_visitor(ip, user_agent) except Exception as e: print(f"⚠️ before_request error (non-fatal): {e}", file=sys.stderr) # ============================================================================ # API ENDPOINTS # ============================================================================ @app.route('/') def index(): dashboard_path = BASE_DIR / 'dashboard.html' if dashboard_path.exists(): return send_from_directory(str(BASE_DIR), 'dashboard.html') return """ K1RL QUASAR — Quantitative Intelligence Observatory

K1RL QUASAR

Quantitative Intelligence Observatory

System Initializing...

Dashboard components loading...

Connecting to services...
""", 200 @app.route('/api/metrics') def api_metrics(): try: metrics = extract_latest_metrics() services = get_service_status() resources = get_system_resources() checkpoint = get_checkpoint_info() return jsonify({ 'metrics': metrics, 'services': services, 'resources': resources, 'checkpoint': checkpoint, 'timestamp': datetime.now().isoformat(), 'visitors': get_visitor_stats(), 'platform': 'huggingface-spaces' }) except Exception as e: return jsonify({'error': str(e), 'platform': 'huggingface-spaces'}), 500 @app.route('/api/visitors') def api_visitors(): return jsonify(get_visitor_stats()) @app.route('/api/visitors/detailed') def api_visitors_detailed(): """Full visitor analytics with bulletproof error handling.""" try: if _VT_AVAILABLE: stats = _vt.get_stats() detailed = _vt.get_detailed_stats() all_v = _vt.get_all_visitors() return jsonify({ 'active_now': detailed.get('active_now', stats.get('active_now', 0)), 'all_time_unique': stats.get('all_time_unique', detailed.get('total_unique', 0)), 'total_unique': detailed.get('total_unique', stats.get('all_time_unique', 0)), 'bots_active': detailed.get('bots_active', stats.get('bots_active', 0)), 'bots_total': detailed.get('bots_total', stats.get('bots_total', 0)), 'active_ips': stats.get('active_ips', []), 'countries': detailed.get('countries', {}), 'cities': detailed.get('cities', {}), 'devices': detailed.get('devices', {}), 'browsers': detailed.get('browsers', {}), 'visitors': all_v.get('visitors', []), 'timestamp': datetime.now().isoformat(), }) basic = get_visitor_stats() return jsonify({**basic, 'visitors': [], 'countries': {}, 'devices': {}, 'browsers': {}, 'cities': {}, 'timestamp': datetime.now().isoformat()}) except Exception as e: _tb.print_exc(file=sys.stderr) return jsonify({'error': str(e), 'traceback': _tb.format_exc()}), 500 @app.route('/api/debug') def api_debug(): """Full diagnostic endpoint.""" diag = { 'app_patch_version': APP_PATCH_VERSION, 'vt_available': _VT_AVAILABLE, 'vt_import_error': _vt_err_msg, 'vt_patch_version': getattr(_vt, 'PATCH_VERSION', 'NOT_PATCHED') if _VT_AVAILABLE else None, 'python_version': sys.version, 'cwd': os.getcwd(), 'base_dir': str(BASE_DIR), 'base_dir_exists': BASE_DIR.exists(), 'dashboard_exists': (BASE_DIR / 'dashboard.html').exists(), 'visitor_tracker_exists': (BASE_DIR / 'visitor_tracker.py').exists(), 'app_py_exists': (BASE_DIR / 'app.py').exists(), 'registered_routes': sorted([rule.rule for rule in app.url_map.iter_rules()]), 'timestamp': datetime.now().isoformat(), } if _VT_AVAILABLE: for name, fn in [('get_stats', _vt.get_stats), ('get_detailed_stats', _vt.get_detailed_stats), ('get_all_visitors', _vt.get_all_visitors)]: try: result = fn() diag[f'{name}_ok'] = True diag[f'{name}_result'] = result if name != 'get_all_visitors' else { 'count': len(result.get('visitors', [])), 'total_unique': result.get('total_unique'), } except Exception as e: diag[f'{name}_ok'] = False diag[f'{name}_error'] = str(e) try: diag['active_visitors_count'] = len(_vt.active_visitors) diag['all_time_visitors_count'] = len(_vt.all_time_visitors) except Exception as e: diag['internals_error'] = str(e) else: diag['fallback_active'] = len(_fb_active) diag['fallback_all_time'] = len(_fb_all_time) return jsonify(diag) @app.route('/api/status') def api_status(): try: if STATUS_FILE.exists(): with open(STATUS_FILE) as f: data = json.load(f) else: data = {'status': 'starting'} data['visitors'] = get_visitor_stats() data['platform'] = 'huggingface-spaces' return jsonify(data) except Exception as e: return jsonify({'error': str(e), 'status': 'error'}), 500 @app.route('/health') @app.route('/api/health') def health(): return jsonify({ 'status': 'ok', 'service': 'k1rl-quasar', 'platform': 'huggingface-spaces', 'timestamp': datetime.now().isoformat(), 'visitors': get_visitor_stats() }) @app.route('/logs/') @app.route('/api/logs/') def stream_logs(service): allowed_services = list(LOG_FILES.keys()) if service not in allowed_services: return f"Invalid service '{service}'. Valid: {allowed_services}", 400 log_path = LOG_FILES[service] def generate(): try: if log_path.exists(): with open(log_path, 'r') as f: lines = f.readlines()[-200:] yield ''.join(lines) else: yield f'Log file not found: {log_path}' except Exception as e: yield f'Error reading log: {e}' if request.headers.get('Accept', '').startswith('application/json'): try: if log_path.exists(): with open(log_path, 'r') as f: lines = f.readlines()[-200:] else: lines = [] return jsonify({'service': service, 'lines': [l.rstrip() for l in lines], 'count': len(lines)}) except Exception as e: return jsonify({'error': str(e)}), 500 return Response(generate(), mimetype='text/plain') # ============================================================================ # STARTUP DIAGNOSTICS — print to stderr so HF Spaces logs capture it # ============================================================================ _all_routes = sorted([rule.rule for rule in app.url_map.iter_rules()]) print("=" * 70, file=sys.stderr, flush=True) print(f"K1RL QUASAR Dashboard — PATCH {APP_PATCH_VERSION}", file=sys.stderr, flush=True) print(f"Registered Flask routes ({len(_all_routes)}):", file=sys.stderr, flush=True) for r in _all_routes: print(f" {r}", file=sys.stderr, flush=True) print(f"visitor_tracker: {'LOADED' if _VT_AVAILABLE else 'FALLBACK (' + str(_vt_err_msg) + ')'}", file=sys.stderr, flush=True) print(f"Base dir: {BASE_DIR} (exists: {BASE_DIR.exists()})", file=sys.stderr, flush=True) print("=" * 70, file=sys.stderr, flush=True) # ============================================================================ # LOG METRICS PUBLISHER — streams quasar_engine.log → Quasar Hub every 5 s # ============================================================================ try: if _log_publisher_available: _publisher, _reader = start_log_publisher(space_name="V75") print("✅ Hub publisher started (space_name=V75)", file=sys.stderr) else: _publisher = _reader = None print("⚠️ Hub publisher skipped (import failed earlier)", file=sys.stderr) except Exception as _lp_start_err: _publisher = _reader = None print(f"⚠️ Hub publisher failed to start (non-fatal): {_lp_start_err}", file=sys.stderr) # ============================================================================ # MAIN # ============================================================================ if __name__ == '__main__': app.run( host='0.0.0.0', port=7860, debug=False, threaded=True )