#!/usr/bin/env python3 """ SIP and RTP PCAP Analyzer with Visualization - A tool to analyze PCAP files and extract SIP messages and RTP streams for VoIP call analysis, with graphical visualization capabilities. This script can parse PCAP/PCAPNG files and identify SIP messages exchanged during VoIP call establishments, including INVITE, ACK, BYE, and other SIP messages. It also analyzes RTP streams to provide metrics on call quality including jitter, packet loss, and MOS score. Additionally, it generates graphical visualizations of SIP signaling and RTP media flows. """ import sys import os import argparse import re import logging import math import subprocess from datetime import datetime from typing import Dict, List, Tuple, Optional, Any, Set, DefaultDict from collections import defaultdict # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('sip_rtp_pcap_analyzer') # Try to import different packet analysis libraries # We'll use the first one available in this priority order: scapy, pyshark, dpkt LIBRARY = None try: import scapy.all as scapy from scapy.layers.inet import IP, UDP, TCP try: from scapy.layers.rtp import RTP HAS_RTP_LAYER = True except ImportError: HAS_RTP_LAYER = False LIBRARY = "scapy" logger.info("Using scapy for packet analysis") except ImportError: try: import pyshark LIBRARY = "pyshark" logger.info("Using pyshark for packet analysis") except ImportError: try: import dpkt import socket from dpkt.ethernet import Ethernet LIBRARY = "dpkt" logger.info("Using dpkt for packet analysis") except ImportError: logger.error("No packet analysis library found. Please install scapy, pyshark, or dpkt.") sys.exit(1) # Try to import visualization libraries try: import plantuml HAS_PLANTUML = True except ImportError: HAS_PLANTUML = False logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.") try: import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt import matplotlib.dates as mdates import numpy as np HAS_MATPLOTLIB = True except ImportError: HAS_MATPLOTLIB = False logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.") # SIP message types we're interested in SIP_METHODS = [ "INVITE", "ACK", "BYE", "CANCEL", "REGISTER", "OPTIONS", "PRACK", "SUBSCRIBE", "NOTIFY", "PUBLISH", "INFO", "REFER", "MESSAGE", "UPDATE" ] # SIP response codes and their meanings SIP_RESPONSES = { "1": "Informational", "2": "Success", "3": "Redirection", "4": "Client Error", "5": "Server Error", "6": "Global Failure" } # Common SIP ports SIP_PORTS = {5060, 5061} # Common RTP port range RTP_PORT_RANGE = (10000, 65535) # Typical RTP port range # RTP payload type to codec mapping RTP_CODEC_MAP = { 0: "PCMU/G.711u", 3: "GSM", 4: "G723", 8: "PCMA/G.711a", 9: "G722", 10: "L16 (stereo)", 11: "L16 (mono)", 18: "G729", 26: "JPEG", 31: "H261", 32: "MPV", 33: "MP2T", 34: "H263", 96: "Dynamic", 97: "Dynamic", 98: "Dynamic", 99: "Dynamic", 100: "Dynamic", 101: "Dynamic", 110: "OPUS", 111: "OPUS" } class RtpStream: """Class to represent an RTP stream with its packets and metrics""" def __init__(self, ssrc: int, src_ip: str, src_port: int, dst_ip: str, dst_port: int): self.ssrc = ssrc self.src_ip = src_ip self.src_port = src_port self.dst_ip = dst_ip self.dst_port = dst_port self.packets = [] self.start_time = None self.end_time = None self.packet_count = 0 self.expected_packets = 0 self.lost_packets = 0 self.jitter = 0.0 self.jitter_values = [] # Store jitter values for each packet for visualization self.last_seq = None self.last_timestamp = None self.last_arrival = None self.codec = None self.payload_type = None self.mos_score = None self.rtt = None # Round Trip Time in ms def add_packet(self, timestamp: datetime, seq: int, rtp_timestamp: int, payload_type: int, payload_size: int): """Add an RTP packet to this stream and update metrics""" if not self.start_time or timestamp < self.start_time: self.start_time = timestamp if not self.end_time or timestamp > self.end_time: self.end_time = timestamp # Set payload type if not already set if self.payload_type is None: self.payload_type = payload_type # Calculate packet loss if self.last_seq is not None: expected_seq = (self.last_seq + 1) % 65536 # RTP sequence numbers are 16-bit if seq != expected_seq: # Account for sequence number wraparound if seq > expected_seq: self.lost_packets += seq - expected_seq else: self.lost_packets += (65536 - expected_seq) + seq # Calculate jitter (RFC 3550 algorithm) if self.last_timestamp is not None and self.last_arrival is not None: # Convert timestamps to milliseconds for easier calculation arrival_ms = timestamp.timestamp() * 1000 last_arrival_ms = self.last_arrival.timestamp() * 1000 # Calculate transit time difference transit = arrival_ms - rtp_timestamp last_transit = last_arrival_ms - self.last_timestamp # Calculate difference in transit times d = abs(transit - last_transit) # Update jitter using RFC 3550 formula: J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16 self.jitter = self.jitter + (d - self.jitter) / 16 # Store jitter value for visualization self.jitter_values.append(self.jitter) else: self.jitter_values.append(0.0) # Store current values for next calculation self.last_seq = seq self.last_timestamp = rtp_timestamp self.last_arrival = timestamp # Add packet to list and increment counter self.packets.append({ "timestamp": timestamp, "seq": seq, "rtp_timestamp": rtp_timestamp, "payload_type": payload_type, "payload_size": payload_size }) self.packet_count += 1 self.expected_packets = self.packet_count + self.lost_packets def get_packet_loss_percentage(self) -> float: """Get packet loss as a percentage""" if self.expected_packets == 0: return 0.0 return (self.lost_packets / self.expected_packets) * 100 def calculate_mos(self) -> float: """Calculate Mean Opinion Score (MOS) based on packet loss and jitter""" # Default RTT if not available rtt = self.rtt if self.rtt is not None else 50 # Default 50ms # Calculate effective latency effective_latency = rtt + (self.jitter * 2) + 10 # Add 10ms for protocol latencies # Calculate R factor based on ITU-T G.107 E-model if effective_latency < 160: r = 93.2 - (effective_latency / 40) else: r = 93.2 - (effective_latency - 120) / 10 # Deduct for packet loss (2.5 R values per percentage) packet_loss = self.get_packet_loss_percentage() r = r - (packet_loss * 2.5) # Convert R to MOS using ITU-T G.107 formula if r < 0: mos = 1.0 elif r > 100: mos = 4.5 else: mos = 1 + (0.035 * r) + (r * (r - 60) * (100 - r) * 7 * 10**-6) # Clamp MOS between 1.0 and 4.5 mos = max(1.0, min(4.5, mos)) self.mos_score = mos return mos def get_duration(self) -> float: """Get stream duration in seconds""" if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return 0 def get_codec_name(self) -> str: """Get codec name based on payload type""" if self.codec: return self.codec if self.payload_type in RTP_CODEC_MAP: return RTP_CODEC_MAP[self.payload_type] return f"Unknown ({self.payload_type})" def get_quality_rating(self) -> str: """Get a human-readable quality rating based on MOS score""" if self.mos_score is None: self.calculate_mos() if self.mos_score >= 4.3: return "Excellent" elif self.mos_score >= 4.0: return "Good" elif self.mos_score >= 3.6: return "Fair" elif self.mos_score >= 3.1: return "Poor" else: return "Bad" def __str__(self) -> str: """String representation of the RTP stream""" if self.mos_score is None: self.calculate_mos() result = f"RTP Stream: SSRC={self.ssrc}\n" result += f"Source: {self.src_ip}:{self.src_port}\n" result += f"Destination: {self.dst_ip}:{self.dst_port}\n" result += f"Codec: {self.get_codec_name()}\n" result += f"Start Time: {self.start_time}\n" result += f"End Time: {self.end_time}\n" result += f"Duration: {self.get_duration():.2f} seconds\n" result += f"Packets: {self.packet_count}\n" result += f"Lost Packets: {self.lost_packets}\n" result += f"Packet Loss: {self.get_packet_loss_percentage():.2f}%\n" result += f"Jitter: {self.jitter:.2f} ms\n" if self.rtt is not None: result += f"Round Trip Time: {self.rtt:.2f} ms\n" result += f"MOS Score: {self.mos_score:.2f} ({self.get_quality_rating()})\n" return result class SipCall: """Class to represent a SIP call with its messages and RTP streams""" def __init__(self, call_id: str): self.call_id = call_id self.messages = [] self.start_time = None self.end_time = None self.from_uri = None self.to_uri = None self.status = "Unknown" # Can be "Setup", "Established", "Terminated", "Failed" self.call_direction = None # Can be "Outgoing", "Incoming", or None self.media_info = {} # Store media information from SDP self.rtp_streams = {} # Dictionary of RTP streams indexed by SSRC def add_message(self, timestamp: datetime, method: str, source_ip: str, dest_ip: str, headers: Dict[str, str], content: str = None): """Add a SIP message to this call""" if not self.start_time or timestamp < self.start_time: self.start_time = timestamp if not self.end_time or timestamp > self.end_time: self.end_time = timestamp # Extract From and To URIs from the first message if not self.from_uri and "From" in headers: from_match = re.search(r'<(sip:[^>]+)>', headers["From"]) if from_match: self.from_uri = from_match.group(1) if not self.to_uri and "To" in headers: to_match = re.search(r'<(sip:[^>]+)>', headers["To"]) if to_match: self.to_uri = to_match.group(1) # Update call status based on method if method == "INVITE": self.status = "Setup" elif method == "ACK" and self.status == "Setup": self.status = "Established" elif method == "BYE": self.status = "Terminated" elif method.startswith("SIP/2.0 4") or method.startswith("SIP/2.0 5") or method.startswith("SIP/2.0 6"): self.status = "Failed" # Extract media information from SDP if present if content and "m=" in content: self._parse_sdp(content) # Add the message to the list self.messages.append({ "timestamp": timestamp, "method": method, "source_ip": source_ip, "dest_ip": dest_ip, "headers": headers, "content": content }) def _parse_sdp(self, sdp_content: str): """Parse SDP content to extract media information""" # Extract media type and port media_match = re.search(r'm=(\w+)\s+(\d+)', sdp_content) if media_match: media_type = media_match.group(1) media_port = int(media_match.group(2)) self.media_info['type'] = media_type self.media_info['port'] = media_port # Extract codec information codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', sdp_content) if codec_match: codec_id = codec_match.group(1) codec_name = codec_match.group(2) codec_rate = codec_match.group(3) self.media_info['codec'] = { 'id': codec_id, 'name': codec_name, 'rate': codec_rate } # Extract connection information (IP) conn_match = re.search(r'c=IN\s+IP[46]\s+([^\s]+)', sdp_content) if conn_match: self.media_info['connection_address'] = conn_match.group(1) def add_rtp_stream(self, rtp_stream: RtpStream): """Add an RTP stream to this call""" self.rtp_streams[rtp_stream.ssrc] = rtp_stream def get_duration(self) -> float: """Get call duration in seconds""" if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return 0 def get_average_mos(self) -> float: """Get average MOS score across all RTP streams""" if not self.rtp_streams: return 0.0 total_mos = 0.0 for stream in self.rtp_streams.values(): if stream.mos_score is None: stream.calculate_mos() total_mos += stream.mos_score return total_mos / len(self.rtp_streams) def get_call_quality_summary(self) -> Dict[str, Any]: """Get a summary of call quality metrics""" if not self.rtp_streams: return {"status": "No RTP streams found"} avg_mos = self.get_average_mos() avg_jitter = sum(s.jitter for s in self.rtp_streams.values()) / len(self.rtp_streams) avg_packet_loss = sum(s.get_packet_loss_percentage() for s in self.rtp_streams.values()) / len(self.rtp_streams) quality_rating = "Unknown" if avg_mos >= 4.3: quality_rating = "Excellent" elif avg_mos >= 4.0: quality_rating = "Good" elif avg_mos >= 3.6: quality_rating = "Fair" elif avg_mos >= 3.1: quality_rating = "Poor" else: quality_rating = "Bad" return { "mos": avg_mos, "jitter": avg_jitter, "packet_loss": avg_packet_loss, "rating": quality_rating, "stream_count": len(self.rtp_streams) } def __str__(self) -> str: """String representation of the call""" result = f"Call ID: {self.call_id}\n" result += f"Status: {self.status}\n" result += f"From: {self.from_uri}\n" result += f"To: {self.to_uri}\n" result += f"Start Time: {self.start_time}\n" result += f"End Time: {self.end_time}\n" result += f"Duration: {self.get_duration():.2f} seconds\n" # Add media information if available if self.media_info: result += "Media Information:\n" for key, value in self.media_info.items(): if isinstance(value, dict): result += f" {key}:\n" for k, v in value.items(): result += f" {k}: {v}\n" else: result += f" {key}: {value}\n" # Add call quality summary if RTP streams are available if self.rtp_streams: quality = self.get_call_quality_summary() result += "Call Quality:\n" result += f" MOS Score: {quality['mos']:.2f} ({quality['rating']})\n" result += f" Average Jitter: {quality['jitter']:.2f} ms\n" result += f" Average Packet Loss: {quality['packet_loss']:.2f}%\n" result += f" RTP Streams: {quality['stream_count']}\n" result += f"Message Count: {len(self.messages)}\n" result += "Message Flow:\n" for i, msg in enumerate(self.messages, 1): result += f" {i}. [{msg['timestamp']}] {msg['source_ip']} -> {msg['dest_ip']}: {msg['method']}\n" # Add RTP stream details if self.rtp_streams: result += "RTP Streams:\n" for ssrc, stream in self.rtp_streams.items(): result += f" Stream SSRC: {ssrc}\n" result += f" Source: {stream.src_ip}:{stream.src_port}\n" result += f" Destination: {stream.dst_ip}:{stream.dst_port}\n" result += f" Codec: {stream.get_codec_name()}\n" result += f" Packets: {stream.packet_count}\n" result += f" Lost Packets: {stream.lost_packets}\n" result += f" Packet Loss: {stream.get_packet_loss_percentage():.2f}%\n" result += f" Jitter: {stream.jitter:.2f} ms\n" result += f" MOS Score: {stream.mos_score:.2f} ({stream.get_quality_rating()})\n" return result class SipRtpVisualizer: """Class for generating visualizations of SIP and RTP flows""" def __init__(self, output_dir: Optional[str] = None, plantuml_server: str = "http://www.plantuml.com/plantuml"): """Initialize the visualizer Args: output_dir: Directory to save visualization files (default: current directory) plantuml_server: PlantUML server URL for rendering diagrams """ self.output_dir = output_dir or os.getcwd() self.plantuml_server = plantuml_server # Create output directory if it doesn't exist os.makedirs(self.output_dir, exist_ok=True) # Initialize PlantUML if HAS_PLANTUML: try: self.plantuml = plantuml.PlantUML(url=self.plantuml_server) except Exception as e: logger.warning(f"Error initializing PlantUML: {e}") self.plantuml = None else: self.plantuml = None def generate_sip_sequence_diagram(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate a sequence diagram for SIP calls Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: sip_sequence.png in output_dir) Returns: Path to the generated diagram file """ if not calls: logger.warning("No SIP calls to visualize") return "" if not HAS_PLANTUML or not self.plantuml: logger.error("PlantUML not available. Cannot generate sequence diagram.") return "" # Use the first call if multiple calls exist # In future, could support multiple calls in separate diagrams call_id = next(iter(calls)) call = calls[call_id] # Start building PlantUML diagram diagram = ["@startuml"] diagram.append("skinparam sequenceMessageAlign center") diagram.append("skinparam sequenceArrowThickness 2") diagram.append("skinparam roundcorner 5") diagram.append("skinparam maxmessagesize 200") diagram.append("") # Extract unique participants participants = set() for msg in call.messages: participants.add(msg['source_ip']) participants.add(msg['dest_ip']) # Define participants for i, participant in enumerate(participants): # Try to determine if this is caller or callee if call.from_uri and participant in call.from_uri: label = f"Caller\\n{participant}" elif call.to_uri and participant in call.to_uri: label = f"Callee\\n{participant}" else: label = f"Endpoint {i+1}\\n{participant}" diagram.append(f'participant "{label}" as P{i}') diagram.append("") # Map IP addresses to participant IDs ip_to_participant = {ip: f"P{i}" for i, ip in enumerate(participants)} # Add messages for msg in call.messages: src = ip_to_participant[msg['source_ip']] dst = ip_to_participant[msg['dest_ip']] # Format the message text method = msg['method'] # For SIP responses, extract the status code if method.startswith("SIP/2.0"): parts = method.split(" ", 2) if len(parts) >= 2: status_code = parts[1] if status_code.startswith("1"): arrow_type = "-->" # Dotted arrow for provisional responses else: arrow_type = "->" else: arrow_type = "->" else: arrow_type = "->" # Add the message to the diagram diagram.append(f"{src} {arrow_type} {dst}: {method}") # Add SDP information as notes if present if msg.get('content') and 'm=audio' in msg.get('content', ''): sdp_note = [] # Extract media type and port media_match = re.search(r'm=(\w+)\s+(\d+)', msg['content']) if media_match: media_type = media_match.group(1) media_port = media_match.group(2) sdp_note.append(f"SDP: {media_type} {media_port}") # Extract codec information codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', msg['content']) if codec_match: codec_id = codec_match.group(1) codec_name = codec_match.group(2) sdp_note.append(f"RTP/AVP {codec_id} ({codec_name})") if sdp_note: note_position = "right" if src < dst else "left" diagram.append(f"note {note_position}: {' '.join(sdp_note)}") # Add RTP stream if present if call.rtp_streams: # Calculate total RTP duration rtp_duration = 0 for stream in call.rtp_streams.values(): duration = stream.get_duration() if duration > rtp_duration: rtp_duration = duration if rtp_duration > 0: diagram.append("") diagram.append(f"... RTP Media Stream ({rtp_duration:.2f} seconds) ...") diagram.append("") diagram.append("@enduml") # Join the diagram lines diagram_str = "\n".join(diagram) # Generate the diagram if not output_file: output_file = os.path.join(self.output_dir, f"sip_sequence_{call_id}.png") try: # Save the PlantUML source first source_file = os.path.splitext(output_file)[0] + ".puml" with open(source_file, 'w') as f: f.write(diagram_str) logger.info(f"PlantUML source saved: {source_file}") # Try to generate the diagram try: #self.plantuml.processes_file(diagram_str, outfile=output_file) self.plantuml.processes_file(source_file) logger.info(f"Sequence diagram generated: {output_file}") return output_file except Exception as e: # If PlantUML server fails, try using local command-line tool if available logger.warning(f"PlantUML server error: {str(e)}") try: cmd = f"plantuml {source_file}" subprocess.run(cmd, shell=True, check=True) logger.info(f"Sequence diagram generated using local PlantUML: {output_file}") return output_file except Exception as e2: logger.warning(f"Local PlantUML failed: {str(e2)}") logger.info("Continuing without sequence diagram. PlantUML source is still available.") return source_file except Exception as e: logger.error(f"Error generating sequence diagram: {str(e)}") return "" def generate_rtp_visualization(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate visualization for RTP streams Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: rtp_visualization.png in output_dir) Returns: Path to the generated visualization file """ if not calls or not HAS_MATPLOTLIB: logger.warning("No SIP calls to visualize or Matplotlib not available") return "" # Find calls with RTP streams calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams} if not calls_with_rtp: logger.warning("No RTP streams to visualize") return "" # Use the first call with RTP streams call_id = next(iter(calls_with_rtp)) call = calls_with_rtp[call_id] # Get the first RTP stream stream_id = next(iter(call.rtp_streams)) stream = call.rtp_streams[stream_id] # Create figure with subplots fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), gridspec_kw={'height_ratios': [3, 1]}) # Extract packet data if not stream.packets: logger.warning("No packets in RTP stream") return "" # Create timestamps relative to the first packet first_timestamp = stream.packets[0]['timestamp'] timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets] # Create jitter data (use actual jitter if available, otherwise simulate) if hasattr(stream, 'jitter_values') and stream.jitter_values: jitter_values = stream.jitter_values else: # Simulate jitter values based on the average jitter base_jitter = stream.jitter jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))] # Create packet loss data packet_loss = np.zeros(len(timestamps)) if stream.lost_packets > 0: # Simulate packet loss positions loss_indices = np.random.choice( range(len(timestamps)), size=min(stream.lost_packets, len(timestamps)//10), # Limit for visualization clarity replace=False ) packet_loss[loss_indices] = 1 # Plot jitter ax1.plot(timestamps, jitter_values, color='blue', alpha=0.7, label='Jitter (ms)') # Highlight packet loss if any lost_packets = np.where(packet_loss == 1)[0] if len(lost_packets) > 0: ax1.scatter( [timestamps[i] for i in lost_packets], [jitter_values[i] for i in lost_packets], color='red', s=50, label='Lost Packets' ) # Add MOS score indicator mos_score = stream.mos_score if stream.mos_score is not None else stream.calculate_mos() quality_rating = stream.get_quality_rating() # Color code based on MOS if mos_score >= 4.0: mos_color = 'green' elif mos_score >= 3.5: mos_color = 'orange' else: mos_color = 'red' ax1.text(0.02, 0.95, f'MOS Score: {mos_score:.2f} ({quality_rating})', transform=ax1.transAxes, bbox=dict(facecolor=mos_color, alpha=0.5)) # Formatting for jitter plot ax1.set_xlabel('Time (seconds)') ax1.set_ylabel('Jitter (ms)') ax1.set_title(f'RTP Stream Analysis - SSRC: {stream.ssrc}') ax1.grid(True, alpha=0.3) ax1.legend() # Add packet flow visualization in the second subplot packet_y = np.ones(len(timestamps)) ax2.scatter(timestamps, packet_y, marker='|', s=10, color='blue', alpha=0.7) # Highlight lost packets if len(lost_packets) > 0: # Calculate positions for lost packets lost_x = [] for i in range(len(timestamps)-1): if i+1 in lost_packets or i in lost_packets: # Find midpoint between packets if i+1 < len(timestamps): midpoint = (timestamps[i] + timestamps[i+1]) / 2 lost_x.append(midpoint) if lost_x: ax2.scatter(lost_x, np.ones(len(lost_x)), marker='x', s=50, color='red', label='Lost Packets') # Formatting for packet flow ax2.set_yticks([]) ax2.set_xlabel('Time (seconds)') ax2.set_title('Packet Flow') ax2.grid(True, alpha=0.3, axis='x') # Add stream information info_text = ( f"Source: {stream.src_ip}:{stream.src_port}\n" f"Destination: {stream.dst_ip}:{stream.dst_port}\n" f"Codec: {stream.get_codec_name()}\n" f"Packets: {stream.packet_count}\n" f"Lost Packets: {stream.lost_packets} ({stream.get_packet_loss_percentage():.2f}%)" ) ax2.text(0.02, 0.5, info_text, transform=ax2.transAxes, bbox=dict(facecolor='white', alpha=0.8)) plt.tight_layout() # Save the figure if not output_file: output_file = os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png") plt.savefig(output_file, dpi=100) plt.close(fig) logger.info(f"RTP visualization generated: {output_file}") return output_file def generate_call_quality_dashboard(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate a dashboard with call quality metrics Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: call_quality.png in output_dir) Returns: Path to the generated dashboard file """ if not calls or not HAS_MATPLOTLIB: logger.warning("No SIP calls to visualize or Matplotlib not available") return "" # Find calls with RTP streams calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams} if not calls_with_rtp: logger.warning("No RTP streams to visualize") return "" # Create figure with subplots fig = plt.figure(figsize=(12, 8)) gs = fig.add_gridspec(2, 2) # Call summary subplot ax_summary = fig.add_subplot(gs[0, 0]) # MOS score gauge subplot ax_mos = fig.add_subplot(gs[0, 1]) # Jitter subplot ax_jitter = fig.add_subplot(gs[1, 0]) # Packet loss subplot ax_loss = fig.add_subplot(gs[1, 1]) # Process each call for i, (call_id, call) in enumerate(calls_with_rtp.items()): # Calculate average metrics across all streams avg_mos = call.get_average_mos() total_jitter = 0 total_packet_loss_pct = 0 total_packets = 0 total_lost_packets = 0 for stream in call.rtp_streams.values(): total_jitter += stream.jitter total_packets += stream.packet_count total_lost_packets += stream.lost_packets avg_jitter = total_jitter / len(call.rtp_streams) if call.rtp_streams else 0 if total_packets > 0: overall_packet_loss_pct = (total_lost_packets / (total_packets + total_lost_packets)) * 100 else: overall_packet_loss_pct = 0 # Call summary summary_text = ( f"Call ID: {call_id}\n" f"From: {call.from_uri}\n" f"To: {call.to_uri}\n" f"Duration: {call.get_duration():.2f} seconds\n" f"Status: {call.status}\n" f"RTP Streams: {len(call.rtp_streams)}\n" f"Total Packets: {total_packets}\n" f"Lost Packets: {total_lost_packets}\n" f"Packet Loss: {overall_packet_loss_pct:.2f}%\n" f"Average Jitter: {avg_jitter:.2f} ms\n" f"MOS Score: {avg_mos:.2f}" ) ax_summary.text(0.05, 0.95, summary_text, transform=ax_summary.transAxes, verticalalignment='top', fontsize=10) ax_summary.axis('off') ax_summary.set_title("Call Summary") # MOS score gauge self._draw_mos_gauge(ax_mos, avg_mos) # Get the first stream for detailed metrics if call.rtp_streams: stream_id = next(iter(call.rtp_streams)) stream = call.rtp_streams[stream_id] # Extract packet data if stream.packets: # Create timestamps relative to the first packet first_timestamp = stream.packets[0]['timestamp'] timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets] # Create jitter data (use actual jitter if available, otherwise simulate) if hasattr(stream, 'jitter_values') and stream.jitter_values: jitter_values = stream.jitter_values else: # Simulate jitter values based on the average jitter base_jitter = stream.jitter jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))] # Plot jitter ax_jitter.plot(timestamps, jitter_values, color='blue', alpha=0.7) ax_jitter.set_xlabel('Time (seconds)') ax_jitter.set_ylabel('Jitter (ms)') ax_jitter.set_title('Jitter Over Time') ax_jitter.grid(True, alpha=0.3) # Add threshold lines ax_jitter.axhline(y=20, color='green', linestyle='--', alpha=0.5) ax_jitter.axhline(y=50, color='orange', linestyle='--', alpha=0.5) ax_jitter.axhline(y=100, color='red', linestyle='--', alpha=0.5) # Add threshold labels ax_jitter.text(timestamps[-1], 20, 'Excellent (<20ms)', verticalalignment='bottom', horizontalalignment='right', color='green') ax_jitter.text(timestamps[-1], 50, 'Good (<50ms)', verticalalignment='bottom', horizontalalignment='right', color='orange') ax_jitter.text(timestamps[-1], 100, 'Fair (<100ms)', verticalalignment='bottom', horizontalalignment='right', color='red') # Packet loss pie chart labels = ['Received', 'Lost'] sizes = [total_packets, total_lost_packets] colors = ['#66b3ff', '#ff6666'] if sum(sizes) > 0: # Avoid division by zero ax_loss.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) ax_loss.axis('equal') ax_loss.set_title('Packet Loss') else: ax_loss.text(0.5, 0.5, "No packet data available", horizontalalignment='center', verticalalignment='center') ax_loss.axis('off') plt.tight_layout() # Save the figure if not output_file: output_file = os.path.join(self.output_dir, "call_quality_dashboard.png") plt.savefig(output_file, dpi=100) plt.close(fig) logger.info(f"Call quality dashboard generated: {output_file}") return output_file def _draw_mos_gauge(self, ax, mos_score): """Draw a gauge chart for MOS score Args: ax: Matplotlib axis to draw on mos_score: MOS score (1.0-4.5) """ # Normalize MOS score to 0-1 range norm_mos = (mos_score - 1.0) / 3.5 # MOS range is 1.0-4.5 norm_mos = max(0, min(1, norm_mos)) # Clamp to 0-1 # Create gauge gauge_angles = np.linspace(0, 180, 100) * np.pi / 180 gauge_radius = 0.8 gauge_width = 0.2 # Draw gauge background for i, angle in enumerate(gauge_angles[:-1]): next_angle = gauge_angles[i+1] # Determine color based on position pos = i / len(gauge_angles[:-1]) if pos < 0.3: # Bad to Poor (red) color = 'red' elif pos < 0.6: # Poor to Fair (orange) color = 'orange' else: # Fair to Excellent (green) color = 'green' # Draw gauge segment ax.add_patch(plt.matplotlib.patches.Wedge( (0, 0), gauge_radius, angle * 180 / np.pi, next_angle * 180 / np.pi, width=gauge_width, color=color, alpha=0.7 )) # Draw needle needle_angle = norm_mos * 180 * np.pi / 180 ax.plot([0, gauge_radius * np.cos(needle_angle)], [0, gauge_radius * np.sin(needle_angle)], color='black', linewidth=2) # Add center circle ax.add_patch(plt.matplotlib.patches.Circle((0, 0), 0.05, color='black')) # Add MOS value text quality_rating = "Unknown" if mos_score >= 4.3: quality_rating = "Excellent" elif mos_score >= 4.0: quality_rating = "Good" elif mos_score >= 3.6: quality_rating = "Fair" elif mos_score >= 3.1: quality_rating = "Poor" else: quality_rating = "Bad" ax.text(0, -0.2, f"MOS: {mos_score:.2f}", horizontalalignment='center', fontsize=12, fontweight='bold') ax.text(0, -0.3, f"({quality_rating})", horizontalalignment='center', fontsize=10) # Add scale labels ax.text(-gauge_radius-0.1, 0, "1.0", horizontalalignment='right', verticalalignment='center', fontsize=8) ax.text(0, gauge_radius+0.1, "3.0", horizontalalignment='center', verticalalignment='bottom', fontsize=8) ax.text(gauge_radius+0.1, 0, "4.5", horizontalalignment='left', verticalalignment='center', fontsize=8) # Set axis limits and turn off axis ax.set_xlim(-1, 1) ax.set_ylim(-0.5, 1) ax.axis('off') ax.set_title("Call Quality (MOS)") def generate_html_report(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate a comprehensive HTML report with all visualizations Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: sip_rtp_report.html in output_dir) Returns: Path to the generated HTML report """ if not calls: logger.warning("No SIP calls to visualize") return "" # Generate all visualizations visualization_files = {} for call_id, call in calls.items(): # Only process calls with RTP streams if not call.rtp_streams: continue # Generate sequence diagram seq_diagram = self.generate_sip_sequence_diagram( {call_id: call}, os.path.join(self.output_dir, f"sip_sequence_{call_id}.png") ) if seq_diagram: visualization_files[f"sequence_{call_id}"] = seq_diagram # Generate RTP visualization for each stream for stream_id in call.rtp_streams: rtp_viz = self.generate_rtp_visualization( {call_id: call}, os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png") ) if rtp_viz: visualization_files[f"rtp_{call_id}_{stream_id}"] = rtp_viz # Generate call quality dashboard dashboard = self.generate_call_quality_dashboard( calls, os.path.join(self.output_dir, "call_quality_dashboard.png") ) if dashboard: visualization_files["dashboard"] = dashboard # Create HTML report if not output_file: output_file = os.path.join(self.output_dir, "sip_rtp_report.html") with open(output_file, 'w', encoding='utf-8') as f: f.write(self._generate_html_content(calls, visualization_files)) logger.info(f"HTML report generated: {output_file}") return output_file def _generate_html_content(self, calls: Dict[str, Any], visualization_files: Dict[str, str]) -> str: """Generate HTML content for the report Args: calls: Dictionary of SIP calls from the analyzer visualization_files: Dictionary of visualization file paths Returns: HTML content as a string """ html = [] html.append("") html.append("") html.append("") html.append(" ") html.append(" ") html.append(" SIP and RTP Analysis Report") html.append(" ") html.append(" ") html.append("") html.append("") html.append("
") html.append("
") html.append("

