| """
|
| Admin routes and functionality for Outline VPN
|
| """
|
| import os
|
| import json
|
| import psutil
|
| import zipfile
|
| from datetime import datetime
|
| from flask import jsonify, request, send_file, flash, redirect, url_for
|
| from flask_login import login_required, current_user
|
| from . import app
|
| from .models import User, UserRole, SystemHealth, AuditLog, Alert
|
| from .services import backup_service, monitoring_service
|
|
|
| def admin_required(f):
|
| """Decorator to require admin role for routes"""
|
| @wraps(f)
|
| def decorated_function(*args, **kwargs):
|
| if not current_user.is_authenticated or current_user.role != UserRole.ADMIN:
|
| flash('You need administrator privileges to access this page.')
|
| return redirect(url_for('dashboard'))
|
| return f(*args, **kwargs)
|
| return decorated_function
|
|
|
| @app.route('/admin')
|
| @login_required
|
| @admin_required
|
| def admin_dashboard():
|
| """Admin dashboard view"""
|
| system_health = monitoring_service.get_system_health()
|
| active_alerts = Alert.query.filter_by(status='active').order_by(Alert.created_at.desc()).all()
|
| audit_logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).limit(50).all()
|
|
|
| return render_template('admin.html',
|
| system_health=system_health,
|
| active_alerts=active_alerts,
|
| audit_logs=audit_logs)
|
|
|
| @app.route('/api/system-health')
|
| @login_required
|
| @admin_required
|
| def get_system_health():
|
| """Get real-time system health metrics"""
|
| return jsonify(monitoring_service.get_system_health())
|
|
|
| @app.route('/api/update-server-config', methods=['POST'])
|
| @login_required
|
| @admin_required
|
| def update_server_config():
|
| """Update server configuration"""
|
| try:
|
| config = request.get_json()
|
| backup_service.backup_config('pre_update')
|
|
|
|
|
| current_config = ServerConfig.query.first()
|
| for key, value in config.items():
|
| setattr(current_config, key, value)
|
|
|
| db.session.commit()
|
|
|
|
|
| AuditLog.create(
|
| user_id=current_user.id,
|
| action='update_config',
|
| details='Server configuration updated'
|
| )
|
|
|
|
|
| monitoring_service.restart_services()
|
|
|
| return jsonify({'status': 'success'})
|
| except Exception as e:
|
| return jsonify({'status': 'error', 'message': str(e)}), 500
|
|
|
| @app.route('/api/create-backup')
|
| @login_required
|
| @admin_required
|
| def create_backup():
|
| """Create a backup of server configuration"""
|
| try:
|
| include_user_data = request.args.get('include_user_data', 'false') == 'true'
|
| backup_path = backup_service.create_backup(include_user_data)
|
|
|
|
|
| AuditLog.create(
|
| user_id=current_user.id,
|
| action='create_backup',
|
| details=f'Backup created: {os.path.basename(backup_path)}'
|
| )
|
|
|
| return send_file(
|
| backup_path,
|
| as_attachment=True,
|
| download_name=f'outline_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip'
|
| )
|
| except Exception as e:
|
| flash(f'Error creating backup: {str(e)}', 'error')
|
| return redirect(url_for('admin_dashboard'))
|
|
|
| @app.route('/api/restore-config', methods=['POST'])
|
| @login_required
|
| @admin_required
|
| def restore_config():
|
| """Restore server configuration from backup"""
|
| try:
|
| if 'backup_file' not in request.files:
|
| flash('No backup file provided', 'error')
|
| return redirect(url_for('admin_dashboard'))
|
|
|
| backup_file = request.files['backup_file']
|
| if backup_file.filename == '':
|
| flash('No backup file selected', 'error')
|
| return redirect(url_for('admin_dashboard'))
|
|
|
|
|
| backup_service.backup_config('pre_restore')
|
|
|
|
|
| backup_service.restore_from_backup(backup_file)
|
|
|
|
|
| AuditLog.create(
|
| user_id=current_user.id,
|
| action='restore_config',
|
| details=f'Configuration restored from {backup_file.filename}'
|
| )
|
|
|
| flash('Configuration restored successfully', 'success')
|
| return redirect(url_for('admin_dashboard'))
|
| except Exception as e:
|
| flash(f'Error restoring configuration: {str(e)}', 'error')
|
| return redirect(url_for('admin_dashboard'))
|
|
|
| @app.route('/api/export-audit-log')
|
| @login_required
|
| @admin_required
|
| def export_audit_log():
|
| """Export audit log in specified format"""
|
| format = request.args.get('format', 'csv')
|
| logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).all()
|
|
|
| if format == 'csv':
|
| output = io.StringIO()
|
| writer = csv.writer(output)
|
| writer.writerow(['Timestamp', 'User', 'Action', 'Details'])
|
| for log in logs:
|
| writer.writerow([
|
| log.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
|
| log.user.username,
|
| log.action,
|
| log.details
|
| ])
|
|
|
| return Response(
|
| output.getvalue(),
|
| mimetype='text/csv',
|
| headers={'Content-Disposition': 'attachment; filename=audit_log.csv'}
|
| )
|
| elif format == 'json':
|
| return jsonify([{
|
| 'timestamp': log.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
|
| 'user': log.user.username,
|
| 'action': log.action,
|
| 'details': log.details
|
| } for log in logs])
|
|
|