File size: 6,638 Bytes
6a5b8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""

IP Parser/Assembler Module



Handles IPv4 packet parsing and construction:

- Parse IPv4, UDP, and TCP headers

- Calculate and verify checksums

- Handle packet fragmentation and reassembly

- Support various IP options

"""

import struct
import socket
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum


class IPProtocol(Enum):
    ICMP = 1
    TCP = 6
    UDP = 17


@dataclass
class IPv4Header:
    """IPv4 header structure"""
    version: int = 4
    ihl: int = 5  # Internet Header Length (in 32-bit words)
    tos: int = 0  # Type of Service
    total_length: int = 0
    identification: int = 0
    flags: int = 0  # 3 bits: Reserved, Don't Fragment, More Fragments
    fragment_offset: int = 0  # 13 bits
    ttl: int = 64  # Time to Live
    protocol: int = 0
    header_checksum: int = 0
    source_ip: str = '0.0.0.0'
    dest_ip: str = '0.0.0.0'
    options: bytes = b''
    
    @property
    def header_length(self) -> int:
        """Get header length in bytes"""
        return self.ihl * 4
    
    @property
    def dont_fragment(self) -> bool:
        """Check if Don't Fragment flag is set"""
        return bool(self.flags & 0x2)
    
    @property
    def more_fragments(self) -> bool:
        """Check if More Fragments flag is set"""
        return bool(self.flags & 0x1)
    
    @property
    def is_fragment(self) -> bool:
        """Check if this is a fragment"""
        return self.more_fragments or self.fragment_offset > 0


@dataclass
class TCPHeader:
    """TCP header structure"""
    source_port: int = 0
    dest_port: int = 0
    seq_num: int = 0
    ack_num: int = 0
    data_offset: int = 5  # Header length in 32-bit words
    reserved: int = 0
    flags: int = 0  # 9 bits: NS, CWR, ECE, URG, ACK, PSH, RST, SYN, FIN
    window_size: int = 65535
    checksum: int = 0
    urgent_pointer: int = 0
    options: bytes = b''
    
    @property
    def header_length(self) -> int:
        """Get header length in bytes"""
        return self.data_offset * 4
    
    # TCP Flag properties
    @property
    def fin(self) -> bool:
        return bool(self.flags & 0x01)
    
    @property
    def syn(self) -> bool:
        return bool(self.flags & 0x02)
    
    @property
    def rst(self) -> bool:
        return bool(self.flags & 0x04)
    
    @property
    def psh(self) -> bool:
        return bool(self.flags & 0x08)
    
    @property
    def ack(self) -> bool:
        return bool(self.flags & 0x10)
    
    @property
    def urg(self) -> bool:
        return bool(self.flags & 0x20)
    
    @property
    def ece(self) -> bool:
        return bool(self.flags & 0x40)
    
    @property
    def cwr(self) -> bool:
        return bool(self.flags & 0x80)
    
    @property
    def ns(self) -> bool:
        return bool(self.flags & 0x100)


@dataclass
class UDPHeader:
    """UDP header structure"""
    source_port: int = 0
    dest_port: int = 0
    length: int = 8  # Header length (8) + data length
    checksum: int = 0


class IPParser:
    """IP packet parser and assembler"""
    
    @staticmethod
    def parse_ipv4_header(packet: bytes) -> Tuple[IPv4Header, bytes]:
        """Parse IPv4 header and return header object and remaining data"""
        # Basic header (20 bytes)
        if len(packet) < 20:
            raise ValueError("Packet too short for IPv4 header")
            
        ver_ihl, tos, total_len, ident, flags_frag, ttl, proto, checksum, src, dst = \
            struct.unpack('!BBHHHBBH4s4s', packet[:20])
            
        version = ver_ihl >> 4
        ihl = ver_ihl & 0x0F
        flags = flags_frag >> 13
        frag_offset = flags_frag & 0x1FFF
        
        header = IPv4Header(
            version=version,
            ihl=ihl,
            tos=tos,
            total_length=total_len,
            identification=ident,
            flags=flags,
            fragment_offset=frag_offset,
            ttl=ttl,
            protocol=proto,
            header_checksum=checksum,
            source_ip=socket.inet_ntoa(src),
            dest_ip=socket.inet_ntoa(dst)
        )
        
        # Extract options if present
        header_len = header.header_length
        if header_len > 20:
            header.options = packet[20:header_len]
            
        return header, packet[header_len:]
    
    @staticmethod
    def parse_tcp_header(packet: bytes) -> Tuple[TCPHeader, bytes]:
        """Parse TCP header and return header object and remaining data"""
        if len(packet) < 20:
            raise ValueError("Packet too short for TCP header")
            
        src_port, dst_port, seq, ack, offset_flags, window, checksum, urgent = \
            struct.unpack('!HHIIHHH', packet[:20])
            
        data_offset = offset_flags >> 12
        flags = offset_flags & 0x1FF
        
        header = TCPHeader(
            source_port=src_port,
            dest_port=dst_port,
            seq_num=seq,
            ack_num=ack,
            data_offset=data_offset,
            flags=flags,
            window_size=window,
            checksum=checksum,
            urgent_pointer=urgent
        )
        
        # Extract options if present
        header_len = header.header_length
        if header_len > 20:
            header.options = packet[20:header_len]
            
        return header, packet[header_len:]
    
    @staticmethod
    def parse_udp_header(packet: bytes) -> Tuple[UDPHeader, bytes]:
        """Parse UDP header and return header object and remaining data"""
        if len(packet) < 8:
            raise ValueError("Packet too short for UDP header")
            
        src_port, dst_port, length, checksum = struct.unpack('!HHHH', packet[:8])
        
        header = UDPHeader(
            source_port=src_port,
            dest_port=dst_port,
            length=length,
            checksum=checksum
        )
        
        return header, packet[8:]
    
    @staticmethod
    def calculate_checksum(data: bytes) -> int:
        """Calculate IP/TCP/UDP checksum"""
        if len(data) % 2 == 1:
            data += b'\0'
            
        words = struct.unpack('!%dH' % (len(data) // 2), data)
        checksum = sum(words)
        
        # Fold 32-bit sum into 16 bits
        while checksum >> 16:
            checksum = (checksum & 0xFFFF) + (checksum >> 16)
            
        return ~checksum & 0xFFFF