| """
|
| L2TP/IPsec Server Implementation
|
| Handles L2TP tunneling with IPsec encryption
|
| """
|
|
|
| import asyncio
|
| import socket
|
| import struct
|
| from typing import Dict, Optional, Tuple
|
| from dataclasses import dataclass
|
| import os
|
| import hmac
|
| import hashlib
|
| from .ip_parser import IPv4Header, IPParser
|
| from .logger import Logger, LogCategory
|
|
|
| @dataclass
|
| class L2TPSession:
|
| tunnel_id: int
|
| session_id: int
|
| client_ip: str
|
| assigned_ip: str
|
| created_at: float
|
| last_seen: float
|
| bytes_in: int = 0
|
| bytes_out: int = 0
|
|
|
| class L2TPServer:
|
| """L2TP/IPsec server implementation"""
|
|
|
| def __init__(self, logger: Logger, ip_pool_start: str = "10.10.0.2"):
|
| self.logger = logger
|
| self.sessions: Dict[Tuple[int, int], L2TPSession] = {}
|
| self.next_tunnel_id = 1
|
| self.next_session_id = 1
|
| self.next_ip = ip_pool_start
|
| self._running = False
|
| self._transport = None
|
| self._ipsec = IPSecHandler(logger)
|
|
|
| async def start(self, host: str = "0.0.0.0", port: int = 1701):
|
| """Start L2TP server"""
|
| loop = asyncio.get_running_loop()
|
| self._transport, _ = await loop.create_datagram_endpoint(
|
| lambda: L2TPProtocol(self),
|
| local_addr=(host, port)
|
| )
|
| self._running = True
|
| self.logger.info(LogCategory.SYSTEM, "l2tp_server", f"L2TP server started on {host}:{port}")
|
|
|
| async def stop(self):
|
| """Stop L2TP server"""
|
| if self._transport:
|
| self._transport.close()
|
| self._running = False
|
| self.logger.info(LogCategory.SYSTEM, "l2tp_server", "L2TP server stopped")
|
|
|
| def allocate_ip(self) -> str:
|
| """Allocate next available IP from pool"""
|
| allocated = self.next_ip
|
|
|
| last_octet = int(self.next_ip.split('.')[-1])
|
| if last_octet >= 254:
|
| raise ValueError("IP pool exhausted")
|
| self.next_ip = f"10.10.0.{last_octet + 1}"
|
| return allocated
|
|
|
| async def handle_packet(self, data: bytes, addr: Tuple[str, int]):
|
| """Handle incoming L2TP packet"""
|
| try:
|
|
|
| if self._ipsec.is_ipsec_packet(data):
|
| data = self._ipsec.decrypt_packet(data)
|
| if not data:
|
| return
|
|
|
|
|
| if len(data) < 6:
|
| return
|
|
|
| flags = struct.unpack('!H', data[0:2])[0]
|
| tunnel_id = struct.unpack('!H', data[2:4])[0]
|
| session_id = struct.unpack('!H', data[4:6])[0]
|
|
|
|
|
| if flags & 0x8000:
|
| await self._handle_control(data[6:], tunnel_id, session_id, addr)
|
| else:
|
| await self._handle_data(data[6:], tunnel_id, session_id)
|
|
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "l2tp_server", f"Error handling packet: {e}")
|
|
|
| async def _handle_control(self, data: bytes, tunnel_id: int, session_id: int, addr: Tuple[str, int]):
|
| """Handle L2TP control message"""
|
| msg_type = struct.unpack('!H', data[0:2])[0]
|
|
|
| if msg_type == 1:
|
|
|
| reply = self._build_sccrp(tunnel_id)
|
| await self._send_control(reply, addr)
|
|
|
| elif msg_type == 3:
|
|
|
| session = L2TPSession(
|
| tunnel_id=tunnel_id,
|
| session_id=self.next_session_id,
|
| client_ip=addr[0],
|
| assigned_ip=self.allocate_ip(),
|
| created_at=asyncio.get_running_loop().time(),
|
| last_seen=asyncio.get_running_loop().time()
|
| )
|
| self.sessions[(tunnel_id, session_id)] = session
|
| self.next_session_id += 1
|
|
|
|
|
| reply = self._build_icrp(tunnel_id, session_id)
|
| await self._send_control(reply, addr)
|
|
|
| async def _handle_data(self, data: bytes, tunnel_id: int, session_id: int):
|
| """Handle L2TP data message"""
|
| session_key = (tunnel_id, session_id)
|
| if session_key not in self.sessions:
|
| return
|
|
|
| session = self.sessions[session_key]
|
| session.last_seen = asyncio.get_running_loop().time()
|
|
|
|
|
| if len(data) < 4:
|
| return
|
|
|
| ppp_protocol = struct.unpack('!H', data[2:4])[0]
|
| if ppp_protocol == 0x0021:
|
| ip_packet = data[4:]
|
| await self._handle_ip_packet(ip_packet, session)
|
|
|
| async def _handle_ip_packet(self, data: bytes, session: L2TPSession):
|
| """Handle IP packet inside PPP frame"""
|
| try:
|
|
|
| ip_header = IPParser.parse_ipv4_header(data)
|
|
|
|
|
| session.bytes_in += len(data)
|
|
|
|
|
| if ip_header.protocol == socket.IPPROTO_TCP:
|
| await self._forward_tcp(data, session)
|
| elif ip_header.protocol == socket.IPPROTO_UDP:
|
| await self._forward_udp(data, session)
|
|
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "l2tp_server", f"Error handling IP packet: {e}")
|
|
|
| async def _forward_tcp(self, data: bytes, session: L2TPSession):
|
| """Forward TCP packet"""
|
|
|
|
|
| pass
|
|
|
| async def _forward_udp(self, data: bytes, session: L2TPSession):
|
| """Forward UDP packet"""
|
|
|
|
|
| pass
|
|
|
| class IPSecHandler:
|
| """Handles IPsec encryption/decryption"""
|
|
|
| def __init__(self, logger: Logger):
|
| self.logger = logger
|
| self.security_associations: Dict[str, Dict] = {}
|
|
|
| def is_ipsec_packet(self, data: bytes) -> bool:
|
| """Check if packet is IPsec encrypted"""
|
| if len(data) < 8:
|
| return False
|
|
|
| return data[0] == 50 or data[0] == 51
|
|
|
| def decrypt_packet(self, data: bytes) -> Optional[bytes]:
|
| """Decrypt IPsec packet"""
|
| try:
|
|
|
|
|
| return data[8:]
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "ipsec_handler", f"Decryption error: {e}")
|
| return None
|
|
|
| def encrypt_packet(self, data: bytes, sa_id: str) -> bytes:
|
| """Encrypt packet using IPsec"""
|
| try:
|
|
|
|
|
| esp_header = struct.pack('!II', 0, 0)
|
| return esp_header + data
|
| except Exception as e:
|
| self.logger.error(LogCategory.SYSTEM, "ipsec_handler", f"Encryption error: {e}")
|
| return data
|
|
|