JRNET / admin_routes.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
Admin routes and functionality for Outline VPN
"""
import os
import json
import psutil
import zipfile
from datetime import datetime
from flask import jsonify, request, send_file, flash, redirect, url_for
from flask_login import login_required, current_user
from . import app
from .models import User, UserRole, SystemHealth, AuditLog, Alert
from .services import backup_service, monitoring_service
def admin_required(f):
"""Decorator to require admin role for routes"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or current_user.role != UserRole.ADMIN:
flash('You need administrator privileges to access this page.')
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated_function
@app.route('/admin')
@login_required
@admin_required
def admin_dashboard():
"""Admin dashboard view"""
system_health = monitoring_service.get_system_health()
active_alerts = Alert.query.filter_by(status='active').order_by(Alert.created_at.desc()).all()
audit_logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).limit(50).all()
return render_template('admin.html',
system_health=system_health,
active_alerts=active_alerts,
audit_logs=audit_logs)
@app.route('/api/system-health')
@login_required
@admin_required
def get_system_health():
"""Get real-time system health metrics"""
return jsonify(monitoring_service.get_system_health())
@app.route('/api/update-server-config', methods=['POST'])
@login_required
@admin_required
def update_server_config():
"""Update server configuration"""
try:
config = request.get_json()
backup_service.backup_config('pre_update') # Create backup before updating
# Update configuration
current_config = ServerConfig.query.first()
for key, value in config.items():
setattr(current_config, key, value)
db.session.commit()
# Log the change
AuditLog.create(
user_id=current_user.id,
action='update_config',
details='Server configuration updated'
)
# Restart required services
monitoring_service.restart_services()
return jsonify({'status': 'success'})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
@app.route('/api/create-backup')
@login_required
@admin_required
def create_backup():
"""Create a backup of server configuration"""
try:
include_user_data = request.args.get('include_user_data', 'false') == 'true'
backup_path = backup_service.create_backup(include_user_data)
# Log the backup creation
AuditLog.create(
user_id=current_user.id,
action='create_backup',
details=f'Backup created: {os.path.basename(backup_path)}'
)
return send_file(
backup_path,
as_attachment=True,
download_name=f'outline_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip'
)
except Exception as e:
flash(f'Error creating backup: {str(e)}', 'error')
return redirect(url_for('admin_dashboard'))
@app.route('/api/restore-config', methods=['POST'])
@login_required
@admin_required
def restore_config():
"""Restore server configuration from backup"""
try:
if 'backup_file' not in request.files:
flash('No backup file provided', 'error')
return redirect(url_for('admin_dashboard'))
backup_file = request.files['backup_file']
if backup_file.filename == '':
flash('No backup file selected', 'error')
return redirect(url_for('admin_dashboard'))
# Create backup of current configuration
backup_service.backup_config('pre_restore')
# Restore from backup
backup_service.restore_from_backup(backup_file)
# Log the restore
AuditLog.create(
user_id=current_user.id,
action='restore_config',
details=f'Configuration restored from {backup_file.filename}'
)
flash('Configuration restored successfully', 'success')
return redirect(url_for('admin_dashboard'))
except Exception as e:
flash(f'Error restoring configuration: {str(e)}', 'error')
return redirect(url_for('admin_dashboard'))
@app.route('/api/export-audit-log')
@login_required
@admin_required
def export_audit_log():
"""Export audit log in specified format"""
format = request.args.get('format', 'csv')
logs = AuditLog.query.order_by(AuditLog.timestamp.desc()).all()
if format == 'csv':
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['Timestamp', 'User', 'Action', 'Details'])
for log in logs:
writer.writerow([
log.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
log.user.username,
log.action,
log.details
])
return Response(
output.getvalue(),
mimetype='text/csv',
headers={'Content-Disposition': 'attachment; filename=audit_log.csv'}
)
elif format == 'json':
return jsonify([{
'timestamp': log.timestamp.strftime('%Y-%m-%d %H:%M:%S'),
'user': log.user.username,
'action': log.action,
'details': log.details
} for log in logs])