""" AdaptiveAuth Admin Router Administrative endpoints for user and security management. """ from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session from sqlalchemy import func from datetime import datetime, timedelta from typing import Optional from ..core.database import get_db from ..core.dependencies import require_admin, get_current_user from ..models import ( User, UserSession, LoginAttempt, RiskEvent, AnomalyPattern, UserRole, SessionStatus, RiskLevel ) from ..risk.monitor import SessionMonitor, AnomalyDetector from .. import schemas router = APIRouter(prefix="/admin", tags=["Admin"]) @router.get("/email-status") async def email_status( current_user: User = Depends(require_admin()), ): """Check email service configuration status (admin only).""" from ..auth.email import get_email_service from ..config import get_settings svc = get_email_service() s = get_settings() return { "configured": svc.is_configured, "fields": { "MAIL_USERNAME": bool(s.MAIL_USERNAME), "MAIL_PASSWORD": bool(s.MAIL_PASSWORD), "MAIL_SERVER": bool(s.MAIL_SERVER), "MAIL_FROM": bool(s.MAIL_FROM), }, "mail_port": s.MAIL_PORT, "starttls": s.MAIL_STARTTLS, "env_prefix": "ADAPTIVEAUTH_", "setup_instructions": ( "Create a .env file in the project root with: " "ADAPTIVEAUTH_MAIL_USERNAME, ADAPTIVEAUTH_MAIL_PASSWORD, " "ADAPTIVEAUTH_MAIL_SERVER, ADAPTIVEAUTH_MAIL_FROM, " "ADAPTIVEAUTH_MAIL_PORT=587, ADAPTIVEAUTH_MAIL_STARTTLS=True" ) } @router.get("/users", response_model=schemas.AdminUserList) async def list_users( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), role: Optional[str] = None, is_active: Optional[bool] = None, current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """List all users (admin only).""" query = db.query(User) if role: query = query.filter(User.role == role) if is_active is not None: query = query.filter(User.is_active == is_active) total = query.count() users = query.offset((page - 1) * page_size).limit(page_size).all() return schemas.AdminUserList( users=[schemas.UserResponse.model_validate(u) for u in users], total=total, page=page, page_size=page_size ) @router.get("/users/{user_id}", response_model=schemas.UserResponse) async def get_user( user_id: int, current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Get user details (admin only).""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user @router.post("/users/{user_id}/block") async def block_user( user_id: int, reason: str = "Administrative action", duration_hours: Optional[int] = None, current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Block a user (admin only).""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_locked = True if duration_hours: user.locked_until = datetime.utcnow() + timedelta(hours=duration_hours) else: user.locked_until = None # Permanent # Revoke all sessions session_monitor = SessionMonitor(db) session_monitor.revoke_all_sessions(user) db.commit() return {"message": f"User {user.email} has been blocked"} @router.post("/users/{user_id}/unblock") async def unblock_user( user_id: int, current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Unblock a user (admin only).""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_locked = False user.locked_until = None user.failed_login_attempts = 0 db.commit() return {"message": f"User {user.email} has been unblocked"} @router.get("/sessions", response_model=schemas.SessionListResponse) async def list_sessions( status_filter: Optional[str] = None, risk_level: Optional[str] = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """List all active sessions (admin only).""" query = db.query(UserSession) if status_filter: query = query.filter(UserSession.status == status_filter) else: query = query.filter(UserSession.status == SessionStatus.ACTIVE.value) if risk_level: query = query.filter(UserSession.current_risk_level == risk_level) total = query.count() sessions = query.order_by( UserSession.last_activity.desc() ).offset((page - 1) * page_size).limit(page_size).all() session_list = [ schemas.SessionInfo( id=s.id, ip_address=s.ip_address, user_agent=s.user_agent or "", country=s.country, city=s.city, risk_level=s.current_risk_level, status=s.status, last_activity=s.last_activity, created_at=s.created_at ) for s in sessions ] return schemas.SessionListResponse(sessions=session_list, total=total) @router.post("/sessions/{session_id}/revoke") async def revoke_session( session_id: int, reason: str = "Administrative action", current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Revoke a specific session (admin only).""" session_monitor = SessionMonitor(db) session_monitor.revoke_session(session_id, reason) return {"message": "Session revoked"} @router.get("/risk-events", response_model=schemas.RiskEventList) async def list_risk_events( risk_level: Optional[str] = None, event_type: Optional[str] = None, user_id: Optional[int] = None, page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=100), current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """List risk events (admin only).""" query = db.query(RiskEvent) if risk_level: query = query.filter(RiskEvent.risk_level == risk_level) if event_type: query = query.filter(RiskEvent.event_type == event_type) if user_id: query = query.filter(RiskEvent.user_id == user_id) total = query.count() events = query.order_by( RiskEvent.created_at.desc() ).offset((page - 1) * page_size).limit(page_size).all() event_list = [ schemas.RiskEventResponse( id=e.id, event_type=e.event_type, risk_score=e.risk_score, risk_level=e.risk_level, ip_address=e.ip_address, risk_factors=e.risk_factors or {}, action_taken=e.action_taken, created_at=e.created_at, resolved=e.resolved ) for e in events ] return schemas.RiskEventList( events=event_list, total=total, page=page, page_size=page_size ) @router.get("/anomalies", response_model=schemas.AnomalyListResponse) async def list_anomalies( active_only: bool = True, current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """List detected anomaly patterns (admin only).""" anomaly_detector = AnomalyDetector(db) if active_only: anomalies = anomaly_detector.get_active_anomalies() else: anomalies = db.query(AnomalyPattern).order_by( AnomalyPattern.last_detected.desc() ).limit(100).all() anomaly_list = [ schemas.AnomalyPatternResponse( id=a.id, pattern_type=a.pattern_type, severity=a.severity, confidence=a.confidence, is_active=a.is_active, first_detected=a.first_detected, last_detected=a.last_detected, pattern_data=a.pattern_data or {} ) for a in anomalies ] return schemas.AnomalyListResponse(anomalies=anomaly_list, total=len(anomaly_list)) @router.post("/anomalies/{anomaly_id}/resolve") async def resolve_anomaly( anomaly_id: int, false_positive: bool = False, current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Resolve an anomaly pattern (admin only).""" anomaly_detector = AnomalyDetector(db) anomaly_detector.resolve_anomaly(anomaly_id, false_positive) return {"message": "Anomaly resolved"} @router.get("/statistics", response_model=schemas.AdminStatistics) async def get_statistics( current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Get admin dashboard statistics.""" today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) total_users = db.query(User).count() active_users = db.query(User).filter(User.is_active == True).count() blocked_users = db.query(User).filter(User.is_locked == True).count() active_sessions = db.query(UserSession).filter( UserSession.status == SessionStatus.ACTIVE.value ).count() high_risk_events = db.query(RiskEvent).filter( RiskEvent.created_at >= today, RiskEvent.risk_level.in_([RiskLevel.HIGH.value, RiskLevel.CRITICAL.value]) ).count() failed_logins = db.query(LoginAttempt).filter( LoginAttempt.attempted_at >= today, LoginAttempt.success == False ).count() new_users = db.query(User).filter( User.created_at >= today ).count() return schemas.AdminStatistics( total_users=total_users, active_users=active_users, blocked_users=blocked_users, active_sessions=active_sessions, high_risk_events_today=high_risk_events, failed_logins_today=failed_logins, new_users_today=new_users ) @router.get("/risk-statistics", response_model=schemas.RiskStatistics) async def get_risk_statistics( period: str = Query("day", pattern="^(day|week|month)$"), current_user: User = Depends(require_admin()), db: Session = Depends(get_db) ): """Get risk statistics for a period.""" if period == "day": since = datetime.utcnow() - timedelta(days=1) elif period == "week": since = datetime.utcnow() - timedelta(weeks=1) else: since = datetime.utcnow() - timedelta(days=30) # Login statistics total_logins = db.query(LoginAttempt).filter( LoginAttempt.attempted_at >= since ).count() successful_logins = db.query(LoginAttempt).filter( LoginAttempt.attempted_at >= since, LoginAttempt.success == True ).count() failed_logins = db.query(LoginAttempt).filter( LoginAttempt.attempted_at >= since, LoginAttempt.success == False ).count() # Risk distribution risk_distribution = {} for level in RiskLevel: count = db.query(LoginAttempt).filter( LoginAttempt.attempted_at >= since, LoginAttempt.risk_level == level.value ).count() risk_distribution[level.value] = count # Average risk score avg_score_result = db.query(func.avg(LoginAttempt.risk_score)).filter( LoginAttempt.attempted_at >= since ).scalar() avg_score = float(avg_score_result) if avg_score_result else 0.0 # Blocked attempts blocked = db.query(LoginAttempt).filter( LoginAttempt.attempted_at >= since, LoginAttempt.security_level >= 4 ).count() return schemas.RiskStatistics( period=period, total_logins=total_logins, successful_logins=successful_logins, failed_logins=failed_logins, blocked_attempts=blocked, average_risk_score=round(avg_score, 2), risk_distribution=risk_distribution )