VoIPAnalyzer / parsers.py
fabioantonini's picture
Upload 5 files
936432e verified
# parsers.py
import pyshark
from datetime import datetime
class MediaStream:
def __init__(self, ssrc=None, payload_type=None, packets=0):
self.ssrc = ssrc
self.payload_type = payload_type
self.packets = packets
def __repr__(self):
return (f"<MediaStream ssrc={self.ssrc}, "
f"payload_type={self.payload_type}, "
f"packets={self.packets}>")
class Call:
def __init__(self, call_id):
self.call_id = call_id
self.from_tag = None
self.to_tag = None
self.invite_time = None
self.answer_time = None
self.end_time = None
self.sip_messages = [] # store raw SIP info if needed
self.media_streams = {} # key: SSRC, value: MediaStream
# NEW: a chronological record of SIP messages:
# each entry: { 'timestamp', 'src_ip', 'dst_ip', 'message' }
self.sip_sequence = []
def __repr__(self):
return (f"<Call call_id={self.call_id}, from_tag={self.from_tag}, "
f"to_tag={self.to_tag}, invite_time={self.invite_time}, "
f"answer_time={self.answer_time}, end_time={self.end_time}, "
f"media_streams={list(self.media_streams.values())}>")
def parse_pcap(pcap_path):
"""
Parse a pcap/pcapng file using PyShark.
Return a dictionary of calls keyed by Call-ID.
Each call holds relevant SIP and RTP info, including a sip_sequence
for generating call flows.
"""
capture = pyshark.FileCapture(pcap_path, keep_packets=False)
calls_by_id = {}
for packet in capture:
# Convert sniff timestamp to a datetime object
pkt_ts = datetime.utcfromtimestamp(float(packet.sniff_timestamp))
# Attempt to extract src/dst IP (assuming IPv4)
src_ip = getattr(packet.ip, 'src', None) if hasattr(packet, 'ip') else None
dst_ip = getattr(packet.ip, 'dst', None) if hasattr(packet, 'ip') else None
# --- Check for SIP packets ---
if 'sip' in packet:
sip_layer = packet.sip
try:
call_id = sip_layer.call_id.strip()
except AttributeError:
# If we can't find a Call-ID, skip
continue
if call_id not in calls_by_id:
calls_by_id[call_id] = Call(call_id)
call_obj = calls_by_id[call_id]
# Capture raw SIP message if desired
# e.g. "INVITE sip:..." or "SIP/2.0 200 OK"
raw_msg = (sip_layer.get_field_value('Request-Line') or
sip_layer.get_field_value('Status-Line') or
"UNKNOWN SIP MESSAGE")
call_obj.sip_messages.append(raw_msg)
# Try to parse from-tag and to-tag
try:
call_obj.from_tag = sip_layer.from_tag
except AttributeError:
pass
try:
call_obj.to_tag = sip_layer.to_tag
except AttributeError:
pass
# Determine whether it's a request or a response
# and build a short summary like "INVITE (Call-ID: abc)" or "200 (Call-ID: abc)"
message_summary = None
# Check request method
try:
method = sip_layer.Request_Line_Method.lower()
if method:
# e.g. "INVITE" (uppercase) plus the Call-ID
message_summary = f"{method.upper()} (Call-ID: {call_id})"
except AttributeError:
# No method => might be a response
pass
# If it's a response, we look at the status line
if not message_summary:
status_line = sip_layer.get_field_value('Status-Line')
if status_line:
parts = status_line.split(None, 2)
if len(parts) >= 2 and parts[1].isdigit():
# e.g. "SIP/2.0 200 OK"
code = parts[1]
message_summary = f"{code} (Call-ID: {call_id})"
else:
message_summary = f"UNKNOWN RESPONSE (Call-ID: {call_id})"
else:
message_summary = f"UNKNOWN SIP MESSAGE (Call-ID: {call_id})"
# Store the short summary in our new sip_sequence
call_obj.sip_sequence.append({
'timestamp': pkt_ts,
'src_ip': src_ip,
'dst_ip': dst_ip,
'message': message_summary
})
# Record key timestamps
# If we detect an INVITE
if message_summary.startswith("INVITE"):
call_obj.invite_time = call_obj.invite_time or pkt_ts
# If we detect a 200, treat as call answered (naive approach)
if message_summary.startswith("200"):
if call_obj.answer_time is None:
call_obj.answer_time = pkt_ts
# If we detect a BYE or a 200 to a BYE => call ended
if "BYE" in message_summary:
call_obj.end_time = pkt_ts
# If there is an SDP part, you can parse media lines here (not shown)
# ...
# --- Check for RTP packets ---
elif 'rtp' in packet:
rtp_layer = packet.rtp
ssrc = getattr(rtp_layer, 'ssrc', None)
payload_type = getattr(rtp_layer, 'payload_type', None)
# This is a simplistic approach, not tying SSRC directly to the call
# with matching IP/port from SDP. Instead, we store the RTP in all calls.
# In practice, you'd match the IP/port from SDP to associate the flow
# with the correct Call.
for c_obj in calls_by_id.values():
if ssrc not in c_obj.media_streams:
c_obj.media_streams[ssrc] = MediaStream(ssrc=ssrc, payload_type=payload_type)
c_obj.media_streams[ssrc].packets += 1
capture.close()
return calls_by_id