Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| SIP and RTP PCAP Analyzer with Visualization - A tool to analyze PCAP files and extract SIP messages | |
| and RTP streams for VoIP call analysis, with graphical visualization capabilities. | |
| This script can parse PCAP/PCAPNG files and identify SIP messages exchanged during VoIP call | |
| establishments, including INVITE, ACK, BYE, and other SIP messages. It also analyzes RTP streams | |
| to provide metrics on call quality including jitter, packet loss, and MOS score. Additionally, | |
| it generates graphical visualizations of SIP signaling and RTP media flows. | |
| """ | |
| import sys | |
| import os | |
| import argparse | |
| import re | |
| import logging | |
| import math | |
| import subprocess | |
| from datetime import datetime | |
| from typing import Dict, List, Tuple, Optional, Any, Set, DefaultDict | |
| from collections import defaultdict | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger('sip_rtp_pcap_analyzer') | |
| # Try to import different packet analysis libraries | |
| # We'll use the first one available in this priority order: scapy, pyshark, dpkt | |
| LIBRARY = None | |
| try: | |
| import scapy.all as scapy | |
| from scapy.layers.inet import IP, UDP, TCP | |
| try: | |
| from scapy.layers.rtp import RTP | |
| HAS_RTP_LAYER = True | |
| except ImportError: | |
| HAS_RTP_LAYER = False | |
| LIBRARY = "scapy" | |
| logger.info("Using scapy for packet analysis") | |
| except ImportError: | |
| try: | |
| import pyshark | |
| LIBRARY = "pyshark" | |
| logger.info("Using pyshark for packet analysis") | |
| except ImportError: | |
| try: | |
| import dpkt | |
| import socket | |
| from dpkt.ethernet import Ethernet | |
| LIBRARY = "dpkt" | |
| logger.info("Using dpkt for packet analysis") | |
| except ImportError: | |
| logger.error("No packet analysis library found. Please install scapy, pyshark, or dpkt.") | |
| sys.exit(1) | |
| # Try to import visualization libraries | |
| try: | |
| import plantuml | |
| HAS_PLANTUML = True | |
| except ImportError: | |
| HAS_PLANTUML = False | |
| logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.") | |
| try: | |
| import matplotlib | |
| matplotlib.use('Agg') # Use non-interactive backend | |
| import matplotlib.pyplot as plt | |
| import matplotlib.dates as mdates | |
| import numpy as np | |
| HAS_MATPLOTLIB = True | |
| except ImportError: | |
| HAS_MATPLOTLIB = False | |
| logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.") | |
| # SIP message types we're interested in | |
| SIP_METHODS = [ | |
| "INVITE", "ACK", "BYE", "CANCEL", "REGISTER", "OPTIONS", | |
| "PRACK", "SUBSCRIBE", "NOTIFY", "PUBLISH", "INFO", "REFER", "MESSAGE", "UPDATE" | |
| ] | |
| # SIP response codes and their meanings | |
| SIP_RESPONSES = { | |
| "1": "Informational", | |
| "2": "Success", | |
| "3": "Redirection", | |
| "4": "Client Error", | |
| "5": "Server Error", | |
| "6": "Global Failure" | |
| } | |
| # Common SIP ports | |
| SIP_PORTS = {5060, 5061} | |
| # Common RTP port range | |
| RTP_PORT_RANGE = (10000, 65535) # Typical RTP port range | |
| # RTP payload type to codec mapping | |
| RTP_CODEC_MAP = { | |
| 0: "PCMU/G.711u", | |
| 3: "GSM", | |
| 4: "G723", | |
| 8: "PCMA/G.711a", | |
| 9: "G722", | |
| 10: "L16 (stereo)", | |
| 11: "L16 (mono)", | |
| 18: "G729", | |
| 26: "JPEG", | |
| 31: "H261", | |
| 32: "MPV", | |
| 33: "MP2T", | |
| 34: "H263", | |
| 96: "Dynamic", | |
| 97: "Dynamic", | |
| 98: "Dynamic", | |
| 99: "Dynamic", | |
| 100: "Dynamic", | |
| 101: "Dynamic", | |
| 110: "OPUS", | |
| 111: "OPUS" | |
| } | |
| class RtpStream: | |
| """Class to represent an RTP stream with its packets and metrics""" | |
| def __init__(self, ssrc: int, src_ip: str, src_port: int, dst_ip: str, dst_port: int): | |
| self.ssrc = ssrc | |
| self.src_ip = src_ip | |
| self.src_port = src_port | |
| self.dst_ip = dst_ip | |
| self.dst_port = dst_port | |
| self.packets = [] | |
| self.start_time = None | |
| self.end_time = None | |
| self.packet_count = 0 | |
| self.expected_packets = 0 | |
| self.lost_packets = 0 | |
| self.jitter = 0.0 | |
| self.jitter_values = [] # Store jitter values for each packet for visualization | |
| self.last_seq = None | |
| self.last_timestamp = None | |
| self.last_arrival = None | |
| self.codec = None | |
| self.payload_type = None | |
| self.mos_score = None | |
| self.rtt = None # Round Trip Time in ms | |
| def add_packet(self, timestamp: datetime, seq: int, rtp_timestamp: int, | |
| payload_type: int, payload_size: int): | |
| """Add an RTP packet to this stream and update metrics""" | |
| if not self.start_time or timestamp < self.start_time: | |
| self.start_time = timestamp | |
| if not self.end_time or timestamp > self.end_time: | |
| self.end_time = timestamp | |
| # Set payload type if not already set | |
| if self.payload_type is None: | |
| self.payload_type = payload_type | |
| # Calculate packet loss | |
| if self.last_seq is not None: | |
| expected_seq = (self.last_seq + 1) % 65536 # RTP sequence numbers are 16-bit | |
| if seq != expected_seq: | |
| # Account for sequence number wraparound | |
| if seq > expected_seq: | |
| self.lost_packets += seq - expected_seq | |
| else: | |
| self.lost_packets += (65536 - expected_seq) + seq | |
| # Calculate jitter (RFC 3550 algorithm) | |
| if self.last_timestamp is not None and self.last_arrival is not None: | |
| # Convert timestamps to milliseconds for easier calculation | |
| arrival_ms = timestamp.timestamp() * 1000 | |
| last_arrival_ms = self.last_arrival.timestamp() * 1000 | |
| # Calculate transit time difference | |
| transit = arrival_ms - rtp_timestamp | |
| last_transit = last_arrival_ms - self.last_timestamp | |
| # Calculate difference in transit times | |
| d = abs(transit - last_transit) | |
| # Update jitter using RFC 3550 formula: J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16 | |
| self.jitter = self.jitter + (d - self.jitter) / 16 | |
| # Store jitter value for visualization | |
| self.jitter_values.append(self.jitter) | |
| else: | |
| self.jitter_values.append(0.0) | |
| # Store current values for next calculation | |
| self.last_seq = seq | |
| self.last_timestamp = rtp_timestamp | |
| self.last_arrival = timestamp | |
| # Add packet to list and increment counter | |
| self.packets.append({ | |
| "timestamp": timestamp, | |
| "seq": seq, | |
| "rtp_timestamp": rtp_timestamp, | |
| "payload_type": payload_type, | |
| "payload_size": payload_size | |
| }) | |
| self.packet_count += 1 | |
| self.expected_packets = self.packet_count + self.lost_packets | |
| def get_packet_loss_percentage(self) -> float: | |
| """Get packet loss as a percentage""" | |
| if self.expected_packets == 0: | |
| return 0.0 | |
| return (self.lost_packets / self.expected_packets) * 100 | |
| def calculate_mos(self) -> float: | |
| """Calculate Mean Opinion Score (MOS) based on packet loss and jitter""" | |
| # Default RTT if not available | |
| rtt = self.rtt if self.rtt is not None else 50 # Default 50ms | |
| # Calculate effective latency | |
| effective_latency = rtt + (self.jitter * 2) + 10 # Add 10ms for protocol latencies | |
| # Calculate R factor based on ITU-T G.107 E-model | |
| if effective_latency < 160: | |
| r = 93.2 - (effective_latency / 40) | |
| else: | |
| r = 93.2 - (effective_latency - 120) / 10 | |
| # Deduct for packet loss (2.5 R values per percentage) | |
| packet_loss = self.get_packet_loss_percentage() | |
| r = r - (packet_loss * 2.5) | |
| # Convert R to MOS using ITU-T G.107 formula | |
| if r < 0: | |
| mos = 1.0 | |
| elif r > 100: | |
| mos = 4.5 | |
| else: | |
| mos = 1 + (0.035 * r) + (r * (r - 60) * (100 - r) * 7 * 10**-6) | |
| # Clamp MOS between 1.0 and 4.5 | |
| mos = max(1.0, min(4.5, mos)) | |
| self.mos_score = mos | |
| return mos | |
| def get_duration(self) -> float: | |
| """Get stream duration in seconds""" | |
| if self.start_time and self.end_time: | |
| return (self.end_time - self.start_time).total_seconds() | |
| return 0 | |
| def get_codec_name(self) -> str: | |
| """Get codec name based on payload type""" | |
| if self.codec: | |
| return self.codec | |
| if self.payload_type in RTP_CODEC_MAP: | |
| return RTP_CODEC_MAP[self.payload_type] | |
| return f"Unknown ({self.payload_type})" | |
| def get_quality_rating(self) -> str: | |
| """Get a human-readable quality rating based on MOS score""" | |
| if self.mos_score is None: | |
| self.calculate_mos() | |
| if self.mos_score >= 4.3: | |
| return "Excellent" | |
| elif self.mos_score >= 4.0: | |
| return "Good" | |
| elif self.mos_score >= 3.6: | |
| return "Fair" | |
| elif self.mos_score >= 3.1: | |
| return "Poor" | |
| else: | |
| return "Bad" | |
| def __str__(self) -> str: | |
| """String representation of the RTP stream""" | |
| if self.mos_score is None: | |
| self.calculate_mos() | |
| result = f"RTP Stream: SSRC={self.ssrc}\n" | |
| result += f"Source: {self.src_ip}:{self.src_port}\n" | |
| result += f"Destination: {self.dst_ip}:{self.dst_port}\n" | |
| result += f"Codec: {self.get_codec_name()}\n" | |
| result += f"Start Time: {self.start_time}\n" | |
| result += f"End Time: {self.end_time}\n" | |
| result += f"Duration: {self.get_duration():.2f} seconds\n" | |
| result += f"Packets: {self.packet_count}\n" | |
| result += f"Lost Packets: {self.lost_packets}\n" | |
| result += f"Packet Loss: {self.get_packet_loss_percentage():.2f}%\n" | |
| result += f"Jitter: {self.jitter:.2f} ms\n" | |
| if self.rtt is not None: | |
| result += f"Round Trip Time: {self.rtt:.2f} ms\n" | |
| result += f"MOS Score: {self.mos_score:.2f} ({self.get_quality_rating()})\n" | |
| return result | |
| class SipCall: | |
| """Class to represent a SIP call with its messages and RTP streams""" | |
| def __init__(self, call_id: str): | |
| self.call_id = call_id | |
| self.messages = [] | |
| self.start_time = None | |
| self.end_time = None | |
| self.from_uri = None | |
| self.to_uri = None | |
| self.status = "Unknown" # Can be "Setup", "Established", "Terminated", "Failed" | |
| self.call_direction = None # Can be "Outgoing", "Incoming", or None | |
| self.media_info = {} # Store media information from SDP | |
| self.rtp_streams = {} # Dictionary of RTP streams indexed by SSRC | |
| def add_message(self, timestamp: datetime, method: str, source_ip: str, | |
| dest_ip: str, headers: Dict[str, str], content: str = None): | |
| """Add a SIP message to this call""" | |
| if not self.start_time or timestamp < self.start_time: | |
| self.start_time = timestamp | |
| if not self.end_time or timestamp > self.end_time: | |
| self.end_time = timestamp | |
| # Extract From and To URIs from the first message | |
| if not self.from_uri and "From" in headers: | |
| from_match = re.search(r'<(sip:[^>]+)>', headers["From"]) | |
| if from_match: | |
| self.from_uri = from_match.group(1) | |
| if not self.to_uri and "To" in headers: | |
| to_match = re.search(r'<(sip:[^>]+)>', headers["To"]) | |
| if to_match: | |
| self.to_uri = to_match.group(1) | |
| # Update call status based on method | |
| if method == "INVITE": | |
| self.status = "Setup" | |
| elif method == "ACK" and self.status == "Setup": | |
| self.status = "Established" | |
| elif method == "BYE": | |
| self.status = "Terminated" | |
| elif method.startswith("SIP/2.0 4") or method.startswith("SIP/2.0 5") or method.startswith("SIP/2.0 6"): | |
| self.status = "Failed" | |
| # Extract media information from SDP if present | |
| if content and "m=" in content: | |
| self._parse_sdp(content) | |
| # Add the message to the list | |
| self.messages.append({ | |
| "timestamp": timestamp, | |
| "method": method, | |
| "source_ip": source_ip, | |
| "dest_ip": dest_ip, | |
| "headers": headers, | |
| "content": content | |
| }) | |
| def _parse_sdp(self, sdp_content: str): | |
| """Parse SDP content to extract media information""" | |
| # Extract media type and port | |
| media_match = re.search(r'm=(\w+)\s+(\d+)', sdp_content) | |
| if media_match: | |
| media_type = media_match.group(1) | |
| media_port = int(media_match.group(2)) | |
| self.media_info['type'] = media_type | |
| self.media_info['port'] = media_port | |
| # Extract codec information | |
| codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', sdp_content) | |
| if codec_match: | |
| codec_id = codec_match.group(1) | |
| codec_name = codec_match.group(2) | |
| codec_rate = codec_match.group(3) | |
| self.media_info['codec'] = { | |
| 'id': codec_id, | |
| 'name': codec_name, | |
| 'rate': codec_rate | |
| } | |
| # Extract connection information (IP) | |
| conn_match = re.search(r'c=IN\s+IP[46]\s+([^\s]+)', sdp_content) | |
| if conn_match: | |
| self.media_info['connection_address'] = conn_match.group(1) | |
| def add_rtp_stream(self, rtp_stream: RtpStream): | |
| """Add an RTP stream to this call""" | |
| self.rtp_streams[rtp_stream.ssrc] = rtp_stream | |
| def get_duration(self) -> float: | |
| """Get call duration in seconds""" | |
| if self.start_time and self.end_time: | |
| return (self.end_time - self.start_time).total_seconds() | |
| return 0 | |
| def get_average_mos(self) -> float: | |
| """Get average MOS score across all RTP streams""" | |
| if not self.rtp_streams: | |
| return 0.0 | |
| total_mos = 0.0 | |
| for stream in self.rtp_streams.values(): | |
| if stream.mos_score is None: | |
| stream.calculate_mos() | |
| total_mos += stream.mos_score | |
| return total_mos / len(self.rtp_streams) | |
| def get_call_quality_summary(self) -> Dict[str, Any]: | |
| """Get a summary of call quality metrics""" | |
| if not self.rtp_streams: | |
| return {"status": "No RTP streams found"} | |
| avg_mos = self.get_average_mos() | |
| avg_jitter = sum(s.jitter for s in self.rtp_streams.values()) / len(self.rtp_streams) | |
| avg_packet_loss = sum(s.get_packet_loss_percentage() for s in self.rtp_streams.values()) / len(self.rtp_streams) | |
| quality_rating = "Unknown" | |
| if avg_mos >= 4.3: | |
| quality_rating = "Excellent" | |
| elif avg_mos >= 4.0: | |
| quality_rating = "Good" | |
| elif avg_mos >= 3.6: | |
| quality_rating = "Fair" | |
| elif avg_mos >= 3.1: | |
| quality_rating = "Poor" | |
| else: | |
| quality_rating = "Bad" | |
| return { | |
| "mos": avg_mos, | |
| "jitter": avg_jitter, | |
| "packet_loss": avg_packet_loss, | |
| "rating": quality_rating, | |
| "stream_count": len(self.rtp_streams) | |
| } | |
| def __str__(self) -> str: | |
| """String representation of the call""" | |
| result = f"Call ID: {self.call_id}\n" | |
| result += f"Status: {self.status}\n" | |
| result += f"From: {self.from_uri}\n" | |
| result += f"To: {self.to_uri}\n" | |
| result += f"Start Time: {self.start_time}\n" | |
| result += f"End Time: {self.end_time}\n" | |
| result += f"Duration: {self.get_duration():.2f} seconds\n" | |
| # Add media information if available | |
| if self.media_info: | |
| result += "Media Information:\n" | |
| for key, value in self.media_info.items(): | |
| if isinstance(value, dict): | |
| result += f" {key}:\n" | |
| for k, v in value.items(): | |
| result += f" {k}: {v}\n" | |
| else: | |
| result += f" {key}: {value}\n" | |
| # Add call quality summary if RTP streams are available | |
| if self.rtp_streams: | |
| quality = self.get_call_quality_summary() | |
| result += "Call Quality:\n" | |
| result += f" MOS Score: {quality['mos']:.2f} ({quality['rating']})\n" | |
| result += f" Average Jitter: {quality['jitter']:.2f} ms\n" | |
| result += f" Average Packet Loss: {quality['packet_loss']:.2f}%\n" | |
| result += f" RTP Streams: {quality['stream_count']}\n" | |
| result += f"Message Count: {len(self.messages)}\n" | |
| result += "Message Flow:\n" | |
| for i, msg in enumerate(self.messages, 1): | |
| result += f" {i}. [{msg['timestamp']}] {msg['source_ip']} -> {msg['dest_ip']}: {msg['method']}\n" | |
| # Add RTP stream details | |
| if self.rtp_streams: | |
| result += "RTP Streams:\n" | |
| for ssrc, stream in self.rtp_streams.items(): | |
| result += f" Stream SSRC: {ssrc}\n" | |
| result += f" Source: {stream.src_ip}:{stream.src_port}\n" | |
| result += f" Destination: {stream.dst_ip}:{stream.dst_port}\n" | |
| result += f" Codec: {stream.get_codec_name()}\n" | |
| result += f" Packets: {stream.packet_count}\n" | |
| result += f" Lost Packets: {stream.lost_packets}\n" | |
| result += f" Packet Loss: {stream.get_packet_loss_percentage():.2f}%\n" | |
| result += f" Jitter: {stream.jitter:.2f} ms\n" | |
| result += f" MOS Score: {stream.mos_score:.2f} ({stream.get_quality_rating()})\n" | |
| return result | |
| class SipRtpVisualizer: | |
| """Class for generating visualizations of SIP and RTP flows""" | |
| def __init__(self, output_dir: Optional[str] = None, | |
| plantuml_server: str = "http://www.plantuml.com/plantuml"): | |
| """Initialize the visualizer | |
| Args: | |
| output_dir: Directory to save visualization files (default: current directory) | |
| plantuml_server: PlantUML server URL for rendering diagrams | |
| """ | |
| self.output_dir = output_dir or os.getcwd() | |
| self.plantuml_server = plantuml_server | |
| # Create output directory if it doesn't exist | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| # Initialize PlantUML | |
| if HAS_PLANTUML: | |
| try: | |
| self.plantuml = plantuml.PlantUML(url=self.plantuml_server) | |
| except Exception as e: | |
| logger.warning(f"Error initializing PlantUML: {e}") | |
| self.plantuml = None | |
| else: | |
| self.plantuml = None | |
| def generate_sip_sequence_diagram(self, calls: Dict[str, Any], | |
| output_file: Optional[str] = None) -> str: | |
| """Generate a sequence diagram for SIP calls | |
| Args: | |
| calls: Dictionary of SIP calls from the analyzer | |
| output_file: Output file path (default: sip_sequence.png in output_dir) | |
| Returns: | |
| Path to the generated diagram file | |
| """ | |
| if not calls: | |
| logger.warning("No SIP calls to visualize") | |
| return "" | |
| if not HAS_PLANTUML or not self.plantuml: | |
| logger.error("PlantUML not available. Cannot generate sequence diagram.") | |
| return "" | |
| # Use the first call if multiple calls exist | |
| # In future, could support multiple calls in separate diagrams | |
| call_id = next(iter(calls)) | |
| call = calls[call_id] | |
| # Start building PlantUML diagram | |
| diagram = ["@startuml"] | |
| diagram.append("skinparam sequenceMessageAlign center") | |
| diagram.append("skinparam sequenceArrowThickness 2") | |
| diagram.append("skinparam roundcorner 5") | |
| diagram.append("skinparam maxmessagesize 200") | |
| diagram.append("") | |
| # Extract unique participants | |
| participants = set() | |
| for msg in call.messages: | |
| participants.add(msg['source_ip']) | |
| participants.add(msg['dest_ip']) | |
| # Define participants | |
| for i, participant in enumerate(participants): | |
| # Try to determine if this is caller or callee | |
| if call.from_uri and participant in call.from_uri: | |
| label = f"Caller\\n{participant}" | |
| elif call.to_uri and participant in call.to_uri: | |
| label = f"Callee\\n{participant}" | |
| else: | |
| label = f"Endpoint {i+1}\\n{participant}" | |
| diagram.append(f'participant "{label}" as P{i}') | |
| diagram.append("") | |
| # Map IP addresses to participant IDs | |
| ip_to_participant = {ip: f"P{i}" for i, ip in enumerate(participants)} | |
| # Add messages | |
| for msg in call.messages: | |
| src = ip_to_participant[msg['source_ip']] | |
| dst = ip_to_participant[msg['dest_ip']] | |
| # Format the message text | |
| method = msg['method'] | |
| # For SIP responses, extract the status code | |
| if method.startswith("SIP/2.0"): | |
| parts = method.split(" ", 2) | |
| if len(parts) >= 2: | |
| status_code = parts[1] | |
| if status_code.startswith("1"): | |
| arrow_type = "-->" # Dotted arrow for provisional responses | |
| else: | |
| arrow_type = "->" | |
| else: | |
| arrow_type = "->" | |
| else: | |
| arrow_type = "->" | |
| # Add the message to the diagram | |
| diagram.append(f"{src} {arrow_type} {dst}: {method}") | |
| # Add SDP information as notes if present | |
| if msg.get('content') and 'm=audio' in msg.get('content', ''): | |
| sdp_note = [] | |
| # Extract media type and port | |
| media_match = re.search(r'm=(\w+)\s+(\d+)', msg['content']) | |
| if media_match: | |
| media_type = media_match.group(1) | |
| media_port = media_match.group(2) | |
| sdp_note.append(f"SDP: {media_type} {media_port}") | |
| # Extract codec information | |
| codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', msg['content']) | |
| if codec_match: | |
| codec_id = codec_match.group(1) | |
| codec_name = codec_match.group(2) | |
| sdp_note.append(f"RTP/AVP {codec_id} ({codec_name})") | |
| if sdp_note: | |
| note_position = "right" if src < dst else "left" | |
| diagram.append(f"note {note_position}: {' '.join(sdp_note)}") | |
| # Add RTP stream if present | |
| if call.rtp_streams: | |
| # Calculate total RTP duration | |
| rtp_duration = 0 | |
| for stream in call.rtp_streams.values(): | |
| duration = stream.get_duration() | |
| if duration > rtp_duration: | |
| rtp_duration = duration | |
| if rtp_duration > 0: | |
| diagram.append("") | |
| diagram.append(f"... RTP Media Stream ({rtp_duration:.2f} seconds) ...") | |
| diagram.append("") | |
| diagram.append("@enduml") | |
| # Join the diagram lines | |
| diagram_str = "\n".join(diagram) | |
| # Generate the diagram | |
| if not output_file: | |
| output_file = os.path.join(self.output_dir, f"sip_sequence_{call_id}.png") | |
| try: | |
| # Save the PlantUML source first | |
| source_file = os.path.splitext(output_file)[0] + ".puml" | |
| with open(source_file, 'w') as f: | |
| f.write(diagram_str) | |
| logger.info(f"PlantUML source saved: {source_file}") | |
| # Try to generate the diagram | |
| try: | |
| #self.plantuml.processes_file(diagram_str, outfile=output_file) | |
| self.plantuml.processes_file(source_file) | |
| logger.info(f"Sequence diagram generated: {output_file}") | |
| return output_file | |
| except Exception as e: | |
| # If PlantUML server fails, try using local command-line tool if available | |
| logger.warning(f"PlantUML server error: {str(e)}") | |
| try: | |
| cmd = f"plantuml {source_file}" | |
| subprocess.run(cmd, shell=True, check=True) | |
| logger.info(f"Sequence diagram generated using local PlantUML: {output_file}") | |
| return output_file | |
| except Exception as e2: | |
| logger.warning(f"Local PlantUML failed: {str(e2)}") | |
| logger.info("Continuing without sequence diagram. PlantUML source is still available.") | |
| return source_file | |
| except Exception as e: | |
| logger.error(f"Error generating sequence diagram: {str(e)}") | |
| return "" | |
| def generate_rtp_visualization(self, calls: Dict[str, Any], | |
| output_file: Optional[str] = None) -> str: | |
| """Generate visualization for RTP streams | |
| Args: | |
| calls: Dictionary of SIP calls from the analyzer | |
| output_file: Output file path (default: rtp_visualization.png in output_dir) | |
| Returns: | |
| Path to the generated visualization file | |
| """ | |
| if not calls or not HAS_MATPLOTLIB: | |
| logger.warning("No SIP calls to visualize or Matplotlib not available") | |
| return "" | |
| # Find calls with RTP streams | |
| calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams} | |
| if not calls_with_rtp: | |
| logger.warning("No RTP streams to visualize") | |
| return "" | |
| # Use the first call with RTP streams | |
| call_id = next(iter(calls_with_rtp)) | |
| call = calls_with_rtp[call_id] | |
| # Get the first RTP stream | |
| stream_id = next(iter(call.rtp_streams)) | |
| stream = call.rtp_streams[stream_id] | |
| # Create figure with subplots | |
| fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), gridspec_kw={'height_ratios': [3, 1]}) | |
| # Extract packet data | |
| if not stream.packets: | |
| logger.warning("No packets in RTP stream") | |
| return "" | |
| # Create timestamps relative to the first packet | |
| first_timestamp = stream.packets[0]['timestamp'] | |
| timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets] | |
| # Create jitter data (use actual jitter if available, otherwise simulate) | |
| if hasattr(stream, 'jitter_values') and stream.jitter_values: | |
| jitter_values = stream.jitter_values | |
| else: | |
| # Simulate jitter values based on the average jitter | |
| base_jitter = stream.jitter | |
| jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))] | |
| # Create packet loss data | |
| packet_loss = np.zeros(len(timestamps)) | |
| if stream.lost_packets > 0: | |
| # Simulate packet loss positions | |
| loss_indices = np.random.choice( | |
| range(len(timestamps)), | |
| size=min(stream.lost_packets, len(timestamps)//10), # Limit for visualization clarity | |
| replace=False | |
| ) | |
| packet_loss[loss_indices] = 1 | |
| # Plot jitter | |
| ax1.plot(timestamps, jitter_values, color='blue', alpha=0.7, label='Jitter (ms)') | |
| # Highlight packet loss if any | |
| lost_packets = np.where(packet_loss == 1)[0] | |
| if len(lost_packets) > 0: | |
| ax1.scatter( | |
| [timestamps[i] for i in lost_packets], | |
| [jitter_values[i] for i in lost_packets], | |
| color='red', s=50, label='Lost Packets' | |
| ) | |
| # Add MOS score indicator | |
| mos_score = stream.mos_score if stream.mos_score is not None else stream.calculate_mos() | |
| quality_rating = stream.get_quality_rating() | |
| # Color code based on MOS | |
| if mos_score >= 4.0: | |
| mos_color = 'green' | |
| elif mos_score >= 3.5: | |
| mos_color = 'orange' | |
| else: | |
| mos_color = 'red' | |
| ax1.text(0.02, 0.95, f'MOS Score: {mos_score:.2f} ({quality_rating})', | |
| transform=ax1.transAxes, | |
| bbox=dict(facecolor=mos_color, alpha=0.5)) | |
| # Formatting for jitter plot | |
| ax1.set_xlabel('Time (seconds)') | |
| ax1.set_ylabel('Jitter (ms)') | |
| ax1.set_title(f'RTP Stream Analysis - SSRC: {stream.ssrc}') | |
| ax1.grid(True, alpha=0.3) | |
| ax1.legend() | |
| # Add packet flow visualization in the second subplot | |
| packet_y = np.ones(len(timestamps)) | |
| ax2.scatter(timestamps, packet_y, marker='|', s=10, color='blue', alpha=0.7) | |
| # Highlight lost packets | |
| if len(lost_packets) > 0: | |
| # Calculate positions for lost packets | |
| lost_x = [] | |
| for i in range(len(timestamps)-1): | |
| if i+1 in lost_packets or i in lost_packets: | |
| # Find midpoint between packets | |
| if i+1 < len(timestamps): | |
| midpoint = (timestamps[i] + timestamps[i+1]) / 2 | |
| lost_x.append(midpoint) | |
| if lost_x: | |
| ax2.scatter(lost_x, np.ones(len(lost_x)), marker='x', s=50, color='red', | |
| label='Lost Packets') | |
| # Formatting for packet flow | |
| ax2.set_yticks([]) | |
| ax2.set_xlabel('Time (seconds)') | |
| ax2.set_title('Packet Flow') | |
| ax2.grid(True, alpha=0.3, axis='x') | |
| # Add stream information | |
| info_text = ( | |
| f"Source: {stream.src_ip}:{stream.src_port}\n" | |
| f"Destination: {stream.dst_ip}:{stream.dst_port}\n" | |
| f"Codec: {stream.get_codec_name()}\n" | |
| f"Packets: {stream.packet_count}\n" | |
| f"Lost Packets: {stream.lost_packets} ({stream.get_packet_loss_percentage():.2f}%)" | |
| ) | |
| ax2.text(0.02, 0.5, info_text, transform=ax2.transAxes, | |
| bbox=dict(facecolor='white', alpha=0.8)) | |
| plt.tight_layout() | |
| # Save the figure | |
| if not output_file: | |
| output_file = os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png") | |
| plt.savefig(output_file, dpi=100) | |
| plt.close(fig) | |
| logger.info(f"RTP visualization generated: {output_file}") | |
| return output_file | |
| def generate_call_quality_dashboard(self, calls: Dict[str, Any], | |
| output_file: Optional[str] = None) -> str: | |
| """Generate a dashboard with call quality metrics | |
| Args: | |
| calls: Dictionary of SIP calls from the analyzer | |
| output_file: Output file path (default: call_quality.png in output_dir) | |
| Returns: | |
| Path to the generated dashboard file | |
| """ | |
| if not calls or not HAS_MATPLOTLIB: | |
| logger.warning("No SIP calls to visualize or Matplotlib not available") | |
| return "" | |
| # Find calls with RTP streams | |
| calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams} | |
| if not calls_with_rtp: | |
| logger.warning("No RTP streams to visualize") | |
| return "" | |
| # Create figure with subplots | |
| fig = plt.figure(figsize=(12, 8)) | |
| gs = fig.add_gridspec(2, 2) | |
| # Call summary subplot | |
| ax_summary = fig.add_subplot(gs[0, 0]) | |
| # MOS score gauge subplot | |
| ax_mos = fig.add_subplot(gs[0, 1]) | |
| # Jitter subplot | |
| ax_jitter = fig.add_subplot(gs[1, 0]) | |
| # Packet loss subplot | |
| ax_loss = fig.add_subplot(gs[1, 1]) | |
| # Process each call | |
| for i, (call_id, call) in enumerate(calls_with_rtp.items()): | |
| # Calculate average metrics across all streams | |
| avg_mos = call.get_average_mos() | |
| total_jitter = 0 | |
| total_packet_loss_pct = 0 | |
| total_packets = 0 | |
| total_lost_packets = 0 | |
| for stream in call.rtp_streams.values(): | |
| total_jitter += stream.jitter | |
| total_packets += stream.packet_count | |
| total_lost_packets += stream.lost_packets | |
| avg_jitter = total_jitter / len(call.rtp_streams) if call.rtp_streams else 0 | |
| if total_packets > 0: | |
| overall_packet_loss_pct = (total_lost_packets / (total_packets + total_lost_packets)) * 100 | |
| else: | |
| overall_packet_loss_pct = 0 | |
| # Call summary | |
| summary_text = ( | |
| f"Call ID: {call_id}\n" | |
| f"From: {call.from_uri}\n" | |
| f"To: {call.to_uri}\n" | |
| f"Duration: {call.get_duration():.2f} seconds\n" | |
| f"Status: {call.status}\n" | |
| f"RTP Streams: {len(call.rtp_streams)}\n" | |
| f"Total Packets: {total_packets}\n" | |
| f"Lost Packets: {total_lost_packets}\n" | |
| f"Packet Loss: {overall_packet_loss_pct:.2f}%\n" | |
| f"Average Jitter: {avg_jitter:.2f} ms\n" | |
| f"MOS Score: {avg_mos:.2f}" | |
| ) | |
| ax_summary.text(0.05, 0.95, summary_text, transform=ax_summary.transAxes, | |
| verticalalignment='top', fontsize=10) | |
| ax_summary.axis('off') | |
| ax_summary.set_title("Call Summary") | |
| # MOS score gauge | |
| self._draw_mos_gauge(ax_mos, avg_mos) | |
| # Get the first stream for detailed metrics | |
| if call.rtp_streams: | |
| stream_id = next(iter(call.rtp_streams)) | |
| stream = call.rtp_streams[stream_id] | |
| # Extract packet data | |
| if stream.packets: | |
| # Create timestamps relative to the first packet | |
| first_timestamp = stream.packets[0]['timestamp'] | |
| timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets] | |
| # Create jitter data (use actual jitter if available, otherwise simulate) | |
| if hasattr(stream, 'jitter_values') and stream.jitter_values: | |
| jitter_values = stream.jitter_values | |
| else: | |
| # Simulate jitter values based on the average jitter | |
| base_jitter = stream.jitter | |
| jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))] | |
| # Plot jitter | |
| ax_jitter.plot(timestamps, jitter_values, color='blue', alpha=0.7) | |
| ax_jitter.set_xlabel('Time (seconds)') | |
| ax_jitter.set_ylabel('Jitter (ms)') | |
| ax_jitter.set_title('Jitter Over Time') | |
| ax_jitter.grid(True, alpha=0.3) | |
| # Add threshold lines | |
| ax_jitter.axhline(y=20, color='green', linestyle='--', alpha=0.5) | |
| ax_jitter.axhline(y=50, color='orange', linestyle='--', alpha=0.5) | |
| ax_jitter.axhline(y=100, color='red', linestyle='--', alpha=0.5) | |
| # Add threshold labels | |
| ax_jitter.text(timestamps[-1], 20, 'Excellent (<20ms)', | |
| verticalalignment='bottom', horizontalalignment='right', color='green') | |
| ax_jitter.text(timestamps[-1], 50, 'Good (<50ms)', | |
| verticalalignment='bottom', horizontalalignment='right', color='orange') | |
| ax_jitter.text(timestamps[-1], 100, 'Fair (<100ms)', | |
| verticalalignment='bottom', horizontalalignment='right', color='red') | |
| # Packet loss pie chart | |
| labels = ['Received', 'Lost'] | |
| sizes = [total_packets, total_lost_packets] | |
| colors = ['#66b3ff', '#ff6666'] | |
| if sum(sizes) > 0: # Avoid division by zero | |
| ax_loss.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) | |
| ax_loss.axis('equal') | |
| ax_loss.set_title('Packet Loss') | |
| else: | |
| ax_loss.text(0.5, 0.5, "No packet data available", | |
| horizontalalignment='center', verticalalignment='center') | |
| ax_loss.axis('off') | |
| plt.tight_layout() | |
| # Save the figure | |
| if not output_file: | |
| output_file = os.path.join(self.output_dir, "call_quality_dashboard.png") | |
| plt.savefig(output_file, dpi=100) | |
| plt.close(fig) | |
| logger.info(f"Call quality dashboard generated: {output_file}") | |
| return output_file | |
| def _draw_mos_gauge(self, ax, mos_score): | |
| """Draw a gauge chart for MOS score | |
| Args: | |
| ax: Matplotlib axis to draw on | |
| mos_score: MOS score (1.0-4.5) | |
| """ | |
| # Normalize MOS score to 0-1 range | |
| norm_mos = (mos_score - 1.0) / 3.5 # MOS range is 1.0-4.5 | |
| norm_mos = max(0, min(1, norm_mos)) # Clamp to 0-1 | |
| # Create gauge | |
| gauge_angles = np.linspace(0, 180, 100) * np.pi / 180 | |
| gauge_radius = 0.8 | |
| gauge_width = 0.2 | |
| # Draw gauge background | |
| for i, angle in enumerate(gauge_angles[:-1]): | |
| next_angle = gauge_angles[i+1] | |
| # Determine color based on position | |
| pos = i / len(gauge_angles[:-1]) | |
| if pos < 0.3: # Bad to Poor (red) | |
| color = 'red' | |
| elif pos < 0.6: # Poor to Fair (orange) | |
| color = 'orange' | |
| else: # Fair to Excellent (green) | |
| color = 'green' | |
| # Draw gauge segment | |
| ax.add_patch(plt.matplotlib.patches.Wedge( | |
| (0, 0), gauge_radius, angle * 180 / np.pi, next_angle * 180 / np.pi, | |
| width=gauge_width, color=color, alpha=0.7 | |
| )) | |
| # Draw needle | |
| needle_angle = norm_mos * 180 * np.pi / 180 | |
| ax.plot([0, gauge_radius * np.cos(needle_angle)], | |
| [0, gauge_radius * np.sin(needle_angle)], | |
| color='black', linewidth=2) | |
| # Add center circle | |
| ax.add_patch(plt.matplotlib.patches.Circle((0, 0), 0.05, color='black')) | |
| # Add MOS value text | |
| quality_rating = "Unknown" | |
| if mos_score >= 4.3: | |
| quality_rating = "Excellent" | |
| elif mos_score >= 4.0: | |
| quality_rating = "Good" | |
| elif mos_score >= 3.6: | |
| quality_rating = "Fair" | |
| elif mos_score >= 3.1: | |
| quality_rating = "Poor" | |
| else: | |
| quality_rating = "Bad" | |
| ax.text(0, -0.2, f"MOS: {mos_score:.2f}", | |
| horizontalalignment='center', fontsize=12, fontweight='bold') | |
| ax.text(0, -0.3, f"({quality_rating})", | |
| horizontalalignment='center', fontsize=10) | |
| # Add scale labels | |
| ax.text(-gauge_radius-0.1, 0, "1.0", | |
| horizontalalignment='right', verticalalignment='center', fontsize=8) | |
| ax.text(0, gauge_radius+0.1, "3.0", | |
| horizontalalignment='center', verticalalignment='bottom', fontsize=8) | |
| ax.text(gauge_radius+0.1, 0, "4.5", | |
| horizontalalignment='left', verticalalignment='center', fontsize=8) | |
| # Set axis limits and turn off axis | |
| ax.set_xlim(-1, 1) | |
| ax.set_ylim(-0.5, 1) | |
| ax.axis('off') | |
| ax.set_title("Call Quality (MOS)") | |
| def generate_html_report(self, calls: Dict[str, Any], | |
| output_file: Optional[str] = None) -> str: | |
| """Generate a comprehensive HTML report with all visualizations | |
| Args: | |
| calls: Dictionary of SIP calls from the analyzer | |
| output_file: Output file path (default: sip_rtp_report.html in output_dir) | |
| Returns: | |
| Path to the generated HTML report | |
| """ | |
| if not calls: | |
| logger.warning("No SIP calls to visualize") | |
| return "" | |
| # Generate all visualizations | |
| visualization_files = {} | |
| for call_id, call in calls.items(): | |
| # Only process calls with RTP streams | |
| if not call.rtp_streams: | |
| continue | |
| # Generate sequence diagram | |
| seq_diagram = self.generate_sip_sequence_diagram( | |
| {call_id: call}, | |
| os.path.join(self.output_dir, f"sip_sequence_{call_id}.png") | |
| ) | |
| if seq_diagram: | |
| visualization_files[f"sequence_{call_id}"] = seq_diagram | |
| # Generate RTP visualization for each stream | |
| for stream_id in call.rtp_streams: | |
| rtp_viz = self.generate_rtp_visualization( | |
| {call_id: call}, | |
| os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png") | |
| ) | |
| if rtp_viz: | |
| visualization_files[f"rtp_{call_id}_{stream_id}"] = rtp_viz | |
| # Generate call quality dashboard | |
| dashboard = self.generate_call_quality_dashboard( | |
| calls, | |
| os.path.join(self.output_dir, "call_quality_dashboard.png") | |
| ) | |
| if dashboard: | |
| visualization_files["dashboard"] = dashboard | |
| # Create HTML report | |
| if not output_file: | |
| output_file = os.path.join(self.output_dir, "sip_rtp_report.html") | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(self._generate_html_content(calls, visualization_files)) | |
| logger.info(f"HTML report generated: {output_file}") | |
| return output_file | |
| def _generate_html_content(self, calls: Dict[str, Any], | |
| visualization_files: Dict[str, str]) -> str: | |
| """Generate HTML content for the report | |
| Args: | |
| calls: Dictionary of SIP calls from the analyzer | |
| visualization_files: Dictionary of visualization file paths | |
| Returns: | |
| HTML content as a string | |
| """ | |
| html = [] | |
| html.append("<!DOCTYPE html>") | |
| html.append("<html lang='en'>") | |
| html.append("<head>") | |
| html.append(" <meta charset='UTF-8'>") | |
| html.append(" <meta name='viewport' content='width=device-width, initial-scale=1.0'>") | |
| html.append(" <title>SIP and RTP Analysis Report</title>") | |
| html.append(" <style>") | |
| html.append(" body { font-family: Arial, sans-serif; margin: 0; padding: 20px; color: #333; }") | |
| html.append(" h1, h2, h3 { color: #2c3e50; }") | |
| html.append(" .container { max-width: 1200px; margin: 0 auto; }") | |
| html.append(" .header { background-color: #3498db; color: white; padding: 20px; margin-bottom: 20px; }") | |
| html.append(" .section { margin-bottom: 30px; border: 1px solid #ddd; padding: 20px; border-radius: 5px; }") | |
| html.append(" .call-info { display: flex; flex-wrap: wrap; }") | |
| html.append(" .call-info div { margin-right: 20px; margin-bottom: 10px; }") | |
| html.append(" .label { font-weight: bold; color: #7f8c8d; }") | |
| html.append(" .visualization { margin: 20px 0; text-align: center; }") | |
| html.append(" .visualization img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }") | |
| html.append(" .message-flow { margin: 20px 0; }") | |
| html.append(" .message { padding: 10px; margin: 5px 0; background-color: #f9f9f9; border-left: 4px solid #3498db; }") | |
| html.append(" .message.request { border-left-color: #3498db; }") | |
| html.append(" .message.response { border-left-color: #2ecc71; }") | |
| html.append(" .message-details { font-family: monospace; white-space: pre-wrap; margin-top: 10px; padding: 10px; background-color: #f1f1f1; display: none; }") | |
| html.append(" .toggle-details { cursor: pointer; color: #3498db; }") | |
| html.append(" .dashboard { margin: 20px 0; text-align: center; }") | |
| html.append(" .dashboard img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }") | |
| html.append(" .footer { margin-top: 30px; text-align: center; color: #7f8c8d; font-size: 0.9em; }") | |
| html.append(" </style>") | |
| html.append(" <script>") | |
| html.append(" function toggleDetails(id) {") | |
| html.append(" var details = document.getElementById(id);") | |
| html.append(" if (details.style.display === 'none' || !details.style.display) {") | |
| html.append(" details.style.display = 'block';") | |
| html.append(" } else {") | |
| html.append(" details.style.display = 'none';") | |
| html.append(" }") | |
| html.append(" }") | |
| html.append(" </script>") | |
| html.append("</head>") | |
| html.append("<body>") | |
| html.append(" <div class='container'>") | |
| html.append(" <div class='header'>") | |
| html.append(" <h1>SIP and RTP Analysis Report</h1>") | |
| html.append(f" <p>Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>") | |
| html.append(" </div>") | |
| # Summary section | |
| html.append(" <div class='section'>") | |
| html.append(" <h2>Summary</h2>") | |
| html.append(" <div class='call-info'>") | |
| html.append(f" <div><span class='label'>Total Calls:</span> {len(calls)}</div>") | |
| # Count calls with RTP | |
| calls_with_rtp = sum(1 for call in calls.values() if call.rtp_streams) | |
| html.append(f" <div><span class='label'>Calls with RTP:</span> {calls_with_rtp}</div>") | |
| # Count total RTP streams | |
| total_streams = sum(len(call.rtp_streams) for call in calls.values()) | |
| html.append(f" <div><span class='label'>Total RTP Streams:</span> {total_streams}</div>") | |
| html.append(" </div>") | |
| # Add dashboard if available | |
| if "dashboard" in visualization_files: | |
| html.append(" <div class='dashboard'>") | |
| html.append(f" <img src='{os.path.basename(visualization_files['dashboard'])}' alt='Call Quality Dashboard'>") | |
| html.append(" </div>") | |
| html.append(" </div>") | |
| # Call details sections | |
| for call_id, call in calls.items(): | |
| html.append(f" <div class='section' id='call-{call_id}'>") | |
| html.append(f" <h2>Call: {call_id}</h2>") | |
| # Call information | |
| html.append(" <div class='call-info'>") | |
| html.append(f" <div><span class='label'>From:</span> {call.from_uri}</div>") | |
| html.append(f" <div><span class='label'>To:</span> {call.to_uri}</div>") | |
| html.append(f" <div><span class='label'>Status:</span> {call.status}</div>") | |
| html.append(f" <div><span class='label'>Duration:</span> {call.get_duration():.2f} seconds</div>") | |
| html.append(f" <div><span class='label'>Messages:</span> {len(call.messages)}</div>") | |
| html.append(f" <div><span class='label'>RTP Streams:</span> {len(call.rtp_streams)}</div>") | |
| # Add quality metrics if available | |
| if call.rtp_streams: | |
| quality = call.get_call_quality_summary() | |
| html.append(f" <div><span class='label'>MOS Score:</span> {quality['mos']:.2f} ({quality['rating']})</div>") | |
| html.append(f" <div><span class='label'>Average Jitter:</span> {quality['jitter']:.2f} ms</div>") | |
| html.append(f" <div><span class='label'>Packet Loss:</span> {quality['packet_loss']:.2f}%</div>") | |
| html.append(" </div>") | |
| # Add sequence diagram if available | |
| seq_key = f"sequence_{call_id}" | |
| if seq_key in visualization_files: | |
| html.append(" <div class='visualization'>") | |
| html.append(" <h3>SIP Message Flow</h3>") | |
| html.append(f" <img src='{os.path.basename(visualization_files[seq_key])}' alt='SIP Sequence Diagram'>") | |
| html.append(" </div>") | |
| # Add RTP visualizations if available | |
| for stream_id in call.rtp_streams: | |
| rtp_key = f"rtp_{call_id}_{stream_id}" | |
| if rtp_key in visualization_files: | |
| html.append(" <div class='visualization'>") | |
| html.append(f" <h3>RTP Stream Analysis (SSRC: {stream_id})</h3>") | |
| html.append(f" <img src='{os.path.basename(visualization_files[rtp_key])}' alt='RTP Visualization'>") | |
| html.append(" </div>") | |
| # Message flow details | |
| html.append(" <div class='message-flow'>") | |
| html.append(" <h3>Message Details</h3>") | |
| for i, msg in enumerate(call.messages): | |
| # Determine if this is a request or response | |
| is_request = not msg['method'].startswith("SIP/2.0") | |
| msg_class = "request" if is_request else "response" | |
| html.append(f" <div class='message {msg_class}'>") | |
| html.append(f" <div><strong>{i+1}.</strong> [{msg['timestamp']}] {msg['source_ip']} → {msg['dest_ip']}: {msg['method']}</div>") | |
| html.append(f" <div class='toggle-details' onclick=\"toggleDetails('msg-{call_id}-{i}')\">Show/Hide Details</div>") | |
| # Message details (hidden by default) | |
| html.append(f" <div class='message-details' id='msg-{call_id}-{i}'>") | |
| # Headers | |
| html.append(" <strong>Headers:</strong>") | |
| for header, value in msg['headers'].items(): | |
| html.append(f" {header}: {value}") | |
| # Content (SDP) | |
| if msg.get('content'): | |
| html.append("\n <strong>Content:</strong>") | |
| html.append(f" {msg['content']}") | |
| html.append(" </div>") | |
| html.append(" </div>") | |
| html.append(" </div>") | |
| html.append(" </div>") | |
| # Footer | |
| html.append(" <div class='footer'>") | |
| html.append(" <p>Generated by SIP and RTP PCAP Analyzer</p>") | |
| html.append(" </div>") | |
| html.append(" </div>") | |
| html.append("</body>") | |
| html.append("</html>") | |
| return "\n".join(html) | |
| class SipRtpPcapAnalyzer: | |
| """Main class for analyzing PCAP files and extracting SIP messages and RTP streams""" | |
| def __init__(self, pcap_file: str, output_file: Optional[str] = None, verbose: bool = False, | |
| custom_sip_ports: Optional[List[int]] = None, rtp_port_range: Optional[Tuple[int, int]] = None, | |
| visualize: bool = False, output_dir: Optional[str] = None): | |
| self.pcap_file = pcap_file | |
| self.output_file = output_file | |
| self.verbose = verbose | |
| self.calls: Dict[str, SipCall] = {} # Dictionary of calls indexed by Call-ID | |
| self.messages_count = 0 | |
| self.sip_ports: Set[int] = set(SIP_PORTS) # Copy default SIP ports | |
| self.rtp_streams: Dict[int, RtpStream] = {} # Dictionary of RTP streams indexed by SSRC | |
| self.potential_rtp_ports: Set[int] = set() # Ports identified from SDP | |
| # Visualization options | |
| self.visualize = visualize | |
| self.output_dir = output_dir | |
| # Set custom RTP port range if provided | |
| self.rtp_port_range = rtp_port_range if rtp_port_range else RTP_PORT_RANGE | |
| # Add custom SIP ports if provided | |
| if custom_sip_ports: | |
| for port in custom_sip_ports: | |
| self.sip_ports.add(port) | |
| # Set logging level based on verbose flag | |
| if verbose: | |
| logger.setLevel(logging.DEBUG) | |
| def analyze(self) -> None: | |
| """Analyze the PCAP file based on the available library""" | |
| if not os.path.exists(self.pcap_file): | |
| logger.error(f"Error: File {self.pcap_file} does not exist.") | |
| return | |
| if LIBRARY == "scapy": | |
| self._analyze_with_scapy() | |
| elif LIBRARY == "pyshark": | |
| self._analyze_with_pyshark() | |
| elif LIBRARY == "dpkt": | |
| self._analyze_with_dpkt() | |
| # After analyzing, associate RTP streams with SIP calls | |
| self._associate_rtp_streams_with_calls() | |
| # Calculate final metrics for all RTP streams | |
| for stream in self.rtp_streams.values(): | |
| stream.calculate_mos() | |
| # Generate visualizations if requested | |
| if self.visualize: | |
| self._generate_visualizations() | |
| def _analyze_with_scapy(self) -> None: | |
| """Analyze PCAP file using scapy""" | |
| logger.info(f"Analyzing {self.pcap_file} with scapy...") | |
| try: | |
| # Read the pcap file | |
| packets = scapy.rdpcap(self.pcap_file) | |
| for packet in packets: | |
| # Check if packet has IP layer | |
| if not packet.haslayer(IP): | |
| continue | |
| # Check if packet has UDP or TCP layer | |
| if packet.haslayer(UDP): | |
| transport_layer = packet[UDP] | |
| protocol = "UDP" | |
| elif packet.haslayer(TCP): | |
| transport_layer = packet[TCP] | |
| protocol = "TCP" | |
| else: | |
| continue | |
| # Get source and destination IP addresses | |
| src_ip = packet[IP].src | |
| dst_ip = packet[IP].dst | |
| src_port = transport_layer.sport | |
| dst_port = transport_layer.dport | |
| # Check if this might be a SIP packet (common ports or payload signature) | |
| is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports) | |
| if is_sip_port or protocol == "TCP": # TCP needs further inspection | |
| # Try to check payload for SIP signature | |
| if not hasattr(transport_layer, 'payload') or not transport_layer.payload: | |
| continue | |
| payload = bytes(transport_layer.payload) | |
| if (payload.startswith(b'SIP/2.0') or | |
| any(payload.startswith(method.encode()) for method in SIP_METHODS)): | |
| # This is a SIP packet | |
| timestamp = datetime.fromtimestamp(float(packet.time)) | |
| sip_message = payload.decode('utf-8', errors='ignore') | |
| self._process_sip_message(timestamp, src_ip, dst_ip, sip_message) | |
| continue | |
| # Check if this might be an RTP packet | |
| is_potential_rtp = ( | |
| protocol == "UDP" and | |
| (src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or | |
| (self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and | |
| self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1])) | |
| ) | |
| if is_potential_rtp: | |
| # Try to parse as RTP | |
| try: | |
| if HAS_RTP_LAYER and packet.haslayer(RTP): | |
| # Scapy has built-in RTP layer | |
| rtp_layer = packet[RTP] | |
| ssrc = rtp_layer.sourcesync | |
| seq = rtp_layer.sequence | |
| timestamp_rtp = rtp_layer.timestamp | |
| payload_type = rtp_layer.payload_type | |
| payload_size = len(bytes(rtp_layer.payload)) | |
| else: | |
| # Manual parsing of RTP header | |
| rtp_data = bytes(transport_layer.payload) | |
| if len(rtp_data) < 12: # Minimum RTP header size | |
| continue | |
| # Check RTP version (first 2 bits should be 2) | |
| version = (rtp_data[0] >> 6) & 0x03 | |
| if version != 2: | |
| continue | |
| # Extract RTP header fields | |
| payload_type = rtp_data[1] & 0x7F | |
| seq = (rtp_data[2] << 8) | rtp_data[3] | |
| timestamp_rtp = ((rtp_data[4] << 24) | (rtp_data[5] << 16) | | |
| (rtp_data[6] << 8) | rtp_data[7]) | |
| ssrc = ((rtp_data[8] << 24) | (rtp_data[9] << 16) | | |
| (rtp_data[10] << 8) | rtp_data[11]) | |
| payload_size = len(rtp_data) - 12 | |
| # Process RTP packet | |
| timestamp = datetime.fromtimestamp(float(packet.time)) | |
| self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port, | |
| ssrc, seq, timestamp_rtp, payload_type, payload_size) | |
| except Exception as e: | |
| if self.verbose: | |
| logger.debug(f"Error parsing potential RTP packet: {e}") | |
| continue | |
| logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.") | |
| except Exception as e: | |
| logger.error(f"Error analyzing PCAP file with scapy: {e}") | |
| def _analyze_with_pyshark(self) -> None: | |
| """Analyze PCAP file using pyshark""" | |
| logger.info(f"Analyzing {self.pcap_file} with pyshark...") | |
| try: | |
| # Open the pcap file | |
| cap = pyshark.FileCapture(self.pcap_file) | |
| for packet in cap: | |
| try: | |
| # Check if packet has IP layer | |
| if not hasattr(packet, 'ip'): | |
| continue | |
| # Get source and destination IP addresses | |
| src_ip = packet.ip.src | |
| dst_ip = packet.ip.dst | |
| # Check for SIP packets | |
| if hasattr(packet, 'sip'): | |
| # This is a SIP packet | |
| timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp)) | |
| # Reconstruct SIP message | |
| sip_message = "" | |
| if hasattr(packet.sip, 'request_line'): | |
| sip_message += packet.sip.request_line + "\r\n" | |
| elif hasattr(packet.sip, 'status_line'): | |
| sip_message += packet.sip.status_line + "\r\n" | |
| # Add headers | |
| for field in packet.sip._all_fields: | |
| field_name = field.name | |
| if field_name.startswith('sip.') and not field_name.startswith('sip.request') and not field_name.startswith('sip.status'): | |
| header_name = field_name.replace('sip.', '') | |
| if hasattr(packet.sip, header_name): | |
| header_value = getattr(packet.sip, header_name) | |
| sip_message += f"{header_name}: {header_value}\r\n" | |
| # Add body if present | |
| if hasattr(packet.sip, 'msg_body'): | |
| sip_message += "\r\n" + packet.sip.msg_body | |
| self._process_sip_message(timestamp, src_ip, dst_ip, sip_message) | |
| continue | |
| # Check for RTP packets | |
| if hasattr(packet, 'rtp'): | |
| # Get transport layer info | |
| if hasattr(packet, 'udp'): | |
| src_port = int(packet.udp.srcport) | |
| dst_port = int(packet.udp.dstport) | |
| else: | |
| continue # RTP should be over UDP | |
| # Extract RTP fields | |
| ssrc = int(packet.rtp.ssrc, 16) | |
| seq = int(packet.rtp.seq) | |
| timestamp_rtp = int(packet.rtp.timestamp) | |
| payload_type = int(packet.rtp.p_type) | |
| # Get payload size | |
| if hasattr(packet.rtp, 'payload'): | |
| payload_size = len(packet.rtp.payload) | |
| else: | |
| payload_size = 0 | |
| # Process RTP packet | |
| timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp)) | |
| self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port, | |
| ssrc, seq, timestamp_rtp, payload_type, payload_size) | |
| except Exception as e: | |
| if self.verbose: | |
| logger.debug(f"Error processing packet with pyshark: {e}") | |
| continue | |
| logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.") | |
| except Exception as e: | |
| logger.error(f"Error analyzing PCAP file with pyshark: {e}") | |
| def _analyze_with_dpkt(self) -> None: | |
| """Analyze PCAP file using dpkt""" | |
| logger.info(f"Analyzing {self.pcap_file} with dpkt...") | |
| try: | |
| # Open the pcap file | |
| with open(self.pcap_file, 'rb') as f: | |
| pcap_reader = dpkt.pcap.Reader(f) | |
| for timestamp, buf in pcap_reader: | |
| try: | |
| # Parse Ethernet frame | |
| eth = Ethernet(buf) | |
| # Check if packet has IP layer | |
| if not isinstance(eth.data, dpkt.ip.IP): | |
| continue | |
| ip = eth.data | |
| src_ip = socket.inet_ntoa(ip.src) | |
| dst_ip = socket.inet_ntoa(ip.dst) | |
| # Check if packet has UDP or TCP layer | |
| if isinstance(ip.data, dpkt.udp.UDP): | |
| udp = ip.data | |
| src_port = udp.sport | |
| dst_port = udp.dport | |
| protocol = "UDP" | |
| transport_data = udp.data | |
| elif isinstance(ip.data, dpkt.tcp.TCP): | |
| tcp = ip.data | |
| src_port = tcp.sport | |
| dst_port = tcp.dport | |
| protocol = "TCP" | |
| transport_data = tcp.data | |
| else: | |
| continue | |
| # Check for SIP packets | |
| is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports) | |
| if is_sip_port or protocol == "TCP": # TCP needs further inspection | |
| # Check if payload looks like SIP | |
| if (transport_data.startswith(b'SIP/2.0') or | |
| any(transport_data.startswith(method.encode()) for method in SIP_METHODS)): | |
| # This is a SIP packet | |
| timestamp_dt = datetime.fromtimestamp(timestamp) | |
| sip_message = transport_data.decode('utf-8', errors='ignore') | |
| self._process_sip_message(timestamp_dt, src_ip, dst_ip, sip_message) | |
| continue | |
| # Check for RTP packets | |
| is_potential_rtp = ( | |
| protocol == "UDP" and | |
| (src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or | |
| (self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and | |
| self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1])) | |
| ) | |
| if is_potential_rtp and len(transport_data) >= 12: | |
| # Try to parse as RTP | |
| try: | |
| # Check RTP version (first 2 bits should be 2) | |
| version = (transport_data[0] >> 6) & 0x03 | |
| if version != 2: | |
| continue | |
| # Extract RTP header fields | |
| payload_type = transport_data[1] & 0x7F | |
| seq = (transport_data[2] << 8) | transport_data[3] | |
| timestamp_rtp = ((transport_data[4] << 24) | (transport_data[5] << 16) | | |
| (transport_data[6] << 8) | transport_data[7]) | |
| ssrc = ((transport_data[8] << 24) | (transport_data[9] << 16) | | |
| (transport_data[10] << 8) | transport_data[11]) | |
| payload_size = len(transport_data) - 12 | |
| # Process RTP packet | |
| timestamp_dt = datetime.fromtimestamp(timestamp) | |
| self._process_rtp_packet(timestamp_dt, src_ip, src_port, dst_ip, dst_port, | |
| ssrc, seq, timestamp_rtp, payload_type, payload_size) | |
| except Exception as e: | |
| if self.verbose: | |
| logger.debug(f"Error parsing potential RTP packet: {e}") | |
| continue | |
| except Exception as e: | |
| if self.verbose: | |
| logger.debug(f"Error processing packet with dpkt: {e}") | |
| continue | |
| logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.") | |
| except Exception as e: | |
| logger.error(f"Error analyzing PCAP file with dpkt: {e}") | |
| def _process_sip_message(self, timestamp: datetime, source_ip: str, dest_ip: str, sip_message: str) -> None: | |
| """Process a SIP message and add it to the appropriate call""" | |
| if not sip_message: | |
| return | |
| # Split message into lines | |
| lines = sip_message.splitlines() | |
| if not lines: | |
| return | |
| # Parse first line to determine if it's a request or response | |
| first_line = lines[0].strip() | |
| if first_line.startswith('SIP/2.0 '): | |
| # This is a response | |
| method = first_line | |
| else: | |
| # This is a request | |
| parts = first_line.split(' ') | |
| if len(parts) >= 1: | |
| method = parts[0] | |
| else: | |
| return | |
| # Parse headers | |
| headers = {} | |
| content = "" | |
| content_started = False | |
| for line in lines[1:]: | |
| if content_started: | |
| content += line + "\n" | |
| continue | |
| if not line.strip(): | |
| content_started = True | |
| continue | |
| if ':' in line: | |
| header_name, header_value = line.split(':', 1) | |
| headers[header_name.strip()] = header_value.strip() | |
| # Get the Call-ID | |
| call_id = headers.get("Call-ID", headers.get("i", None)) | |
| if not call_id: | |
| return | |
| # Add message to the appropriate call | |
| if call_id not in self.calls: | |
| self.calls[call_id] = SipCall(call_id) | |
| self.calls[call_id].add_message( | |
| timestamp, | |
| method, | |
| source_ip, | |
| dest_ip, | |
| headers, | |
| content | |
| ) | |
| self.messages_count += 1 | |
| # Extract potential RTP ports from SDP | |
| if content and "m=audio" in content: | |
| media_matches = re.findall(r'm=audio\s+(\d+)', content) | |
| for port_str in media_matches: | |
| try: | |
| port = int(port_str) | |
| self.potential_rtp_ports.add(port) | |
| if self.verbose: | |
| logger.debug(f"Found potential RTP port {port} in SDP") | |
| except ValueError: | |
| continue | |
| def _process_rtp_packet(self, timestamp: datetime, src_ip: str, src_port: int, | |
| dst_ip: str, dst_port: int, ssrc: int, seq: int, | |
| rtp_timestamp: int, payload_type: int, payload_size: int) -> None: | |
| """Process an RTP packet and add it to the appropriate stream""" | |
| # Create a unique stream identifier | |
| stream_key = ssrc | |
| # Add to existing stream or create new one | |
| if stream_key not in self.rtp_streams: | |
| self.rtp_streams[stream_key] = RtpStream(ssrc, src_ip, src_port, dst_ip, dst_port) | |
| # Set codec based on payload type if available in SDP | |
| for call in self.calls.values(): | |
| if 'codec' in call.media_info and 'id' in call.media_info['codec']: | |
| try: | |
| codec_id = int(call.media_info['codec']['id']) | |
| if codec_id == payload_type: | |
| self.rtp_streams[stream_key].codec = call.media_info['codec']['name'] | |
| break | |
| except (ValueError, TypeError): | |
| pass | |
| # Add packet to stream | |
| self.rtp_streams[stream_key].add_packet( | |
| timestamp, | |
| seq, | |
| rtp_timestamp, | |
| payload_type, | |
| payload_size | |
| ) | |
| def _associate_rtp_streams_with_calls(self) -> None: | |
| """Associate RTP streams with SIP calls based on IP addresses and ports""" | |
| # Create a mapping of media endpoints to call IDs | |
| media_endpoints = {} | |
| # First pass: extract media endpoints from SDP in SIP messages | |
| for call_id, call in self.calls.items(): | |
| if 'connection_address' in call.media_info and 'port' in call.media_info: | |
| endpoint = (call.media_info['connection_address'], call.media_info['port']) | |
| media_endpoints[endpoint] = call_id | |
| # Second pass: try to match RTP streams to calls | |
| for ssrc, stream in self.rtp_streams.items(): | |
| matched = False | |
| # Check source endpoint | |
| src_endpoint = (stream.src_ip, stream.src_port) | |
| if src_endpoint in media_endpoints: | |
| call_id = media_endpoints[src_endpoint] | |
| self.calls[call_id].add_rtp_stream(stream) | |
| matched = True | |
| continue | |
| # Check destination endpoint | |
| dst_endpoint = (stream.dst_ip, stream.dst_port) | |
| if dst_endpoint in media_endpoints: | |
| call_id = media_endpoints[dst_endpoint] | |
| self.calls[call_id].add_rtp_stream(stream) | |
| matched = True | |
| continue | |
| # If no exact match, try matching just by IP address | |
| if not matched: | |
| for call_id, call in self.calls.items(): | |
| for msg in call.messages: | |
| if (stream.src_ip == msg['source_ip'] and stream.dst_ip == msg['dest_ip']) or \ | |
| (stream.src_ip == msg['dest_ip'] and stream.dst_ip == msg['source_ip']): | |
| call.add_rtp_stream(stream) | |
| matched = True | |
| break | |
| if matched: | |
| break | |
| def _generate_visualizations(self) -> None: | |
| """Generate visualizations for SIP calls and RTP streams""" | |
| if not HAS_PLANTUML and not HAS_MATPLOTLIB: | |
| logger.warning("Visualization libraries not available. Install plantuml and matplotlib for visualizations.") | |
| return | |
| # Create visualizer | |
| visualizer = SipRtpVisualizer(output_dir=self.output_dir) | |
| # Generate HTML report | |
| report_file = visualizer.generate_html_report(self.calls) | |
| if report_file: | |
| logger.info(f"HTML report generated: {report_file}") | |
| def print_results(self) -> None: | |
| """Print analysis results to console or file""" | |
| output = [] | |
| output.append(f"SIP and RTP Analysis Results for {self.pcap_file}") | |
| output.append("=" * 80) | |
| output.append(f"Total SIP Messages: {self.messages_count}") | |
| output.append(f"Total SIP Calls: {len(self.calls)}") | |
| output.append(f"Total RTP Streams: {len(self.rtp_streams)}") | |
| output.append("") | |
| # Print call details | |
| output.append("Call Details:") | |
| output.append("-" * 80) | |
| for call_id, call in self.calls.items(): | |
| output.append(str(call)) | |
| output.append("-" * 80) | |
| # Print RTP streams that weren't associated with any call | |
| unassociated_streams = [] | |
| for ssrc, stream in self.rtp_streams.items(): | |
| is_associated = False | |
| for call in self.calls.values(): | |
| if ssrc in call.rtp_streams: | |
| is_associated = True | |
| break | |
| if not is_associated: | |
| unassociated_streams.append(stream) | |
| if unassociated_streams: | |
| output.append("Unassociated RTP Streams:") | |
| output.append("-" * 80) | |
| for stream in unassociated_streams: | |
| output.append(str(stream)) | |
| output.append("-" * 80) | |
| # Join all output lines | |
| result = "\n".join(output) | |
| # Print to console or file | |
| if self.output_file: | |
| try: | |
| with open(self.output_file, 'w') as f: | |
| f.write(result) | |
| logger.info(f"Results written to {self.output_file}") | |
| except Exception as e: | |
| logger.error(f"Error writing to output file: {e}") | |
| print(result) | |
| else: | |
| print(result) | |
| def get_call_quality_metrics(self) -> Dict[str, Dict[str, Any]]: | |
| """Get quality metrics for all calls in a structured format""" | |
| metrics = {} | |
| for call_id, call in self.calls.items(): | |
| if call.rtp_streams: | |
| quality = call.get_call_quality_summary() | |
| metrics[call_id] = { | |
| "from": call.from_uri, | |
| "to": call.to_uri, | |
| "duration": call.get_duration(), | |
| "mos": quality["mos"], | |
| "jitter": quality["jitter"], | |
| "packet_loss": quality["packet_loss"], | |
| "rating": quality["rating"], | |
| "stream_count": quality["stream_count"] | |
| } | |
| else: | |
| metrics[call_id] = { | |
| "from": call.from_uri, | |
| "to": call.to_uri, | |
| "duration": call.get_duration(), | |
| "status": "No RTP streams found" | |
| } | |
| return metrics | |
| def main(): | |
| """Main function""" | |
| parser = argparse.ArgumentParser(description='Analyze PCAP files for SIP messages and RTP streams in VoIP calls') | |
| parser.add_argument('pcap_file', help='Path to the PCAP file to analyze') | |
| parser.add_argument('-o', '--output', help='Output file for analysis results') | |
| parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') | |
| parser.add_argument('-p', '--ports', type=int, nargs='+', help='Additional SIP ports to check') | |
| parser.add_argument('-r', '--rtp-range', type=int, nargs=2, metavar=('MIN', 'MAX'), | |
| help='Custom RTP port range (default: 10000-65535)') | |
| parser.add_argument('--visualize', action='store_true', help='Generate visualizations') | |
| parser.add_argument('--output-dir', help='Output directory for visualizations') | |
| parser.add_argument('--report', action='store_true', help='Generate HTML report') | |
| args = parser.parse_args() | |
| rtp_range = (args.rtp_range[0], args.rtp_range[1]) if args.rtp_range else None | |
| # Check if visualization libraries are available | |
| if args.visualize or args.report: | |
| if not HAS_PLANTUML: | |
| logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.") | |
| if not HAS_MATPLOTLIB: | |
| logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.") | |
| # Create analyzer | |
| analyzer = SipRtpPcapAnalyzer( | |
| args.pcap_file, | |
| args.output, | |
| args.verbose, | |
| args.ports, | |
| rtp_range, | |
| args.visualize or args.report, | |
| args.output_dir | |
| ) | |
| # Analyze PCAP file | |
| analyzer.analyze() | |
| # Print results | |
| analyzer.print_results() | |
| # Generate report if requested | |
| if args.report and not analyzer.visualize: | |
| visualizer = SipRtpVisualizer(output_dir=args.output_dir) | |
| report_file = visualizer.generate_html_report(analyzer.calls) | |
| if report_file: | |
| logger.info(f"HTML report generated: {report_file}") | |
| if __name__ == "__main__": | |
| main() | |