| """
|
| IP Parser/Assembler Module
|
|
|
| Handles IPv4 packet parsing and construction:
|
| - Parse IPv4, UDP, and TCP headers
|
| - Calculate and verify checksums
|
| - Handle packet fragmentation and reassembly
|
| - Support various IP options
|
| """
|
|
|
| import struct
|
| import socket
|
| from typing import Dict, List, Optional, Tuple
|
| from dataclasses import dataclass
|
| from enum import Enum
|
|
|
|
|
| class IPProtocol(Enum):
|
| ICMP = 1
|
| TCP = 6
|
| UDP = 17
|
|
|
|
|
| @dataclass
|
| class IPv4Header:
|
| """IPv4 header structure"""
|
| version: int = 4
|
| ihl: int = 5
|
| tos: int = 0
|
| total_length: int = 0
|
| identification: int = 0
|
| flags: int = 0
|
| fragment_offset: int = 0
|
| ttl: int = 64
|
| protocol: int = 0
|
| header_checksum: int = 0
|
| source_ip: str = '0.0.0.0'
|
| dest_ip: str = '0.0.0.0'
|
| options: bytes = b''
|
|
|
| @property
|
| def header_length(self) -> int:
|
| """Get header length in bytes"""
|
| return self.ihl * 4
|
|
|
| @property
|
| def dont_fragment(self) -> bool:
|
| """Check if Don't Fragment flag is set"""
|
| return bool(self.flags & 0x2)
|
|
|
| @property
|
| def more_fragments(self) -> bool:
|
| """Check if More Fragments flag is set"""
|
| return bool(self.flags & 0x1)
|
|
|
| @property
|
| def is_fragment(self) -> bool:
|
| """Check if this is a fragment"""
|
| return self.more_fragments or self.fragment_offset > 0
|
|
|
|
|
| @dataclass
|
| class TCPHeader:
|
| """TCP header structure"""
|
| source_port: int = 0
|
| dest_port: int = 0
|
| seq_num: int = 0
|
| ack_num: int = 0
|
| data_offset: int = 5
|
| reserved: int = 0
|
| flags: int = 0
|
| window_size: int = 65535
|
| checksum: int = 0
|
| urgent_pointer: int = 0
|
| options: bytes = b''
|
|
|
| @property
|
| def header_length(self) -> int:
|
| """Get header length in bytes"""
|
| return self.data_offset * 4
|
|
|
|
|
| @property
|
| def fin(self) -> bool:
|
| return bool(self.flags & 0x01)
|
|
|
| @property
|
| def syn(self) -> bool:
|
| return bool(self.flags & 0x02)
|
|
|
| @property
|
| def rst(self) -> bool:
|
| return bool(self.flags & 0x04)
|
|
|
| @property
|
| def psh(self) -> bool:
|
| return bool(self.flags & 0x08)
|
|
|
| @property
|
| def ack(self) -> bool:
|
| return bool(self.flags & 0x10)
|
|
|
| @property
|
| def urg(self) -> bool:
|
| return bool(self.flags & 0x20)
|
|
|
| @property
|
| def ece(self) -> bool:
|
| return bool(self.flags & 0x40)
|
|
|
| @property
|
| def cwr(self) -> bool:
|
| return bool(self.flags & 0x80)
|
|
|
| @property
|
| def ns(self) -> bool:
|
| return bool(self.flags & 0x100)
|
|
|
|
|
| @dataclass
|
| class UDPHeader:
|
| """UDP header structure"""
|
| source_port: int = 0
|
| dest_port: int = 0
|
| length: int = 8
|
| checksum: int = 0
|
|
|
|
|
| class IPParser:
|
| """IP packet parser and assembler"""
|
|
|
| @staticmethod
|
| def parse_ipv4_header(packet: bytes) -> Tuple[IPv4Header, bytes]:
|
| """Parse IPv4 header and return header object and remaining data"""
|
|
|
| if len(packet) < 20:
|
| raise ValueError("Packet too short for IPv4 header")
|
|
|
| ver_ihl, tos, total_len, ident, flags_frag, ttl, proto, checksum, src, dst = \
|
| struct.unpack('!BBHHHBBH4s4s', packet[:20])
|
|
|
| version = ver_ihl >> 4
|
| ihl = ver_ihl & 0x0F
|
| flags = flags_frag >> 13
|
| frag_offset = flags_frag & 0x1FFF
|
|
|
| header = IPv4Header(
|
| version=version,
|
| ihl=ihl,
|
| tos=tos,
|
| total_length=total_len,
|
| identification=ident,
|
| flags=flags,
|
| fragment_offset=frag_offset,
|
| ttl=ttl,
|
| protocol=proto,
|
| header_checksum=checksum,
|
| source_ip=socket.inet_ntoa(src),
|
| dest_ip=socket.inet_ntoa(dst)
|
| )
|
|
|
|
|
| header_len = header.header_length
|
| if header_len > 20:
|
| header.options = packet[20:header_len]
|
|
|
| return header, packet[header_len:]
|
|
|
| @staticmethod
|
| def parse_tcp_header(packet: bytes) -> Tuple[TCPHeader, bytes]:
|
| """Parse TCP header and return header object and remaining data"""
|
| if len(packet) < 20:
|
| raise ValueError("Packet too short for TCP header")
|
|
|
| src_port, dst_port, seq, ack, offset_flags, window, checksum, urgent = \
|
| struct.unpack('!HHIIHHH', packet[:20])
|
|
|
| data_offset = offset_flags >> 12
|
| flags = offset_flags & 0x1FF
|
|
|
| header = TCPHeader(
|
| source_port=src_port,
|
| dest_port=dst_port,
|
| seq_num=seq,
|
| ack_num=ack,
|
| data_offset=data_offset,
|
| flags=flags,
|
| window_size=window,
|
| checksum=checksum,
|
| urgent_pointer=urgent
|
| )
|
|
|
|
|
| header_len = header.header_length
|
| if header_len > 20:
|
| header.options = packet[20:header_len]
|
|
|
| return header, packet[header_len:]
|
|
|
| @staticmethod
|
| def parse_udp_header(packet: bytes) -> Tuple[UDPHeader, bytes]:
|
| """Parse UDP header and return header object and remaining data"""
|
| if len(packet) < 8:
|
| raise ValueError("Packet too short for UDP header")
|
|
|
| src_port, dst_port, length, checksum = struct.unpack('!HHHH', packet[:8])
|
|
|
| header = UDPHeader(
|
| source_port=src_port,
|
| dest_port=dst_port,
|
| length=length,
|
| checksum=checksum
|
| )
|
|
|
| return header, packet[8:]
|
|
|
| @staticmethod
|
| def calculate_checksum(data: bytes) -> int:
|
| """Calculate IP/TCP/UDP checksum"""
|
| if len(data) % 2 == 1:
|
| data += b'\0'
|
|
|
| words = struct.unpack('!%dH' % (len(data) // 2), data)
|
| checksum = sum(words)
|
|
|
|
|
| while checksum >> 16:
|
| checksum = (checksum & 0xFFFF) + (checksum >> 16)
|
|
|
| return ~checksum & 0xFFFF
|
|
|