""" DHCP Server Module Implements a user-space DHCP server that handles: - DHCP DISCOVER → OFFER → REQUEST → ACK sequence - IP lease management - Lease renewals and expiration """ import struct import time import socket import threading from typing import Dict, Optional, Tuple from dataclasses import dataclass from enum import Enum class DHCPMessageType(Enum): DISCOVER = 1 OFFER = 2 REQUEST = 3 DECLINE = 4 ACK = 5 NAK = 6 RELEASE = 7 INFORM = 8 @dataclass class DHCPLease: """Represents a DHCP lease""" mac_address: str ip_address: str lease_time: int lease_start: float state: str = 'BOUND' @property def is_expired(self) -> bool: return time.time() > (self.lease_start + self.lease_time) @property def remaining_time(self) -> int: remaining = int((self.lease_start + self.lease_time) - time.time()) return max(0, remaining) class DHCPPacket: """DHCP packet parser and builder""" def __init__(self): self.op = 0 # Message op code / message type self.htype = 1 # Hardware address type (Ethernet = 1) self.hlen = 6 # Hardware address length self.hops = 0 # Hops self.xid = 0 # Transaction ID self.secs = 0 # Seconds elapsed self.flags = 0 # Flags self.ciaddr = '0.0.0.0' # Client IP address self.yiaddr = '0.0.0.0' # Your IP address self.siaddr = '0.0.0.0' # Server IP address self.giaddr = '0.0.0.0' # Gateway IP address self.chaddr = b'\x00' * 16 # Client hardware address self.sname = b'\x00' * 64 # Server name self.file = b'\x00' * 128 # Boot file name self.options = {} # DHCP options @classmethod def parse(cls, data: bytes) -> 'DHCPPacket': """Parse DHCP packet from raw bytes""" packet = cls() # Parse fixed fields (first 236 bytes) if len(data) < 236: raise ValueError("DHCP packet too short") fields = struct.unpack('!BBBBIHH4s4s4s4s16s64s128s', data[:236]) packet.op = fields[0] packet.htype = fields[1] packet.hlen = fields[2] packet.hops = fields[3] packet.xid = fields[4] packet.secs = fields[5] packet.flags = fields[6] packet.ciaddr = socket.inet_ntoa(fields[7]) packet.yiaddr = socket.inet_ntoa(fields[8]) packet.siaddr = socket.inet_ntoa(fields[9]) packet.giaddr = socket.inet_ntoa(fields[10]) packet.chaddr = fields[11] packet.sname = fields[12] packet.file = fields[13] # Parse options (after magic cookie) options_data = data[236:] if len(options_data) >= 4: magic = struct.unpack('!I', options_data[:4])[0] if magic == 0x63825363: # DHCP magic cookie packet.options = packet._parse_options(options_data[4:]) return packet def _parse_options(self, data: bytes) -> Dict[int, bytes]: """Parse DHCP options""" options = {} i = 0 while i < len(data): if data[i] == 255: # End option break elif data[i] == 0: # Pad option i += 1 continue option_type = data[i] if i + 1 >= len(data): break option_length = data[i + 1] if i + 2 + option_length > len(data): break option_data = data[i + 2:i + 2 + option_length] options[option_type] = option_data i += 2 + option_length return options def build(self) -> bytes: """Build DHCP packet as bytes""" # Build fixed fields packet_data = struct.pack( '!BBBBIHH4s4s4s4s16s64s128s', self.op, self.htype, self.hlen, self.hops, self.xid, self.secs, self.flags, socket.inet_aton(self.ciaddr), socket.inet_aton(self.yiaddr), socket.inet_aton(self.siaddr), socket.inet_aton(self.giaddr), self.chaddr, self.sname, self.file ) # Add magic cookie packet_data += struct.pack('!I', 0x63825363) # Add options for option_type, option_data in self.options.items(): packet_data += struct.pack('!BB', option_type, len(option_data)) packet_data += option_data # Add end option packet_data += b'\xff' # Pad to minimum size while len(packet_data) < 300: packet_data += b'\x00' return packet_data def get_mac_address(self) -> str: """Get client MAC address as string""" return ':'.join(f'{b:02x}' for b in self.chaddr[:6]) def get_message_type(self) -> Optional[DHCPMessageType]: """Get DHCP message type from options""" if 53 in self.options and len(self.options[53]) == 1: msg_type = self.options[53][0] try: return DHCPMessageType(msg_type) except ValueError: return None return None class DHCPServer: """User-space DHCP server implementation""" def __init__(self, config: Dict): self.config = config self.leases: Dict[str, DHCPLease] = {} # MAC -> Lease self.ip_pool = self._build_ip_pool() self.running = False self.server_thread = None self.lock = threading.Lock() def _build_ip_pool(self) -> set: """Build available IP address pool""" network = self.config['network'] start_ip = self.config['range_start'] end_ip = self.config['range_end'] # Convert IP addresses to integers for range calculation start_int = struct.unpack('!I', socket.inet_aton(start_ip))[0] end_int = struct.unpack('!I', socket.inet_aton(end_ip))[0] pool = set() for ip_int in range(start_int, end_int + 1): ip_str = socket.inet_ntoa(struct.pack('!I', ip_int)) pool.add(ip_str) return pool def _get_available_ip(self) -> Optional[str]: """Get next available IP address""" with self.lock: # Remove expired leases self._cleanup_expired_leases() # Find available IP used_ips = {lease.ip_address for lease in self.leases.values()} available_ips = self.ip_pool - used_ips if available_ips: return min(available_ips) # Return lowest available IP return None def _cleanup_expired_leases(self): """Remove expired leases""" expired_macs = [ mac for mac, lease in self.leases.items() if lease.is_expired ] for mac in expired_macs: del self.leases[mac] def _create_dhcp_offer(self, discover_packet: DHCPPacket) -> DHCPPacket: """Create DHCP OFFER response""" mac_address = discover_packet.get_mac_address() # Check for existing lease if mac_address in self.leases and not self.leases[mac_address].is_expired: offered_ip = self.leases[mac_address].ip_address else: offered_ip = self._get_available_ip() if not offered_ip: return None # No available IPs # Create OFFER packet offer = DHCPPacket() offer.op = 2 # BOOTREPLY offer.htype = discover_packet.htype offer.hlen = discover_packet.hlen offer.xid = discover_packet.xid offer.yiaddr = offered_ip offer.siaddr = self.config['gateway'] offer.chaddr = discover_packet.chaddr # Add DHCP options offer.options[53] = bytes([DHCPMessageType.OFFER.value]) # Message type offer.options[1] = socket.inet_aton('255.255.255.0') # Subnet mask offer.options[3] = socket.inet_aton(self.config['gateway']) # Router offer.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) # DNS offer.options[51] = struct.pack('!I', self.config['lease_time']) # Lease time offer.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier return offer def _create_dhcp_ack(self, request_packet: DHCPPacket) -> DHCPPacket: """Create DHCP ACK response""" mac_address = request_packet.get_mac_address() requested_ip = request_packet.ciaddr # If no requested IP in ciaddr, check option 50 if requested_ip == '0.0.0.0' and 50 in request_packet.options: requested_ip = socket.inet_ntoa(request_packet.options[50]) # Validate request if not self._validate_request(mac_address, requested_ip): return self._create_dhcp_nak(request_packet) # Create or update lease lease = DHCPLease( mac_address=mac_address, ip_address=requested_ip, lease_time=self.config['lease_time'], lease_start=time.time() ) with self.lock: self.leases[mac_address] = lease # Create ACK packet ack = DHCPPacket() ack.op = 2 # BOOTREPLY ack.htype = request_packet.htype ack.hlen = request_packet.hlen ack.xid = request_packet.xid ack.yiaddr = requested_ip ack.siaddr = self.config['gateway'] ack.chaddr = request_packet.chaddr # Add DHCP options ack.options[53] = bytes([DHCPMessageType.ACK.value]) # Message type ack.options[1] = socket.inet_aton('255.255.255.0') # Subnet mask ack.options[3] = socket.inet_aton(self.config['gateway']) # Router ack.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) # DNS ack.options[51] = struct.pack('!I', self.config['lease_time']) # Lease time ack.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier return ack def _create_dhcp_nak(self, request_packet: DHCPPacket) -> DHCPPacket: """Create DHCP NAK response""" nak = DHCPPacket() nak.op = 2 # BOOTREPLY nak.htype = request_packet.htype nak.hlen = request_packet.hlen nak.xid = request_packet.xid nak.chaddr = request_packet.chaddr # Add DHCP options nak.options[53] = bytes([DHCPMessageType.NAK.value]) # Message type nak.options[54] = socket.inet_aton(self.config['gateway']) # DHCP server identifier return nak def _validate_request(self, mac_address: str, requested_ip: str) -> bool: """Validate DHCP request""" # Check if IP is in our pool if requested_ip not in self.ip_pool: return False # Check if IP is available or already assigned to this MAC with self.lock: for mac, lease in self.leases.items(): if lease.ip_address == requested_ip: if mac != mac_address and not lease.is_expired: return False # IP already assigned to different MAC return True def process_packet(self, packet_data: bytes, client_addr: Tuple[str, int]) -> Optional[bytes]: """Process incoming DHCP packet and return response""" try: packet = DHCPPacket.parse(packet_data) message_type = packet.get_message_type() if message_type == DHCPMessageType.DISCOVER: response = self._create_dhcp_offer(packet) elif message_type == DHCPMessageType.REQUEST: response = self._create_dhcp_ack(packet) elif message_type == DHCPMessageType.RELEASE: # Handle lease release mac_address = packet.get_mac_address() with self.lock: if mac_address in self.leases: del self.leases[mac_address] return None else: return None if response: return response.build() except Exception as e: print(f"Error processing DHCP packet: {e}") return None def get_leases(self) -> Dict[str, Dict]: """Get current lease table""" with self.lock: self._cleanup_expired_leases() return { mac: { 'ip_address': lease.ip_address, 'lease_time': lease.lease_time, 'lease_start': lease.lease_start, 'remaining_time': lease.remaining_time, 'state': lease.state } for mac, lease in self.leases.items() } def release_lease(self, mac_address: str) -> bool: """Manually release a lease""" with self.lock: if mac_address in self.leases: del self.leases[mac_address] return True return False def start(self): """Start DHCP server (placeholder for integration with packet bridge)""" self.running = True print(f"DHCP server started - Pool: {self.config['range_start']} - {self.config['range_end']}") def stop(self): """Stop DHCP server""" self.running = False print("DHCP server stopped")