|
|
""" |
|
|
DHCP Server Module |
|
|
|
|
|
Implements a user-space DHCP server that handles: |
|
|
- DHCP DISCOVER → OFFER → REQUEST → ACK sequence |
|
|
- IP lease management |
|
|
- Lease renewals and expiration |
|
|
""" |
|
|
|
|
|
import struct |
|
|
import time |
|
|
import socket |
|
|
import threading |
|
|
from typing import Dict, Optional, Tuple |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
|
|
|
|
|
|
class DHCPMessageType(Enum): |
|
|
DISCOVER = 1 |
|
|
OFFER = 2 |
|
|
REQUEST = 3 |
|
|
DECLINE = 4 |
|
|
ACK = 5 |
|
|
NAK = 6 |
|
|
RELEASE = 7 |
|
|
INFORM = 8 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DHCPLease: |
|
|
"""Represents a DHCP lease""" |
|
|
mac_address: str |
|
|
ip_address: str |
|
|
lease_time: int |
|
|
lease_start: float |
|
|
state: str = 'BOUND' |
|
|
|
|
|
@property |
|
|
def is_expired(self) -> bool: |
|
|
return time.time() > (self.lease_start + self.lease_time) |
|
|
|
|
|
@property |
|
|
def remaining_time(self) -> int: |
|
|
remaining = int((self.lease_start + self.lease_time) - time.time()) |
|
|
return max(0, remaining) |
|
|
|
|
|
|
|
|
class DHCPPacket: |
|
|
"""DHCP packet parser and builder""" |
|
|
|
|
|
def __init__(self): |
|
|
self.op = 0 |
|
|
self.htype = 1 |
|
|
self.hlen = 6 |
|
|
self.hops = 0 |
|
|
self.xid = 0 |
|
|
self.secs = 0 |
|
|
self.flags = 0 |
|
|
self.ciaddr = '0.0.0.0' |
|
|
self.yiaddr = '0.0.0.0' |
|
|
self.siaddr = '0.0.0.0' |
|
|
self.giaddr = '0.0.0.0' |
|
|
self.chaddr = b'\x00' * 16 |
|
|
self.sname = b'\x00' * 64 |
|
|
self.file = b'\x00' * 128 |
|
|
self.options = {} |
|
|
|
|
|
@classmethod |
|
|
def parse(cls, data: bytes) -> 'DHCPPacket': |
|
|
"""Parse DHCP packet from raw bytes""" |
|
|
packet = cls() |
|
|
|
|
|
|
|
|
if len(data) < 236: |
|
|
raise ValueError("DHCP packet too short") |
|
|
|
|
|
fields = struct.unpack('!BBBBIHH4s4s4s4s16s64s128s', data[:236]) |
|
|
packet.op = fields[0] |
|
|
packet.htype = fields[1] |
|
|
packet.hlen = fields[2] |
|
|
packet.hops = fields[3] |
|
|
packet.xid = fields[4] |
|
|
packet.secs = fields[5] |
|
|
packet.flags = fields[6] |
|
|
packet.ciaddr = socket.inet_ntoa(fields[7]) |
|
|
packet.yiaddr = socket.inet_ntoa(fields[8]) |
|
|
packet.siaddr = socket.inet_ntoa(fields[9]) |
|
|
packet.giaddr = socket.inet_ntoa(fields[10]) |
|
|
packet.chaddr = fields[11] |
|
|
packet.sname = fields[12] |
|
|
packet.file = fields[13] |
|
|
|
|
|
|
|
|
options_data = data[236:] |
|
|
if len(options_data) >= 4: |
|
|
magic = struct.unpack('!I', options_data[:4])[0] |
|
|
if magic == 0x63825363: |
|
|
packet.options = packet._parse_options(options_data[4:]) |
|
|
|
|
|
return packet |
|
|
|
|
|
def _parse_options(self, data: bytes) -> Dict[int, bytes]: |
|
|
"""Parse DHCP options""" |
|
|
options = {} |
|
|
i = 0 |
|
|
|
|
|
while i < len(data): |
|
|
if data[i] == 255: |
|
|
break |
|
|
elif data[i] == 0: |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
option_type = data[i] |
|
|
if i + 1 >= len(data): |
|
|
break |
|
|
|
|
|
option_length = data[i + 1] |
|
|
if i + 2 + option_length > len(data): |
|
|
break |
|
|
|
|
|
option_data = data[i + 2:i + 2 + option_length] |
|
|
options[option_type] = option_data |
|
|
i += 2 + option_length |
|
|
|
|
|
return options |
|
|
|
|
|
def build(self) -> bytes: |
|
|
"""Build DHCP packet as bytes""" |
|
|
|
|
|
packet_data = struct.pack( |
|
|
'!BBBBIHH4s4s4s4s16s64s128s', |
|
|
self.op, self.htype, self.hlen, self.hops, |
|
|
self.xid, self.secs, self.flags, |
|
|
socket.inet_aton(self.ciaddr), |
|
|
socket.inet_aton(self.yiaddr), |
|
|
socket.inet_aton(self.siaddr), |
|
|
socket.inet_aton(self.giaddr), |
|
|
self.chaddr, self.sname, self.file |
|
|
) |
|
|
|
|
|
|
|
|
packet_data += struct.pack('!I', 0x63825363) |
|
|
|
|
|
|
|
|
for option_type, option_data in self.options.items(): |
|
|
packet_data += struct.pack('!BB', option_type, len(option_data)) |
|
|
packet_data += option_data |
|
|
|
|
|
|
|
|
packet_data += b'\xff' |
|
|
|
|
|
|
|
|
while len(packet_data) < 300: |
|
|
packet_data += b'\x00' |
|
|
|
|
|
return packet_data |
|
|
|
|
|
def get_mac_address(self) -> str: |
|
|
"""Get client MAC address as string""" |
|
|
return ':'.join(f'{b:02x}' for b in self.chaddr[:6]) |
|
|
|
|
|
def get_message_type(self) -> Optional[DHCPMessageType]: |
|
|
"""Get DHCP message type from options""" |
|
|
if 53 in self.options and len(self.options[53]) == 1: |
|
|
msg_type = self.options[53][0] |
|
|
try: |
|
|
return DHCPMessageType(msg_type) |
|
|
except ValueError: |
|
|
return None |
|
|
return None |
|
|
|
|
|
|
|
|
class DHCPServer: |
|
|
"""User-space DHCP server implementation""" |
|
|
|
|
|
def __init__(self, config: Dict): |
|
|
self.config = config |
|
|
self.leases: Dict[str, DHCPLease] = {} |
|
|
self.ip_pool = self._build_ip_pool() |
|
|
self.running = False |
|
|
self.server_thread = None |
|
|
self.lock = threading.Lock() |
|
|
|
|
|
def _build_ip_pool(self) -> set: |
|
|
"""Build available IP address pool""" |
|
|
network = self.config['network'] |
|
|
start_ip = self.config['range_start'] |
|
|
end_ip = self.config['range_end'] |
|
|
|
|
|
|
|
|
start_int = struct.unpack('!I', socket.inet_aton(start_ip))[0] |
|
|
end_int = struct.unpack('!I', socket.inet_aton(end_ip))[0] |
|
|
|
|
|
pool = set() |
|
|
for ip_int in range(start_int, end_int + 1): |
|
|
ip_str = socket.inet_ntoa(struct.pack('!I', ip_int)) |
|
|
pool.add(ip_str) |
|
|
|
|
|
return pool |
|
|
|
|
|
def _get_available_ip(self) -> Optional[str]: |
|
|
"""Get next available IP address""" |
|
|
with self.lock: |
|
|
|
|
|
self._cleanup_expired_leases() |
|
|
|
|
|
|
|
|
used_ips = {lease.ip_address for lease in self.leases.values()} |
|
|
available_ips = self.ip_pool - used_ips |
|
|
|
|
|
if available_ips: |
|
|
return min(available_ips) |
|
|
return None |
|
|
|
|
|
def _cleanup_expired_leases(self): |
|
|
"""Remove expired leases""" |
|
|
expired_macs = [ |
|
|
mac for mac, lease in self.leases.items() |
|
|
if lease.is_expired |
|
|
] |
|
|
for mac in expired_macs: |
|
|
del self.leases[mac] |
|
|
|
|
|
def _create_dhcp_offer(self, discover_packet: DHCPPacket) -> DHCPPacket: |
|
|
"""Create DHCP OFFER response""" |
|
|
mac_address = discover_packet.get_mac_address() |
|
|
|
|
|
|
|
|
if mac_address in self.leases and not self.leases[mac_address].is_expired: |
|
|
offered_ip = self.leases[mac_address].ip_address |
|
|
else: |
|
|
offered_ip = self._get_available_ip() |
|
|
if not offered_ip: |
|
|
return None |
|
|
|
|
|
|
|
|
offer = DHCPPacket() |
|
|
offer.op = 2 |
|
|
offer.htype = discover_packet.htype |
|
|
offer.hlen = discover_packet.hlen |
|
|
offer.xid = discover_packet.xid |
|
|
offer.yiaddr = offered_ip |
|
|
offer.siaddr = self.config['gateway'] |
|
|
offer.chaddr = discover_packet.chaddr |
|
|
|
|
|
|
|
|
offer.options[53] = bytes([DHCPMessageType.OFFER.value]) |
|
|
offer.options[1] = socket.inet_aton('255.255.255.0') |
|
|
offer.options[3] = socket.inet_aton(self.config['gateway']) |
|
|
offer.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) |
|
|
offer.options[51] = struct.pack('!I', self.config['lease_time']) |
|
|
offer.options[54] = socket.inet_aton(self.config['gateway']) |
|
|
|
|
|
return offer |
|
|
|
|
|
def _create_dhcp_ack(self, request_packet: DHCPPacket) -> DHCPPacket: |
|
|
"""Create DHCP ACK response""" |
|
|
mac_address = request_packet.get_mac_address() |
|
|
requested_ip = request_packet.ciaddr |
|
|
|
|
|
|
|
|
if requested_ip == '0.0.0.0' and 50 in request_packet.options: |
|
|
requested_ip = socket.inet_ntoa(request_packet.options[50]) |
|
|
|
|
|
|
|
|
if not self._validate_request(mac_address, requested_ip): |
|
|
return self._create_dhcp_nak(request_packet) |
|
|
|
|
|
|
|
|
lease = DHCPLease( |
|
|
mac_address=mac_address, |
|
|
ip_address=requested_ip, |
|
|
lease_time=self.config['lease_time'], |
|
|
lease_start=time.time() |
|
|
) |
|
|
|
|
|
with self.lock: |
|
|
self.leases[mac_address] = lease |
|
|
|
|
|
|
|
|
ack = DHCPPacket() |
|
|
ack.op = 2 |
|
|
ack.htype = request_packet.htype |
|
|
ack.hlen = request_packet.hlen |
|
|
ack.xid = request_packet.xid |
|
|
ack.yiaddr = requested_ip |
|
|
ack.siaddr = self.config['gateway'] |
|
|
ack.chaddr = request_packet.chaddr |
|
|
|
|
|
|
|
|
ack.options[53] = bytes([DHCPMessageType.ACK.value]) |
|
|
ack.options[1] = socket.inet_aton('255.255.255.0') |
|
|
ack.options[3] = socket.inet_aton(self.config['gateway']) |
|
|
ack.options[6] = b''.join(socket.inet_aton(dns) for dns in self.config['dns_servers']) |
|
|
ack.options[51] = struct.pack('!I', self.config['lease_time']) |
|
|
ack.options[54] = socket.inet_aton(self.config['gateway']) |
|
|
|
|
|
return ack |
|
|
|
|
|
def _create_dhcp_nak(self, request_packet: DHCPPacket) -> DHCPPacket: |
|
|
"""Create DHCP NAK response""" |
|
|
nak = DHCPPacket() |
|
|
nak.op = 2 |
|
|
nak.htype = request_packet.htype |
|
|
nak.hlen = request_packet.hlen |
|
|
nak.xid = request_packet.xid |
|
|
nak.chaddr = request_packet.chaddr |
|
|
|
|
|
|
|
|
nak.options[53] = bytes([DHCPMessageType.NAK.value]) |
|
|
nak.options[54] = socket.inet_aton(self.config['gateway']) |
|
|
|
|
|
return nak |
|
|
|
|
|
def _validate_request(self, mac_address: str, requested_ip: str) -> bool: |
|
|
"""Validate DHCP request""" |
|
|
|
|
|
if requested_ip not in self.ip_pool: |
|
|
return False |
|
|
|
|
|
|
|
|
with self.lock: |
|
|
for mac, lease in self.leases.items(): |
|
|
if lease.ip_address == requested_ip: |
|
|
if mac != mac_address and not lease.is_expired: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def process_packet(self, packet_data: bytes, client_addr: Tuple[str, int]) -> Optional[bytes]: |
|
|
"""Process incoming DHCP packet and return response""" |
|
|
try: |
|
|
packet = DHCPPacket.parse(packet_data) |
|
|
message_type = packet.get_message_type() |
|
|
|
|
|
if message_type == DHCPMessageType.DISCOVER: |
|
|
response = self._create_dhcp_offer(packet) |
|
|
elif message_type == DHCPMessageType.REQUEST: |
|
|
response = self._create_dhcp_ack(packet) |
|
|
elif message_type == DHCPMessageType.RELEASE: |
|
|
|
|
|
mac_address = packet.get_mac_address() |
|
|
with self.lock: |
|
|
if mac_address in self.leases: |
|
|
del self.leases[mac_address] |
|
|
return None |
|
|
else: |
|
|
return None |
|
|
|
|
|
if response: |
|
|
return response.build() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing DHCP packet: {e}") |
|
|
return None |
|
|
|
|
|
def get_leases(self) -> Dict[str, Dict]: |
|
|
"""Get current lease table""" |
|
|
with self.lock: |
|
|
self._cleanup_expired_leases() |
|
|
return { |
|
|
mac: { |
|
|
'ip_address': lease.ip_address, |
|
|
'lease_time': lease.lease_time, |
|
|
'lease_start': lease.lease_start, |
|
|
'remaining_time': lease.remaining_time, |
|
|
'state': lease.state |
|
|
} |
|
|
for mac, lease in self.leases.items() |
|
|
} |
|
|
|
|
|
def release_lease(self, mac_address: str) -> bool: |
|
|
"""Manually release a lease""" |
|
|
with self.lock: |
|
|
if mac_address in self.leases: |
|
|
del self.leases[mac_address] |
|
|
return True |
|
|
return False |
|
|
|
|
|
def start(self): |
|
|
"""Start DHCP server (placeholder for integration with packet bridge)""" |
|
|
self.running = True |
|
|
print(f"DHCP server started - Pool: {self.config['range_start']} - {self.config['range_end']}") |
|
|
|
|
|
def stop(self): |
|
|
"""Stop DHCP server""" |
|
|
self.running = False |
|
|
print("DHCP server stopped") |
|
|
|
|
|
|