#!/usr/bin/env python3 """ Archon Monitoring Dashboard for sync_dataset.sh Flask UI untuk memantau status sync dan logs """ import os import sys import json import subprocess from datetime import datetime from pathlib import Path from flask import Flask, render_template, jsonify, request import psutil app = Flask(__name__) # Config paths PICOCLAW_HOME = os.getenv('PICOCLAW_HOME', os.path.expanduser('~/.picoclaw')) LOG_FILE = Path(PICOCLAW_HOME) / 'sync.log' STATE_FILE = Path(PICOCLAW_HOME) / 'sync.state' CONFIG_FILE = Path(PICOCLAW_HOME) / 'config.json' def read_config(): """Load configuration dari config.json atau environment""" config = { 'DATASET_REPO': os.getenv('DATASET_REPO', 'https://github.com/personalbotai/picoclaw-memory.git'), 'DATASET_BRANCH': os.getenv('DATASET_BRANCH', 'main'), 'SYNC_INTERVAL': int(os.getenv('SYNC_INTERVAL', '300')), 'PICOCLAW_HOME': PICOCLAW_HOME } if CONFIG_FILE.exists(): try: with open(CONFIG_FILE) as f: file_config = json.load(f) config.update(file_config) except Exception as e: app.logger.error(f"Failed to read config: {e}") return config def get_sync_status(): """Check if sync daemon is running""" if STATE_FILE.exists(): try: with open(STATE_FILE) as f: pid = int(f.read().strip()) if psutil.pid_exists(pid): proc = psutil.Process(pid) return { 'running': True, 'pid': pid, 'cpu_percent': proc.cpu_percent(interval=0.1), 'memory_mb': proc.memory_info().rss / 1024 / 1024, 'cmdline': proc.cmdline() } except Exception as e: app.logger.error(f"Error checking PID: {e}") return {'running': False} def get_recent_logs(lines=100): """Read recent log entries""" if not LOG_FILE.exists(): return [] try: with open(LOG_FILE) as f: all_lines = f.readlines() return all_lines[-lines:] except Exception as e: app.logger.error(f"Error reading logs: {e}") return [] def get_disk_usage(): """Get disk usage statistics""" try: stat = os.statvfs(PICOCLAW_HOME) total = stat.f_frsize * stat.f_blocks free = stat.f_frsize * stat.f_bavail used = total - free return { 'total_gb': round(total / 1024 / 1024 / 1024, 2), 'used_gb': round(used / 1024 / 1024 / 1024, 2), 'free_gb': round(free / 1024 / 1024 / 1024, 2), 'percent_used': round((used / total) * 100, 1) } except Exception as e: app.logger.error(f"Error getting disk usage: {e}") return {} def get_git_status(): """Check git status of backup directory""" backup_dir = Path(PICOCLAW_HOME) / 'backup' if not (backup_dir / '.git').exists(): return {'error': 'Not a git repository'} try: os.chdir(backup_dir) branch = subprocess.check_output(['git', 'branch', '--show-current'], text=True).strip() remote = subprocess.check_output(['git', 'remote', 'get-url', 'origin'], text=True).strip() status = subprocess.check_output(['git', 'status', '--porcelain'], text=True).strip() return { 'branch': branch, 'remote': remote, 'has_changes': bool(status), 'changes_count': len(status.split('\n')) if status else 0 } except subprocess.CalledProcessError as e: return {'error': str(e)} except Exception as e: return {'error': str(e)} @app.route('/') def index(): """Main dashboard""" config = read_config() status = get_sync_status() logs = get_recent_logs(50) disk = get_disk_usage() git = get_git_status() return render_template('index.html', config=config, status=status, logs=logs, disk=disk, git=git, now=datetime.now().strftime('%Y-%m-%d %H:%M:%S') ) @app.route('/api/status') def api_status(): """JSON API for status""" return jsonify({ 'sync': get_sync_status(), 'disk': get_disk_usage(), 'git': get_git_status(), 'timestamp': datetime.now().isoformat() }) @app.route('/api/logs') def api_logs(): """Get logs as JSON""" lines = request.args.get('lines', '100', type=int) logs = get_recent_logs(lines) return jsonify({'logs': logs}) @app.route('/api/restart', methods=['POST']) def api_restart(): """Restart sync daemon (placeholder)""" # In production, this would send signal to daemon return jsonify({'status': 'ok', 'message': 'Restart signal sent'}) if __name__ == '__main__': # Create templates directory if not exists templates_dir = Path(__file__).parent / 'templates' templates_dir.mkdir(exist_ok=True) # Create index.html if not exists index_html = templates_dir / 'index.html' if not index_html.exists(): # Generate default template html_content = '''
⚠️ Unable to get disk info
{% endif %}⚠️ {{ git.error }}
{% else %}