Spaces:
Runtime error
Runtime error
| """ | |
| Admin routes and functionality for Outline VPN | |
| """ | |
| import os | |
| import json | |
| import psutil | |
| import zipfile | |
| from datetime import datetime | |
| from flask import jsonify, request, send_file, flash, redirect, url_for | |
| from flask_login import login_required, current_user | |
| from . import app | |
| from .models import User, UserRole, SystemHealth, AuditLog, Alert | |
| from .services import backup_service, monitoring_service | |
| def admin_required(f): | |
| """Decorator to require admin role for routes""" | |
| def decorated_function(*args, **kwargs): | |
| if not current_user.is_authenticated or current_user.role != UserRole.ADMIN: | |
| flash('You need administrator privileges to access this page.') | |
| return redirect(url_for('dashboard')) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def admin_dashboard(): | |
| """Admin dashboard view""" | |
| system_health = monitoring_service.get_system_health() | |
| active_alerts = Alert.query.filter_by(status='active').order_by(Alert.created_at.desc()).all() | |
| audit_logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).limit(50).all() | |
| return render_template('admin.html', | |
| system_health=system_health, | |
| active_alerts=active_alerts, | |
| audit_logs=audit_logs) | |
| def get_system_health(): | |
| """Get real-time system health metrics""" | |
| return jsonify(monitoring_service.get_system_health()) | |
| def update_server_config(): | |
| """Update server configuration""" | |
| try: | |
| config = request.get_json() | |
| backup_service.backup_config('pre_update') # Create backup before updating | |
| # Update configuration | |
| current_config = ServerConfig.query.first() | |
| for key, value in config.items(): | |
| setattr(current_config, key, value) | |
| db.session.commit() | |
| # Log the change | |
| AuditLog.create( | |
| user_id=current_user.id, | |
| action='update_config', | |
| details='Server configuration updated' | |
| ) | |
| # Restart required services | |
| monitoring_service.restart_services() | |
| return jsonify({'status': 'success'}) | |
| except Exception as e: | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| def create_backup(): | |
| """Create a backup of server configuration""" | |
| try: | |
| include_user_data = request.args.get('include_user_data', 'false') == 'true' | |
| backup_path = backup_service.create_backup(include_user_data) | |
| # Log the backup creation | |
| AuditLog.create( | |
| user_id=current_user.id, | |
| action='create_backup', | |
| details=f'Backup created: {os.path.basename(backup_path)}' | |
| ) | |
| return send_file( | |
| backup_path, | |
| as_attachment=True, | |
| download_name=f'outline_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' | |
| ) | |
| except Exception as e: | |
| flash(f'Error creating backup: {str(e)}', 'error') | |
| return redirect(url_for('admin_dashboard')) | |
| def restore_config(): | |
| """Restore server configuration from backup""" | |
| try: | |
| if 'backup_file' not in request.files: | |
| flash('No backup file provided', 'error') | |
| return redirect(url_for('admin_dashboard')) | |
| backup_file = request.files['backup_file'] | |
| if backup_file.filename == '': | |
| flash('No backup file selected', 'error') | |
| return redirect(url_for('admin_dashboard')) | |
| # Create backup of current configuration | |
| backup_service.backup_config('pre_restore') | |
| # Restore from backup | |
| backup_service.restore_from_backup(backup_file) | |
| # Log the restore | |
| AuditLog.create( | |
| user_id=current_user.id, | |
| action='restore_config', | |
| details=f'Configuration restored from {backup_file.filename}' | |
| ) | |
| flash('Configuration restored successfully', 'success') | |
| return redirect(url_for('admin_dashboard')) | |
| except Exception as e: | |
| flash(f'Error restoring configuration: {str(e)}', 'error') | |
| return redirect(url_for('admin_dashboard')) | |
| def export_audit_log(): | |
| """Export audit log in specified format""" | |
| format = request.args.get('format', 'csv') | |
| logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).all() | |
| if format == 'csv': | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(['Timestamp', 'User', 'Action', 'Details']) | |
| for log in logs: | |
| writer.writerow([ | |
| log.timestamp.strftime('%Y-%m-%d %H:%M:%S'), | |
| log.user.username, | |
| log.action, | |
| log.details | |
| ]) | |
| return Response( | |
| output.getvalue(), | |
| mimetype='text/csv', | |
| headers={'Content-Disposition': 'attachment; filename=audit_log.csv'} | |
| ) | |
| elif format == 'json': | |
| return jsonify([{ | |
| 'timestamp': log.timestamp.strftime('%Y-%m-%d %H:%M:%S'), | |
| 'user': log.user.username, | |
| 'action': log.action, | |
| 'details': log.details | |
| } for log in logs]) | |