SIP and RTP Analysis Report

") html.append(f"

Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

") html.append("
") # Summary section html.append("
") html.append("

Summary

") html.append("
") html.append(f"
Total Calls: {len(calls)}
") # Count calls with RTP calls_with_rtp = sum(1 for call in calls.values() if call.rtp_streams) html.append(f"
Calls with RTP: {calls_with_rtp}
") # Count total RTP streams total_streams = sum(len(call.rtp_streams) for call in calls.values()) html.append(f"
Total RTP Streams: {total_streams}
") html.append("
") # Add dashboard if available if "dashboard" in visualization_files: html.append("
") html.append(f" Call Quality Dashboard") html.append("
") html.append("
") # Call details sections for call_id, call in calls.items(): html.append(f"
") html.append(f"

Call: {call_id}

") # Call information html.append("
") html.append(f"
From: {call.from_uri}
") html.append(f"
To: {call.to_uri}
") html.append(f"
Status: {call.status}
") html.append(f"
Duration: {call.get_duration():.2f} seconds
") html.append(f"
Messages: {len(call.messages)}
") html.append(f"
RTP Streams: {len(call.rtp_streams)}
") # Add quality metrics if available if call.rtp_streams: quality = call.get_call_quality_summary() html.append(f"
MOS Score: {quality['mos']:.2f} ({quality['rating']})
") html.append(f"
Average Jitter: {quality['jitter']:.2f} ms
") html.append(f"
Packet Loss: {quality['packet_loss']:.2f}%
") html.append("
") # Add sequence diagram if available seq_key = f"sequence_{call_id}" if seq_key in visualization_files: html.append("
") html.append("

