""" IP Parser/Assembler Module Handles IPv4 packet parsing and construction: - Parse IPv4, UDP, and TCP headers - Calculate and verify checksums - Handle packet fragmentation and reassembly - Support various IP options """ import struct import socket from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from enum import Enum class IPProtocol(Enum): ICMP = 1 TCP = 6 UDP = 17 @dataclass class IPv4Header: """IPv4 header structure""" version: int = 4 ihl: int = 5 # Internet Header Length (in 32-bit words) tos: int = 0 # Type of Service total_length: int = 0 identification: int = 0 flags: int = 0 # 3 bits: Reserved, Don't Fragment, More Fragments fragment_offset: int = 0 # 13 bits ttl: int = 64 # Time to Live protocol: int = 0 header_checksum: int = 0 source_ip: str = '0.0.0.0' dest_ip: str = '0.0.0.0' options: bytes = b'' @property def header_length(self) -> int: """Get header length in bytes""" return self.ihl * 4 @property def dont_fragment(self) -> bool: """Check if Don't Fragment flag is set""" return bool(self.flags & 0x2) @property def more_fragments(self) -> bool: """Check if More Fragments flag is set""" return bool(self.flags & 0x1) @property def is_fragment(self) -> bool: """Check if this is a fragment""" return self.more_fragments or self.fragment_offset > 0 @dataclass class TCPHeader: """TCP header structure""" source_port: int = 0 dest_port: int = 0 seq_num: int = 0 ack_num: int = 0 data_offset: int = 5 # Header length in 32-bit words reserved: int = 0 flags: int = 0 # 9 bits: NS, CWR, ECE, URG, ACK, PSH, RST, SYN, FIN window_size: int = 65535 checksum: int = 0 urgent_pointer: int = 0 options: bytes = b'' @property def header_length(self) -> int: """Get header length in bytes""" return self.data_offset * 4 # TCP Flag properties @property def fin(self) -> bool: return bool(self.flags & 0x01) @property def syn(self) -> bool: return bool(self.flags & 0x02) @property def rst(self) -> bool: return bool(self.flags & 0x04) @property def psh(self) -> bool: return bool(self.flags & 0x08) @property def ack(self) -> bool: return bool(self.flags & 0x10) @property def urg(self) -> bool: return bool(self.flags & 0x20) @property def ece(self) -> bool: return bool(self.flags & 0x40) @property def cwr(self) -> bool: return bool(self.flags & 0x80) @property def ns(self) -> bool: return bool(self.flags & 0x100) @dataclass class UDPHeader: """UDP header structure""" source_port: int = 0 dest_port: int = 0 length: int = 8 # Header length (8) + data length checksum: int = 0 class IPParser: """IP packet parser and assembler""" @staticmethod def parse_ipv4_header(packet: bytes) -> Tuple[IPv4Header, bytes]: """Parse IPv4 header and return header object and remaining data""" # Basic header (20 bytes) if len(packet) < 20: raise ValueError("Packet too short for IPv4 header") ver_ihl, tos, total_len, ident, flags_frag, ttl, proto, checksum, src, dst = \ struct.unpack('!BBHHHBBH4s4s', packet[:20]) version = ver_ihl >> 4 ihl = ver_ihl & 0x0F flags = flags_frag >> 13 frag_offset = flags_frag & 0x1FFF header = IPv4Header( version=version, ihl=ihl, tos=tos, total_length=total_len, identification=ident, flags=flags, fragment_offset=frag_offset, ttl=ttl, protocol=proto, header_checksum=checksum, source_ip=socket.inet_ntoa(src), dest_ip=socket.inet_ntoa(dst) ) # Extract options if present header_len = header.header_length if header_len > 20: header.options = packet[20:header_len] return header, packet[header_len:] @staticmethod def parse_tcp_header(packet: bytes) -> Tuple[TCPHeader, bytes]: """Parse TCP header and return header object and remaining data""" if len(packet) < 20: raise ValueError("Packet too short for TCP header") src_port, dst_port, seq, ack, offset_flags, window, checksum, urgent = \ struct.unpack('!HHIIHHH', packet[:20]) data_offset = offset_flags >> 12 flags = offset_flags & 0x1FF header = TCPHeader( source_port=src_port, dest_port=dst_port, seq_num=seq, ack_num=ack, data_offset=data_offset, flags=flags, window_size=window, checksum=checksum, urgent_pointer=urgent ) # Extract options if present header_len = header.header_length if header_len > 20: header.options = packet[20:header_len] return header, packet[header_len:] @staticmethod def parse_udp_header(packet: bytes) -> Tuple[UDPHeader, bytes]: """Parse UDP header and return header object and remaining data""" if len(packet) < 8: raise ValueError("Packet too short for UDP header") src_port, dst_port, length, checksum = struct.unpack('!HHHH', packet[:8]) header = UDPHeader( source_port=src_port, dest_port=dst_port, length=length, checksum=checksum ) return header, packet[8:] @staticmethod def calculate_checksum(data: bytes) -> int: """Calculate IP/TCP/UDP checksum""" if len(data) % 2 == 1: data += b'\0' words = struct.unpack('!%dH' % (len(data) // 2), data) checksum = sum(words) # Fold 32-bit sum into 16 bits while checksum >> 16: checksum = (checksum & 0xFFFF) + (checksum >> 16) return ~checksum & 0xFFFF