| """
|
| VPN Authentication Manager
|
| Handles authentication across different VPN protocols
|
| """
|
| from typing import Optional, Tuple
|
| from sqlalchemy.orm import Session
|
| from .database import SessionLocal
|
| from .services.user_service import UserService
|
| from .models.user import User, UserStatus
|
|
|
| class VPNAuthManager:
|
| def __init__(self):
|
| self._db = SessionLocal()
|
| self._user_service = UserService(self._db)
|
|
|
| async def authenticate(self, username: str, password: str, protocol: str,
|
| ip_address: str, device_info: str = None) -> Tuple[bool, str, Optional[str]]:
|
| """
|
| Authenticate user for VPN access
|
| Returns: (success, message, session_token)
|
| """
|
|
|
| success, message, user = self._user_service.authenticate_user(username, password)
|
| if not success:
|
| return False, message, None
|
|
|
|
|
| session = self._user_service.create_session(user, ip_address, device_info)
|
|
|
| return True, "Authentication successful", session.token
|
|
|
| async def validate_session(self, token: str) -> Tuple[bool, str, Optional[User]]:
|
| """
|
| Validate a session token
|
| Returns: (success, message, user)
|
| """
|
| success, message, session = self._user_service.validate_session(token)
|
| if not success:
|
| return False, message, None
|
|
|
|
|
| user = self._db.query(User).filter(User.id == session.user_id).first()
|
| if not user or user.status != UserStatus.ACTIVE:
|
| return False, "User inactive or not found", None
|
|
|
| return True, "Session valid", user
|
|
|
| def close(self):
|
| """Close database connection"""
|
| self._db.close()
|
|
|