| """
|
| Port Manager for VPN Server
|
| Handles port allocation, testing, and management
|
| """
|
|
|
| import socket
|
| import asyncio
|
| import logging
|
| from typing import List, Optional, Tuple
|
| from contextlib import closing
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class PortManager:
|
|
|
| DEFAULT_VPN_PORTS = [
|
| 443,
|
| 8388,
|
| 8443,
|
| 1194,
|
| 1984,
|
| 8000,
|
| 8080
|
| ]
|
|
|
| def __init__(self):
|
| self.bound_ports: List[int] = []
|
| self.reserved_ports: List[int] = []
|
|
|
| async def find_available_port(self, preferred_port: int,
|
| fallback_ports: List[int] = None,
|
| bind_address: str = '0.0.0.0') -> Tuple[int, bool]:
|
| """
|
| Find an available port, starting with preferred_port.
|
| Returns: (port_number, is_preferred_port)
|
| """
|
|
|
| if await self._test_port(preferred_port, bind_address):
|
| return preferred_port, True
|
|
|
|
|
| ports_to_try = (fallback_ports or []) + self.DEFAULT_VPN_PORTS
|
| for port in ports_to_try:
|
| if port != preferred_port and await self._test_port(port, bind_address):
|
| return port, False
|
|
|
|
|
| port = await self._scan_for_available_port(bind_address)
|
| return port, False
|
|
|
| async def _test_port(self, port: int, bind_address: str) -> bool:
|
| """Test if a port is available for both TCP and UDP"""
|
| if port in self.bound_ports or port in self.reserved_ports:
|
| return False
|
|
|
| try:
|
|
|
| tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
| tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
| tcp_sock.bind((bind_address, port))
|
| tcp_sock.close()
|
|
|
|
|
| udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
| udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
| udp_sock.bind((bind_address, port))
|
| udp_sock.close()
|
|
|
| return True
|
|
|
| except OSError:
|
| return False
|
|
|
| async def _scan_for_available_port(self, bind_address: str,
|
| start_port: int = 10000,
|
| end_port: int = 65535) -> int:
|
| """Scan for any available port in the given range"""
|
| for port in range(start_port, end_port):
|
| if await self._test_port(port, bind_address):
|
| return port
|
| raise RuntimeError("No available ports found")
|
|
|
| async def reserve_port(self, port: int):
|
| """Reserve a port for future use"""
|
| if port not in self.bound_ports and port not in self.reserved_ports:
|
| self.reserved_ports.append(port)
|
|
|
| async def release_port(self, port: int):
|
| """Release a reserved or bound port"""
|
| if port in self.bound_ports:
|
| self.bound_ports.remove(port)
|
| if port in self.reserved_ports:
|
| self.reserved_ports.remove(port)
|
|
|
| async def bind_port(self, port: int, bind_address: str = '0.0.0.0') -> bool:
|
| """
|
| Attempt to bind to a port and mark it as bound if successful
|
| Returns True if binding was successful
|
| """
|
| if await self._test_port(port, bind_address):
|
| self.bound_ports.append(port)
|
| if port in self.reserved_ports:
|
| self.reserved_ports.remove(port)
|
| return True
|
| return False
|
|
|
| def get_bound_ports(self) -> List[int]:
|
| """Get list of currently bound ports"""
|
| return self.bound_ports.copy()
|
|
|
| def get_reserved_ports(self) -> List[int]:
|
| """Get list of currently reserved ports"""
|
| return self.reserved_ports.copy()
|
|
|