JRNET / core /ip_parser.py
Factor Studios
Upload 96 files
6a5b8d8 verified
raw
history blame
6.64 kB
"""
IP Parser/Assembler Module
Handles IPv4 packet parsing and construction:
- Parse IPv4, UDP, and TCP headers
- Calculate and verify checksums
- Handle packet fragmentation and reassembly
- Support various IP options
"""
import struct
import socket
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class IPProtocol(Enum):
ICMP = 1
TCP = 6
UDP = 17
@dataclass
class IPv4Header:
"""IPv4 header structure"""
version: int = 4
ihl: int = 5 # Internet Header Length (in 32-bit words)
tos: int = 0 # Type of Service
total_length: int = 0
identification: int = 0
flags: int = 0 # 3 bits: Reserved, Don't Fragment, More Fragments
fragment_offset: int = 0 # 13 bits
ttl: int = 64 # Time to Live
protocol: int = 0
header_checksum: int = 0
source_ip: str = '0.0.0.0'
dest_ip: str = '0.0.0.0'
options: bytes = b''
@property
def header_length(self) -> int:
"""Get header length in bytes"""
return self.ihl * 4
@property
def dont_fragment(self) -> bool:
"""Check if Don't Fragment flag is set"""
return bool(self.flags & 0x2)
@property
def more_fragments(self) -> bool:
"""Check if More Fragments flag is set"""
return bool(self.flags & 0x1)
@property
def is_fragment(self) -> bool:
"""Check if this is a fragment"""
return self.more_fragments or self.fragment_offset > 0
@dataclass
class TCPHeader:
"""TCP header structure"""
source_port: int = 0
dest_port: int = 0
seq_num: int = 0
ack_num: int = 0
data_offset: int = 5 # Header length in 32-bit words
reserved: int = 0
flags: int = 0 # 9 bits: NS, CWR, ECE, URG, ACK, PSH, RST, SYN, FIN
window_size: int = 65535
checksum: int = 0
urgent_pointer: int = 0
options: bytes = b''
@property
def header_length(self) -> int:
"""Get header length in bytes"""
return self.data_offset * 4
# TCP Flag properties
@property
def fin(self) -> bool:
return bool(self.flags & 0x01)
@property
def syn(self) -> bool:
return bool(self.flags & 0x02)
@property
def rst(self) -> bool:
return bool(self.flags & 0x04)
@property
def psh(self) -> bool:
return bool(self.flags & 0x08)
@property
def ack(self) -> bool:
return bool(self.flags & 0x10)
@property
def urg(self) -> bool:
return bool(self.flags & 0x20)
@property
def ece(self) -> bool:
return bool(self.flags & 0x40)
@property
def cwr(self) -> bool:
return bool(self.flags & 0x80)
@property
def ns(self) -> bool:
return bool(self.flags & 0x100)
@dataclass
class UDPHeader:
"""UDP header structure"""
source_port: int = 0
dest_port: int = 0
length: int = 8 # Header length (8) + data length
checksum: int = 0
class IPParser:
"""IP packet parser and assembler"""
@staticmethod
def parse_ipv4_header(packet: bytes) -> Tuple[IPv4Header, bytes]:
"""Parse IPv4 header and return header object and remaining data"""
# Basic header (20 bytes)
if len(packet) < 20:
raise ValueError("Packet too short for IPv4 header")
ver_ihl, tos, total_len, ident, flags_frag, ttl, proto, checksum, src, dst = \
struct.unpack('!BBHHHBBH4s4s', packet[:20])
version = ver_ihl >> 4
ihl = ver_ihl & 0x0F
flags = flags_frag >> 13
frag_offset = flags_frag & 0x1FFF
header = IPv4Header(
version=version,
ihl=ihl,
tos=tos,
total_length=total_len,
identification=ident,
flags=flags,
fragment_offset=frag_offset,
ttl=ttl,
protocol=proto,
header_checksum=checksum,
source_ip=socket.inet_ntoa(src),
dest_ip=socket.inet_ntoa(dst)
)
# Extract options if present
header_len = header.header_length
if header_len > 20:
header.options = packet[20:header_len]
return header, packet[header_len:]
@staticmethod
def parse_tcp_header(packet: bytes) -> Tuple[TCPHeader, bytes]:
"""Parse TCP header and return header object and remaining data"""
if len(packet) < 20:
raise ValueError("Packet too short for TCP header")
src_port, dst_port, seq, ack, offset_flags, window, checksum, urgent = \
struct.unpack('!HHIIHHH', packet[:20])
data_offset = offset_flags >> 12
flags = offset_flags & 0x1FF
header = TCPHeader(
source_port=src_port,
dest_port=dst_port,
seq_num=seq,
ack_num=ack,
data_offset=data_offset,
flags=flags,
window_size=window,
checksum=checksum,
urgent_pointer=urgent
)
# Extract options if present
header_len = header.header_length
if header_len > 20:
header.options = packet[20:header_len]
return header, packet[header_len:]
@staticmethod
def parse_udp_header(packet: bytes) -> Tuple[UDPHeader, bytes]:
"""Parse UDP header and return header object and remaining data"""
if len(packet) < 8:
raise ValueError("Packet too short for UDP header")
src_port, dst_port, length, checksum = struct.unpack('!HHHH', packet[:8])
header = UDPHeader(
source_port=src_port,
dest_port=dst_port,
length=length,
checksum=checksum
)
return header, packet[8:]
@staticmethod
def calculate_checksum(data: bytes) -> int:
"""Calculate IP/TCP/UDP checksum"""
if len(data) % 2 == 1:
data += b'\0'
words = struct.unpack('!%dH' % (len(data) // 2), data)
checksum = sum(words)
# Fold 32-bit sum into 16 bits
while checksum >> 16:
checksum = (checksum & 0xFFFF) + (checksum >> 16)
return ~checksum & 0xFFFF