File size: 4,134 Bytes
6a5b8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""

Port Manager for VPN Server

Handles port allocation, testing, and management

"""

import socket
import asyncio
import logging
from typing import List, Optional, Tuple
from contextlib import closing

logger = logging.getLogger(__name__)

class PortManager:
    # Common VPN ports to try
    DEFAULT_VPN_PORTS = [
        443,    # HTTPS
        8388,   # Shadowsocks default
        8443,   # Alternative HTTPS
        1194,   # OpenVPN default
        1984,   # Outline default
        8000,   # Alternative
        8080    # Alternative HTTP
    ]

    def __init__(self):
        self.bound_ports: List[int] = []
        self.reserved_ports: List[int] = []

    async def find_available_port(self, preferred_port: int,

                                fallback_ports: List[int] = None,

                                bind_address: str = '0.0.0.0') -> Tuple[int, bool]:
        """

        Find an available port, starting with preferred_port.

        Returns: (port_number, is_preferred_port)

        """
        # Try preferred port first
        if await self._test_port(preferred_port, bind_address):
            return preferred_port, True

        # Try fallback ports
        ports_to_try = (fallback_ports or []) + self.DEFAULT_VPN_PORTS
        for port in ports_to_try:
            if port != preferred_port and await self._test_port(port, bind_address):
                return port, False

        # If no predefined ports work, scan for any available port
        port = await self._scan_for_available_port(bind_address)
        return port, False

    async def _test_port(self, port: int, bind_address: str) -> bool:
        """Test if a port is available for both TCP and UDP"""
        if port in self.bound_ports or port in self.reserved_ports:
            return False

        try:
            # Test TCP
            tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            tcp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            tcp_sock.bind((bind_address, port))
            tcp_sock.close()

            # Test UDP
            udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            udp_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            udp_sock.bind((bind_address, port))
            udp_sock.close()

            return True

        except OSError:
            return False

    async def _scan_for_available_port(self, bind_address: str,

                                     start_port: int = 10000,

                                     end_port: int = 65535) -> int:
        """Scan for any available port in the given range"""
        for port in range(start_port, end_port):
            if await self._test_port(port, bind_address):
                return port
        raise RuntimeError("No available ports found")

    async def reserve_port(self, port: int):
        """Reserve a port for future use"""
        if port not in self.bound_ports and port not in self.reserved_ports:
            self.reserved_ports.append(port)

    async def release_port(self, port: int):
        """Release a reserved or bound port"""
        if port in self.bound_ports:
            self.bound_ports.remove(port)
        if port in self.reserved_ports:
            self.reserved_ports.remove(port)

    async def bind_port(self, port: int, bind_address: str = '0.0.0.0') -> bool:
        """

        Attempt to bind to a port and mark it as bound if successful

        Returns True if binding was successful

        """
        if await self._test_port(port, bind_address):
            self.bound_ports.append(port)
            if port in self.reserved_ports:
                self.reserved_ports.remove(port)
            return True
        return False

    def get_bound_ports(self) -> List[int]:
        """Get list of currently bound ports"""
        return self.bound_ports.copy()

    def get_reserved_ports(self) -> List[int]:
        """Get list of currently reserved ports"""
        return self.reserved_ports.copy()