Spaces:
Sleeping
Sleeping
| # parsers.py | |
| import pyshark | |
| from datetime import datetime | |
| class MediaStream: | |
| def __init__(self, ssrc=None, payload_type=None, packets=0): | |
| self.ssrc = ssrc | |
| self.payload_type = payload_type | |
| self.packets = packets | |
| def __repr__(self): | |
| return (f"<MediaStream ssrc={self.ssrc}, " | |
| f"payload_type={self.payload_type}, " | |
| f"packets={self.packets}>") | |
| class Call: | |
| def __init__(self, call_id): | |
| self.call_id = call_id | |
| self.from_tag = None | |
| self.to_tag = None | |
| self.invite_time = None | |
| self.answer_time = None | |
| self.end_time = None | |
| self.sip_messages = [] # store raw SIP info if needed | |
| self.media_streams = {} # key: SSRC, value: MediaStream | |
| # NEW: a chronological record of SIP messages: | |
| # each entry: { 'timestamp', 'src_ip', 'dst_ip', 'message' } | |
| self.sip_sequence = [] | |
| def __repr__(self): | |
| return (f"<Call call_id={self.call_id}, from_tag={self.from_tag}, " | |
| f"to_tag={self.to_tag}, invite_time={self.invite_time}, " | |
| f"answer_time={self.answer_time}, end_time={self.end_time}, " | |
| f"media_streams={list(self.media_streams.values())}>") | |
| def parse_pcap(pcap_path): | |
| """ | |
| Parse a pcap/pcapng file using PyShark. | |
| Return a dictionary of calls keyed by Call-ID. | |
| Each call holds relevant SIP and RTP info, including a sip_sequence | |
| for generating call flows. | |
| """ | |
| capture = pyshark.FileCapture(pcap_path, keep_packets=False) | |
| calls_by_id = {} | |
| for packet in capture: | |
| # Convert sniff timestamp to a datetime object | |
| pkt_ts = datetime.utcfromtimestamp(float(packet.sniff_timestamp)) | |
| # Attempt to extract src/dst IP (assuming IPv4) | |
| src_ip = getattr(packet.ip, 'src', None) if hasattr(packet, 'ip') else None | |
| dst_ip = getattr(packet.ip, 'dst', None) if hasattr(packet, 'ip') else None | |
| # --- Check for SIP packets --- | |
| if 'sip' in packet: | |
| sip_layer = packet.sip | |
| try: | |
| call_id = sip_layer.call_id.strip() | |
| except AttributeError: | |
| # If we can't find a Call-ID, skip | |
| continue | |
| if call_id not in calls_by_id: | |
| calls_by_id[call_id] = Call(call_id) | |
| call_obj = calls_by_id[call_id] | |
| # Capture raw SIP message if desired | |
| # e.g. "INVITE sip:..." or "SIP/2.0 200 OK" | |
| raw_msg = (sip_layer.get_field_value('Request-Line') or | |
| sip_layer.get_field_value('Status-Line') or | |
| "UNKNOWN SIP MESSAGE") | |
| call_obj.sip_messages.append(raw_msg) | |
| # Try to parse from-tag and to-tag | |
| try: | |
| call_obj.from_tag = sip_layer.from_tag | |
| except AttributeError: | |
| pass | |
| try: | |
| call_obj.to_tag = sip_layer.to_tag | |
| except AttributeError: | |
| pass | |
| # Determine whether it's a request or a response | |
| # and build a short summary like "INVITE (Call-ID: abc)" or "200 (Call-ID: abc)" | |
| message_summary = None | |
| # Check request method | |
| try: | |
| method = sip_layer.Request_Line_Method.lower() | |
| if method: | |
| # e.g. "INVITE" (uppercase) plus the Call-ID | |
| message_summary = f"{method.upper()} (Call-ID: {call_id})" | |
| except AttributeError: | |
| # No method => might be a response | |
| pass | |
| # If it's a response, we look at the status line | |
| if not message_summary: | |
| status_line = sip_layer.get_field_value('Status-Line') | |
| if status_line: | |
| parts = status_line.split(None, 2) | |
| if len(parts) >= 2 and parts[1].isdigit(): | |
| # e.g. "SIP/2.0 200 OK" | |
| code = parts[1] | |
| message_summary = f"{code} (Call-ID: {call_id})" | |
| else: | |
| message_summary = f"UNKNOWN RESPONSE (Call-ID: {call_id})" | |
| else: | |
| message_summary = f"UNKNOWN SIP MESSAGE (Call-ID: {call_id})" | |
| # Store the short summary in our new sip_sequence | |
| call_obj.sip_sequence.append({ | |
| 'timestamp': pkt_ts, | |
| 'src_ip': src_ip, | |
| 'dst_ip': dst_ip, | |
| 'message': message_summary | |
| }) | |
| # Record key timestamps | |
| # If we detect an INVITE | |
| if message_summary.startswith("INVITE"): | |
| call_obj.invite_time = call_obj.invite_time or pkt_ts | |
| # If we detect a 200, treat as call answered (naive approach) | |
| if message_summary.startswith("200"): | |
| if call_obj.answer_time is None: | |
| call_obj.answer_time = pkt_ts | |
| # If we detect a BYE or a 200 to a BYE => call ended | |
| if "BYE" in message_summary: | |
| call_obj.end_time = pkt_ts | |
| # If there is an SDP part, you can parse media lines here (not shown) | |
| # ... | |
| # --- Check for RTP packets --- | |
| elif 'rtp' in packet: | |
| rtp_layer = packet.rtp | |
| ssrc = getattr(rtp_layer, 'ssrc', None) | |
| payload_type = getattr(rtp_layer, 'payload_type', None) | |
| # This is a simplistic approach, not tying SSRC directly to the call | |
| # with matching IP/port from SDP. Instead, we store the RTP in all calls. | |
| # In practice, you'd match the IP/port from SDP to associate the flow | |
| # with the correct Call. | |
| for c_obj in calls_by_id.values(): | |
| if ssrc not in c_obj.media_streams: | |
| c_obj.media_streams[ssrc] = MediaStream(ssrc=ssrc, payload_type=payload_type) | |
| c_obj.media_streams[ssrc].packets += 1 | |
| capture.close() | |
| return calls_by_id | |