File size: 4,946 Bytes
aaaaa79
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Traffic Router Module

Handles routing of all client traffic through VPN for free data access using async TCP sockets.
Optimized for performance and scalability.
"""


import asyncio
import socket
import logging
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)


class TrafficRouter:
    """Manages traffic routing for VPN clients using async TCP sockets"""

    def __init__(self, config: Dict[str, Any], nat_engine: Any = None):
        self.config = config
        self.is_running = False
        self.vpn_host = self.config.get("vpn_host", "127.0.0.1")
        self.vpn_port = self.config.get("vpn_port", 9000)
        self.internet_host = self.config.get("internet_host", "0.0.0.0")
        self.internet_port = self.config.get("internet_port", 9001)
        self.nat_engine = nat_engine
        self.loop = None
        self.vpn_server = None
        self.internet_server = None
        self.connections = set()
        self.stats = {
            "total_connections": 0,
            "active_connections": 0,
            "bytes_forwarded": 0,
            "errors": 0
        }

    async def start(self):
        """Start the traffic router using asyncio TCP servers"""
        if self.is_running:
            logger.warning("Traffic Router is already running")
            return True

        self.is_running = True
        self.loop = asyncio.get_event_loop()
        self.vpn_server = await asyncio.start_server(
            lambda r, w: self._handle_connection(r, w, "VPN"),
            self.vpn_host, self.vpn_port)
        self.internet_server = await asyncio.start_server(
            lambda r, w: self._handle_connection(r, w, "Internet"),
            self.internet_host, self.internet_port)

        logger.info(f"Traffic Router started on TCP endpoints: {self.vpn_host}:{self.vpn_port} and {self.internet_host}:{self.internet_port}")
        return True

    async def stop(self):
        """Stop the traffic router"""
        logger.info("Stopping Traffic Router...")
        self.is_running = False
        if self.vpn_server:
            self.vpn_server.close()
            await self.vpn_server.wait_closed()
        if self.internet_server:
            self.internet_server.close()
            await self.internet_server.wait_closed()
        for conn in list(self.connections):
            conn.close()
        logger.info("Traffic Router stopped")
        return True

    async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, source_name: str):
        """Handle a new connection and forward data asynchronously."""
        peer = writer.get_extra_info("peername")
        logger.info(f"Accepted connection from {peer} on {source_name}")
        self.connections.add(writer)
        try:
            while self.is_running:
                data = await reader.read(4096)
                if not data:
                    break
                processed_data = None
                if self.nat_engine:
                    if source_name == "VPN":
                        processed_data = self.nat_engine.process_outbound_packet(data)
                    elif source_name == "Internet":
                        processed_data = self.nat_engine.process_inbound_packet(data)
                if processed_data:
                    await self._forward_data(processed_data, source_name)
                elif not self.nat_engine:
                    await self._forward_data(data, source_name)
                self.stats["bytes_forwarded"] += len(data)
        except Exception as e:
            self.stats["errors"] += 1
            logger.error(f"Error in {source_name} connection: {e}")
        finally:
            writer.close()
            await writer.wait_closed()
            self.connections.discard(writer)

    async def _forward_data(self, data: bytes, source_name: str):
        """Forward data to the opposite endpoint."""
        # This is a placeholder for actual forwarding logic.
        # You may want to implement connection pooling or load balancing here.
        # For demo, just log the forwarding event.
        logger.debug(f"Forwarded {len(data)} bytes from {source_name}")

    def get_stats(self) -> Dict[str, Any]:
        """Get traffic router statistics"""
        return {
            "is_running": self.is_running,
            "vpn_host": self.vpn_host,
            "vpn_port": self.vpn_port,
            "internet_host": self.internet_host,
            "internet_port": self.internet_port,
            "total_connections": self.stats["total_connections"],
            "active_connections": len(self.connections),
            "bytes_forwarded": self.stats["bytes_forwarded"],
            "errors": self.stats["errors"]
        }






    def set_components(self, nat_engine: Any = None):
        """Set references to other components for inter-operation."""
        if nat_engine:
            self.nat_engine = nat_engine