""" Port Manager for VPN Server Handles port allocation, testing, and management """ import socket import asyncio import logging from typing import List, Optional, Tuple from contextlib import closing logger = logging.getLogger(__name__) class PortManager: # Common VPN ports to try DEFAULT_VPN_PORTS = [ 443, # HTTPS 8388, # Shadowsocks default 8443, # Alternative HTTPS 1194, # OpenVPN default 1984, # Outline default 8000, # Alternative 8080 # Alternative HTTP ] def __init__(self): self.bound_ports: List[int] = [] self.reserved_ports: List[int] = [] async def find_available_port(self, preferred_port: int, fallback_ports: List[int] = None, bind_address: str = '0.0.0.0') -> Tuple[int, bool]: """ Find an available port, starting with preferred_port. Returns: (port_number, is_preferred_port) """ # Try preferred port first if await self._test_port(preferred_port, bind_address): return preferred_port, True # Try fallback ports ports_to_try = (fallback_ports or []) + self.DEFAULT_VPN_PORTS for port in ports_to_try: if port != preferred_port and await self._test_port(port, bind_address): return port, False # If no predefined ports work, scan for any available port port = await self._scan_for_available_port(bind_address) return port, False async def _test_port(self, port: int, bind_address: str) -> bool: """Test if a port is available for both TCP and UDP""" if port in self.bound_ports or port in self.reserved_ports: return False try: # Test TCP tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_sock.bind((bind_address, port)) tcp_sock.close() # Test UDP udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) udp_sock.bind((bind_address, port)) udp_sock.close() return True except OSError: return False async def _scan_for_available_port(self, bind_address: str, start_port: int = 10000, end_port: int = 65535) -> int: """Scan for any available port in the given range""" for port in range(start_port, end_port): if await self._test_port(port, bind_address): return port raise RuntimeError("No available ports found") async def reserve_port(self, port: int): """Reserve a port for future use""" if port not in self.bound_ports and port not in self.reserved_ports: self.reserved_ports.append(port) async def release_port(self, port: int): """Release a reserved or bound port""" if port in self.bound_ports: self.bound_ports.remove(port) if port in self.reserved_ports: self.reserved_ports.remove(port) async def bind_port(self, port: int, bind_address: str = '0.0.0.0') -> bool: """ Attempt to bind to a port and mark it as bound if successful Returns True if binding was successful """ if await self._test_port(port, bind_address): self.bound_ports.append(port) if port in self.reserved_ports: self.reserved_ports.remove(port) return True return False def get_bound_ports(self) -> List[int]: """Get list of currently bound ports""" return self.bound_ports.copy() def get_reserved_ports(self) -> List[int]: """Get list of currently reserved ports""" return self.reserved_ports.copy()