Spaces:
Runtime error
Runtime error
| """ | |
| L2TP/IPsec Server Implementation | |
| Handles L2TP tunneling with IPsec encryption | |
| """ | |
| import asyncio | |
| import socket | |
| import struct | |
| from typing import Dict, Optional, Tuple | |
| from dataclasses import dataclass | |
| import os | |
| import hmac | |
| import hashlib | |
| from .ip_parser import IPv4Header, IPParser | |
| from .logger import Logger, LogCategory | |
| class L2TPSession: | |
| tunnel_id: int | |
| session_id: int | |
| client_ip: str | |
| assigned_ip: str | |
| created_at: float | |
| last_seen: float | |
| bytes_in: int = 0 | |
| bytes_out: int = 0 | |
| class L2TPServer: | |
| """L2TP/IPsec server implementation""" | |
| def __init__(self, logger: Logger, ip_pool_start: str = "10.10.0.2"): | |
| self.logger = logger | |
| self.sessions: Dict[Tuple[int, int], L2TPSession] = {} # (tunnel_id, session_id) -> session | |
| self.next_tunnel_id = 1 | |
| self.next_session_id = 1 | |
| self.next_ip = ip_pool_start | |
| self._running = False | |
| self._transport = None | |
| self._ipsec = IPSecHandler(logger) | |
| async def start(self, host: str = "0.0.0.0", port: int = 1701): | |
| """Start L2TP server""" | |
| loop = asyncio.get_running_loop() | |
| self._transport, _ = await loop.create_datagram_endpoint( | |
| lambda: L2TPProtocol(self), | |
| local_addr=(host, port) | |
| ) | |
| self._running = True | |
| self.logger.info(LogCategory.SYSTEM, "l2tp_server", f"L2TP server started on {host}:{port}") | |
| async def stop(self): | |
| """Stop L2TP server""" | |
| if self._transport: | |
| self._transport.close() | |
| self._running = False | |
| self.logger.info(LogCategory.SYSTEM, "l2tp_server", "L2TP server stopped") | |
| def allocate_ip(self) -> str: | |
| """Allocate next available IP from pool""" | |
| allocated = self.next_ip | |
| # Increment last octet, handling rollover | |
| last_octet = int(self.next_ip.split('.')[-1]) | |
| if last_octet >= 254: | |
| raise ValueError("IP pool exhausted") | |
| self.next_ip = f"10.10.0.{last_octet + 1}" | |
| return allocated | |
| async def handle_packet(self, data: bytes, addr: Tuple[str, int]): | |
| """Handle incoming L2TP packet""" | |
| try: | |
| # Decrypt IPsec if present | |
| if self._ipsec.is_ipsec_packet(data): | |
| data = self._ipsec.decrypt_packet(data) | |
| if not data: | |
| return | |
| # Parse L2TP header | |
| if len(data) < 6: | |
| return | |
| flags = struct.unpack('!H', data[0:2])[0] | |
| tunnel_id = struct.unpack('!H', data[2:4])[0] | |
| session_id = struct.unpack('!H', data[4:6])[0] | |
| # Handle control messages | |
| if flags & 0x8000: # Control message | |
| await self._handle_control(data[6:], tunnel_id, session_id, addr) | |
| else: # Data message | |
| await self._handle_data(data[6:], tunnel_id, session_id) | |
| except Exception as e: | |
| self.logger.error(LogCategory.SYSTEM, "l2tp_server", f"Error handling packet: {e}") | |
| async def _handle_control(self, data: bytes, tunnel_id: int, session_id: int, addr: Tuple[str, int]): | |
| """Handle L2TP control message""" | |
| msg_type = struct.unpack('!H', data[0:2])[0] | |
| if msg_type == 1: # SCCRQ - Start-Control-Connection-Request | |
| # Send SCCRP - Start-Control-Connection-Reply | |
| reply = self._build_sccrp(tunnel_id) | |
| await self._send_control(reply, addr) | |
| elif msg_type == 3: # ICRQ - Incoming-Call-Request | |
| # Create new session | |
| session = L2TPSession( | |
| tunnel_id=tunnel_id, | |
| session_id=self.next_session_id, | |
| client_ip=addr[0], | |
| assigned_ip=self.allocate_ip(), | |
| created_at=asyncio.get_running_loop().time(), | |
| last_seen=asyncio.get_running_loop().time() | |
| ) | |
| self.sessions[(tunnel_id, session_id)] = session | |
| self.next_session_id += 1 | |
| # Send ICRP - Incoming-Call-Reply | |
| reply = self._build_icrp(tunnel_id, session_id) | |
| await self._send_control(reply, addr) | |
| async def _handle_data(self, data: bytes, tunnel_id: int, session_id: int): | |
| """Handle L2TP data message""" | |
| session_key = (tunnel_id, session_id) | |
| if session_key not in self.sessions: | |
| return | |
| session = self.sessions[session_key] | |
| session.last_seen = asyncio.get_running_loop().time() | |
| # Parse PPP frame | |
| if len(data) < 4: | |
| return | |
| ppp_protocol = struct.unpack('!H', data[2:4])[0] | |
| if ppp_protocol == 0x0021: # IP | |
| ip_packet = data[4:] | |
| await self._handle_ip_packet(ip_packet, session) | |
| async def _handle_ip_packet(self, data: bytes, session: L2TPSession): | |
| """Handle IP packet inside PPP frame""" | |
| try: | |
| # Parse IP header | |
| ip_header = IPParser.parse_ipv4_header(data) | |
| # Update statistics | |
| session.bytes_in += len(data) | |
| # Forward packet to destination | |
| if ip_header.protocol == socket.IPPROTO_TCP: | |
| await self._forward_tcp(data, session) | |
| elif ip_header.protocol == socket.IPPROTO_UDP: | |
| await self._forward_udp(data, session) | |
| except Exception as e: | |
| self.logger.error(LogCategory.SYSTEM, "l2tp_server", f"Error handling IP packet: {e}") | |
| async def _forward_tcp(self, data: bytes, session: L2TPSession): | |
| """Forward TCP packet""" | |
| # This will be handled by the TCP forwarding engine | |
| # Just a placeholder for now | |
| pass | |
| async def _forward_udp(self, data: bytes, session: L2TPSession): | |
| """Forward UDP packet""" | |
| # This will be handled by the UDP forwarding engine | |
| # Just a placeholder for now | |
| pass | |
| class IPSecHandler: | |
| """Handles IPsec encryption/decryption""" | |
| def __init__(self, logger: Logger): | |
| self.logger = logger | |
| self.security_associations: Dict[str, Dict] = {} | |
| def is_ipsec_packet(self, data: bytes) -> bool: | |
| """Check if packet is IPsec encrypted""" | |
| if len(data) < 8: | |
| return False | |
| # Check for ESP or AH protocol | |
| return data[0] == 50 or data[0] == 51 | |
| def decrypt_packet(self, data: bytes) -> Optional[bytes]: | |
| """Decrypt IPsec packet""" | |
| try: | |
| # Implementation depends on the encryption method | |
| # This is a placeholder for actual decryption | |
| return data[8:] # Skip ESP header for now | |
| except Exception as e: | |
| self.logger.error(LogCategory.SYSTEM, "ipsec_handler", f"Decryption error: {e}") | |
| return None | |
| def encrypt_packet(self, data: bytes, sa_id: str) -> bytes: | |
| """Encrypt packet using IPsec""" | |
| try: | |
| # Implementation depends on the encryption method | |
| # This is a placeholder for actual encryption | |
| esp_header = struct.pack('!II', 0, 0) # SPI and Sequence Number | |
| return esp_header + data | |
| except Exception as e: | |
| self.logger.error(LogCategory.SYSTEM, "ipsec_handler", f"Encryption error: {e}") | |
| return data | |