""" Admin routes and functionality for Outline VPN """ import os import json import psutil import zipfile from datetime import datetime from flask import jsonify, request, send_file, flash, redirect, url_for from flask_login import login_required, current_user from . import app from .models import User, UserRole, SystemHealth, AuditLog, Alert from .services import backup_service, monitoring_service def admin_required(f): """Decorator to require admin role for routes""" @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or current_user.role != UserRole.ADMIN: flash('You need administrator privileges to access this page.') return redirect(url_for('dashboard')) return f(*args, **kwargs) return decorated_function @app.route('/admin') @login_required @admin_required def admin_dashboard(): """Admin dashboard view""" system_health = monitoring_service.get_system_health() active_alerts = Alert.query.filter_by(status='active').order_by(Alert.created_at.desc()).all() audit_logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).limit(50).all() return render_template('admin.html', system_health=system_health, active_alerts=active_alerts, audit_logs=audit_logs) @app.route('/api/system-health') @login_required @admin_required def get_system_health(): """Get real-time system health metrics""" return jsonify(monitoring_service.get_system_health()) @app.route('/api/update-server-config', methods=['POST']) @login_required @admin_required def update_server_config(): """Update server configuration""" try: config = request.get_json() backup_service.backup_config('pre_update') # Create backup before updating # Update configuration current_config = ServerConfig.query.first() for key, value in config.items(): setattr(current_config, key, value) db.session.commit() # Log the change AuditLog.create( user_id=current_user.id, action='update_config', details='Server configuration updated' ) # Restart required services monitoring_service.restart_services() return jsonify({'status': 'success'}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 @app.route('/api/create-backup') @login_required @admin_required def create_backup(): """Create a backup of server configuration""" try: include_user_data = request.args.get('include_user_data', 'false') == 'true' backup_path = backup_service.create_backup(include_user_data) # Log the backup creation AuditLog.create( user_id=current_user.id, action='create_backup', details=f'Backup created: {os.path.basename(backup_path)}' ) return send_file( backup_path, as_attachment=True, download_name=f'outline_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' ) except Exception as e: flash(f'Error creating backup: {str(e)}', 'error') return redirect(url_for('admin_dashboard')) @app.route('/api/restore-config', methods=['POST']) @login_required @admin_required def restore_config(): """Restore server configuration from backup""" try: if 'backup_file' not in request.files: flash('No backup file provided', 'error') return redirect(url_for('admin_dashboard')) backup_file = request.files['backup_file'] if backup_file.filename == '': flash('No backup file selected', 'error') return redirect(url_for('admin_dashboard')) # Create backup of current configuration backup_service.backup_config('pre_restore') # Restore from backup backup_service.restore_from_backup(backup_file) # Log the restore AuditLog.create( user_id=current_user.id, action='restore_config', details=f'Configuration restored from {backup_file.filename}' ) flash('Configuration restored successfully', 'success') return redirect(url_for('admin_dashboard')) except Exception as e: flash(f'Error restoring configuration: {str(e)}', 'error') return redirect(url_for('admin_dashboard')) @app.route('/api/export-audit-log') @login_required @admin_required def export_audit_log(): """Export audit log in specified format""" format = request.args.get('format', 'csv') logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).all() if format == 'csv': output = io.StringIO() writer = csv.writer(output) writer.writerow(['Timestamp', 'User', 'Action', 'Details']) for log in logs: writer.writerow([ log.timestamp.strftime('%Y-%m-%d %H:%M:%S'), log.user.username, log.action, log.details ]) return Response( output.getvalue(), mimetype='text/csv', headers={'Content-Disposition': 'attachment; filename=audit_log.csv'} ) elif format == 'json': return jsonify([{ 'timestamp': log.timestamp.strftime('%Y-%m-%d %H:%M:%S'), 'user': log.user.username, 'action': log.action, 'details': log.details } for log in logs])