k1rl-quasar / app.py
RealComp's picture
Upload 32 files
047d482 verified
Raw
History Blame Contribute Delete
23.3 kB
#!/usr/bin/env python3
"""
K1RL QUASAR — Real-Time Monitoring Dashboard (HuggingFace Spaces)
Enhanced with visitor tracking, metrics extraction, and container optimization
PATCH: 2026-03-09-v4 — full route diagnostics + visitor tracking fix
"""
from flask import Flask, render_template, jsonify, Response, send_from_directory, request
from flask_cors import CORS
import subprocess
import json
import time
import os
import re
from datetime import datetime
from collections import deque, defaultdict
import psutil
import threading
import sys
import traceback as _tb
from pathlib import Path
try:
from log_metrics_reader import start_log_publisher
_log_publisher_available = True
except Exception as _lp_import_err:
_log_publisher_available = False
print(f"⚠️ log_metrics_reader import failed (non-fatal): {_lp_import_err}", file=sys.stderr)
# ── Build Flask app FIRST — before any imports that could fail ────────────
app = Flask(__name__)
CORS(app)
APP_PATCH_VERSION = '2026-03-09-v4'
# ── CANARY ROUTE — defined before ANYTHING else ──────────────────────────
# If this returns 404, then a DIFFERENT app.py is running.
@app.route('/ping')
def _ping():
return jsonify({
'pong': True,
'patch': APP_PATCH_VERSION,
'routes': [rule.rule for rule in app.url_map.iter_rules()],
'timestamp': datetime.now().isoformat()
})
# ── Rich visitor tracking (bot-aware, geo-located) ─────────────────────────
_vt_err_msg = None
_VT_AVAILABLE = False
try:
import visitor_tracker as _vt
_VT_AVAILABLE = True
print(f"✅ visitor_tracker loaded (version: {getattr(_vt, 'PATCH_VERSION', 'unknown')})",
file=sys.stderr, flush=True)
except Exception as _vt_err:
_VT_AVAILABLE = False
_vt_err_msg = str(_vt_err)
print(f"⚠️ visitor_tracker unavailable — using fallback: {_vt_err}",
file=sys.stderr, flush=True)
# Also print full traceback for debugging
_tb.print_exc(file=sys.stderr)
# ============================================================================
# CONFIGURATION (HuggingFace Spaces)
# ============================================================================
BASE_DIR = Path('/home/user/app')
LOG_DIR = BASE_DIR / 'logs'
CHECKPOINT_DIR = BASE_DIR / 'checkpoints'
STATUS_FILE = Path('/tmp/quasar_status.json')
for dir_path in [LOG_DIR, CHECKPOINT_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
LOG_FILES = {
'quasar': LOG_DIR / 'quasar_engine.log',
'quasar_engine': LOG_DIR / 'quasar_engine.log', # alias
'features': LOG_DIR / 'features.log',
'rewards': LOG_DIR / 'rewards.log'
}
# ============================================================================
# VISITOR TRACKING (delegates to visitor_tracker.py when available)
# ============================================================================
_fb_active: dict = {}
_fb_all_time: set = set()
_fb_lock = threading.Lock()
_FB_TIMEOUT = 120
def track_visitor(ip, user_agent=''):
if _VT_AVAILABLE:
_vt.track_visitor(ip, user_agent)
else:
with _fb_lock:
_fb_active[ip] = time.time()
_fb_all_time.add(ip)
def get_visitor_stats():
if _VT_AVAILABLE:
return _vt.get_stats()
with _fb_lock:
now = time.time()
stale = [ip for ip, ts in _fb_active.items() if now - ts > _FB_TIMEOUT]
for ip in stale:
del _fb_active[ip]
return {
'active_now': len(_fb_active),
'active_ips': list(_fb_active.keys()),
'all_time_unique': len(_fb_all_time),
'bots_active': 0,
'bots_total': 0,
}
# ============================================================================
# METRICS STORAGE
# ============================================================================
loss_history = deque(maxlen=50)
metrics_history = deque(maxlen=100)
log_buffer = deque(maxlen=500)
# ============================================================================
# METRIC EXTRACTION
# ============================================================================
def extract_latest_metrics():
"""Extract latest metrics from quasar log"""
try:
log_file = LOG_FILES['quasar']
if not log_file.exists():
return None
with open(log_file, 'r') as f:
lines = f.readlines()[-600:] # v3.0.1: raised from 500 — worst-case AVN gap is 511 lines
metrics = {
'training_steps': None,
'actor_loss': None,
'critic_loss': None,
'total_loss': None,
'avn_loss': None,
'avn_accuracy': None,
'buffer_size': None,
'matched_rewards': None,
'unmatched_rewards': None,
'duplicates': None,
'entropy_bonus': None,
'diversity_loss': None,
'entanglement_loss': None,
'timestamp': datetime.now().isoformat()
}
for line in reversed(lines):
# v3.0.1 FIX-C: log never emits dict-format {'actor_loss':...}.
# Actual format is label-style: "Actor Loss: -29900.759766"
# and inline: "Critic Loss: 0.1234 | Actor Loss: -0.5678"
if 'Actor Loss:' in line and not metrics['actor_loss']:
m = re.search(r'Actor Loss:\s*([-\d.]+)', line)
if m:
metrics['actor_loss'] = float(m.group(1))
if 'Critic Loss:' in line and not metrics['critic_loss']:
m = re.search(r'Critic Loss:\s*([-\d.]+)', line)
if m:
metrics['critic_loss'] = float(m.group(1))
if 'Total Loss:' in line and not metrics['total_loss']:
m = re.search(r'Total Loss:\s*([-\d.]+)', line)
if m:
metrics['total_loss'] = float(m.group(1))
if 'buffer_size' in line and not metrics['buffer_size']:
m = re.search(r"buffer_size['\"]?\s*:\s*(\d+)", line)
if m:
metrics['buffer_size'] = int(m.group(1))
if 'avn_training_steps:' in line and not metrics['training_steps']:
m = re.search(r'avn_training_steps:\s*(\d+)', line)
if m:
metrics['training_steps'] = int(m.group(1))
# v3.0.1 FIX-B: was '🎯 Avg Loss:' — wrong emoji (🎯 = Avg Accuracy).
# Actual log line uses 📉 emoji. Dropping emoji from check entirely
# to be robust to future emoji changes; substring 'Avg Loss:' is unique.
if 'Avg Loss:' in line and not metrics['avn_loss']:
m = re.search(r'Avg Loss:\s*([-\d.]+)', line)
if m:
metrics['avn_loss'] = float(m.group(1))
if ('AVN Accuracy:' in line or 'Avg Accuracy:' in line) and metrics['avn_accuracy'] is None:
m = re.search(r'(?:AVN|Avg) Accuracy:\s*([\d.]+)%?', line)
if m:
metrics['avn_accuracy'] = float(m.group(1))
if 'avn_rewards:' in line:
if not metrics['matched_rewards']:
m = re.search(r'matched=(\d+)', line)
if m:
metrics['matched_rewards'] = int(m.group(1))
if not metrics['unmatched_rewards']:
m = re.search(r'unmatched=(\d+)', line)
if m:
metrics['unmatched_rewards'] = int(m.group(1))
if not metrics['duplicates']:
m = re.search(r'duplicate=(\d+)', line)
if m:
metrics['duplicates'] = int(m.group(1))
if metrics['actor_loss'] is not None:
loss_history.append({
'actor_loss': metrics['actor_loss'],
'critic_loss': metrics['critic_loss'],
'total_loss': metrics['total_loss'],
'avn_accuracy': metrics['avn_accuracy'],
'timestamp': datetime.now().isoformat()
})
return metrics
except Exception as e:
print(f"Metric extraction error: {e}")
return None
def get_service_status():
"""Get supervisor service status — per-line parsing"""
services = {}
service_map = {
'quasar_engine': 'quasar_engine',
'dashboard': 'dashboard',
'health_monitor': 'health_monitor',
'redis': 'redis',
'features': 'features',
'rewards': 'rewards',
}
try:
result = subprocess.run(
['supervisorctl', 'status'],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0 or not result.stdout.strip():
raise RuntimeError(f"supervisorctl error: {result.stderr.strip()}")
for line in result.stdout.splitlines():
parts = line.split()
if not parts:
continue
program_name = parts[0]
status_token = parts[1] if len(parts) > 1 else 'UNKNOWN'
friendly = service_map.get(program_name)
if friendly is None:
continue
if status_token == 'RUNNING':
services[friendly] = 'active'
elif status_token in ('STOPPED', 'FATAL', 'EXITED', 'BACKOFF'):
services[friendly] = 'inactive'
else:
services[friendly] = 'unknown'
except Exception:
services = {
'dashboard': 'active',
'health_monitor': 'unknown',
'redis': 'unknown',
'quasar_engine': 'unknown'
}
return services
def get_system_resources():
"""Get system resource usage"""
try:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
quasar_memory = 0
process_count = 0
for proc in psutil.process_iter(['name', 'memory_info', 'cmdline']):
try:
cmdline = ' '.join(proc.info.get('cmdline', []) or [])
if 'quasar' in cmdline.lower() or 'python' in cmdline.lower():
quasar_memory += proc.info['memory_info'].rss / (1024 ** 3)
process_count += 1
except Exception:
pass
return {
'cpu_percent': round(cpu_percent, 1),
'memory_percent': round(memory.percent, 1),
'memory_used_gb': round(memory.used / (1024 ** 3), 2),
'memory_total_gb': round(memory.total / (1024 ** 3), 2),
'memory_available_gb': round(memory.available / (1024 ** 3), 2),
'disk_percent': round(disk.percent, 1),
'disk_used_gb': round(disk.used / (1024 ** 3), 2),
'disk_total_gb': round(disk.total / (1024 ** 3), 2),
'quasar_memory_gb': round(quasar_memory, 2),
'process_count': process_count
}
except Exception as e:
print(f"Resource error: {e}")
return None
def get_checkpoint_info():
"""Get latest checkpoint information"""
try:
if not CHECKPOINT_DIR.exists():
return None
checkpoints = list(CHECKPOINT_DIR.glob('checkpoint_*.pt'))
if not checkpoints:
return None
checkpoints.sort(key=lambda x: x.stat().st_mtime, reverse=True)
latest = checkpoints[0]
size_mb = latest.stat().st_size / (1024 ** 2)
modified = datetime.fromtimestamp(latest.stat().st_mtime)
match = re.search(r'step_(\d+)', latest.name)
step = int(match.group(1)) if match else None
return {
'filename': latest.name,
'size_mb': round(size_mb, 2),
'modified': modified.strftime('%Y-%m-%d %H:%M:%S'),
'step': step,
'count': len(checkpoints)
}
except Exception as e:
print(f"Checkpoint error: {e}")
return None
# ============================================================================
# REQUEST TRACKING (wrapped so crash never takes down an API response)
# ============================================================================
@app.before_request
def before_request():
try:
ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if ip:
ip = ip.split(',')[0].strip()
user_agent = request.headers.get('User-Agent', '')
track_visitor(ip, user_agent)
except Exception as e:
print(f"⚠️ before_request error (non-fatal): {e}", file=sys.stderr)
# ============================================================================
# API ENDPOINTS
# ============================================================================
@app.route('/')
def index():
dashboard_path = BASE_DIR / 'dashboard.html'
if dashboard_path.exists():
return send_from_directory(str(BASE_DIR), 'dashboard.html')
return """
<!DOCTYPE html>
<html>
<head>
<title>K1RL QUASAR — Quantitative Intelligence Observatory</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body { background: #0a0a0a; color: #00ffff; font-family: monospace; text-align: center; padding: 50px; }
.container { max-width: 800px; margin: 0 auto; }
.status { background: #1a1a1a; padding: 20px; margin: 20px 0; border-radius: 10px; }
.loading { animation: pulse 2s infinite; }
@keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.5; } }
</style>
</head>
<body>
<div class="container">
<h1>K1RL QUASAR</h1>
<h2>Quantitative Intelligence Observatory</h2>
<div class="status loading">
<p>System Initializing...</p>
<p>Dashboard components loading...</p>
<div id="status">Connecting to services...</div>
</div>
</div>
<script>
setTimeout(() => window.location.reload(), 5000);
</script>
</body>
</html>
""", 200
@app.route('/api/metrics')
def api_metrics():
try:
metrics = extract_latest_metrics()
services = get_service_status()
resources = get_system_resources()
checkpoint = get_checkpoint_info()
return jsonify({
'metrics': metrics,
'services': services,
'resources': resources,
'checkpoint': checkpoint,
'timestamp': datetime.now().isoformat(),
'visitors': get_visitor_stats(),
'platform': 'huggingface-spaces'
})
except Exception as e:
return jsonify({'error': str(e), 'platform': 'huggingface-spaces'}), 500
@app.route('/api/visitors')
def api_visitors():
return jsonify(get_visitor_stats())
@app.route('/api/visitors/detailed')
def api_visitors_detailed():
"""Full visitor analytics with bulletproof error handling."""
try:
if _VT_AVAILABLE:
stats = _vt.get_stats()
detailed = _vt.get_detailed_stats()
all_v = _vt.get_all_visitors()
return jsonify({
'active_now': detailed.get('active_now', stats.get('active_now', 0)),
'all_time_unique': stats.get('all_time_unique', detailed.get('total_unique', 0)),
'total_unique': detailed.get('total_unique', stats.get('all_time_unique', 0)),
'bots_active': detailed.get('bots_active', stats.get('bots_active', 0)),
'bots_total': detailed.get('bots_total', stats.get('bots_total', 0)),
'active_ips': stats.get('active_ips', []),
'countries': detailed.get('countries', {}),
'cities': detailed.get('cities', {}),
'devices': detailed.get('devices', {}),
'browsers': detailed.get('browsers', {}),
'visitors': all_v.get('visitors', []),
'timestamp': datetime.now().isoformat(),
})
basic = get_visitor_stats()
return jsonify({**basic, 'visitors': [], 'countries': {},
'devices': {}, 'browsers': {}, 'cities': {},
'timestamp': datetime.now().isoformat()})
except Exception as e:
_tb.print_exc(file=sys.stderr)
return jsonify({'error': str(e), 'traceback': _tb.format_exc()}), 500
@app.route('/api/debug')
def api_debug():
"""Full diagnostic endpoint."""
diag = {
'app_patch_version': APP_PATCH_VERSION,
'vt_available': _VT_AVAILABLE,
'vt_import_error': _vt_err_msg,
'vt_patch_version': getattr(_vt, 'PATCH_VERSION', 'NOT_PATCHED') if _VT_AVAILABLE else None,
'python_version': sys.version,
'cwd': os.getcwd(),
'base_dir': str(BASE_DIR),
'base_dir_exists': BASE_DIR.exists(),
'dashboard_exists': (BASE_DIR / 'dashboard.html').exists(),
'visitor_tracker_exists': (BASE_DIR / 'visitor_tracker.py').exists(),
'app_py_exists': (BASE_DIR / 'app.py').exists(),
'registered_routes': sorted([rule.rule for rule in app.url_map.iter_rules()]),
'timestamp': datetime.now().isoformat(),
}
if _VT_AVAILABLE:
for name, fn in [('get_stats', _vt.get_stats),
('get_detailed_stats', _vt.get_detailed_stats),
('get_all_visitors', _vt.get_all_visitors)]:
try:
result = fn()
diag[f'{name}_ok'] = True
diag[f'{name}_result'] = result if name != 'get_all_visitors' else {
'count': len(result.get('visitors', [])),
'total_unique': result.get('total_unique'),
}
except Exception as e:
diag[f'{name}_ok'] = False
diag[f'{name}_error'] = str(e)
try:
diag['active_visitors_count'] = len(_vt.active_visitors)
diag['all_time_visitors_count'] = len(_vt.all_time_visitors)
except Exception as e:
diag['internals_error'] = str(e)
else:
diag['fallback_active'] = len(_fb_active)
diag['fallback_all_time'] = len(_fb_all_time)
return jsonify(diag)
@app.route('/api/status')
def api_status():
try:
if STATUS_FILE.exists():
with open(STATUS_FILE) as f:
data = json.load(f)
else:
data = {'status': 'starting'}
data['visitors'] = get_visitor_stats()
data['platform'] = 'huggingface-spaces'
return jsonify(data)
except Exception as e:
return jsonify({'error': str(e), 'status': 'error'}), 500
@app.route('/health')
@app.route('/api/health')
def health():
return jsonify({
'status': 'ok',
'service': 'k1rl-quasar',
'platform': 'huggingface-spaces',
'timestamp': datetime.now().isoformat(),
'visitors': get_visitor_stats()
})
@app.route('/logs/<service>')
@app.route('/api/logs/<service>')
def stream_logs(service):
allowed_services = list(LOG_FILES.keys())
if service not in allowed_services:
return f"Invalid service '{service}'. Valid: {allowed_services}", 400
log_path = LOG_FILES[service]
def generate():
try:
if log_path.exists():
with open(log_path, 'r') as f:
lines = f.readlines()[-200:]
yield ''.join(lines)
else:
yield f'Log file not found: {log_path}'
except Exception as e:
yield f'Error reading log: {e}'
if request.headers.get('Accept', '').startswith('application/json'):
try:
if log_path.exists():
with open(log_path, 'r') as f:
lines = f.readlines()[-200:]
else:
lines = []
return jsonify({'service': service, 'lines': [l.rstrip() for l in lines], 'count': len(lines)})
except Exception as e:
return jsonify({'error': str(e)}), 500
return Response(generate(), mimetype='text/plain')
# ============================================================================
# STARTUP DIAGNOSTICS — print to stderr so HF Spaces logs capture it
# ============================================================================
_all_routes = sorted([rule.rule for rule in app.url_map.iter_rules()])
print("=" * 70, file=sys.stderr, flush=True)
print(f"K1RL QUASAR Dashboard — PATCH {APP_PATCH_VERSION}", file=sys.stderr, flush=True)
print(f"Registered Flask routes ({len(_all_routes)}):", file=sys.stderr, flush=True)
for r in _all_routes:
print(f" {r}", file=sys.stderr, flush=True)
print(f"visitor_tracker: {'LOADED' if _VT_AVAILABLE else 'FALLBACK (' + str(_vt_err_msg) + ')'}", file=sys.stderr, flush=True)
print(f"Base dir: {BASE_DIR} (exists: {BASE_DIR.exists()})", file=sys.stderr, flush=True)
print("=" * 70, file=sys.stderr, flush=True)
# ============================================================================
# LOG METRICS PUBLISHER — streams quasar_engine.log → Quasar Hub every 5 s
# ============================================================================
try:
if _log_publisher_available:
_publisher, _reader = start_log_publisher(space_name="V75")
print("✅ Hub publisher started (space_name=V75)", file=sys.stderr)
else:
_publisher = _reader = None
print("⚠️ Hub publisher skipped (import failed earlier)", file=sys.stderr)
except Exception as _lp_start_err:
_publisher = _reader = None
print(f"⚠️ Hub publisher failed to start (non-fatal): {_lp_start_err}", file=sys.stderr)
# ============================================================================
# MAIN
# ============================================================================
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=7860,
debug=False,
threaded=True
)