K1RL-Quasar-Volatility-25 / accuracy_api.py
KarlQuant's picture
Upload 26 files
55ab7d8 verified
#!/usr/bin/env python3
"""
K1RL QUANT - Accuracy Tracking API
HuggingFace Spaces Edition
Standalone module to track training accuracy
"""
import re
import json
import time
import os
from datetime import datetime
from collections import deque
from pathlib import Path
import threading
# ✅ FIXED: Container-safe storage paths
BASE_DIR = Path('/home/user/app')
DATA_DIR = BASE_DIR / 'data'
LOG_DIR = BASE_DIR / 'logs'
# Ensure directories exist
DATA_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
# ✅ FIXED: HuggingFace Spaces compatible paths
accuracy_history = deque(maxlen=1000)
accuracy_file = DATA_DIR / 'accuracy_data.json'
# Load existing data
if accuracy_file.exists():
try:
with open(accuracy_file, 'r') as f:
data = json.load(f)
accuracy_history.extend(data)
print(f"✅ Loaded {len(accuracy_history)} accuracy readings")
except Exception as e:
print(f"⚠️ Could not load accuracy data: {e}")
def parse_accuracy_from_logs():
"""Parse accuracy from quasar logs (HF Spaces compatible)"""
# ✅ FIXED: HuggingFace Spaces log path
log_file = LOG_DIR / 'quasar_engine.log'
last_position = 0
print(f"🔍 Starting accuracy parser on {log_file}")
while True:
try:
# Check if log file exists
if not log_file.exists():
print(f"⚠️ Log file not found: {log_file}")
time.sleep(10)
continue
with open(log_file, 'r') as f:
f.seek(last_position)
new_lines = f.readlines()
last_position = f.tell()
for line in new_lines:
# Multiple patterns for accuracy detection
patterns = [
r'🎯 Avg Accuracy: ([\d.]+)%',
r'AVN Accuracy: ([\d.]+)%?',
r'Avg Accuracy: ([\d.]+)%?',
r'accuracy[:\s]+([\d.]+)%?',
r'acc[:\s]+([\d.]+)%?'
]
for pattern in patterns:
match = re.search(pattern, line, re.IGNORECASE)
if match:
accuracy = float(match.group(1))
# Sanity check - accuracy should be 0-100
if 0 <= accuracy <= 100:
timestamp = datetime.now().isoformat()
data_point = {
'timestamp': timestamp,
'accuracy': accuracy,
'iteration': len(accuracy_history) + 1
}
accuracy_history.append(data_point)
print(f"📈 Accuracy: {accuracy:.1f}% (iteration {data_point['iteration']})")
# Save periodically
if len(accuracy_history) % 10 == 0:
save_accuracy_data()
break # Only take first match per line
time.sleep(5)
except Exception as e:
print(f"⚠️ Error parsing logs: {e}")
time.sleep(10)
def save_accuracy_data():
"""Save accuracy data to file"""
try:
with open(accuracy_file, 'w') as f:
json.dump(list(accuracy_history), f, indent=2)
except Exception as e:
print(f"⚠️ Could not save accuracy data: {e}")
def get_current_accuracy():
"""Get current accuracy"""
if accuracy_history:
return accuracy_history[-1]
return {'accuracy': 0, 'timestamp': None, 'iteration': 0}
def get_accuracy_history():
"""Get all history"""
return list(accuracy_history)
def get_accuracy_stats():
"""Get comprehensive statistics"""
if not accuracy_history:
return {
'current': 0,
'average': 0,
'min': 0,
'max': 0,
'count': 0,
'last_10_avg': 0,
'trend': 'unknown'
}
accuracies = [d['accuracy'] for d in accuracy_history]
# Calculate trend
trend = 'stable'
if len(accuracies) >= 20:
recent_avg = sum(accuracies[-10:]) / 10
older_avg = sum(accuracies[-20:-10]) / 10
if recent_avg > older_avg * 1.02: # 2% improvement
trend = 'up'
elif recent_avg < older_avg * 0.98: # 2% decline
trend = 'down'
return {
'current': accuracies[-1],
'average': sum(accuracies) / len(accuracies),
'min': min(accuracies),
'max': max(accuracies),
'count': len(accuracies),
'last_10_avg': sum(accuracies[-10:]) / min(10, len(accuracies)),
'trend': trend,
'std_dev': (sum((x - sum(accuracies)/len(accuracies))**2 for x in accuracies) / len(accuracies))**0.5,
'latest_timestamp': accuracy_history[-1]['timestamp'] if accuracy_history else None
}
def get_accuracy_by_timerange(hours=24):
"""Get accuracy data for specific time range"""
if not accuracy_history:
return []
# Filter by timestamp (last N hours)
cutoff_time = datetime.now().timestamp() - (hours * 3600)
filtered = []
for entry in accuracy_history:
try:
entry_time = datetime.fromisoformat(entry['timestamp']).timestamp()
if entry_time >= cutoff_time:
filtered.append(entry)
except:
# Include entries with invalid timestamps
filtered.append(entry)
return filtered
# ✅ ADDED: HuggingFace Spaces health check
def get_parser_status():
"""Get parser health status"""
log_file = LOG_DIR / 'quasar_engine.log'
status = {
'running': parser_thread.is_alive() if 'parser_thread' in globals() else False,
'log_file_exists': log_file.exists(),
'log_file_path': str(log_file),
'total_readings': len(accuracy_history),
'last_reading': accuracy_history[-1] if accuracy_history else None,
'data_file_exists': accuracy_file.exists(),
'data_file_path': str(accuracy_file)
}
return status
# Start parser thread
print("🚀 Starting accuracy parser thread...")
parser_thread = threading.Thread(target=parse_accuracy_from_logs, daemon=True)
parser_thread.start()
# ✅ ADDED: Graceful shutdown handler
import atexit
def cleanup():
"""Save data on shutdown"""
print("💾 Saving accuracy data on shutdown...")
save_accuracy_data()
atexit.register(cleanup)
if __name__ == "__main__":
print("=" * 60)
print("K1RL QUANT - Accuracy Tracker")
print("HuggingFace Spaces Edition 🤗")
print("=" * 60)
print(f"📁 Data directory: {DATA_DIR}")
print(f"📁 Log directory: {LOG_DIR}")
print(f"📊 Loaded {len(accuracy_history)} existing readings")
print("🔄 Parser running in background...")
print("=" * 60)
# Keep alive
try:
while True:
time.sleep(60)
if len(accuracy_history) > 0:
latest = accuracy_history[-1]
print(f"📈 Latest accuracy: {latest['accuracy']:.1f}% ({latest['timestamp']})")
except KeyboardInterrupt:
print("🛑 Shutting down accuracy tracker...")
cleanup()