| """
|
| PPTP Server Implementation
|
| Handles PPTP tunneling and packet forwarding
|
| """
|
|
|
| import asyncio
|
| import socket
|
| import struct
|
| from typing import Dict, Optional, Tuple
|
| from dataclasses import dataclass
|
| import os
|
| import hmac
|
| import hashlib
|
| from .ip_parser import IPv4Header, IPParser
|
| from .logger import Logger, LogCategory
|
|
|
| @dataclass
|
| class PPTPSession:
|
| call_id: int
|
| peer_call_id: int
|
| client_ip: str
|
| assigned_ip: str
|
| created_at: float
|
| last_seen: float
|
| bytes_in: int = 0
|
| bytes_out: int = 0
|
|
|
| class PPTPServer:
|
| """PPTP server implementation"""
|
|
|
| def __init__(self, logger: Logger, ip_pool_start: str = "10.20.0.2"):
|
| self.logger = logger
|
| self.sessions: Dict[int, PPTPSession] = {}
|
| self.next_call_id = 1
|
| self.next_ip = ip_pool_start
|
| self._running = False
|
| self._transport = None
|
|
|
| async def start(self, host: str = "0.0.0.0", port: int = 1723):
|
| """Start PPTP server"""
|
| loop = asyncio.get_running_loop()
|
| self._transport, _ = await loop.create_server(
|
| lambda: PPTPProtocol(self),
|
| host, port
|
| )
|
| self._running = True
|
| self.logger.info(LogCategory.SYSTEM, "pptp_server", f"PPTP server started on {host}:{port}")
|
|
|
| async def stop(self):
|
| """Stop PPTP server"""
|
| if self._transport:
|
| self._transport.close()
|
| self._running = False
|
| self.logger.info(LogCategory.SYSTEM, "pptp_server", "PPTP server stopped")
|
|
|
| def allocate_ip(self) -> str:
|
| """Allocate next available IP from pool"""
|
| allocated = self.next_ip
|
|
|
| last_octet = int(self.next_ip.split('.')[-1])
|
| if last_octet >= 254:
|
| raise ValueError("IP pool exhausted")
|
| self.next_ip = f"10.20.0.{last_octet + 1}"
|
| return allocated
|
|
|
| async def handle_packet(self, data: bytes, addr: Tuple[str, int]):
|
| """Handle incoming PPTP packet"""
|
| try:
|
| if len(data) < 8:
|
| return
|
|
|
|
|
| message_type = struct.unpack('!H', data[2:4])[0]
|
|
|
| if message_type == 1:
|
| await self._handle_control(data, addr)
|
| else:
|
| await self._handle_gre(data, addr)
|
|
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling packet: {e}")
|
|
|
| async def _handle_control(self, data: bytes, addr: Tuple[str, int]):
|
| """Handle PPTP control message"""
|
| control_type = struct.unpack('!H', data[8:10])[0]
|
|
|
| if control_type == 1:
|
|
|
| reply = self._build_start_control_reply()
|
| await self._send_control(reply, addr)
|
|
|
| elif control_type == 7:
|
| call_id = struct.unpack('!H', data[12:14])[0]
|
|
|
|
|
| session = PPTPSession(
|
| call_id=call_id,
|
| peer_call_id=self.next_call_id,
|
| client_ip=addr[0],
|
| assigned_ip=self.allocate_ip(),
|
| created_at=asyncio.get_running_loop().time(),
|
| last_seen=asyncio.get_running_loop().time()
|
| )
|
| self.sessions[call_id] = session
|
| self.next_call_id += 1
|
|
|
|
|
| reply = self._build_outgoing_call_reply(session)
|
| await self._send_control(reply, addr)
|
|
|
| async def _handle_gre(self, data: bytes, addr: Tuple[str, int]):
|
| """Handle GRE encapsulated data"""
|
| try:
|
| if len(data) < 12:
|
| return
|
|
|
|
|
| flags = struct.unpack('!H', data[0:2])[0]
|
| protocol = struct.unpack('!H', data[2:4])[0]
|
| payload_len = struct.unpack('!H', data[4:6])[0]
|
| call_id = struct.unpack('!H', data[6:8])[0]
|
|
|
| if call_id not in self.sessions:
|
| return
|
|
|
| session = self.sessions[call_id]
|
| session.last_seen = asyncio.get_running_loop().time()
|
|
|
|
|
| if protocol == 0x880B:
|
| await self._handle_ppp(data[8:], session)
|
|
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling GRE packet: {e}")
|
|
|
| async def _handle_ppp(self, data: bytes, session: PPTPSession):
|
| """Handle PPP payload"""
|
| try:
|
| if len(data) < 4:
|
| return
|
|
|
| protocol = struct.unpack('!H', data[2:4])[0]
|
|
|
| if protocol == 0x0021:
|
| ip_packet = data[4:]
|
| await self._handle_ip_packet(ip_packet, session)
|
|
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling PPP packet: {e}")
|
|
|
| async def _handle_ip_packet(self, data: bytes, session: PPTPSession):
|
| """Handle IP packet inside PPP frame"""
|
| try:
|
|
|
| ip_header = IPParser.parse_ipv4_header(data)
|
|
|
|
|
| session.bytes_in += len(data)
|
|
|
|
|
| if ip_header.protocol == socket.IPPROTO_TCP:
|
| await self._forward_tcp(data, session)
|
| elif ip_header.protocol == socket.IPPROTO_UDP:
|
| await self._forward_udp(data, session)
|
|
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "pptp_server", f"Error handling IP packet: {e}")
|
|
|
| async def _forward_tcp(self, data: bytes, session: PPTPSession):
|
| """Forward TCP packet"""
|
|
|
|
|
| pass
|
|
|
| async def _forward_udp(self, data: bytes, session: PPTPSession):
|
| """Forward UDP packet"""
|
|
|
|
|
| pass
|
|
|