#!/usr/bin/env python3 """ SIP and RTP PCAP Analyzer with Visualization - A tool to analyze PCAP files and extract SIP messages and RTP streams for VoIP call analysis, with graphical visualization capabilities. This script can parse PCAP/PCAPNG files and identify SIP messages exchanged during VoIP call establishments, including INVITE, ACK, BYE, and other SIP messages. It also analyzes RTP streams to provide metrics on call quality including jitter, packet loss, and MOS score. Additionally, it generates graphical visualizations of SIP signaling and RTP media flows. """ import sys import os import argparse import re import logging import math import subprocess from datetime import datetime from typing import Dict, List, Tuple, Optional, Any, Set, DefaultDict from collections import defaultdict # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('sip_rtp_pcap_analyzer') # Try to import different packet analysis libraries # We'll use the first one available in this priority order: scapy, pyshark, dpkt LIBRARY = None try: import scapy.all as scapy from scapy.layers.inet import IP, UDP, TCP try: from scapy.layers.rtp import RTP HAS_RTP_LAYER = True except ImportError: HAS_RTP_LAYER = False LIBRARY = "scapy" logger.info("Using scapy for packet analysis") except ImportError: try: import pyshark LIBRARY = "pyshark" logger.info("Using pyshark for packet analysis") except ImportError: try: import dpkt import socket from dpkt.ethernet import Ethernet LIBRARY = "dpkt" logger.info("Using dpkt for packet analysis") except ImportError: logger.error("No packet analysis library found. Please install scapy, pyshark, or dpkt.") sys.exit(1) # Try to import visualization libraries try: import plantuml HAS_PLANTUML = True except ImportError: HAS_PLANTUML = False logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.") try: import matplotlib matplotlib.use('Agg') # Use non-interactive backend import matplotlib.pyplot as plt import matplotlib.dates as mdates import numpy as np HAS_MATPLOTLIB = True except ImportError: HAS_MATPLOTLIB = False logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.") # SIP message types we're interested in SIP_METHODS = [ "INVITE", "ACK", "BYE", "CANCEL", "REGISTER", "OPTIONS", "PRACK", "SUBSCRIBE", "NOTIFY", "PUBLISH", "INFO", "REFER", "MESSAGE", "UPDATE" ] # SIP response codes and their meanings SIP_RESPONSES = { "1": "Informational", "2": "Success", "3": "Redirection", "4": "Client Error", "5": "Server Error", "6": "Global Failure" } # Common SIP ports SIP_PORTS = {5060, 5061} # Common RTP port range RTP_PORT_RANGE = (10000, 65535) # Typical RTP port range # RTP payload type to codec mapping RTP_CODEC_MAP = { 0: "PCMU/G.711u", 3: "GSM", 4: "G723", 8: "PCMA/G.711a", 9: "G722", 10: "L16 (stereo)", 11: "L16 (mono)", 18: "G729", 26: "JPEG", 31: "H261", 32: "MPV", 33: "MP2T", 34: "H263", 96: "Dynamic", 97: "Dynamic", 98: "Dynamic", 99: "Dynamic", 100: "Dynamic", 101: "Dynamic", 110: "OPUS", 111: "OPUS" } class RtpStream: """Class to represent an RTP stream with its packets and metrics""" def __init__(self, ssrc: int, src_ip: str, src_port: int, dst_ip: str, dst_port: int): self.ssrc = ssrc self.src_ip = src_ip self.src_port = src_port self.dst_ip = dst_ip self.dst_port = dst_port self.packets = [] self.start_time = None self.end_time = None self.packet_count = 0 self.expected_packets = 0 self.lost_packets = 0 self.jitter = 0.0 self.jitter_values = [] # Store jitter values for each packet for visualization self.last_seq = None self.last_timestamp = None self.last_arrival = None self.codec = None self.payload_type = None self.mos_score = None self.rtt = None # Round Trip Time in ms def add_packet(self, timestamp: datetime, seq: int, rtp_timestamp: int, payload_type: int, payload_size: int): """Add an RTP packet to this stream and update metrics""" if not self.start_time or timestamp < self.start_time: self.start_time = timestamp if not self.end_time or timestamp > self.end_time: self.end_time = timestamp # Set payload type if not already set if self.payload_type is None: self.payload_type = payload_type # Calculate packet loss if self.last_seq is not None: expected_seq = (self.last_seq + 1) % 65536 # RTP sequence numbers are 16-bit if seq != expected_seq: # Account for sequence number wraparound if seq > expected_seq: self.lost_packets += seq - expected_seq else: self.lost_packets += (65536 - expected_seq) + seq # Calculate jitter (RFC 3550 algorithm) if self.last_timestamp is not None and self.last_arrival is not None: # Convert timestamps to milliseconds for easier calculation arrival_ms = timestamp.timestamp() * 1000 last_arrival_ms = self.last_arrival.timestamp() * 1000 # Calculate transit time difference transit = arrival_ms - rtp_timestamp last_transit = last_arrival_ms - self.last_timestamp # Calculate difference in transit times d = abs(transit - last_transit) # Update jitter using RFC 3550 formula: J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16 self.jitter = self.jitter + (d - self.jitter) / 16 # Store jitter value for visualization self.jitter_values.append(self.jitter) else: self.jitter_values.append(0.0) # Store current values for next calculation self.last_seq = seq self.last_timestamp = rtp_timestamp self.last_arrival = timestamp # Add packet to list and increment counter self.packets.append({ "timestamp": timestamp, "seq": seq, "rtp_timestamp": rtp_timestamp, "payload_type": payload_type, "payload_size": payload_size }) self.packet_count += 1 self.expected_packets = self.packet_count + self.lost_packets def get_packet_loss_percentage(self) -> float: """Get packet loss as a percentage""" if self.expected_packets == 0: return 0.0 return (self.lost_packets / self.expected_packets) * 100 def calculate_mos(self) -> float: """Calculate Mean Opinion Score (MOS) based on packet loss and jitter""" # Default RTT if not available rtt = self.rtt if self.rtt is not None else 50 # Default 50ms # Calculate effective latency effective_latency = rtt + (self.jitter * 2) + 10 # Add 10ms for protocol latencies # Calculate R factor based on ITU-T G.107 E-model if effective_latency < 160: r = 93.2 - (effective_latency / 40) else: r = 93.2 - (effective_latency - 120) / 10 # Deduct for packet loss (2.5 R values per percentage) packet_loss = self.get_packet_loss_percentage() r = r - (packet_loss * 2.5) # Convert R to MOS using ITU-T G.107 formula if r < 0: mos = 1.0 elif r > 100: mos = 4.5 else: mos = 1 + (0.035 * r) + (r * (r - 60) * (100 - r) * 7 * 10**-6) # Clamp MOS between 1.0 and 4.5 mos = max(1.0, min(4.5, mos)) self.mos_score = mos return mos def get_duration(self) -> float: """Get stream duration in seconds""" if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return 0 def get_codec_name(self) -> str: """Get codec name based on payload type""" if self.codec: return self.codec if self.payload_type in RTP_CODEC_MAP: return RTP_CODEC_MAP[self.payload_type] return f"Unknown ({self.payload_type})" def get_quality_rating(self) -> str: """Get a human-readable quality rating based on MOS score""" if self.mos_score is None: self.calculate_mos() if self.mos_score >= 4.3: return "Excellent" elif self.mos_score >= 4.0: return "Good" elif self.mos_score >= 3.6: return "Fair" elif self.mos_score >= 3.1: return "Poor" else: return "Bad" def __str__(self) -> str: """String representation of the RTP stream""" if self.mos_score is None: self.calculate_mos() result = f"RTP Stream: SSRC={self.ssrc}\n" result += f"Source: {self.src_ip}:{self.src_port}\n" result += f"Destination: {self.dst_ip}:{self.dst_port}\n" result += f"Codec: {self.get_codec_name()}\n" result += f"Start Time: {self.start_time}\n" result += f"End Time: {self.end_time}\n" result += f"Duration: {self.get_duration():.2f} seconds\n" result += f"Packets: {self.packet_count}\n" result += f"Lost Packets: {self.lost_packets}\n" result += f"Packet Loss: {self.get_packet_loss_percentage():.2f}%\n" result += f"Jitter: {self.jitter:.2f} ms\n" if self.rtt is not None: result += f"Round Trip Time: {self.rtt:.2f} ms\n" result += f"MOS Score: {self.mos_score:.2f} ({self.get_quality_rating()})\n" return result class SipCall: """Class to represent a SIP call with its messages and RTP streams""" def __init__(self, call_id: str): self.call_id = call_id self.messages = [] self.start_time = None self.end_time = None self.from_uri = None self.to_uri = None self.status = "Unknown" # Can be "Setup", "Established", "Terminated", "Failed" self.call_direction = None # Can be "Outgoing", "Incoming", or None self.media_info = {} # Store media information from SDP self.rtp_streams = {} # Dictionary of RTP streams indexed by SSRC def add_message(self, timestamp: datetime, method: str, source_ip: str, dest_ip: str, headers: Dict[str, str], content: str = None): """Add a SIP message to this call""" if not self.start_time or timestamp < self.start_time: self.start_time = timestamp if not self.end_time or timestamp > self.end_time: self.end_time = timestamp # Extract From and To URIs from the first message if not self.from_uri and "From" in headers: from_match = re.search(r'<(sip:[^>]+)>', headers["From"]) if from_match: self.from_uri = from_match.group(1) if not self.to_uri and "To" in headers: to_match = re.search(r'<(sip:[^>]+)>', headers["To"]) if to_match: self.to_uri = to_match.group(1) # Update call status based on method if method == "INVITE": self.status = "Setup" elif method == "ACK" and self.status == "Setup": self.status = "Established" elif method == "BYE": self.status = "Terminated" elif method.startswith("SIP/2.0 4") or method.startswith("SIP/2.0 5") or method.startswith("SIP/2.0 6"): self.status = "Failed" # Extract media information from SDP if present if content and "m=" in content: self._parse_sdp(content) # Add the message to the list self.messages.append({ "timestamp": timestamp, "method": method, "source_ip": source_ip, "dest_ip": dest_ip, "headers": headers, "content": content }) def _parse_sdp(self, sdp_content: str): """Parse SDP content to extract media information""" # Extract media type and port media_match = re.search(r'm=(\w+)\s+(\d+)', sdp_content) if media_match: media_type = media_match.group(1) media_port = int(media_match.group(2)) self.media_info['type'] = media_type self.media_info['port'] = media_port # Extract codec information codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', sdp_content) if codec_match: codec_id = codec_match.group(1) codec_name = codec_match.group(2) codec_rate = codec_match.group(3) self.media_info['codec'] = { 'id': codec_id, 'name': codec_name, 'rate': codec_rate } # Extract connection information (IP) conn_match = re.search(r'c=IN\s+IP[46]\s+([^\s]+)', sdp_content) if conn_match: self.media_info['connection_address'] = conn_match.group(1) def add_rtp_stream(self, rtp_stream: RtpStream): """Add an RTP stream to this call""" self.rtp_streams[rtp_stream.ssrc] = rtp_stream def get_duration(self) -> float: """Get call duration in seconds""" if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return 0 def get_average_mos(self) -> float: """Get average MOS score across all RTP streams""" if not self.rtp_streams: return 0.0 total_mos = 0.0 for stream in self.rtp_streams.values(): if stream.mos_score is None: stream.calculate_mos() total_mos += stream.mos_score return total_mos / len(self.rtp_streams) def get_call_quality_summary(self) -> Dict[str, Any]: """Get a summary of call quality metrics""" if not self.rtp_streams: return {"status": "No RTP streams found"} avg_mos = self.get_average_mos() avg_jitter = sum(s.jitter for s in self.rtp_streams.values()) / len(self.rtp_streams) avg_packet_loss = sum(s.get_packet_loss_percentage() for s in self.rtp_streams.values()) / len(self.rtp_streams) quality_rating = "Unknown" if avg_mos >= 4.3: quality_rating = "Excellent" elif avg_mos >= 4.0: quality_rating = "Good" elif avg_mos >= 3.6: quality_rating = "Fair" elif avg_mos >= 3.1: quality_rating = "Poor" else: quality_rating = "Bad" return { "mos": avg_mos, "jitter": avg_jitter, "packet_loss": avg_packet_loss, "rating": quality_rating, "stream_count": len(self.rtp_streams) } def __str__(self) -> str: """String representation of the call""" result = f"Call ID: {self.call_id}\n" result += f"Status: {self.status}\n" result += f"From: {self.from_uri}\n" result += f"To: {self.to_uri}\n" result += f"Start Time: {self.start_time}\n" result += f"End Time: {self.end_time}\n" result += f"Duration: {self.get_duration():.2f} seconds\n" # Add media information if available if self.media_info: result += "Media Information:\n" for key, value in self.media_info.items(): if isinstance(value, dict): result += f" {key}:\n" for k, v in value.items(): result += f" {k}: {v}\n" else: result += f" {key}: {value}\n" # Add call quality summary if RTP streams are available if self.rtp_streams: quality = self.get_call_quality_summary() result += "Call Quality:\n" result += f" MOS Score: {quality['mos']:.2f} ({quality['rating']})\n" result += f" Average Jitter: {quality['jitter']:.2f} ms\n" result += f" Average Packet Loss: {quality['packet_loss']:.2f}%\n" result += f" RTP Streams: {quality['stream_count']}\n" result += f"Message Count: {len(self.messages)}\n" result += "Message Flow:\n" for i, msg in enumerate(self.messages, 1): result += f" {i}. [{msg['timestamp']}] {msg['source_ip']} -> {msg['dest_ip']}: {msg['method']}\n" # Add RTP stream details if self.rtp_streams: result += "RTP Streams:\n" for ssrc, stream in self.rtp_streams.items(): result += f" Stream SSRC: {ssrc}\n" result += f" Source: {stream.src_ip}:{stream.src_port}\n" result += f" Destination: {stream.dst_ip}:{stream.dst_port}\n" result += f" Codec: {stream.get_codec_name()}\n" result += f" Packets: {stream.packet_count}\n" result += f" Lost Packets: {stream.lost_packets}\n" result += f" Packet Loss: {stream.get_packet_loss_percentage():.2f}%\n" result += f" Jitter: {stream.jitter:.2f} ms\n" result += f" MOS Score: {stream.mos_score:.2f} ({stream.get_quality_rating()})\n" return result class SipRtpVisualizer: """Class for generating visualizations of SIP and RTP flows""" def __init__(self, output_dir: Optional[str] = None, plantuml_server: str = "http://www.plantuml.com/plantuml"): """Initialize the visualizer Args: output_dir: Directory to save visualization files (default: current directory) plantuml_server: PlantUML server URL for rendering diagrams """ self.output_dir = output_dir or os.getcwd() self.plantuml_server = plantuml_server # Create output directory if it doesn't exist os.makedirs(self.output_dir, exist_ok=True) # Initialize PlantUML if HAS_PLANTUML: try: self.plantuml = plantuml.PlantUML(url=self.plantuml_server) except Exception as e: logger.warning(f"Error initializing PlantUML: {e}") self.plantuml = None else: self.plantuml = None def generate_sip_sequence_diagram(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate a sequence diagram for SIP calls Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: sip_sequence.png in output_dir) Returns: Path to the generated diagram file """ if not calls: logger.warning("No SIP calls to visualize") return "" if not HAS_PLANTUML or not self.plantuml: logger.error("PlantUML not available. Cannot generate sequence diagram.") return "" # Use the first call if multiple calls exist # In future, could support multiple calls in separate diagrams call_id = next(iter(calls)) call = calls[call_id] # Start building PlantUML diagram diagram = ["@startuml"] diagram.append("skinparam sequenceMessageAlign center") diagram.append("skinparam sequenceArrowThickness 2") diagram.append("skinparam roundcorner 5") diagram.append("skinparam maxmessagesize 200") diagram.append("") # Extract unique participants participants = set() for msg in call.messages: participants.add(msg['source_ip']) participants.add(msg['dest_ip']) # Define participants for i, participant in enumerate(participants): # Try to determine if this is caller or callee if call.from_uri and participant in call.from_uri: label = f"Caller\\n{participant}" elif call.to_uri and participant in call.to_uri: label = f"Callee\\n{participant}" else: label = f"Endpoint {i+1}\\n{participant}" diagram.append(f'participant "{label}" as P{i}') diagram.append("") # Map IP addresses to participant IDs ip_to_participant = {ip: f"P{i}" for i, ip in enumerate(participants)} # Add messages for msg in call.messages: src = ip_to_participant[msg['source_ip']] dst = ip_to_participant[msg['dest_ip']] # Format the message text method = msg['method'] # For SIP responses, extract the status code if method.startswith("SIP/2.0"): parts = method.split(" ", 2) if len(parts) >= 2: status_code = parts[1] if status_code.startswith("1"): arrow_type = "-->" # Dotted arrow for provisional responses else: arrow_type = "->" else: arrow_type = "->" else: arrow_type = "->" # Add the message to the diagram diagram.append(f"{src} {arrow_type} {dst}: {method}") # Add SDP information as notes if present if msg.get('content') and 'm=audio' in msg.get('content', ''): sdp_note = [] # Extract media type and port media_match = re.search(r'm=(\w+)\s+(\d+)', msg['content']) if media_match: media_type = media_match.group(1) media_port = media_match.group(2) sdp_note.append(f"SDP: {media_type} {media_port}") # Extract codec information codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', msg['content']) if codec_match: codec_id = codec_match.group(1) codec_name = codec_match.group(2) sdp_note.append(f"RTP/AVP {codec_id} ({codec_name})") if sdp_note: note_position = "right" if src < dst else "left" diagram.append(f"note {note_position}: {' '.join(sdp_note)}") # Add RTP stream if present if call.rtp_streams: # Calculate total RTP duration rtp_duration = 0 for stream in call.rtp_streams.values(): duration = stream.get_duration() if duration > rtp_duration: rtp_duration = duration if rtp_duration > 0: diagram.append("") diagram.append(f"... RTP Media Stream ({rtp_duration:.2f} seconds) ...") diagram.append("") diagram.append("@enduml") # Join the diagram lines diagram_str = "\n".join(diagram) # Generate the diagram if not output_file: output_file = os.path.join(self.output_dir, f"sip_sequence_{call_id}.png") try: # Save the PlantUML source first source_file = os.path.splitext(output_file)[0] + ".puml" with open(source_file, 'w') as f: f.write(diagram_str) logger.info(f"PlantUML source saved: {source_file}") # Try to generate the diagram try: #self.plantuml.processes_file(diagram_str, outfile=output_file) self.plantuml.processes_file(source_file) logger.info(f"Sequence diagram generated: {output_file}") return output_file except Exception as e: # If PlantUML server fails, try using local command-line tool if available logger.warning(f"PlantUML server error: {str(e)}") try: cmd = f"plantuml {source_file}" subprocess.run(cmd, shell=True, check=True) logger.info(f"Sequence diagram generated using local PlantUML: {output_file}") return output_file except Exception as e2: logger.warning(f"Local PlantUML failed: {str(e2)}") logger.info("Continuing without sequence diagram. PlantUML source is still available.") return source_file except Exception as e: logger.error(f"Error generating sequence diagram: {str(e)}") return "" def generate_rtp_visualization(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate visualization for RTP streams Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: rtp_visualization.png in output_dir) Returns: Path to the generated visualization file """ if not calls or not HAS_MATPLOTLIB: logger.warning("No SIP calls to visualize or Matplotlib not available") return "" # Find calls with RTP streams calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams} if not calls_with_rtp: logger.warning("No RTP streams to visualize") return "" # Use the first call with RTP streams call_id = next(iter(calls_with_rtp)) call = calls_with_rtp[call_id] # Get the first RTP stream stream_id = next(iter(call.rtp_streams)) stream = call.rtp_streams[stream_id] # Create figure with subplots fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), gridspec_kw={'height_ratios': [3, 1]}) # Extract packet data if not stream.packets: logger.warning("No packets in RTP stream") return "" # Create timestamps relative to the first packet first_timestamp = stream.packets[0]['timestamp'] timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets] # Create jitter data (use actual jitter if available, otherwise simulate) if hasattr(stream, 'jitter_values') and stream.jitter_values: jitter_values = stream.jitter_values else: # Simulate jitter values based on the average jitter base_jitter = stream.jitter jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))] # Create packet loss data packet_loss = np.zeros(len(timestamps)) if stream.lost_packets > 0: # Simulate packet loss positions loss_indices = np.random.choice( range(len(timestamps)), size=min(stream.lost_packets, len(timestamps)//10), # Limit for visualization clarity replace=False ) packet_loss[loss_indices] = 1 # Plot jitter ax1.plot(timestamps, jitter_values, color='blue', alpha=0.7, label='Jitter (ms)') # Highlight packet loss if any lost_packets = np.where(packet_loss == 1)[0] if len(lost_packets) > 0: ax1.scatter( [timestamps[i] for i in lost_packets], [jitter_values[i] for i in lost_packets], color='red', s=50, label='Lost Packets' ) # Add MOS score indicator mos_score = stream.mos_score if stream.mos_score is not None else stream.calculate_mos() quality_rating = stream.get_quality_rating() # Color code based on MOS if mos_score >= 4.0: mos_color = 'green' elif mos_score >= 3.5: mos_color = 'orange' else: mos_color = 'red' ax1.text(0.02, 0.95, f'MOS Score: {mos_score:.2f} ({quality_rating})', transform=ax1.transAxes, bbox=dict(facecolor=mos_color, alpha=0.5)) # Formatting for jitter plot ax1.set_xlabel('Time (seconds)') ax1.set_ylabel('Jitter (ms)') ax1.set_title(f'RTP Stream Analysis - SSRC: {stream.ssrc}') ax1.grid(True, alpha=0.3) ax1.legend() # Add packet flow visualization in the second subplot packet_y = np.ones(len(timestamps)) ax2.scatter(timestamps, packet_y, marker='|', s=10, color='blue', alpha=0.7) # Highlight lost packets if len(lost_packets) > 0: # Calculate positions for lost packets lost_x = [] for i in range(len(timestamps)-1): if i+1 in lost_packets or i in lost_packets: # Find midpoint between packets if i+1 < len(timestamps): midpoint = (timestamps[i] + timestamps[i+1]) / 2 lost_x.append(midpoint) if lost_x: ax2.scatter(lost_x, np.ones(len(lost_x)), marker='x', s=50, color='red', label='Lost Packets') # Formatting for packet flow ax2.set_yticks([]) ax2.set_xlabel('Time (seconds)') ax2.set_title('Packet Flow') ax2.grid(True, alpha=0.3, axis='x') # Add stream information info_text = ( f"Source: {stream.src_ip}:{stream.src_port}\n" f"Destination: {stream.dst_ip}:{stream.dst_port}\n" f"Codec: {stream.get_codec_name()}\n" f"Packets: {stream.packet_count}\n" f"Lost Packets: {stream.lost_packets} ({stream.get_packet_loss_percentage():.2f}%)" ) ax2.text(0.02, 0.5, info_text, transform=ax2.transAxes, bbox=dict(facecolor='white', alpha=0.8)) plt.tight_layout() # Save the figure if not output_file: output_file = os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png") plt.savefig(output_file, dpi=100) plt.close(fig) logger.info(f"RTP visualization generated: {output_file}") return output_file def generate_call_quality_dashboard(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate a dashboard with call quality metrics Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: call_quality.png in output_dir) Returns: Path to the generated dashboard file """ if not calls or not HAS_MATPLOTLIB: logger.warning("No SIP calls to visualize or Matplotlib not available") return "" # Find calls with RTP streams calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams} if not calls_with_rtp: logger.warning("No RTP streams to visualize") return "" # Create figure with subplots fig = plt.figure(figsize=(12, 8)) gs = fig.add_gridspec(2, 2) # Call summary subplot ax_summary = fig.add_subplot(gs[0, 0]) # MOS score gauge subplot ax_mos = fig.add_subplot(gs[0, 1]) # Jitter subplot ax_jitter = fig.add_subplot(gs[1, 0]) # Packet loss subplot ax_loss = fig.add_subplot(gs[1, 1]) # Process each call for i, (call_id, call) in enumerate(calls_with_rtp.items()): # Calculate average metrics across all streams avg_mos = call.get_average_mos() total_jitter = 0 total_packet_loss_pct = 0 total_packets = 0 total_lost_packets = 0 for stream in call.rtp_streams.values(): total_jitter += stream.jitter total_packets += stream.packet_count total_lost_packets += stream.lost_packets avg_jitter = total_jitter / len(call.rtp_streams) if call.rtp_streams else 0 if total_packets > 0: overall_packet_loss_pct = (total_lost_packets / (total_packets + total_lost_packets)) * 100 else: overall_packet_loss_pct = 0 # Call summary summary_text = ( f"Call ID: {call_id}\n" f"From: {call.from_uri}\n" f"To: {call.to_uri}\n" f"Duration: {call.get_duration():.2f} seconds\n" f"Status: {call.status}\n" f"RTP Streams: {len(call.rtp_streams)}\n" f"Total Packets: {total_packets}\n" f"Lost Packets: {total_lost_packets}\n" f"Packet Loss: {overall_packet_loss_pct:.2f}%\n" f"Average Jitter: {avg_jitter:.2f} ms\n" f"MOS Score: {avg_mos:.2f}" ) ax_summary.text(0.05, 0.95, summary_text, transform=ax_summary.transAxes, verticalalignment='top', fontsize=10) ax_summary.axis('off') ax_summary.set_title("Call Summary") # MOS score gauge self._draw_mos_gauge(ax_mos, avg_mos) # Get the first stream for detailed metrics if call.rtp_streams: stream_id = next(iter(call.rtp_streams)) stream = call.rtp_streams[stream_id] # Extract packet data if stream.packets: # Create timestamps relative to the first packet first_timestamp = stream.packets[0]['timestamp'] timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets] # Create jitter data (use actual jitter if available, otherwise simulate) if hasattr(stream, 'jitter_values') and stream.jitter_values: jitter_values = stream.jitter_values else: # Simulate jitter values based on the average jitter base_jitter = stream.jitter jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))] # Plot jitter ax_jitter.plot(timestamps, jitter_values, color='blue', alpha=0.7) ax_jitter.set_xlabel('Time (seconds)') ax_jitter.set_ylabel('Jitter (ms)') ax_jitter.set_title('Jitter Over Time') ax_jitter.grid(True, alpha=0.3) # Add threshold lines ax_jitter.axhline(y=20, color='green', linestyle='--', alpha=0.5) ax_jitter.axhline(y=50, color='orange', linestyle='--', alpha=0.5) ax_jitter.axhline(y=100, color='red', linestyle='--', alpha=0.5) # Add threshold labels ax_jitter.text(timestamps[-1], 20, 'Excellent (<20ms)', verticalalignment='bottom', horizontalalignment='right', color='green') ax_jitter.text(timestamps[-1], 50, 'Good (<50ms)', verticalalignment='bottom', horizontalalignment='right', color='orange') ax_jitter.text(timestamps[-1], 100, 'Fair (<100ms)', verticalalignment='bottom', horizontalalignment='right', color='red') # Packet loss pie chart labels = ['Received', 'Lost'] sizes = [total_packets, total_lost_packets] colors = ['#66b3ff', '#ff6666'] if sum(sizes) > 0: # Avoid division by zero ax_loss.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) ax_loss.axis('equal') ax_loss.set_title('Packet Loss') else: ax_loss.text(0.5, 0.5, "No packet data available", horizontalalignment='center', verticalalignment='center') ax_loss.axis('off') plt.tight_layout() # Save the figure if not output_file: output_file = os.path.join(self.output_dir, "call_quality_dashboard.png") plt.savefig(output_file, dpi=100) plt.close(fig) logger.info(f"Call quality dashboard generated: {output_file}") return output_file def _draw_mos_gauge(self, ax, mos_score): """Draw a gauge chart for MOS score Args: ax: Matplotlib axis to draw on mos_score: MOS score (1.0-4.5) """ # Normalize MOS score to 0-1 range norm_mos = (mos_score - 1.0) / 3.5 # MOS range is 1.0-4.5 norm_mos = max(0, min(1, norm_mos)) # Clamp to 0-1 # Create gauge gauge_angles = np.linspace(0, 180, 100) * np.pi / 180 gauge_radius = 0.8 gauge_width = 0.2 # Draw gauge background for i, angle in enumerate(gauge_angles[:-1]): next_angle = gauge_angles[i+1] # Determine color based on position pos = i / len(gauge_angles[:-1]) if pos < 0.3: # Bad to Poor (red) color = 'red' elif pos < 0.6: # Poor to Fair (orange) color = 'orange' else: # Fair to Excellent (green) color = 'green' # Draw gauge segment ax.add_patch(plt.matplotlib.patches.Wedge( (0, 0), gauge_radius, angle * 180 / np.pi, next_angle * 180 / np.pi, width=gauge_width, color=color, alpha=0.7 )) # Draw needle needle_angle = norm_mos * 180 * np.pi / 180 ax.plot([0, gauge_radius * np.cos(needle_angle)], [0, gauge_radius * np.sin(needle_angle)], color='black', linewidth=2) # Add center circle ax.add_patch(plt.matplotlib.patches.Circle((0, 0), 0.05, color='black')) # Add MOS value text quality_rating = "Unknown" if mos_score >= 4.3: quality_rating = "Excellent" elif mos_score >= 4.0: quality_rating = "Good" elif mos_score >= 3.6: quality_rating = "Fair" elif mos_score >= 3.1: quality_rating = "Poor" else: quality_rating = "Bad" ax.text(0, -0.2, f"MOS: {mos_score:.2f}", horizontalalignment='center', fontsize=12, fontweight='bold') ax.text(0, -0.3, f"({quality_rating})", horizontalalignment='center', fontsize=10) # Add scale labels ax.text(-gauge_radius-0.1, 0, "1.0", horizontalalignment='right', verticalalignment='center', fontsize=8) ax.text(0, gauge_radius+0.1, "3.0", horizontalalignment='center', verticalalignment='bottom', fontsize=8) ax.text(gauge_radius+0.1, 0, "4.5", horizontalalignment='left', verticalalignment='center', fontsize=8) # Set axis limits and turn off axis ax.set_xlim(-1, 1) ax.set_ylim(-0.5, 1) ax.axis('off') ax.set_title("Call Quality (MOS)") def generate_html_report(self, calls: Dict[str, Any], output_file: Optional[str] = None) -> str: """Generate a comprehensive HTML report with all visualizations Args: calls: Dictionary of SIP calls from the analyzer output_file: Output file path (default: sip_rtp_report.html in output_dir) Returns: Path to the generated HTML report """ if not calls: logger.warning("No SIP calls to visualize") return "" # Generate all visualizations visualization_files = {} for call_id, call in calls.items(): # Only process calls with RTP streams if not call.rtp_streams: continue # Generate sequence diagram seq_diagram = self.generate_sip_sequence_diagram( {call_id: call}, os.path.join(self.output_dir, f"sip_sequence_{call_id}.png") ) if seq_diagram: visualization_files[f"sequence_{call_id}"] = seq_diagram # Generate RTP visualization for each stream for stream_id in call.rtp_streams: rtp_viz = self.generate_rtp_visualization( {call_id: call}, os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png") ) if rtp_viz: visualization_files[f"rtp_{call_id}_{stream_id}"] = rtp_viz # Generate call quality dashboard dashboard = self.generate_call_quality_dashboard( calls, os.path.join(self.output_dir, "call_quality_dashboard.png") ) if dashboard: visualization_files["dashboard"] = dashboard # Create HTML report if not output_file: output_file = os.path.join(self.output_dir, "sip_rtp_report.html") with open(output_file, 'w', encoding='utf-8') as f: f.write(self._generate_html_content(calls, visualization_files)) logger.info(f"HTML report generated: {output_file}") return output_file def _generate_html_content(self, calls: Dict[str, Any], visualization_files: Dict[str, str]) -> str: """Generate HTML content for the report Args: calls: Dictionary of SIP calls from the analyzer visualization_files: Dictionary of visualization file paths Returns: HTML content as a string """ html = [] html.append("") html.append("") html.append("
") html.append(" ") html.append(" ") html.append("Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
") html.append("