SipRtpAnalyzer / sip_rtp_pcap_analyzer_with_visualization.py
fabioantonini's picture
Upload sip_rtp_pcap_analyzer_with_visualization.py
40d7b61 verified
#!/usr/bin/env python3
"""
SIP and RTP PCAP Analyzer with Visualization - A tool to analyze PCAP files and extract SIP messages
and RTP streams for VoIP call analysis, with graphical visualization capabilities.
This script can parse PCAP/PCAPNG files and identify SIP messages exchanged during VoIP call
establishments, including INVITE, ACK, BYE, and other SIP messages. It also analyzes RTP streams
to provide metrics on call quality including jitter, packet loss, and MOS score. Additionally,
it generates graphical visualizations of SIP signaling and RTP media flows.
"""
import sys
import os
import argparse
import re
import logging
import math
import subprocess
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any, Set, DefaultDict
from collections import defaultdict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('sip_rtp_pcap_analyzer')
# Try to import different packet analysis libraries
# We'll use the first one available in this priority order: scapy, pyshark, dpkt
LIBRARY = None
try:
import scapy.all as scapy
from scapy.layers.inet import IP, UDP, TCP
try:
from scapy.layers.rtp import RTP
HAS_RTP_LAYER = True
except ImportError:
HAS_RTP_LAYER = False
LIBRARY = "scapy"
logger.info("Using scapy for packet analysis")
except ImportError:
try:
import pyshark
LIBRARY = "pyshark"
logger.info("Using pyshark for packet analysis")
except ImportError:
try:
import dpkt
import socket
from dpkt.ethernet import Ethernet
LIBRARY = "dpkt"
logger.info("Using dpkt for packet analysis")
except ImportError:
logger.error("No packet analysis library found. Please install scapy, pyshark, or dpkt.")
sys.exit(1)
# Try to import visualization libraries
try:
import plantuml
HAS_PLANTUML = True
except ImportError:
HAS_PLANTUML = False
logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.")
try:
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
HAS_MATPLOTLIB = True
except ImportError:
HAS_MATPLOTLIB = False
logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.")
# SIP message types we're interested in
SIP_METHODS = [
"INVITE", "ACK", "BYE", "CANCEL", "REGISTER", "OPTIONS",
"PRACK", "SUBSCRIBE", "NOTIFY", "PUBLISH", "INFO", "REFER", "MESSAGE", "UPDATE"
]
# SIP response codes and their meanings
SIP_RESPONSES = {
"1": "Informational",
"2": "Success",
"3": "Redirection",
"4": "Client Error",
"5": "Server Error",
"6": "Global Failure"
}
# Common SIP ports
SIP_PORTS = {5060, 5061}
# Common RTP port range
RTP_PORT_RANGE = (10000, 65535) # Typical RTP port range
# RTP payload type to codec mapping
RTP_CODEC_MAP = {
0: "PCMU/G.711u",
3: "GSM",
4: "G723",
8: "PCMA/G.711a",
9: "G722",
10: "L16 (stereo)",
11: "L16 (mono)",
18: "G729",
26: "JPEG",
31: "H261",
32: "MPV",
33: "MP2T",
34: "H263",
96: "Dynamic",
97: "Dynamic",
98: "Dynamic",
99: "Dynamic",
100: "Dynamic",
101: "Dynamic",
110: "OPUS",
111: "OPUS"
}
class RtpStream:
"""Class to represent an RTP stream with its packets and metrics"""
def __init__(self, ssrc: int, src_ip: str, src_port: int, dst_ip: str, dst_port: int):
self.ssrc = ssrc
self.src_ip = src_ip
self.src_port = src_port
self.dst_ip = dst_ip
self.dst_port = dst_port
self.packets = []
self.start_time = None
self.end_time = None
self.packet_count = 0
self.expected_packets = 0
self.lost_packets = 0
self.jitter = 0.0
self.jitter_values = [] # Store jitter values for each packet for visualization
self.last_seq = None
self.last_timestamp = None
self.last_arrival = None
self.codec = None
self.payload_type = None
self.mos_score = None
self.rtt = None # Round Trip Time in ms
def add_packet(self, timestamp: datetime, seq: int, rtp_timestamp: int,
payload_type: int, payload_size: int):
"""Add an RTP packet to this stream and update metrics"""
if not self.start_time or timestamp < self.start_time:
self.start_time = timestamp
if not self.end_time or timestamp > self.end_time:
self.end_time = timestamp
# Set payload type if not already set
if self.payload_type is None:
self.payload_type = payload_type
# Calculate packet loss
if self.last_seq is not None:
expected_seq = (self.last_seq + 1) % 65536 # RTP sequence numbers are 16-bit
if seq != expected_seq:
# Account for sequence number wraparound
if seq > expected_seq:
self.lost_packets += seq - expected_seq
else:
self.lost_packets += (65536 - expected_seq) + seq
# Calculate jitter (RFC 3550 algorithm)
if self.last_timestamp is not None and self.last_arrival is not None:
# Convert timestamps to milliseconds for easier calculation
arrival_ms = timestamp.timestamp() * 1000
last_arrival_ms = self.last_arrival.timestamp() * 1000
# Calculate transit time difference
transit = arrival_ms - rtp_timestamp
last_transit = last_arrival_ms - self.last_timestamp
# Calculate difference in transit times
d = abs(transit - last_transit)
# Update jitter using RFC 3550 formula: J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16
self.jitter = self.jitter + (d - self.jitter) / 16
# Store jitter value for visualization
self.jitter_values.append(self.jitter)
else:
self.jitter_values.append(0.0)
# Store current values for next calculation
self.last_seq = seq
self.last_timestamp = rtp_timestamp
self.last_arrival = timestamp
# Add packet to list and increment counter
self.packets.append({
"timestamp": timestamp,
"seq": seq,
"rtp_timestamp": rtp_timestamp,
"payload_type": payload_type,
"payload_size": payload_size
})
self.packet_count += 1
self.expected_packets = self.packet_count + self.lost_packets
def get_packet_loss_percentage(self) -> float:
"""Get packet loss as a percentage"""
if self.expected_packets == 0:
return 0.0
return (self.lost_packets / self.expected_packets) * 100
def calculate_mos(self) -> float:
"""Calculate Mean Opinion Score (MOS) based on packet loss and jitter"""
# Default RTT if not available
rtt = self.rtt if self.rtt is not None else 50 # Default 50ms
# Calculate effective latency
effective_latency = rtt + (self.jitter * 2) + 10 # Add 10ms for protocol latencies
# Calculate R factor based on ITU-T G.107 E-model
if effective_latency < 160:
r = 93.2 - (effective_latency / 40)
else:
r = 93.2 - (effective_latency - 120) / 10
# Deduct for packet loss (2.5 R values per percentage)
packet_loss = self.get_packet_loss_percentage()
r = r - (packet_loss * 2.5)
# Convert R to MOS using ITU-T G.107 formula
if r < 0:
mos = 1.0
elif r > 100:
mos = 4.5
else:
mos = 1 + (0.035 * r) + (r * (r - 60) * (100 - r) * 7 * 10**-6)
# Clamp MOS between 1.0 and 4.5
mos = max(1.0, min(4.5, mos))
self.mos_score = mos
return mos
def get_duration(self) -> float:
"""Get stream duration in seconds"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0
def get_codec_name(self) -> str:
"""Get codec name based on payload type"""
if self.codec:
return self.codec
if self.payload_type in RTP_CODEC_MAP:
return RTP_CODEC_MAP[self.payload_type]
return f"Unknown ({self.payload_type})"
def get_quality_rating(self) -> str:
"""Get a human-readable quality rating based on MOS score"""
if self.mos_score is None:
self.calculate_mos()
if self.mos_score >= 4.3:
return "Excellent"
elif self.mos_score >= 4.0:
return "Good"
elif self.mos_score >= 3.6:
return "Fair"
elif self.mos_score >= 3.1:
return "Poor"
else:
return "Bad"
def __str__(self) -> str:
"""String representation of the RTP stream"""
if self.mos_score is None:
self.calculate_mos()
result = f"RTP Stream: SSRC={self.ssrc}\n"
result += f"Source: {self.src_ip}:{self.src_port}\n"
result += f"Destination: {self.dst_ip}:{self.dst_port}\n"
result += f"Codec: {self.get_codec_name()}\n"
result += f"Start Time: {self.start_time}\n"
result += f"End Time: {self.end_time}\n"
result += f"Duration: {self.get_duration():.2f} seconds\n"
result += f"Packets: {self.packet_count}\n"
result += f"Lost Packets: {self.lost_packets}\n"
result += f"Packet Loss: {self.get_packet_loss_percentage():.2f}%\n"
result += f"Jitter: {self.jitter:.2f} ms\n"
if self.rtt is not None:
result += f"Round Trip Time: {self.rtt:.2f} ms\n"
result += f"MOS Score: {self.mos_score:.2f} ({self.get_quality_rating()})\n"
return result
class SipCall:
"""Class to represent a SIP call with its messages and RTP streams"""
def __init__(self, call_id: str):
self.call_id = call_id
self.messages = []
self.start_time = None
self.end_time = None
self.from_uri = None
self.to_uri = None
self.status = "Unknown" # Can be "Setup", "Established", "Terminated", "Failed"
self.call_direction = None # Can be "Outgoing", "Incoming", or None
self.media_info = {} # Store media information from SDP
self.rtp_streams = {} # Dictionary of RTP streams indexed by SSRC
def add_message(self, timestamp: datetime, method: str, source_ip: str,
dest_ip: str, headers: Dict[str, str], content: str = None):
"""Add a SIP message to this call"""
if not self.start_time or timestamp < self.start_time:
self.start_time = timestamp
if not self.end_time or timestamp > self.end_time:
self.end_time = timestamp
# Extract From and To URIs from the first message
if not self.from_uri and "From" in headers:
from_match = re.search(r'<(sip:[^>]+)>', headers["From"])
if from_match:
self.from_uri = from_match.group(1)
if not self.to_uri and "To" in headers:
to_match = re.search(r'<(sip:[^>]+)>', headers["To"])
if to_match:
self.to_uri = to_match.group(1)
# Update call status based on method
if method == "INVITE":
self.status = "Setup"
elif method == "ACK" and self.status == "Setup":
self.status = "Established"
elif method == "BYE":
self.status = "Terminated"
elif method.startswith("SIP/2.0 4") or method.startswith("SIP/2.0 5") or method.startswith("SIP/2.0 6"):
self.status = "Failed"
# Extract media information from SDP if present
if content and "m=" in content:
self._parse_sdp(content)
# Add the message to the list
self.messages.append({
"timestamp": timestamp,
"method": method,
"source_ip": source_ip,
"dest_ip": dest_ip,
"headers": headers,
"content": content
})
def _parse_sdp(self, sdp_content: str):
"""Parse SDP content to extract media information"""
# Extract media type and port
media_match = re.search(r'm=(\w+)\s+(\d+)', sdp_content)
if media_match:
media_type = media_match.group(1)
media_port = int(media_match.group(2))
self.media_info['type'] = media_type
self.media_info['port'] = media_port
# Extract codec information
codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', sdp_content)
if codec_match:
codec_id = codec_match.group(1)
codec_name = codec_match.group(2)
codec_rate = codec_match.group(3)
self.media_info['codec'] = {
'id': codec_id,
'name': codec_name,
'rate': codec_rate
}
# Extract connection information (IP)
conn_match = re.search(r'c=IN\s+IP[46]\s+([^\s]+)', sdp_content)
if conn_match:
self.media_info['connection_address'] = conn_match.group(1)
def add_rtp_stream(self, rtp_stream: RtpStream):
"""Add an RTP stream to this call"""
self.rtp_streams[rtp_stream.ssrc] = rtp_stream
def get_duration(self) -> float:
"""Get call duration in seconds"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0
def get_average_mos(self) -> float:
"""Get average MOS score across all RTP streams"""
if not self.rtp_streams:
return 0.0
total_mos = 0.0
for stream in self.rtp_streams.values():
if stream.mos_score is None:
stream.calculate_mos()
total_mos += stream.mos_score
return total_mos / len(self.rtp_streams)
def get_call_quality_summary(self) -> Dict[str, Any]:
"""Get a summary of call quality metrics"""
if not self.rtp_streams:
return {"status": "No RTP streams found"}
avg_mos = self.get_average_mos()
avg_jitter = sum(s.jitter for s in self.rtp_streams.values()) / len(self.rtp_streams)
avg_packet_loss = sum(s.get_packet_loss_percentage() for s in self.rtp_streams.values()) / len(self.rtp_streams)
quality_rating = "Unknown"
if avg_mos >= 4.3:
quality_rating = "Excellent"
elif avg_mos >= 4.0:
quality_rating = "Good"
elif avg_mos >= 3.6:
quality_rating = "Fair"
elif avg_mos >= 3.1:
quality_rating = "Poor"
else:
quality_rating = "Bad"
return {
"mos": avg_mos,
"jitter": avg_jitter,
"packet_loss": avg_packet_loss,
"rating": quality_rating,
"stream_count": len(self.rtp_streams)
}
def __str__(self) -> str:
"""String representation of the call"""
result = f"Call ID: {self.call_id}\n"
result += f"Status: {self.status}\n"
result += f"From: {self.from_uri}\n"
result += f"To: {self.to_uri}\n"
result += f"Start Time: {self.start_time}\n"
result += f"End Time: {self.end_time}\n"
result += f"Duration: {self.get_duration():.2f} seconds\n"
# Add media information if available
if self.media_info:
result += "Media Information:\n"
for key, value in self.media_info.items():
if isinstance(value, dict):
result += f" {key}:\n"
for k, v in value.items():
result += f" {k}: {v}\n"
else:
result += f" {key}: {value}\n"
# Add call quality summary if RTP streams are available
if self.rtp_streams:
quality = self.get_call_quality_summary()
result += "Call Quality:\n"
result += f" MOS Score: {quality['mos']:.2f} ({quality['rating']})\n"
result += f" Average Jitter: {quality['jitter']:.2f} ms\n"
result += f" Average Packet Loss: {quality['packet_loss']:.2f}%\n"
result += f" RTP Streams: {quality['stream_count']}\n"
result += f"Message Count: {len(self.messages)}\n"
result += "Message Flow:\n"
for i, msg in enumerate(self.messages, 1):
result += f" {i}. [{msg['timestamp']}] {msg['source_ip']} -> {msg['dest_ip']}: {msg['method']}\n"
# Add RTP stream details
if self.rtp_streams:
result += "RTP Streams:\n"
for ssrc, stream in self.rtp_streams.items():
result += f" Stream SSRC: {ssrc}\n"
result += f" Source: {stream.src_ip}:{stream.src_port}\n"
result += f" Destination: {stream.dst_ip}:{stream.dst_port}\n"
result += f" Codec: {stream.get_codec_name()}\n"
result += f" Packets: {stream.packet_count}\n"
result += f" Lost Packets: {stream.lost_packets}\n"
result += f" Packet Loss: {stream.get_packet_loss_percentage():.2f}%\n"
result += f" Jitter: {stream.jitter:.2f} ms\n"
result += f" MOS Score: {stream.mos_score:.2f} ({stream.get_quality_rating()})\n"
return result
class SipRtpVisualizer:
"""Class for generating visualizations of SIP and RTP flows"""
def __init__(self, output_dir: Optional[str] = None,
plantuml_server: str = "http://www.plantuml.com/plantuml"):
"""Initialize the visualizer
Args:
output_dir: Directory to save visualization files (default: current directory)
plantuml_server: PlantUML server URL for rendering diagrams
"""
self.output_dir = output_dir or os.getcwd()
self.plantuml_server = plantuml_server
# Create output directory if it doesn't exist
os.makedirs(self.output_dir, exist_ok=True)
# Initialize PlantUML
if HAS_PLANTUML:
try:
self.plantuml = plantuml.PlantUML(url=self.plantuml_server)
except Exception as e:
logger.warning(f"Error initializing PlantUML: {e}")
self.plantuml = None
else:
self.plantuml = None
def generate_sip_sequence_diagram(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate a sequence diagram for SIP calls
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: sip_sequence.png in output_dir)
Returns:
Path to the generated diagram file
"""
if not calls:
logger.warning("No SIP calls to visualize")
return ""
if not HAS_PLANTUML or not self.plantuml:
logger.error("PlantUML not available. Cannot generate sequence diagram.")
return ""
# Use the first call if multiple calls exist
# In future, could support multiple calls in separate diagrams
call_id = next(iter(calls))
call = calls[call_id]
# Start building PlantUML diagram
diagram = ["@startuml"]
diagram.append("skinparam sequenceMessageAlign center")
diagram.append("skinparam sequenceArrowThickness 2")
diagram.append("skinparam roundcorner 5")
diagram.append("skinparam maxmessagesize 200")
diagram.append("")
# Extract unique participants
participants = set()
for msg in call.messages:
participants.add(msg['source_ip'])
participants.add(msg['dest_ip'])
# Define participants
for i, participant in enumerate(participants):
# Try to determine if this is caller or callee
if call.from_uri and participant in call.from_uri:
label = f"Caller\\n{participant}"
elif call.to_uri and participant in call.to_uri:
label = f"Callee\\n{participant}"
else:
label = f"Endpoint {i+1}\\n{participant}"
diagram.append(f'participant "{label}" as P{i}')
diagram.append("")
# Map IP addresses to participant IDs
ip_to_participant = {ip: f"P{i}" for i, ip in enumerate(participants)}
# Add messages
for msg in call.messages:
src = ip_to_participant[msg['source_ip']]
dst = ip_to_participant[msg['dest_ip']]
# Format the message text
method = msg['method']
# For SIP responses, extract the status code
if method.startswith("SIP/2.0"):
parts = method.split(" ", 2)
if len(parts) >= 2:
status_code = parts[1]
if status_code.startswith("1"):
arrow_type = "-->" # Dotted arrow for provisional responses
else:
arrow_type = "->"
else:
arrow_type = "->"
else:
arrow_type = "->"
# Add the message to the diagram
diagram.append(f"{src} {arrow_type} {dst}: {method}")
# Add SDP information as notes if present
if msg.get('content') and 'm=audio' in msg.get('content', ''):
sdp_note = []
# Extract media type and port
media_match = re.search(r'm=(\w+)\s+(\d+)', msg['content'])
if media_match:
media_type = media_match.group(1)
media_port = media_match.group(2)
sdp_note.append(f"SDP: {media_type} {media_port}")
# Extract codec information
codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', msg['content'])
if codec_match:
codec_id = codec_match.group(1)
codec_name = codec_match.group(2)
sdp_note.append(f"RTP/AVP {codec_id} ({codec_name})")
if sdp_note:
note_position = "right" if src < dst else "left"
diagram.append(f"note {note_position}: {' '.join(sdp_note)}")
# Add RTP stream if present
if call.rtp_streams:
# Calculate total RTP duration
rtp_duration = 0
for stream in call.rtp_streams.values():
duration = stream.get_duration()
if duration > rtp_duration:
rtp_duration = duration
if rtp_duration > 0:
diagram.append("")
diagram.append(f"... RTP Media Stream ({rtp_duration:.2f} seconds) ...")
diagram.append("")
diagram.append("@enduml")
# Join the diagram lines
diagram_str = "\n".join(diagram)
# Generate the diagram
if not output_file:
output_file = os.path.join(self.output_dir, f"sip_sequence_{call_id}.png")
try:
# Save the PlantUML source first
source_file = os.path.splitext(output_file)[0] + ".puml"
with open(source_file, 'w') as f:
f.write(diagram_str)
logger.info(f"PlantUML source saved: {source_file}")
# Try to generate the diagram
try:
#self.plantuml.processes_file(diagram_str, outfile=output_file)
self.plantuml.processes_file(source_file)
logger.info(f"Sequence diagram generated: {output_file}")
return output_file
except Exception as e:
# If PlantUML server fails, try using local command-line tool if available
logger.warning(f"PlantUML server error: {str(e)}")
try:
cmd = f"plantuml {source_file}"
subprocess.run(cmd, shell=True, check=True)
logger.info(f"Sequence diagram generated using local PlantUML: {output_file}")
return output_file
except Exception as e2:
logger.warning(f"Local PlantUML failed: {str(e2)}")
logger.info("Continuing without sequence diagram. PlantUML source is still available.")
return source_file
except Exception as e:
logger.error(f"Error generating sequence diagram: {str(e)}")
return ""
def generate_rtp_visualization(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate visualization for RTP streams
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: rtp_visualization.png in output_dir)
Returns:
Path to the generated visualization file
"""
if not calls or not HAS_MATPLOTLIB:
logger.warning("No SIP calls to visualize or Matplotlib not available")
return ""
# Find calls with RTP streams
calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams}
if not calls_with_rtp:
logger.warning("No RTP streams to visualize")
return ""
# Use the first call with RTP streams
call_id = next(iter(calls_with_rtp))
call = calls_with_rtp[call_id]
# Get the first RTP stream
stream_id = next(iter(call.rtp_streams))
stream = call.rtp_streams[stream_id]
# Create figure with subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), gridspec_kw={'height_ratios': [3, 1]})
# Extract packet data
if not stream.packets:
logger.warning("No packets in RTP stream")
return ""
# Create timestamps relative to the first packet
first_timestamp = stream.packets[0]['timestamp']
timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets]
# Create jitter data (use actual jitter if available, otherwise simulate)
if hasattr(stream, 'jitter_values') and stream.jitter_values:
jitter_values = stream.jitter_values
else:
# Simulate jitter values based on the average jitter
base_jitter = stream.jitter
jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))]
# Create packet loss data
packet_loss = np.zeros(len(timestamps))
if stream.lost_packets > 0:
# Simulate packet loss positions
loss_indices = np.random.choice(
range(len(timestamps)),
size=min(stream.lost_packets, len(timestamps)//10), # Limit for visualization clarity
replace=False
)
packet_loss[loss_indices] = 1
# Plot jitter
ax1.plot(timestamps, jitter_values, color='blue', alpha=0.7, label='Jitter (ms)')
# Highlight packet loss if any
lost_packets = np.where(packet_loss == 1)[0]
if len(lost_packets) > 0:
ax1.scatter(
[timestamps[i] for i in lost_packets],
[jitter_values[i] for i in lost_packets],
color='red', s=50, label='Lost Packets'
)
# Add MOS score indicator
mos_score = stream.mos_score if stream.mos_score is not None else stream.calculate_mos()
quality_rating = stream.get_quality_rating()
# Color code based on MOS
if mos_score >= 4.0:
mos_color = 'green'
elif mos_score >= 3.5:
mos_color = 'orange'
else:
mos_color = 'red'
ax1.text(0.02, 0.95, f'MOS Score: {mos_score:.2f} ({quality_rating})',
transform=ax1.transAxes,
bbox=dict(facecolor=mos_color, alpha=0.5))
# Formatting for jitter plot
ax1.set_xlabel('Time (seconds)')
ax1.set_ylabel('Jitter (ms)')
ax1.set_title(f'RTP Stream Analysis - SSRC: {stream.ssrc}')
ax1.grid(True, alpha=0.3)
ax1.legend()
# Add packet flow visualization in the second subplot
packet_y = np.ones(len(timestamps))
ax2.scatter(timestamps, packet_y, marker='|', s=10, color='blue', alpha=0.7)
# Highlight lost packets
if len(lost_packets) > 0:
# Calculate positions for lost packets
lost_x = []
for i in range(len(timestamps)-1):
if i+1 in lost_packets or i in lost_packets:
# Find midpoint between packets
if i+1 < len(timestamps):
midpoint = (timestamps[i] + timestamps[i+1]) / 2
lost_x.append(midpoint)
if lost_x:
ax2.scatter(lost_x, np.ones(len(lost_x)), marker='x', s=50, color='red',
label='Lost Packets')
# Formatting for packet flow
ax2.set_yticks([])
ax2.set_xlabel('Time (seconds)')
ax2.set_title('Packet Flow')
ax2.grid(True, alpha=0.3, axis='x')
# Add stream information
info_text = (
f"Source: {stream.src_ip}:{stream.src_port}\n"
f"Destination: {stream.dst_ip}:{stream.dst_port}\n"
f"Codec: {stream.get_codec_name()}\n"
f"Packets: {stream.packet_count}\n"
f"Lost Packets: {stream.lost_packets} ({stream.get_packet_loss_percentage():.2f}%)"
)
ax2.text(0.02, 0.5, info_text, transform=ax2.transAxes,
bbox=dict(facecolor='white', alpha=0.8))
plt.tight_layout()
# Save the figure
if not output_file:
output_file = os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png")
plt.savefig(output_file, dpi=100)
plt.close(fig)
logger.info(f"RTP visualization generated: {output_file}")
return output_file
def generate_call_quality_dashboard(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate a dashboard with call quality metrics
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: call_quality.png in output_dir)
Returns:
Path to the generated dashboard file
"""
if not calls or not HAS_MATPLOTLIB:
logger.warning("No SIP calls to visualize or Matplotlib not available")
return ""
# Find calls with RTP streams
calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams}
if not calls_with_rtp:
logger.warning("No RTP streams to visualize")
return ""
# Create figure with subplots
fig = plt.figure(figsize=(12, 8))
gs = fig.add_gridspec(2, 2)
# Call summary subplot
ax_summary = fig.add_subplot(gs[0, 0])
# MOS score gauge subplot
ax_mos = fig.add_subplot(gs[0, 1])
# Jitter subplot
ax_jitter = fig.add_subplot(gs[1, 0])
# Packet loss subplot
ax_loss = fig.add_subplot(gs[1, 1])
# Process each call
for i, (call_id, call) in enumerate(calls_with_rtp.items()):
# Calculate average metrics across all streams
avg_mos = call.get_average_mos()
total_jitter = 0
total_packet_loss_pct = 0
total_packets = 0
total_lost_packets = 0
for stream in call.rtp_streams.values():
total_jitter += stream.jitter
total_packets += stream.packet_count
total_lost_packets += stream.lost_packets
avg_jitter = total_jitter / len(call.rtp_streams) if call.rtp_streams else 0
if total_packets > 0:
overall_packet_loss_pct = (total_lost_packets / (total_packets + total_lost_packets)) * 100
else:
overall_packet_loss_pct = 0
# Call summary
summary_text = (
f"Call ID: {call_id}\n"
f"From: {call.from_uri}\n"
f"To: {call.to_uri}\n"
f"Duration: {call.get_duration():.2f} seconds\n"
f"Status: {call.status}\n"
f"RTP Streams: {len(call.rtp_streams)}\n"
f"Total Packets: {total_packets}\n"
f"Lost Packets: {total_lost_packets}\n"
f"Packet Loss: {overall_packet_loss_pct:.2f}%\n"
f"Average Jitter: {avg_jitter:.2f} ms\n"
f"MOS Score: {avg_mos:.2f}"
)
ax_summary.text(0.05, 0.95, summary_text, transform=ax_summary.transAxes,
verticalalignment='top', fontsize=10)
ax_summary.axis('off')
ax_summary.set_title("Call Summary")
# MOS score gauge
self._draw_mos_gauge(ax_mos, avg_mos)
# Get the first stream for detailed metrics
if call.rtp_streams:
stream_id = next(iter(call.rtp_streams))
stream = call.rtp_streams[stream_id]
# Extract packet data
if stream.packets:
# Create timestamps relative to the first packet
first_timestamp = stream.packets[0]['timestamp']
timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets]
# Create jitter data (use actual jitter if available, otherwise simulate)
if hasattr(stream, 'jitter_values') and stream.jitter_values:
jitter_values = stream.jitter_values
else:
# Simulate jitter values based on the average jitter
base_jitter = stream.jitter
jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))]
# Plot jitter
ax_jitter.plot(timestamps, jitter_values, color='blue', alpha=0.7)
ax_jitter.set_xlabel('Time (seconds)')
ax_jitter.set_ylabel('Jitter (ms)')
ax_jitter.set_title('Jitter Over Time')
ax_jitter.grid(True, alpha=0.3)
# Add threshold lines
ax_jitter.axhline(y=20, color='green', linestyle='--', alpha=0.5)
ax_jitter.axhline(y=50, color='orange', linestyle='--', alpha=0.5)
ax_jitter.axhline(y=100, color='red', linestyle='--', alpha=0.5)
# Add threshold labels
ax_jitter.text(timestamps[-1], 20, 'Excellent (<20ms)',
verticalalignment='bottom', horizontalalignment='right', color='green')
ax_jitter.text(timestamps[-1], 50, 'Good (<50ms)',
verticalalignment='bottom', horizontalalignment='right', color='orange')
ax_jitter.text(timestamps[-1], 100, 'Fair (<100ms)',
verticalalignment='bottom', horizontalalignment='right', color='red')
# Packet loss pie chart
labels = ['Received', 'Lost']
sizes = [total_packets, total_lost_packets]
colors = ['#66b3ff', '#ff6666']
if sum(sizes) > 0: # Avoid division by zero
ax_loss.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
ax_loss.axis('equal')
ax_loss.set_title('Packet Loss')
else:
ax_loss.text(0.5, 0.5, "No packet data available",
horizontalalignment='center', verticalalignment='center')
ax_loss.axis('off')
plt.tight_layout()
# Save the figure
if not output_file:
output_file = os.path.join(self.output_dir, "call_quality_dashboard.png")
plt.savefig(output_file, dpi=100)
plt.close(fig)
logger.info(f"Call quality dashboard generated: {output_file}")
return output_file
def _draw_mos_gauge(self, ax, mos_score):
"""Draw a gauge chart for MOS score
Args:
ax: Matplotlib axis to draw on
mos_score: MOS score (1.0-4.5)
"""
# Normalize MOS score to 0-1 range
norm_mos = (mos_score - 1.0) / 3.5 # MOS range is 1.0-4.5
norm_mos = max(0, min(1, norm_mos)) # Clamp to 0-1
# Create gauge
gauge_angles = np.linspace(0, 180, 100) * np.pi / 180
gauge_radius = 0.8
gauge_width = 0.2
# Draw gauge background
for i, angle in enumerate(gauge_angles[:-1]):
next_angle = gauge_angles[i+1]
# Determine color based on position
pos = i / len(gauge_angles[:-1])
if pos < 0.3: # Bad to Poor (red)
color = 'red'
elif pos < 0.6: # Poor to Fair (orange)
color = 'orange'
else: # Fair to Excellent (green)
color = 'green'
# Draw gauge segment
ax.add_patch(plt.matplotlib.patches.Wedge(
(0, 0), gauge_radius, angle * 180 / np.pi, next_angle * 180 / np.pi,
width=gauge_width, color=color, alpha=0.7
))
# Draw needle
needle_angle = norm_mos * 180 * np.pi / 180
ax.plot([0, gauge_radius * np.cos(needle_angle)],
[0, gauge_radius * np.sin(needle_angle)],
color='black', linewidth=2)
# Add center circle
ax.add_patch(plt.matplotlib.patches.Circle((0, 0), 0.05, color='black'))
# Add MOS value text
quality_rating = "Unknown"
if mos_score >= 4.3:
quality_rating = "Excellent"
elif mos_score >= 4.0:
quality_rating = "Good"
elif mos_score >= 3.6:
quality_rating = "Fair"
elif mos_score >= 3.1:
quality_rating = "Poor"
else:
quality_rating = "Bad"
ax.text(0, -0.2, f"MOS: {mos_score:.2f}",
horizontalalignment='center', fontsize=12, fontweight='bold')
ax.text(0, -0.3, f"({quality_rating})",
horizontalalignment='center', fontsize=10)
# Add scale labels
ax.text(-gauge_radius-0.1, 0, "1.0",
horizontalalignment='right', verticalalignment='center', fontsize=8)
ax.text(0, gauge_radius+0.1, "3.0",
horizontalalignment='center', verticalalignment='bottom', fontsize=8)
ax.text(gauge_radius+0.1, 0, "4.5",
horizontalalignment='left', verticalalignment='center', fontsize=8)
# Set axis limits and turn off axis
ax.set_xlim(-1, 1)
ax.set_ylim(-0.5, 1)
ax.axis('off')
ax.set_title("Call Quality (MOS)")
def generate_html_report(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate a comprehensive HTML report with all visualizations
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: sip_rtp_report.html in output_dir)
Returns:
Path to the generated HTML report
"""
if not calls:
logger.warning("No SIP calls to visualize")
return ""
# Generate all visualizations
visualization_files = {}
for call_id, call in calls.items():
# Only process calls with RTP streams
if not call.rtp_streams:
continue
# Generate sequence diagram
seq_diagram = self.generate_sip_sequence_diagram(
{call_id: call},
os.path.join(self.output_dir, f"sip_sequence_{call_id}.png")
)
if seq_diagram:
visualization_files[f"sequence_{call_id}"] = seq_diagram
# Generate RTP visualization for each stream
for stream_id in call.rtp_streams:
rtp_viz = self.generate_rtp_visualization(
{call_id: call},
os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png")
)
if rtp_viz:
visualization_files[f"rtp_{call_id}_{stream_id}"] = rtp_viz
# Generate call quality dashboard
dashboard = self.generate_call_quality_dashboard(
calls,
os.path.join(self.output_dir, "call_quality_dashboard.png")
)
if dashboard:
visualization_files["dashboard"] = dashboard
# Create HTML report
if not output_file:
output_file = os.path.join(self.output_dir, "sip_rtp_report.html")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(self._generate_html_content(calls, visualization_files))
logger.info(f"HTML report generated: {output_file}")
return output_file
def _generate_html_content(self, calls: Dict[str, Any],
visualization_files: Dict[str, str]) -> str:
"""Generate HTML content for the report
Args:
calls: Dictionary of SIP calls from the analyzer
visualization_files: Dictionary of visualization file paths
Returns:
HTML content as a string
"""
html = []
html.append("<!DOCTYPE html>")
html.append("<html lang='en'>")
html.append("<head>")
html.append(" <meta charset='UTF-8'>")
html.append(" <meta name='viewport' content='width=device-width, initial-scale=1.0'>")
html.append(" <title>SIP and RTP Analysis Report</title>")
html.append(" <style>")
html.append(" body { font-family: Arial, sans-serif; margin: 0; padding: 20px; color: #333; }")
html.append(" h1, h2, h3 { color: #2c3e50; }")
html.append(" .container { max-width: 1200px; margin: 0 auto; }")
html.append(" .header { background-color: #3498db; color: white; padding: 20px; margin-bottom: 20px; }")
html.append(" .section { margin-bottom: 30px; border: 1px solid #ddd; padding: 20px; border-radius: 5px; }")
html.append(" .call-info { display: flex; flex-wrap: wrap; }")
html.append(" .call-info div { margin-right: 20px; margin-bottom: 10px; }")
html.append(" .label { font-weight: bold; color: #7f8c8d; }")
html.append(" .visualization { margin: 20px 0; text-align: center; }")
html.append(" .visualization img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }")
html.append(" .message-flow { margin: 20px 0; }")
html.append(" .message { padding: 10px; margin: 5px 0; background-color: #f9f9f9; border-left: 4px solid #3498db; }")
html.append(" .message.request { border-left-color: #3498db; }")
html.append(" .message.response { border-left-color: #2ecc71; }")
html.append(" .message-details { font-family: monospace; white-space: pre-wrap; margin-top: 10px; padding: 10px; background-color: #f1f1f1; display: none; }")
html.append(" .toggle-details { cursor: pointer; color: #3498db; }")
html.append(" .dashboard { margin: 20px 0; text-align: center; }")
html.append(" .dashboard img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }")
html.append(" .footer { margin-top: 30px; text-align: center; color: #7f8c8d; font-size: 0.9em; }")
html.append(" </style>")
html.append(" <script>")
html.append(" function toggleDetails(id) {")
html.append(" var details = document.getElementById(id);")
html.append(" if (details.style.display === 'none' || !details.style.display) {")
html.append(" details.style.display = 'block';")
html.append(" } else {")
html.append(" details.style.display = 'none';")
html.append(" }")
html.append(" }")
html.append(" </script>")
html.append("</head>")
html.append("<body>")
html.append(" <div class='container'>")
html.append(" <div class='header'>")
html.append(" <h1>SIP and RTP Analysis Report</h1>")
html.append(f" <p>Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>")
html.append(" </div>")
# Summary section
html.append(" <div class='section'>")
html.append(" <h2>Summary</h2>")
html.append(" <div class='call-info'>")
html.append(f" <div><span class='label'>Total Calls:</span> {len(calls)}</div>")
# Count calls with RTP
calls_with_rtp = sum(1 for call in calls.values() if call.rtp_streams)
html.append(f" <div><span class='label'>Calls with RTP:</span> {calls_with_rtp}</div>")
# Count total RTP streams
total_streams = sum(len(call.rtp_streams) for call in calls.values())
html.append(f" <div><span class='label'>Total RTP Streams:</span> {total_streams}</div>")
html.append(" </div>")
# Add dashboard if available
if "dashboard" in visualization_files:
html.append(" <div class='dashboard'>")
html.append(f" <img src='{os.path.basename(visualization_files['dashboard'])}' alt='Call Quality Dashboard'>")
html.append(" </div>")
html.append(" </div>")
# Call details sections
for call_id, call in calls.items():
html.append(f" <div class='section' id='call-{call_id}'>")
html.append(f" <h2>Call: {call_id}</h2>")
# Call information
html.append(" <div class='call-info'>")
html.append(f" <div><span class='label'>From:</span> {call.from_uri}</div>")
html.append(f" <div><span class='label'>To:</span> {call.to_uri}</div>")
html.append(f" <div><span class='label'>Status:</span> {call.status}</div>")
html.append(f" <div><span class='label'>Duration:</span> {call.get_duration():.2f} seconds</div>")
html.append(f" <div><span class='label'>Messages:</span> {len(call.messages)}</div>")
html.append(f" <div><span class='label'>RTP Streams:</span> {len(call.rtp_streams)}</div>")
# Add quality metrics if available
if call.rtp_streams:
quality = call.get_call_quality_summary()
html.append(f" <div><span class='label'>MOS Score:</span> {quality['mos']:.2f} ({quality['rating']})</div>")
html.append(f" <div><span class='label'>Average Jitter:</span> {quality['jitter']:.2f} ms</div>")
html.append(f" <div><span class='label'>Packet Loss:</span> {quality['packet_loss']:.2f}%</div>")
html.append(" </div>")
# Add sequence diagram if available
seq_key = f"sequence_{call_id}"
if seq_key in visualization_files:
html.append(" <div class='visualization'>")
html.append(" <h3>SIP Message Flow</h3>")
html.append(f" <img src='{os.path.basename(visualization_files[seq_key])}' alt='SIP Sequence Diagram'>")
html.append(" </div>")
# Add RTP visualizations if available
for stream_id in call.rtp_streams:
rtp_key = f"rtp_{call_id}_{stream_id}"
if rtp_key in visualization_files:
html.append(" <div class='visualization'>")
html.append(f" <h3>RTP Stream Analysis (SSRC: {stream_id})</h3>")
html.append(f" <img src='{os.path.basename(visualization_files[rtp_key])}' alt='RTP Visualization'>")
html.append(" </div>")
# Message flow details
html.append(" <div class='message-flow'>")
html.append(" <h3>Message Details</h3>")
for i, msg in enumerate(call.messages):
# Determine if this is a request or response
is_request = not msg['method'].startswith("SIP/2.0")
msg_class = "request" if is_request else "response"
html.append(f" <div class='message {msg_class}'>")
html.append(f" <div><strong>{i+1}.</strong> [{msg['timestamp']}] {msg['source_ip']}{msg['dest_ip']}: {msg['method']}</div>")
html.append(f" <div class='toggle-details' onclick=\"toggleDetails('msg-{call_id}-{i}')\">Show/Hide Details</div>")
# Message details (hidden by default)
html.append(f" <div class='message-details' id='msg-{call_id}-{i}'>")
# Headers
html.append(" <strong>Headers:</strong>")
for header, value in msg['headers'].items():
html.append(f" {header}: {value}")
# Content (SDP)
if msg.get('content'):
html.append("\n <strong>Content:</strong>")
html.append(f" {msg['content']}")
html.append(" </div>")
html.append(" </div>")
html.append(" </div>")
html.append(" </div>")
# Footer
html.append(" <div class='footer'>")
html.append(" <p>Generated by SIP and RTP PCAP Analyzer</p>")
html.append(" </div>")
html.append(" </div>")
html.append("</body>")
html.append("</html>")
return "\n".join(html)
class SipRtpPcapAnalyzer:
"""Main class for analyzing PCAP files and extracting SIP messages and RTP streams"""
def __init__(self, pcap_file: str, output_file: Optional[str] = None, verbose: bool = False,
custom_sip_ports: Optional[List[int]] = None, rtp_port_range: Optional[Tuple[int, int]] = None,
visualize: bool = False, output_dir: Optional[str] = None):
self.pcap_file = pcap_file
self.output_file = output_file
self.verbose = verbose
self.calls: Dict[str, SipCall] = {} # Dictionary of calls indexed by Call-ID
self.messages_count = 0
self.sip_ports: Set[int] = set(SIP_PORTS) # Copy default SIP ports
self.rtp_streams: Dict[int, RtpStream] = {} # Dictionary of RTP streams indexed by SSRC
self.potential_rtp_ports: Set[int] = set() # Ports identified from SDP
# Visualization options
self.visualize = visualize
self.output_dir = output_dir
# Set custom RTP port range if provided
self.rtp_port_range = rtp_port_range if rtp_port_range else RTP_PORT_RANGE
# Add custom SIP ports if provided
if custom_sip_ports:
for port in custom_sip_ports:
self.sip_ports.add(port)
# Set logging level based on verbose flag
if verbose:
logger.setLevel(logging.DEBUG)
def analyze(self) -> None:
"""Analyze the PCAP file based on the available library"""
if not os.path.exists(self.pcap_file):
logger.error(f"Error: File {self.pcap_file} does not exist.")
return
if LIBRARY == "scapy":
self._analyze_with_scapy()
elif LIBRARY == "pyshark":
self._analyze_with_pyshark()
elif LIBRARY == "dpkt":
self._analyze_with_dpkt()
# After analyzing, associate RTP streams with SIP calls
self._associate_rtp_streams_with_calls()
# Calculate final metrics for all RTP streams
for stream in self.rtp_streams.values():
stream.calculate_mos()
# Generate visualizations if requested
if self.visualize:
self._generate_visualizations()
def _analyze_with_scapy(self) -> None:
"""Analyze PCAP file using scapy"""
logger.info(f"Analyzing {self.pcap_file} with scapy...")
try:
# Read the pcap file
packets = scapy.rdpcap(self.pcap_file)
for packet in packets:
# Check if packet has IP layer
if not packet.haslayer(IP):
continue
# Check if packet has UDP or TCP layer
if packet.haslayer(UDP):
transport_layer = packet[UDP]
protocol = "UDP"
elif packet.haslayer(TCP):
transport_layer = packet[TCP]
protocol = "TCP"
else:
continue
# Get source and destination IP addresses
src_ip = packet[IP].src
dst_ip = packet[IP].dst
src_port = transport_layer.sport
dst_port = transport_layer.dport
# Check if this might be a SIP packet (common ports or payload signature)
is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports)
if is_sip_port or protocol == "TCP": # TCP needs further inspection
# Try to check payload for SIP signature
if not hasattr(transport_layer, 'payload') or not transport_layer.payload:
continue
payload = bytes(transport_layer.payload)
if (payload.startswith(b'SIP/2.0') or
any(payload.startswith(method.encode()) for method in SIP_METHODS)):
# This is a SIP packet
timestamp = datetime.fromtimestamp(float(packet.time))
sip_message = payload.decode('utf-8', errors='ignore')
self._process_sip_message(timestamp, src_ip, dst_ip, sip_message)
continue
# Check if this might be an RTP packet
is_potential_rtp = (
protocol == "UDP" and
(src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or
(self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and
self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1]))
)
if is_potential_rtp:
# Try to parse as RTP
try:
if HAS_RTP_LAYER and packet.haslayer(RTP):
# Scapy has built-in RTP layer
rtp_layer = packet[RTP]
ssrc = rtp_layer.sourcesync
seq = rtp_layer.sequence
timestamp_rtp = rtp_layer.timestamp
payload_type = rtp_layer.payload_type
payload_size = len(bytes(rtp_layer.payload))
else:
# Manual parsing of RTP header
rtp_data = bytes(transport_layer.payload)
if len(rtp_data) < 12: # Minimum RTP header size
continue
# Check RTP version (first 2 bits should be 2)
version = (rtp_data[0] >> 6) & 0x03
if version != 2:
continue
# Extract RTP header fields
payload_type = rtp_data[1] & 0x7F
seq = (rtp_data[2] << 8) | rtp_data[3]
timestamp_rtp = ((rtp_data[4] << 24) | (rtp_data[5] << 16) |
(rtp_data[6] << 8) | rtp_data[7])
ssrc = ((rtp_data[8] << 24) | (rtp_data[9] << 16) |
(rtp_data[10] << 8) | rtp_data[11])
payload_size = len(rtp_data) - 12
# Process RTP packet
timestamp = datetime.fromtimestamp(float(packet.time))
self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port,
ssrc, seq, timestamp_rtp, payload_type, payload_size)
except Exception as e:
if self.verbose:
logger.debug(f"Error parsing potential RTP packet: {e}")
continue
logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.")
except Exception as e:
logger.error(f"Error analyzing PCAP file with scapy: {e}")
def _analyze_with_pyshark(self) -> None:
"""Analyze PCAP file using pyshark"""
logger.info(f"Analyzing {self.pcap_file} with pyshark...")
try:
# Open the pcap file
cap = pyshark.FileCapture(self.pcap_file)
for packet in cap:
try:
# Check if packet has IP layer
if not hasattr(packet, 'ip'):
continue
# Get source and destination IP addresses
src_ip = packet.ip.src
dst_ip = packet.ip.dst
# Check for SIP packets
if hasattr(packet, 'sip'):
# This is a SIP packet
timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
# Reconstruct SIP message
sip_message = ""
if hasattr(packet.sip, 'request_line'):
sip_message += packet.sip.request_line + "\r\n"
elif hasattr(packet.sip, 'status_line'):
sip_message += packet.sip.status_line + "\r\n"
# Add headers
for field in packet.sip._all_fields:
field_name = field.name
if field_name.startswith('sip.') and not field_name.startswith('sip.request') and not field_name.startswith('sip.status'):
header_name = field_name.replace('sip.', '')
if hasattr(packet.sip, header_name):
header_value = getattr(packet.sip, header_name)
sip_message += f"{header_name}: {header_value}\r\n"
# Add body if present
if hasattr(packet.sip, 'msg_body'):
sip_message += "\r\n" + packet.sip.msg_body
self._process_sip_message(timestamp, src_ip, dst_ip, sip_message)
continue
# Check for RTP packets
if hasattr(packet, 'rtp'):
# Get transport layer info
if hasattr(packet, 'udp'):
src_port = int(packet.udp.srcport)
dst_port = int(packet.udp.dstport)
else:
continue # RTP should be over UDP
# Extract RTP fields
ssrc = int(packet.rtp.ssrc, 16)
seq = int(packet.rtp.seq)
timestamp_rtp = int(packet.rtp.timestamp)
payload_type = int(packet.rtp.p_type)
# Get payload size
if hasattr(packet.rtp, 'payload'):
payload_size = len(packet.rtp.payload)
else:
payload_size = 0
# Process RTP packet
timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port,
ssrc, seq, timestamp_rtp, payload_type, payload_size)
except Exception as e:
if self.verbose:
logger.debug(f"Error processing packet with pyshark: {e}")
continue
logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.")
except Exception as e:
logger.error(f"Error analyzing PCAP file with pyshark: {e}")
def _analyze_with_dpkt(self) -> None:
"""Analyze PCAP file using dpkt"""
logger.info(f"Analyzing {self.pcap_file} with dpkt...")
try:
# Open the pcap file
with open(self.pcap_file, 'rb') as f:
pcap_reader = dpkt.pcap.Reader(f)
for timestamp, buf in pcap_reader:
try:
# Parse Ethernet frame
eth = Ethernet(buf)
# Check if packet has IP layer
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
src_ip = socket.inet_ntoa(ip.src)
dst_ip = socket.inet_ntoa(ip.dst)
# Check if packet has UDP or TCP layer
if isinstance(ip.data, dpkt.udp.UDP):
udp = ip.data
src_port = udp.sport
dst_port = udp.dport
protocol = "UDP"
transport_data = udp.data
elif isinstance(ip.data, dpkt.tcp.TCP):
tcp = ip.data
src_port = tcp.sport
dst_port = tcp.dport
protocol = "TCP"
transport_data = tcp.data
else:
continue
# Check for SIP packets
is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports)
if is_sip_port or protocol == "TCP": # TCP needs further inspection
# Check if payload looks like SIP
if (transport_data.startswith(b'SIP/2.0') or
any(transport_data.startswith(method.encode()) for method in SIP_METHODS)):
# This is a SIP packet
timestamp_dt = datetime.fromtimestamp(timestamp)
sip_message = transport_data.decode('utf-8', errors='ignore')
self._process_sip_message(timestamp_dt, src_ip, dst_ip, sip_message)
continue
# Check for RTP packets
is_potential_rtp = (
protocol == "UDP" and
(src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or
(self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and
self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1]))
)
if is_potential_rtp and len(transport_data) >= 12:
# Try to parse as RTP
try:
# Check RTP version (first 2 bits should be 2)
version = (transport_data[0] >> 6) & 0x03
if version != 2:
continue
# Extract RTP header fields
payload_type = transport_data[1] & 0x7F
seq = (transport_data[2] << 8) | transport_data[3]
timestamp_rtp = ((transport_data[4] << 24) | (transport_data[5] << 16) |
(transport_data[6] << 8) | transport_data[7])
ssrc = ((transport_data[8] << 24) | (transport_data[9] << 16) |
(transport_data[10] << 8) | transport_data[11])
payload_size = len(transport_data) - 12
# Process RTP packet
timestamp_dt = datetime.fromtimestamp(timestamp)
self._process_rtp_packet(timestamp_dt, src_ip, src_port, dst_ip, dst_port,
ssrc, seq, timestamp_rtp, payload_type, payload_size)
except Exception as e:
if self.verbose:
logger.debug(f"Error parsing potential RTP packet: {e}")
continue
except Exception as e:
if self.verbose:
logger.debug(f"Error processing packet with dpkt: {e}")
continue
logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.")
except Exception as e:
logger.error(f"Error analyzing PCAP file with dpkt: {e}")
def _process_sip_message(self, timestamp: datetime, source_ip: str, dest_ip: str, sip_message: str) -> None:
"""Process a SIP message and add it to the appropriate call"""
if not sip_message:
return
# Split message into lines
lines = sip_message.splitlines()
if not lines:
return
# Parse first line to determine if it's a request or response
first_line = lines[0].strip()
if first_line.startswith('SIP/2.0 '):
# This is a response
method = first_line
else:
# This is a request
parts = first_line.split(' ')
if len(parts) >= 1:
method = parts[0]
else:
return
# Parse headers
headers = {}
content = ""
content_started = False
for line in lines[1:]:
if content_started:
content += line + "\n"
continue
if not line.strip():
content_started = True
continue
if ':' in line:
header_name, header_value = line.split(':', 1)
headers[header_name.strip()] = header_value.strip()
# Get the Call-ID
call_id = headers.get("Call-ID", headers.get("i", None))
if not call_id:
return
# Add message to the appropriate call
if call_id not in self.calls:
self.calls[call_id] = SipCall(call_id)
self.calls[call_id].add_message(
timestamp,
method,
source_ip,
dest_ip,
headers,
content
)
self.messages_count += 1
# Extract potential RTP ports from SDP
if content and "m=audio" in content:
media_matches = re.findall(r'm=audio\s+(\d+)', content)
for port_str in media_matches:
try:
port = int(port_str)
self.potential_rtp_ports.add(port)
if self.verbose:
logger.debug(f"Found potential RTP port {port} in SDP")
except ValueError:
continue
def _process_rtp_packet(self, timestamp: datetime, src_ip: str, src_port: int,
dst_ip: str, dst_port: int, ssrc: int, seq: int,
rtp_timestamp: int, payload_type: int, payload_size: int) -> None:
"""Process an RTP packet and add it to the appropriate stream"""
# Create a unique stream identifier
stream_key = ssrc
# Add to existing stream or create new one
if stream_key not in self.rtp_streams:
self.rtp_streams[stream_key] = RtpStream(ssrc, src_ip, src_port, dst_ip, dst_port)
# Set codec based on payload type if available in SDP
for call in self.calls.values():
if 'codec' in call.media_info and 'id' in call.media_info['codec']:
try:
codec_id = int(call.media_info['codec']['id'])
if codec_id == payload_type:
self.rtp_streams[stream_key].codec = call.media_info['codec']['name']
break
except (ValueError, TypeError):
pass
# Add packet to stream
self.rtp_streams[stream_key].add_packet(
timestamp,
seq,
rtp_timestamp,
payload_type,
payload_size
)
def _associate_rtp_streams_with_calls(self) -> None:
"""Associate RTP streams with SIP calls based on IP addresses and ports"""
# Create a mapping of media endpoints to call IDs
media_endpoints = {}
# First pass: extract media endpoints from SDP in SIP messages
for call_id, call in self.calls.items():
if 'connection_address' in call.media_info and 'port' in call.media_info:
endpoint = (call.media_info['connection_address'], call.media_info['port'])
media_endpoints[endpoint] = call_id
# Second pass: try to match RTP streams to calls
for ssrc, stream in self.rtp_streams.items():
matched = False
# Check source endpoint
src_endpoint = (stream.src_ip, stream.src_port)
if src_endpoint in media_endpoints:
call_id = media_endpoints[src_endpoint]
self.calls[call_id].add_rtp_stream(stream)
matched = True
continue
# Check destination endpoint
dst_endpoint = (stream.dst_ip, stream.dst_port)
if dst_endpoint in media_endpoints:
call_id = media_endpoints[dst_endpoint]
self.calls[call_id].add_rtp_stream(stream)
matched = True
continue
# If no exact match, try matching just by IP address
if not matched:
for call_id, call in self.calls.items():
for msg in call.messages:
if (stream.src_ip == msg['source_ip'] and stream.dst_ip == msg['dest_ip']) or \
(stream.src_ip == msg['dest_ip'] and stream.dst_ip == msg['source_ip']):
call.add_rtp_stream(stream)
matched = True
break
if matched:
break
def _generate_visualizations(self) -> None:
"""Generate visualizations for SIP calls and RTP streams"""
if not HAS_PLANTUML and not HAS_MATPLOTLIB:
logger.warning("Visualization libraries not available. Install plantuml and matplotlib for visualizations.")
return
# Create visualizer
visualizer = SipRtpVisualizer(output_dir=self.output_dir)
# Generate HTML report
report_file = visualizer.generate_html_report(self.calls)
if report_file:
logger.info(f"HTML report generated: {report_file}")
def print_results(self) -> None:
"""Print analysis results to console or file"""
output = []
output.append(f"SIP and RTP Analysis Results for {self.pcap_file}")
output.append("=" * 80)
output.append(f"Total SIP Messages: {self.messages_count}")
output.append(f"Total SIP Calls: {len(self.calls)}")
output.append(f"Total RTP Streams: {len(self.rtp_streams)}")
output.append("")
# Print call details
output.append("Call Details:")
output.append("-" * 80)
for call_id, call in self.calls.items():
output.append(str(call))
output.append("-" * 80)
# Print RTP streams that weren't associated with any call
unassociated_streams = []
for ssrc, stream in self.rtp_streams.items():
is_associated = False
for call in self.calls.values():
if ssrc in call.rtp_streams:
is_associated = True
break
if not is_associated:
unassociated_streams.append(stream)
if unassociated_streams:
output.append("Unassociated RTP Streams:")
output.append("-" * 80)
for stream in unassociated_streams:
output.append(str(stream))
output.append("-" * 80)
# Join all output lines
result = "\n".join(output)
# Print to console or file
if self.output_file:
try:
with open(self.output_file, 'w') as f:
f.write(result)
logger.info(f"Results written to {self.output_file}")
except Exception as e:
logger.error(f"Error writing to output file: {e}")
print(result)
else:
print(result)
def get_call_quality_metrics(self) -> Dict[str, Dict[str, Any]]:
"""Get quality metrics for all calls in a structured format"""
metrics = {}
for call_id, call in self.calls.items():
if call.rtp_streams:
quality = call.get_call_quality_summary()
metrics[call_id] = {
"from": call.from_uri,
"to": call.to_uri,
"duration": call.get_duration(),
"mos": quality["mos"],
"jitter": quality["jitter"],
"packet_loss": quality["packet_loss"],
"rating": quality["rating"],
"stream_count": quality["stream_count"]
}
else:
metrics[call_id] = {
"from": call.from_uri,
"to": call.to_uri,
"duration": call.get_duration(),
"status": "No RTP streams found"
}
return metrics
def main():
"""Main function"""
parser = argparse.ArgumentParser(description='Analyze PCAP files for SIP messages and RTP streams in VoIP calls')
parser.add_argument('pcap_file', help='Path to the PCAP file to analyze')
parser.add_argument('-o', '--output', help='Output file for analysis results')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('-p', '--ports', type=int, nargs='+', help='Additional SIP ports to check')
parser.add_argument('-r', '--rtp-range', type=int, nargs=2, metavar=('MIN', 'MAX'),
help='Custom RTP port range (default: 10000-65535)')
parser.add_argument('--visualize', action='store_true', help='Generate visualizations')
parser.add_argument('--output-dir', help='Output directory for visualizations')
parser.add_argument('--report', action='store_true', help='Generate HTML report')
args = parser.parse_args()
rtp_range = (args.rtp_range[0], args.rtp_range[1]) if args.rtp_range else None
# Check if visualization libraries are available
if args.visualize or args.report:
if not HAS_PLANTUML:
logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.")
if not HAS_MATPLOTLIB:
logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.")
# Create analyzer
analyzer = SipRtpPcapAnalyzer(
args.pcap_file,
args.output,
args.verbose,
args.ports,
rtp_range,
args.visualize or args.report,
args.output_dir
)
# Analyze PCAP file
analyzer.analyze()
# Print results
analyzer.print_results()
# Generate report if requested
if args.report and not analyzer.visualize:
visualizer = SipRtpVisualizer(output_dir=args.output_dir)
report_file = visualizer.generate_html_report(analyzer.calls)
if report_file:
logger.info(f"HTML report generated: {report_file}")
if __name__ == "__main__":
main()