Spaces:
Runtime error
Runtime error
| """ | |
| IP Parser/Assembler Module | |
| Handles IPv4 packet parsing and construction: | |
| - Parse IPv4, UDP, and TCP headers | |
| - Calculate and verify checksums | |
| - Handle packet fragmentation and reassembly | |
| - Support various IP options | |
| """ | |
| import struct | |
| import socket | |
| from typing import Dict, List, Optional, Tuple | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| class IPProtocol(Enum): | |
| ICMP = 1 | |
| TCP = 6 | |
| UDP = 17 | |
| class IPv4Header: | |
| """IPv4 header structure""" | |
| version: int = 4 | |
| ihl: int = 5 # Internet Header Length (in 32-bit words) | |
| tos: int = 0 # Type of Service | |
| total_length: int = 0 | |
| identification: int = 0 | |
| flags: int = 0 # 3 bits: Reserved, Don't Fragment, More Fragments | |
| fragment_offset: int = 0 # 13 bits | |
| ttl: int = 64 # Time to Live | |
| protocol: int = 0 | |
| header_checksum: int = 0 | |
| source_ip: str = '0.0.0.0' | |
| dest_ip: str = '0.0.0.0' | |
| options: bytes = b'' | |
| def header_length(self) -> int: | |
| """Get header length in bytes""" | |
| return self.ihl * 4 | |
| def dont_fragment(self) -> bool: | |
| """Check if Don't Fragment flag is set""" | |
| return bool(self.flags & 0x2) | |
| def more_fragments(self) -> bool: | |
| """Check if More Fragments flag is set""" | |
| return bool(self.flags & 0x1) | |
| def is_fragment(self) -> bool: | |
| """Check if this is a fragment""" | |
| return self.more_fragments or self.fragment_offset > 0 | |
| class TCPHeader: | |
| """TCP header structure""" | |
| source_port: int = 0 | |
| dest_port: int = 0 | |
| seq_num: int = 0 | |
| ack_num: int = 0 | |
| data_offset: int = 5 # Header length in 32-bit words | |
| reserved: int = 0 | |
| flags: int = 0 # 9 bits: NS, CWR, ECE, URG, ACK, PSH, RST, SYN, FIN | |
| window_size: int = 65535 | |
| checksum: int = 0 | |
| urgent_pointer: int = 0 | |
| options: bytes = b'' | |
| def header_length(self) -> int: | |
| """Get header length in bytes""" | |
| return self.data_offset * 4 | |
| # TCP Flag properties | |
| def fin(self) -> bool: | |
| return bool(self.flags & 0x01) | |
| def syn(self) -> bool: | |
| return bool(self.flags & 0x02) | |
| def rst(self) -> bool: | |
| return bool(self.flags & 0x04) | |
| def psh(self) -> bool: | |
| return bool(self.flags & 0x08) | |
| def ack(self) -> bool: | |
| return bool(self.flags & 0x10) | |
| def urg(self) -> bool: | |
| return bool(self.flags & 0x20) | |
| def ece(self) -> bool: | |
| return bool(self.flags & 0x40) | |
| def cwr(self) -> bool: | |
| return bool(self.flags & 0x80) | |
| def ns(self) -> bool: | |
| return bool(self.flags & 0x100) | |
| class UDPHeader: | |
| """UDP header structure""" | |
| source_port: int = 0 | |
| dest_port: int = 0 | |
| length: int = 8 # Header length (8) + data length | |
| checksum: int = 0 | |
| class IPParser: | |
| """IP packet parser and assembler""" | |
| def parse_ipv4_header(packet: bytes) -> Tuple[IPv4Header, bytes]: | |
| """Parse IPv4 header and return header object and remaining data""" | |
| # Basic header (20 bytes) | |
| if len(packet) < 20: | |
| raise ValueError("Packet too short for IPv4 header") | |
| ver_ihl, tos, total_len, ident, flags_frag, ttl, proto, checksum, src, dst = \ | |
| struct.unpack('!BBHHHBBH4s4s', packet[:20]) | |
| version = ver_ihl >> 4 | |
| ihl = ver_ihl & 0x0F | |
| flags = flags_frag >> 13 | |
| frag_offset = flags_frag & 0x1FFF | |
| header = IPv4Header( | |
| version=version, | |
| ihl=ihl, | |
| tos=tos, | |
| total_length=total_len, | |
| identification=ident, | |
| flags=flags, | |
| fragment_offset=frag_offset, | |
| ttl=ttl, | |
| protocol=proto, | |
| header_checksum=checksum, | |
| source_ip=socket.inet_ntoa(src), | |
| dest_ip=socket.inet_ntoa(dst) | |
| ) | |
| # Extract options if present | |
| header_len = header.header_length | |
| if header_len > 20: | |
| header.options = packet[20:header_len] | |
| return header, packet[header_len:] | |
| def parse_tcp_header(packet: bytes) -> Tuple[TCPHeader, bytes]: | |
| """Parse TCP header and return header object and remaining data""" | |
| if len(packet) < 20: | |
| raise ValueError("Packet too short for TCP header") | |
| src_port, dst_port, seq, ack, offset_flags, window, checksum, urgent = \ | |
| struct.unpack('!HHIIHHH', packet[:20]) | |
| data_offset = offset_flags >> 12 | |
| flags = offset_flags & 0x1FF | |
| header = TCPHeader( | |
| source_port=src_port, | |
| dest_port=dst_port, | |
| seq_num=seq, | |
| ack_num=ack, | |
| data_offset=data_offset, | |
| flags=flags, | |
| window_size=window, | |
| checksum=checksum, | |
| urgent_pointer=urgent | |
| ) | |
| # Extract options if present | |
| header_len = header.header_length | |
| if header_len > 20: | |
| header.options = packet[20:header_len] | |
| return header, packet[header_len:] | |
| def parse_udp_header(packet: bytes) -> Tuple[UDPHeader, bytes]: | |
| """Parse UDP header and return header object and remaining data""" | |
| if len(packet) < 8: | |
| raise ValueError("Packet too short for UDP header") | |
| src_port, dst_port, length, checksum = struct.unpack('!HHHH', packet[:8]) | |
| header = UDPHeader( | |
| source_port=src_port, | |
| dest_port=dst_port, | |
| length=length, | |
| checksum=checksum | |
| ) | |
| return header, packet[8:] | |
| def calculate_checksum(data: bytes) -> int: | |
| """Calculate IP/TCP/UDP checksum""" | |
| if len(data) % 2 == 1: | |
| data += b'\0' | |
| words = struct.unpack('!%dH' % (len(data) // 2), data) | |
| checksum = sum(words) | |
| # Fold 32-bit sum into 16 bits | |
| while checksum >> 16: | |
| checksum = (checksum & 0xFFFF) + (checksum >> 16) | |
| return ~checksum & 0xFFFF | |