Spaces:
Runtime error
Runtime error
| """ | |
| Admin routes and functionality for VPN Server | |
| """ | |
| from fastapi import APIRouter, Depends, HTTPException, status, Request, File, UploadFile | |
| from fastapi.responses import JSONResponse, FileResponse, StreamingResponse | |
| from fastapi.security import OAuth2PasswordBearer | |
| from sqlalchemy.orm import Session | |
| from typing import List, Optional | |
| from datetime import datetime | |
| import os | |
| from models.user import User, UserRole | |
| from models.system import SystemHealth, AuditLog, Alert, ServerConfig | |
| from services import backup_service, monitoring_service | |
| from core.auth import get_current_active_user, get_current_admin_user | |
| from core.database import get_db | |
| from schemas.admin import ( | |
| SystemHealthResponse, | |
| ServerConfigUpdate, | |
| AlertResponse, | |
| AuditLogResponse | |
| ) | |
| router = APIRouter( | |
| prefix="/admin", | |
| tags=["admin"], | |
| responses={404: {"description": "Not found"}}, | |
| ) | |
| async def admin_dashboard( | |
| current_user: User = Depends(get_current_admin_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Admin dashboard data""" | |
| system_health = await monitoring_service.get_system_health() | |
| active_alerts = db.query(Alert).filter(Alert.status == 'active').order_by(Alert.created_at.desc()).all() | |
| audit_logs = db.query(AuditLog).order_by(AuditLog.timestamp.desc()).limit(50).all() | |
| return { | |
| "system_health": system_health, | |
| "active_alerts": active_alerts, | |
| "audit_logs": audit_logs | |
| } | |
| async def get_system_health( | |
| current_user: User = Depends(get_current_admin_user) | |
| ): | |
| """Get real-time system health metrics""" | |
| return await monitoring_service.get_system_health() | |
| async def update_server_config( | |
| config: ServerConfigUpdate, | |
| current_user: User = Depends(get_current_admin_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Update server configuration""" | |
| try: | |
| # Create backup before updating | |
| await backup_service.backup_config('pre_update') | |
| # Update configuration | |
| current_config = db.query(ServerConfig).first() | |
| for key, value in config.dict(exclude_unset=True).items(): | |
| setattr(current_config, key, value) | |
| db.commit() | |
| # Log the change | |
| audit_log = AuditLog( | |
| user_id=current_user.id, | |
| action='update_config', | |
| details='Server configuration updated' | |
| ) | |
| db.add(audit_log) | |
| db.commit() | |
| # Restart required services | |
| await monitoring_service.restart_services() | |
| return {"status": "success"} | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| async def create_backup( | |
| include_user_data: bool = False, | |
| current_user: User = Depends(get_current_admin_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Create a backup of server configuration""" | |
| try: | |
| backup_path = await backup_service.create_backup(include_user_data) | |
| # Log the backup creation | |
| audit_log = AuditLog( | |
| user_id=current_user.id, | |
| action='create_backup', | |
| details=f'Backup created: {os.path.basename(backup_path)}' | |
| ) | |
| db.add(audit_log) | |
| db.commit() | |
| filename = f'outline_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.zip' | |
| return FileResponse( | |
| path=backup_path, | |
| filename=filename, | |
| media_type='application/zip' | |
| ) | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=str(e) | |
| ) | |
| async def get_audit_logs( | |
| limit: int = 50, | |
| current_user: User = Depends(get_current_admin_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get audit logs""" | |
| logs = db.query(AuditLog).order_by(AuditLog.timestamp.desc()).limit(limit).all() | |
| return logs | |
| async def get_alerts( | |
| status: Optional[str] = None, | |
| current_user: User = Depends(get_current_admin_user), | |
| db: Session = Depends(get_db) | |
| ): | |
| """Get system alerts""" | |
| query = db.query(Alert) | |
| if status: | |
| query = query.filter(Alert.status == status) | |
| alerts = query.order_by(Alert.created_at.desc()).all() | |
| return alerts | |