""" PPTP Server Implementation Handles PPTP tunneling and packet forwarding """ import asyncio import socket import struct from typing import Dict, Optional, Tuple from dataclasses import dataclass import os import hmac import hashlib from .ip_parser import IPv4Header, IPParser from .logger import Logger, LogCategory @dataclass class PPTPSession: call_id: int peer_call_id: int client_ip: str assigned_ip: str created_at: float last_seen: float bytes_in: int = 0 bytes_out: int = 0 class PPTPServer: """PPTP server implementation""" def __init__(self, logger: Logger, ip_pool_start: str = "10.20.0.2"): self.logger = logger self.sessions: Dict[int, PPTPSession] = {} # call_id -> session self.next_call_id = 1 self.next_ip = ip_pool_start self._running = False self._transport = None async def start(self, host: str = "0.0.0.0", port: int = 1723): """Start PPTP server""" loop = asyncio.get_running_loop() self._transport, _ = await loop.create_server( lambda: PPTPProtocol(self), host, port ) self._running = True self.logger.info(LogCategory.SYSTEM, "pptp_server", f"PPTP server started on {host}:{port}") async def stop(self): """Stop PPTP server""" if self._transport: self._transport.close() self._running = False self.logger.info(LogCategory.SYSTEM, "pptp_server", "PPTP server stopped") def allocate_ip(self) -> str: """Allocate next available IP from pool""" allocated = self.next_ip # Increment last octet, handling rollover last_octet = int(self.next_ip.split('.')[-1]) if last_octet >= 254: raise ValueError("IP pool exhausted") self.next_ip = f"10.20.0.{last_octet + 1}" return allocated async def handle_packet(self, data: bytes, addr: Tuple[str, int]): """Handle incoming PPTP packet""" try: if len(data) < 8: return # Parse PPTP header message_type = struct.unpack('!H', data[2:4])[0] if message_type == 1: # Control Message await self._handle_control(data, addr) else: # Data Message (GRE) await self._handle_gre(data, addr) except Exception as e: self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling packet: {e}") async def _handle_control(self, data: bytes, addr: Tuple[str, int]): """Handle PPTP control message""" control_type = struct.unpack('!H', data[8:10])[0] if control_type == 1: # Start-Control-Connection-Request # Send Start-Control-Connection-Reply reply = self._build_start_control_reply() await self._send_control(reply, addr) elif control_type == 7: # Outgoing-Call-Request call_id = struct.unpack('!H', data[12:14])[0] # Create new session session = PPTPSession( call_id=call_id, peer_call_id=self.next_call_id, client_ip=addr[0], assigned_ip=self.allocate_ip(), created_at=asyncio.get_running_loop().time(), last_seen=asyncio.get_running_loop().time() ) self.sessions[call_id] = session self.next_call_id += 1 # Send Outgoing-Call-Reply reply = self._build_outgoing_call_reply(session) await self._send_control(reply, addr) async def _handle_gre(self, data: bytes, addr: Tuple[str, int]): """Handle GRE encapsulated data""" try: if len(data) < 12: # GRE v1 header size return # Parse GRE header flags = struct.unpack('!H', data[0:2])[0] protocol = struct.unpack('!H', data[2:4])[0] payload_len = struct.unpack('!H', data[4:6])[0] call_id = struct.unpack('!H', data[6:8])[0] if call_id not in self.sessions: return session = self.sessions[call_id] session.last_seen = asyncio.get_running_loop().time() # Handle PPP payload if protocol == 0x880B: # PPP await self._handle_ppp(data[8:], session) except Exception as e: self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling GRE packet: {e}") async def _handle_ppp(self, data: bytes, session: PPTPSession): """Handle PPP payload""" try: if len(data) < 4: return protocol = struct.unpack('!H', data[2:4])[0] if protocol == 0x0021: # IP ip_packet = data[4:] await self._handle_ip_packet(ip_packet, session) except Exception as e: self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling PPP packet: {e}") async def _handle_ip_packet(self, data: bytes, session: PPTPSession): """Handle IP packet inside PPP frame""" try: # Parse IP header ip_header = IPParser.parse_ipv4_header(data) # Update statistics session.bytes_in += len(data) # Forward packet to destination if ip_header.protocol == socket.IPPROTO_TCP: await self._forward_tcp(data, session) elif ip_header.protocol == socket.IPPROTO_UDP: await self._forward_udp(data, session) except Exception as e: self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling IP packet: {e}") async def _forward_tcp(self, data: bytes, session: PPTPSession): """Forward TCP packet""" # This will be handled by the TCP forwarding engine # Just a placeholder for now pass async def _forward_udp(self, data: bytes, session: PPTPSession): """Forward UDP packet""" # This will be handled by the UDP forwarding engine # Just a placeholder for now pass