SIP Message Flow

") html.append(f" SIP Sequence Diagram") html.append("
") # Add RTP visualizations if available for stream_id in call.rtp_streams: rtp_key = f"rtp_{call_id}_{stream_id}" if rtp_key in visualization_files: html.append("
") html.append(f"

RTP Stream Analysis (SSRC: {stream_id})

") html.append(f" RTP Visualization") html.append("
") # Message flow details html.append("
") html.append("

Message Details

") for i, msg in enumerate(call.messages): # Determine if this is a request or response is_request = not msg['method'].startswith("SIP/2.0") msg_class = "request" if is_request else "response" html.append(f"
") html.append(f"
{i+1}. [{msg['timestamp']}] {msg['source_ip']} → {msg['dest_ip']}: {msg['method']}
") html.append(f"
Show/Hide Details
") # Message details (hidden by default) html.append(f"
") # Headers html.append(" Headers:") for header, value in msg['headers'].items(): html.append(f" {header}: {value}") # Content (SDP) if msg.get('content'): html.append("\n Content:") html.append(f" {msg['content']}") html.append("
") html.append("
") html.append("
") html.append("
") # Footer html.append(" ") html.append("
") html.append("") html.append("") return "\n".join(html) class SipRtpPcapAnalyzer: """Main class for analyzing PCAP files and extracting SIP messages and RTP streams""" def __init__(self, pcap_file: str, output_file: Optional[str] = None, verbose: bool = False, custom_sip_ports: Optional[List[int]] = None, rtp_port_range: Optional[Tuple[int, int]] = None, visualize: bool = False, output_dir: Optional[str] = None): self.pcap_file = pcap_file self.output_file = output_file self.verbose = verbose self.calls: Dict[str, SipCall] = {} # Dictionary of calls indexed by Call-ID self.messages_count = 0 self.sip_ports: Set[int] = set(SIP_PORTS) # Copy default SIP ports self.rtp_streams: Dict[int, RtpStream] = {} # Dictionary of RTP streams indexed by SSRC self.potential_rtp_ports: Set[int] = set() # Ports identified from SDP # Visualization options self.visualize = visualize self.output_dir = output_dir # Set custom RTP port range if provided self.rtp_port_range = rtp_port_range if rtp_port_range else RTP_PORT_RANGE # Add custom SIP ports if provided if custom_sip_ports: for port in custom_sip_ports: self.sip_ports.add(port) # Set logging level based on verbose flag if verbose: logger.setLevel(logging.DEBUG) def analyze(self) -> None: """Analyze the PCAP file based on the available library""" if not os.path.exists(self.pcap_file): logger.error(f"Error: File {self.pcap_file} does not exist.") return if LIBRARY == "scapy": self._analyze_with_scapy() elif LIBRARY == "pyshark": self._analyze_with_pyshark() elif LIBRARY == "dpkt": self._analyze_with_dpkt() # After analyzing, associate RTP streams with SIP calls self._associate_rtp_streams_with_calls() # Calculate final metrics for all RTP streams for stream in self.rtp_streams.values(): stream.calculate_mos() # Generate visualizations if requested if self.visualize: self._generate_visualizations() def _analyze_with_scapy(self) -> None: """Analyze PCAP file using scapy""" logger.info(f"Analyzing {self.pcap_file} with scapy...") try: # Read the pcap file packets = scapy.rdpcap(self.pcap_file) for packet in packets: # Check if packet has IP layer if not packet.haslayer(IP): continue # Check if packet has UDP or TCP layer if packet.haslayer(UDP): transport_layer = packet[UDP] protocol = "UDP" elif packet.haslayer(TCP): transport_layer = packet[TCP] protocol = "TCP" else: continue # Get source and destination IP addresses src_ip = packet[IP].src dst_ip = packet[IP].dst src_port = transport_layer.sport dst_port = transport_layer.dport # Check if this might be a SIP packet (common ports or payload signature) is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports) if is_sip_port or protocol == "TCP": # TCP needs further inspection # Try to check payload for SIP signature if not hasattr(transport_layer, 'payload') or not transport_layer.payload: continue payload = bytes(transport_layer.payload) if (payload.startswith(b'SIP/2.0') or any(payload.startswith(method.encode()) for method in SIP_METHODS)): # This is a SIP packet timestamp = datetime.fromtimestamp(float(packet.time)) sip_message = payload.decode('utf-8', errors='ignore') self._process_sip_message(timestamp, src_ip, dst_ip, sip_message) continue # Check if this might be an RTP packet is_potential_rtp = ( protocol == "UDP" and (src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or (self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1])) ) if is_potential_rtp: # Try to parse as RTP try: if HAS_RTP_LAYER and packet.haslayer(RTP): # Scapy has built-in RTP layer rtp_layer = packet[RTP] ssrc = rtp_layer.sourcesync seq = rtp_layer.sequence timestamp_rtp = rtp_layer.timestamp payload_type = rtp_layer.payload_type payload_size = len(bytes(rtp_layer.payload)) else: # Manual parsing of RTP header rtp_data = bytes(transport_layer.payload) if len(rtp_data) < 12: # Minimum RTP header size continue # Check RTP version (first 2 bits should be 2) version = (rtp_data[0] >> 6) & 0x03 if version != 2: continue # Extract RTP header fields payload_type = rtp_data[1] & 0x7F seq = (rtp_data[2] << 8) | rtp_data[3] timestamp_rtp = ((rtp_data[4] << 24) | (rtp_data[5] << 16) | (rtp_data[6] << 8) | rtp_data[7]) ssrc = ((rtp_data[8] << 24) | (rtp_data[9] << 16) | (rtp_data[10] << 8) | rtp_data[11]) payload_size = len(rtp_data) - 12 # Process RTP packet timestamp = datetime.fromtimestamp(float(packet.time)) self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port, ssrc, seq, timestamp_rtp, payload_type, payload_size) except Exception as e: if self.verbose: logger.debug(f"Error parsing potential RTP packet: {e}") continue logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.") except Exception as e: logger.error(f"Error analyzing PCAP file with scapy: {e}") def _analyze_with_pyshark(self) -> None: """Analyze PCAP file using pyshark""" logger.info(f"Analyzing {self.pcap_file} with pyshark...") try: # Open the pcap file cap = pyshark.FileCapture(self.pcap_file) for packet in cap: try: # Check if packet has IP layer if not hasattr(packet, 'ip'): continue # Get source and destination IP addresses src_ip = packet.ip.src dst_ip = packet.ip.dst # Check for SIP packets if hasattr(packet, 'sip'): # This is a SIP packet timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp)) # Reconstruct SIP message sip_message = "" if hasattr(packet.sip, 'request_line'): sip_message += packet.sip.request_line + "\r\n" elif hasattr(packet.sip, 'status_line'): sip_message += packet.sip.status_line + "\r\n" # Add headers for field in packet.sip._all_fields: field_name = field.name if field_name.startswith('sip.') and not field_name.startswith('sip.request') and not field_name.startswith('sip.status'): header_name = field_name.replace('sip.', '') if hasattr(packet.sip, header_name): header_value = getattr(packet.sip, header_name) sip_message += f"{header_name}: {header_value}\r\n" # Add body if present if hasattr(packet.sip, 'msg_body'): sip_message += "\r\n" + packet.sip.msg_body self._process_sip_message(timestamp, src_ip, dst_ip, sip_message) continue # Check for RTP packets if hasattr(packet, 'rtp'): # Get transport layer info if hasattr(packet, 'udp'): src_port = int(packet.udp.srcport) dst_port = int(packet.udp.dstport) else: continue # RTP should be over UDP # Extract RTP fields ssrc = int(packet.rtp.ssrc, 16) seq = int(packet.rtp.seq) timestamp_rtp = int(packet.rtp.timestamp) payload_type = int(packet.rtp.p_type) # Get payload size if hasattr(packet.rtp, 'payload'): payload_size = len(packet.rtp.payload) else: payload_size = 0 # Process RTP packet timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp)) self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port, ssrc, seq, timestamp_rtp, payload_type, payload_size) except Exception as e: if self.verbose: logger.debug(f"Error processing packet with pyshark: {e}") continue logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.") except Exception as e: logger.error(f"Error analyzing PCAP file with pyshark: {e}") def _analyze_with_dpkt(self) -> None: """Analyze PCAP file using dpkt""" logger.info(f"Analyzing {self.pcap_file} with dpkt...") try: # Open the pcap file with open(self.pcap_file, 'rb') as f: pcap_reader = dpkt.pcap.Reader(f) for timestamp, buf in pcap_reader: try: # Parse Ethernet frame eth = Ethernet(buf) # Check if packet has IP layer if not isinstance(eth.data, dpkt.ip.IP): continue ip = eth.data src_ip = socket.inet_ntoa(ip.src) dst_ip = socket.inet_ntoa(ip.dst) # Check if packet has UDP or TCP layer if isinstance(ip.data, dpkt.udp.UDP): udp = ip.data src_port = udp.sport dst_port = udp.dport protocol = "UDP" transport_data = udp.data elif isinstance(ip.data, dpkt.tcp.TCP): tcp = ip.data src_port = tcp.sport dst_port = tcp.dport protocol = "TCP" transport_data = tcp.data else: continue # Check for SIP packets is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports) if is_sip_port or protocol == "TCP": # TCP needs further inspection # Check if payload looks like SIP if (transport_data.startswith(b'SIP/2.0') or any(transport_data.startswith(method.encode()) for method in SIP_METHODS)): # This is a SIP packet timestamp_dt = datetime.fromtimestamp(timestamp) sip_message = transport_data.decode('utf-8', errors='ignore') self._process_sip_message(timestamp_dt, src_ip, dst_ip, sip_message) continue # Check for RTP packets is_potential_rtp = ( protocol == "UDP" and (src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or (self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1])) ) if is_potential_rtp and len(transport_data) >= 12: # Try to parse as RTP try: # Check RTP version (first 2 bits should be 2) version = (transport_data[0] >> 6) & 0x03 if version != 2: continue # Extract RTP header fields payload_type = transport_data[1] & 0x7F seq = (transport_data[2] << 8) | transport_data[3] timestamp_rtp = ((transport_data[4] << 24) | (transport_data[5] << 16) | (transport_data[6] << 8) | transport_data[7]) ssrc = ((transport_data[8] << 24) | (transport_data[9] << 16) | (transport_data[10] << 8) | transport_data[11]) payload_size = len(transport_data) - 12 # Process RTP packet timestamp_dt = datetime.fromtimestamp(timestamp) self._process_rtp_packet(timestamp_dt, src_ip, src_port, dst_ip, dst_port, ssrc, seq, timestamp_rtp, payload_type, payload_size) except Exception as e: if self.verbose: logger.debug(f"Error parsing potential RTP packet: {e}") continue except Exception as e: if self.verbose: logger.debug(f"Error processing packet with dpkt: {e}") continue logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.") except Exception as e: logger.error(f"Error analyzing PCAP file with dpkt: {e}") def _process_sip_message(self, timestamp: datetime, source_ip: str, dest_ip: str, sip_message: str) -> None: """Process a SIP message and add it to the appropriate call""" if not sip_message: return # Split message into lines lines = sip_message.splitlines() if not lines: return # Parse first line to determine if it's a request or response first_line = lines[0].strip() if first_line.startswith('SIP/2.0 '): # This is a response method = first_line else: # This is a request parts = first_line.split(' ') if len(parts) >= 1: method = parts[0] else: return # Parse headers headers = {} content = "" content_started = False for line in lines[1:]: if content_started: content += line + "\n" continue if not line.strip(): content_started = True continue if ':' in line: header_name, header_value = line.split(':', 1) headers[header_name.strip()] = header_value.strip() # Get the Call-ID call_id = headers.get("Call-ID", headers.get("i", None)) if not call_id: return # Add message to the appropriate call if call_id not in self.calls: self.calls[call_id] = SipCall(call_id) self.calls[call_id].add_message( timestamp, method, source_ip, dest_ip, headers, content ) self.messages_count += 1 # Extract potential RTP ports from SDP if content and "m=audio" in content: media_matches = re.findall(r'm=audio\s+(\d+)', content) for port_str in media_matches: try: port = int(port_str) self.potential_rtp_ports.add(port) if self.verbose: logger.debug(f"Found potential RTP port {port} in SDP") except ValueError: continue def _process_rtp_packet(self, timestamp: datetime, src_ip: str, src_port: int, dst_ip: str, dst_port: int, ssrc: int, seq: int, rtp_timestamp: int, payload_type: int, payload_size: int) -> None: """Process an RTP packet and add it to the appropriate stream""" # Create a unique stream identifier stream_key = ssrc # Add to existing stream or create new one if stream_key not in self.rtp_streams: self.rtp_streams[stream_key] = RtpStream(ssrc, src_ip, src_port, dst_ip, dst_port) # Set codec based on payload type if available in SDP for call in self.calls.values(): if 'codec' in call.media_info and 'id' in call.media_info['codec']: try: codec_id = int(call.media_info['codec']['id']) if codec_id == payload_type: self.rtp_streams[stream_key].codec = call.media_info['codec']['name'] break except (ValueError, TypeError): pass # Add packet to stream self.rtp_streams[stream_key].add_packet( timestamp, seq, rtp_timestamp, payload_type, payload_size ) def _associate_rtp_streams_with_calls(self) -> None: """Associate RTP streams with SIP calls based on IP addresses and ports""" # Create a mapping of media endpoints to call IDs media_endpoints = {} # First pass: extract media endpoints from SDP in SIP messages for call_id, call in self.calls.items(): if 'connection_address' in call.media_info and 'port' in call.media_info: endpoint = (call.media_info['connection_address'], call.media_info['port']) media_endpoints[endpoint] = call_id # Second pass: try to match RTP streams to calls for ssrc, stream in self.rtp_streams.items(): matched = False # Check source endpoint src_endpoint = (stream.src_ip, stream.src_port) if src_endpoint in media_endpoints: call_id = media_endpoints[src_endpoint] self.calls[call_id].add_rtp_stream(stream) matched = True continue # Check destination endpoint dst_endpoint = (stream.dst_ip, stream.dst_port) if dst_endpoint in media_endpoints: call_id = media_endpoints[dst_endpoint] self.calls[call_id].add_rtp_stream(stream) matched = True continue # If no exact match, try matching just by IP address if not matched: for call_id, call in self.calls.items(): for msg in call.messages: if (stream.src_ip == msg['source_ip'] and stream.dst_ip == msg['dest_ip']) or \ (stream.src_ip == msg['dest_ip'] and stream.dst_ip == msg['source_ip']): call.add_rtp_stream(stream) matched = True break if matched: break def _generate_visualizations(self) -> None: """Generate visualizations for SIP calls and RTP streams""" if not HAS_PLANTUML and not HAS_MATPLOTLIB: logger.warning("Visualization libraries not available. Install plantuml and matplotlib for visualizations.") return # Create visualizer visualizer = SipRtpVisualizer(output_dir=self.output_dir) # Generate HTML report report_file = visualizer.generate_html_report(self.calls) if report_file: logger.info(f"HTML report generated: {report_file}") def print_results(self) -> None: """Print analysis results to console or file""" output = [] output.append(f"SIP and RTP Analysis Results for {self.pcap_file}") output.append("=" * 80) output.append(f"Total SIP Messages: {self.messages_count}") output.append(f"Total SIP Calls: {len(self.calls)}") output.append(f"Total RTP Streams: {len(self.rtp_streams)}") output.append("") # Print call details output.append("Call Details:") output.append("-" * 80) for call_id, call in self.calls.items(): output.append(str(call)) output.append("-" * 80) # Print RTP streams that weren't associated with any call unassociated_streams = [] for ssrc, stream in self.rtp_streams.items(): is_associated = False for call in self.calls.values(): if ssrc in call.rtp_streams: is_associated = True break if not is_associated: unassociated_streams.append(stream) if unassociated_streams: output.append("Unassociated RTP Streams:") output.append("-" * 80) for stream in unassociated_streams: output.append(str(stream)) output.append("-" * 80) # Join all output lines result = "\n".join(output) # Print to console or file if self.output_file: try: with open(self.output_file, 'w') as f: f.write(result) logger.info(f"Results written to {self.output_file}") except Exception as e: logger.error(f"Error writing to output file: {e}") print(result) else: print(result) def get_call_quality_metrics(self) -> Dict[str, Dict[str, Any]]: """Get quality metrics for all calls in a structured format""" metrics = {} for call_id, call in self.calls.items(): if call.rtp_streams: quality = call.get_call_quality_summary() metrics[call_id] = { "from": call.from_uri, "to": call.to_uri, "duration": call.get_duration(), "mos": quality["mos"], "jitter": quality["jitter"], "packet_loss": quality["packet_loss"], "rating": quality["rating"], "stream_count": quality["stream_count"] } else: metrics[call_id] = { "from": call.from_uri, "to": call.to_uri, "duration": call.get_duration(), "status": "No RTP streams found" } return metrics def main(): """Main function""" parser = argparse.ArgumentParser(description='Analyze PCAP files for SIP messages and RTP streams in VoIP calls') parser.add_argument('pcap_file', help='Path to the PCAP file to analyze') parser.add_argument('-o', '--output', help='Output file for analysis results') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') parser.add_argument('-p', '--ports', type=int, nargs='+', help='Additional SIP ports to check') parser.add_argument('-r', '--rtp-range', type=int, nargs=2, metavar=('MIN', 'MAX'), help='Custom RTP port range (default: 10000-65535)') parser.add_argument('--visualize', action='store_true', help='Generate visualizations') parser.add_argument('--output-dir', help='Output directory for visualizations') parser.add_argument('--report', action='store_true', help='Generate HTML report') args = parser.parse_args() rtp_range = (args.rtp_range[0], args.rtp_range[1]) if args.rtp_range else None # Check if visualization libraries are available if args.visualize or args.report: if not HAS_PLANTUML: logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.") if not HAS_MATPLOTLIB: logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.") # Create analyzer analyzer = SipRtpPcapAnalyzer( args.pcap_file, args.output, args.verbose, args.ports, rtp_range, args.visualize or args.report, args.output_dir ) # Analyze PCAP file analyzer.analyze() # Print results analyzer.print_results() # Generate report if requested if args.report and not analyzer.visualize: visualizer = SipRtpVisualizer(output_dir=args.output_dir) report_file = visualizer.generate_html_report(analyzer.calls) if report_file: logger.info(f"HTML report generated: {report_file}") if __name__ == "__main__": main()