Spaces:
Sleeping
Sleeping
File size: 80,859 Bytes
a538bc7 40d7b61 a538bc7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 | #!/usr/bin/env python3
"""
SIP and RTP PCAP Analyzer with Visualization - A tool to analyze PCAP files and extract SIP messages
and RTP streams for VoIP call analysis, with graphical visualization capabilities.
This script can parse PCAP/PCAPNG files and identify SIP messages exchanged during VoIP call
establishments, including INVITE, ACK, BYE, and other SIP messages. It also analyzes RTP streams
to provide metrics on call quality including jitter, packet loss, and MOS score. Additionally,
it generates graphical visualizations of SIP signaling and RTP media flows.
"""
import sys
import os
import argparse
import re
import logging
import math
import subprocess
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any, Set, DefaultDict
from collections import defaultdict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('sip_rtp_pcap_analyzer')
# Try to import different packet analysis libraries
# We'll use the first one available in this priority order: scapy, pyshark, dpkt
LIBRARY = None
try:
import scapy.all as scapy
from scapy.layers.inet import IP, UDP, TCP
try:
from scapy.layers.rtp import RTP
HAS_RTP_LAYER = True
except ImportError:
HAS_RTP_LAYER = False
LIBRARY = "scapy"
logger.info("Using scapy for packet analysis")
except ImportError:
try:
import pyshark
LIBRARY = "pyshark"
logger.info("Using pyshark for packet analysis")
except ImportError:
try:
import dpkt
import socket
from dpkt.ethernet import Ethernet
LIBRARY = "dpkt"
logger.info("Using dpkt for packet analysis")
except ImportError:
logger.error("No packet analysis library found. Please install scapy, pyshark, or dpkt.")
sys.exit(1)
# Try to import visualization libraries
try:
import plantuml
HAS_PLANTUML = True
except ImportError:
HAS_PLANTUML = False
logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.")
try:
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
HAS_MATPLOTLIB = True
except ImportError:
HAS_MATPLOTLIB = False
logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.")
# SIP message types we're interested in
SIP_METHODS = [
"INVITE", "ACK", "BYE", "CANCEL", "REGISTER", "OPTIONS",
"PRACK", "SUBSCRIBE", "NOTIFY", "PUBLISH", "INFO", "REFER", "MESSAGE", "UPDATE"
]
# SIP response codes and their meanings
SIP_RESPONSES = {
"1": "Informational",
"2": "Success",
"3": "Redirection",
"4": "Client Error",
"5": "Server Error",
"6": "Global Failure"
}
# Common SIP ports
SIP_PORTS = {5060, 5061}
# Common RTP port range
RTP_PORT_RANGE = (10000, 65535) # Typical RTP port range
# RTP payload type to codec mapping
RTP_CODEC_MAP = {
0: "PCMU/G.711u",
3: "GSM",
4: "G723",
8: "PCMA/G.711a",
9: "G722",
10: "L16 (stereo)",
11: "L16 (mono)",
18: "G729",
26: "JPEG",
31: "H261",
32: "MPV",
33: "MP2T",
34: "H263",
96: "Dynamic",
97: "Dynamic",
98: "Dynamic",
99: "Dynamic",
100: "Dynamic",
101: "Dynamic",
110: "OPUS",
111: "OPUS"
}
class RtpStream:
"""Class to represent an RTP stream with its packets and metrics"""
def __init__(self, ssrc: int, src_ip: str, src_port: int, dst_ip: str, dst_port: int):
self.ssrc = ssrc
self.src_ip = src_ip
self.src_port = src_port
self.dst_ip = dst_ip
self.dst_port = dst_port
self.packets = []
self.start_time = None
self.end_time = None
self.packet_count = 0
self.expected_packets = 0
self.lost_packets = 0
self.jitter = 0.0
self.jitter_values = [] # Store jitter values for each packet for visualization
self.last_seq = None
self.last_timestamp = None
self.last_arrival = None
self.codec = None
self.payload_type = None
self.mos_score = None
self.rtt = None # Round Trip Time in ms
def add_packet(self, timestamp: datetime, seq: int, rtp_timestamp: int,
payload_type: int, payload_size: int):
"""Add an RTP packet to this stream and update metrics"""
if not self.start_time or timestamp < self.start_time:
self.start_time = timestamp
if not self.end_time or timestamp > self.end_time:
self.end_time = timestamp
# Set payload type if not already set
if self.payload_type is None:
self.payload_type = payload_type
# Calculate packet loss
if self.last_seq is not None:
expected_seq = (self.last_seq + 1) % 65536 # RTP sequence numbers are 16-bit
if seq != expected_seq:
# Account for sequence number wraparound
if seq > expected_seq:
self.lost_packets += seq - expected_seq
else:
self.lost_packets += (65536 - expected_seq) + seq
# Calculate jitter (RFC 3550 algorithm)
if self.last_timestamp is not None and self.last_arrival is not None:
# Convert timestamps to milliseconds for easier calculation
arrival_ms = timestamp.timestamp() * 1000
last_arrival_ms = self.last_arrival.timestamp() * 1000
# Calculate transit time difference
transit = arrival_ms - rtp_timestamp
last_transit = last_arrival_ms - self.last_timestamp
# Calculate difference in transit times
d = abs(transit - last_transit)
# Update jitter using RFC 3550 formula: J(i) = J(i-1) + (|D(i-1,i)| - J(i-1))/16
self.jitter = self.jitter + (d - self.jitter) / 16
# Store jitter value for visualization
self.jitter_values.append(self.jitter)
else:
self.jitter_values.append(0.0)
# Store current values for next calculation
self.last_seq = seq
self.last_timestamp = rtp_timestamp
self.last_arrival = timestamp
# Add packet to list and increment counter
self.packets.append({
"timestamp": timestamp,
"seq": seq,
"rtp_timestamp": rtp_timestamp,
"payload_type": payload_type,
"payload_size": payload_size
})
self.packet_count += 1
self.expected_packets = self.packet_count + self.lost_packets
def get_packet_loss_percentage(self) -> float:
"""Get packet loss as a percentage"""
if self.expected_packets == 0:
return 0.0
return (self.lost_packets / self.expected_packets) * 100
def calculate_mos(self) -> float:
"""Calculate Mean Opinion Score (MOS) based on packet loss and jitter"""
# Default RTT if not available
rtt = self.rtt if self.rtt is not None else 50 # Default 50ms
# Calculate effective latency
effective_latency = rtt + (self.jitter * 2) + 10 # Add 10ms for protocol latencies
# Calculate R factor based on ITU-T G.107 E-model
if effective_latency < 160:
r = 93.2 - (effective_latency / 40)
else:
r = 93.2 - (effective_latency - 120) / 10
# Deduct for packet loss (2.5 R values per percentage)
packet_loss = self.get_packet_loss_percentage()
r = r - (packet_loss * 2.5)
# Convert R to MOS using ITU-T G.107 formula
if r < 0:
mos = 1.0
elif r > 100:
mos = 4.5
else:
mos = 1 + (0.035 * r) + (r * (r - 60) * (100 - r) * 7 * 10**-6)
# Clamp MOS between 1.0 and 4.5
mos = max(1.0, min(4.5, mos))
self.mos_score = mos
return mos
def get_duration(self) -> float:
"""Get stream duration in seconds"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0
def get_codec_name(self) -> str:
"""Get codec name based on payload type"""
if self.codec:
return self.codec
if self.payload_type in RTP_CODEC_MAP:
return RTP_CODEC_MAP[self.payload_type]
return f"Unknown ({self.payload_type})"
def get_quality_rating(self) -> str:
"""Get a human-readable quality rating based on MOS score"""
if self.mos_score is None:
self.calculate_mos()
if self.mos_score >= 4.3:
return "Excellent"
elif self.mos_score >= 4.0:
return "Good"
elif self.mos_score >= 3.6:
return "Fair"
elif self.mos_score >= 3.1:
return "Poor"
else:
return "Bad"
def __str__(self) -> str:
"""String representation of the RTP stream"""
if self.mos_score is None:
self.calculate_mos()
result = f"RTP Stream: SSRC={self.ssrc}\n"
result += f"Source: {self.src_ip}:{self.src_port}\n"
result += f"Destination: {self.dst_ip}:{self.dst_port}\n"
result += f"Codec: {self.get_codec_name()}\n"
result += f"Start Time: {self.start_time}\n"
result += f"End Time: {self.end_time}\n"
result += f"Duration: {self.get_duration():.2f} seconds\n"
result += f"Packets: {self.packet_count}\n"
result += f"Lost Packets: {self.lost_packets}\n"
result += f"Packet Loss: {self.get_packet_loss_percentage():.2f}%\n"
result += f"Jitter: {self.jitter:.2f} ms\n"
if self.rtt is not None:
result += f"Round Trip Time: {self.rtt:.2f} ms\n"
result += f"MOS Score: {self.mos_score:.2f} ({self.get_quality_rating()})\n"
return result
class SipCall:
"""Class to represent a SIP call with its messages and RTP streams"""
def __init__(self, call_id: str):
self.call_id = call_id
self.messages = []
self.start_time = None
self.end_time = None
self.from_uri = None
self.to_uri = None
self.status = "Unknown" # Can be "Setup", "Established", "Terminated", "Failed"
self.call_direction = None # Can be "Outgoing", "Incoming", or None
self.media_info = {} # Store media information from SDP
self.rtp_streams = {} # Dictionary of RTP streams indexed by SSRC
def add_message(self, timestamp: datetime, method: str, source_ip: str,
dest_ip: str, headers: Dict[str, str], content: str = None):
"""Add a SIP message to this call"""
if not self.start_time or timestamp < self.start_time:
self.start_time = timestamp
if not self.end_time or timestamp > self.end_time:
self.end_time = timestamp
# Extract From and To URIs from the first message
if not self.from_uri and "From" in headers:
from_match = re.search(r'<(sip:[^>]+)>', headers["From"])
if from_match:
self.from_uri = from_match.group(1)
if not self.to_uri and "To" in headers:
to_match = re.search(r'<(sip:[^>]+)>', headers["To"])
if to_match:
self.to_uri = to_match.group(1)
# Update call status based on method
if method == "INVITE":
self.status = "Setup"
elif method == "ACK" and self.status == "Setup":
self.status = "Established"
elif method == "BYE":
self.status = "Terminated"
elif method.startswith("SIP/2.0 4") or method.startswith("SIP/2.0 5") or method.startswith("SIP/2.0 6"):
self.status = "Failed"
# Extract media information from SDP if present
if content and "m=" in content:
self._parse_sdp(content)
# Add the message to the list
self.messages.append({
"timestamp": timestamp,
"method": method,
"source_ip": source_ip,
"dest_ip": dest_ip,
"headers": headers,
"content": content
})
def _parse_sdp(self, sdp_content: str):
"""Parse SDP content to extract media information"""
# Extract media type and port
media_match = re.search(r'm=(\w+)\s+(\d+)', sdp_content)
if media_match:
media_type = media_match.group(1)
media_port = int(media_match.group(2))
self.media_info['type'] = media_type
self.media_info['port'] = media_port
# Extract codec information
codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', sdp_content)
if codec_match:
codec_id = codec_match.group(1)
codec_name = codec_match.group(2)
codec_rate = codec_match.group(3)
self.media_info['codec'] = {
'id': codec_id,
'name': codec_name,
'rate': codec_rate
}
# Extract connection information (IP)
conn_match = re.search(r'c=IN\s+IP[46]\s+([^\s]+)', sdp_content)
if conn_match:
self.media_info['connection_address'] = conn_match.group(1)
def add_rtp_stream(self, rtp_stream: RtpStream):
"""Add an RTP stream to this call"""
self.rtp_streams[rtp_stream.ssrc] = rtp_stream
def get_duration(self) -> float:
"""Get call duration in seconds"""
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 0
def get_average_mos(self) -> float:
"""Get average MOS score across all RTP streams"""
if not self.rtp_streams:
return 0.0
total_mos = 0.0
for stream in self.rtp_streams.values():
if stream.mos_score is None:
stream.calculate_mos()
total_mos += stream.mos_score
return total_mos / len(self.rtp_streams)
def get_call_quality_summary(self) -> Dict[str, Any]:
"""Get a summary of call quality metrics"""
if not self.rtp_streams:
return {"status": "No RTP streams found"}
avg_mos = self.get_average_mos()
avg_jitter = sum(s.jitter for s in self.rtp_streams.values()) / len(self.rtp_streams)
avg_packet_loss = sum(s.get_packet_loss_percentage() for s in self.rtp_streams.values()) / len(self.rtp_streams)
quality_rating = "Unknown"
if avg_mos >= 4.3:
quality_rating = "Excellent"
elif avg_mos >= 4.0:
quality_rating = "Good"
elif avg_mos >= 3.6:
quality_rating = "Fair"
elif avg_mos >= 3.1:
quality_rating = "Poor"
else:
quality_rating = "Bad"
return {
"mos": avg_mos,
"jitter": avg_jitter,
"packet_loss": avg_packet_loss,
"rating": quality_rating,
"stream_count": len(self.rtp_streams)
}
def __str__(self) -> str:
"""String representation of the call"""
result = f"Call ID: {self.call_id}\n"
result += f"Status: {self.status}\n"
result += f"From: {self.from_uri}\n"
result += f"To: {self.to_uri}\n"
result += f"Start Time: {self.start_time}\n"
result += f"End Time: {self.end_time}\n"
result += f"Duration: {self.get_duration():.2f} seconds\n"
# Add media information if available
if self.media_info:
result += "Media Information:\n"
for key, value in self.media_info.items():
if isinstance(value, dict):
result += f" {key}:\n"
for k, v in value.items():
result += f" {k}: {v}\n"
else:
result += f" {key}: {value}\n"
# Add call quality summary if RTP streams are available
if self.rtp_streams:
quality = self.get_call_quality_summary()
result += "Call Quality:\n"
result += f" MOS Score: {quality['mos']:.2f} ({quality['rating']})\n"
result += f" Average Jitter: {quality['jitter']:.2f} ms\n"
result += f" Average Packet Loss: {quality['packet_loss']:.2f}%\n"
result += f" RTP Streams: {quality['stream_count']}\n"
result += f"Message Count: {len(self.messages)}\n"
result += "Message Flow:\n"
for i, msg in enumerate(self.messages, 1):
result += f" {i}. [{msg['timestamp']}] {msg['source_ip']} -> {msg['dest_ip']}: {msg['method']}\n"
# Add RTP stream details
if self.rtp_streams:
result += "RTP Streams:\n"
for ssrc, stream in self.rtp_streams.items():
result += f" Stream SSRC: {ssrc}\n"
result += f" Source: {stream.src_ip}:{stream.src_port}\n"
result += f" Destination: {stream.dst_ip}:{stream.dst_port}\n"
result += f" Codec: {stream.get_codec_name()}\n"
result += f" Packets: {stream.packet_count}\n"
result += f" Lost Packets: {stream.lost_packets}\n"
result += f" Packet Loss: {stream.get_packet_loss_percentage():.2f}%\n"
result += f" Jitter: {stream.jitter:.2f} ms\n"
result += f" MOS Score: {stream.mos_score:.2f} ({stream.get_quality_rating()})\n"
return result
class SipRtpVisualizer:
"""Class for generating visualizations of SIP and RTP flows"""
def __init__(self, output_dir: Optional[str] = None,
plantuml_server: str = "http://www.plantuml.com/plantuml"):
"""Initialize the visualizer
Args:
output_dir: Directory to save visualization files (default: current directory)
plantuml_server: PlantUML server URL for rendering diagrams
"""
self.output_dir = output_dir or os.getcwd()
self.plantuml_server = plantuml_server
# Create output directory if it doesn't exist
os.makedirs(self.output_dir, exist_ok=True)
# Initialize PlantUML
if HAS_PLANTUML:
try:
self.plantuml = plantuml.PlantUML(url=self.plantuml_server)
except Exception as e:
logger.warning(f"Error initializing PlantUML: {e}")
self.plantuml = None
else:
self.plantuml = None
def generate_sip_sequence_diagram(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate a sequence diagram for SIP calls
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: sip_sequence.png in output_dir)
Returns:
Path to the generated diagram file
"""
if not calls:
logger.warning("No SIP calls to visualize")
return ""
if not HAS_PLANTUML or not self.plantuml:
logger.error("PlantUML not available. Cannot generate sequence diagram.")
return ""
# Use the first call if multiple calls exist
# In future, could support multiple calls in separate diagrams
call_id = next(iter(calls))
call = calls[call_id]
# Start building PlantUML diagram
diagram = ["@startuml"]
diagram.append("skinparam sequenceMessageAlign center")
diagram.append("skinparam sequenceArrowThickness 2")
diagram.append("skinparam roundcorner 5")
diagram.append("skinparam maxmessagesize 200")
diagram.append("")
# Extract unique participants
participants = set()
for msg in call.messages:
participants.add(msg['source_ip'])
participants.add(msg['dest_ip'])
# Define participants
for i, participant in enumerate(participants):
# Try to determine if this is caller or callee
if call.from_uri and participant in call.from_uri:
label = f"Caller\\n{participant}"
elif call.to_uri and participant in call.to_uri:
label = f"Callee\\n{participant}"
else:
label = f"Endpoint {i+1}\\n{participant}"
diagram.append(f'participant "{label}" as P{i}')
diagram.append("")
# Map IP addresses to participant IDs
ip_to_participant = {ip: f"P{i}" for i, ip in enumerate(participants)}
# Add messages
for msg in call.messages:
src = ip_to_participant[msg['source_ip']]
dst = ip_to_participant[msg['dest_ip']]
# Format the message text
method = msg['method']
# For SIP responses, extract the status code
if method.startswith("SIP/2.0"):
parts = method.split(" ", 2)
if len(parts) >= 2:
status_code = parts[1]
if status_code.startswith("1"):
arrow_type = "-->" # Dotted arrow for provisional responses
else:
arrow_type = "->"
else:
arrow_type = "->"
else:
arrow_type = "->"
# Add the message to the diagram
diagram.append(f"{src} {arrow_type} {dst}: {method}")
# Add SDP information as notes if present
if msg.get('content') and 'm=audio' in msg.get('content', ''):
sdp_note = []
# Extract media type and port
media_match = re.search(r'm=(\w+)\s+(\d+)', msg['content'])
if media_match:
media_type = media_match.group(1)
media_port = media_match.group(2)
sdp_note.append(f"SDP: {media_type} {media_port}")
# Extract codec information
codec_match = re.search(r'a=rtpmap:(\d+)\s+([^\s/]+)/(\d+)', msg['content'])
if codec_match:
codec_id = codec_match.group(1)
codec_name = codec_match.group(2)
sdp_note.append(f"RTP/AVP {codec_id} ({codec_name})")
if sdp_note:
note_position = "right" if src < dst else "left"
diagram.append(f"note {note_position}: {' '.join(sdp_note)}")
# Add RTP stream if present
if call.rtp_streams:
# Calculate total RTP duration
rtp_duration = 0
for stream in call.rtp_streams.values():
duration = stream.get_duration()
if duration > rtp_duration:
rtp_duration = duration
if rtp_duration > 0:
diagram.append("")
diagram.append(f"... RTP Media Stream ({rtp_duration:.2f} seconds) ...")
diagram.append("")
diagram.append("@enduml")
# Join the diagram lines
diagram_str = "\n".join(diagram)
# Generate the diagram
if not output_file:
output_file = os.path.join(self.output_dir, f"sip_sequence_{call_id}.png")
try:
# Save the PlantUML source first
source_file = os.path.splitext(output_file)[0] + ".puml"
with open(source_file, 'w') as f:
f.write(diagram_str)
logger.info(f"PlantUML source saved: {source_file}")
# Try to generate the diagram
try:
#self.plantuml.processes_file(diagram_str, outfile=output_file)
self.plantuml.processes_file(source_file)
logger.info(f"Sequence diagram generated: {output_file}")
return output_file
except Exception as e:
# If PlantUML server fails, try using local command-line tool if available
logger.warning(f"PlantUML server error: {str(e)}")
try:
cmd = f"plantuml {source_file}"
subprocess.run(cmd, shell=True, check=True)
logger.info(f"Sequence diagram generated using local PlantUML: {output_file}")
return output_file
except Exception as e2:
logger.warning(f"Local PlantUML failed: {str(e2)}")
logger.info("Continuing without sequence diagram. PlantUML source is still available.")
return source_file
except Exception as e:
logger.error(f"Error generating sequence diagram: {str(e)}")
return ""
def generate_rtp_visualization(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate visualization for RTP streams
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: rtp_visualization.png in output_dir)
Returns:
Path to the generated visualization file
"""
if not calls or not HAS_MATPLOTLIB:
logger.warning("No SIP calls to visualize or Matplotlib not available")
return ""
# Find calls with RTP streams
calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams}
if not calls_with_rtp:
logger.warning("No RTP streams to visualize")
return ""
# Use the first call with RTP streams
call_id = next(iter(calls_with_rtp))
call = calls_with_rtp[call_id]
# Get the first RTP stream
stream_id = next(iter(call.rtp_streams))
stream = call.rtp_streams[stream_id]
# Create figure with subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), gridspec_kw={'height_ratios': [3, 1]})
# Extract packet data
if not stream.packets:
logger.warning("No packets in RTP stream")
return ""
# Create timestamps relative to the first packet
first_timestamp = stream.packets[0]['timestamp']
timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets]
# Create jitter data (use actual jitter if available, otherwise simulate)
if hasattr(stream, 'jitter_values') and stream.jitter_values:
jitter_values = stream.jitter_values
else:
# Simulate jitter values based on the average jitter
base_jitter = stream.jitter
jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))]
# Create packet loss data
packet_loss = np.zeros(len(timestamps))
if stream.lost_packets > 0:
# Simulate packet loss positions
loss_indices = np.random.choice(
range(len(timestamps)),
size=min(stream.lost_packets, len(timestamps)//10), # Limit for visualization clarity
replace=False
)
packet_loss[loss_indices] = 1
# Plot jitter
ax1.plot(timestamps, jitter_values, color='blue', alpha=0.7, label='Jitter (ms)')
# Highlight packet loss if any
lost_packets = np.where(packet_loss == 1)[0]
if len(lost_packets) > 0:
ax1.scatter(
[timestamps[i] for i in lost_packets],
[jitter_values[i] for i in lost_packets],
color='red', s=50, label='Lost Packets'
)
# Add MOS score indicator
mos_score = stream.mos_score if stream.mos_score is not None else stream.calculate_mos()
quality_rating = stream.get_quality_rating()
# Color code based on MOS
if mos_score >= 4.0:
mos_color = 'green'
elif mos_score >= 3.5:
mos_color = 'orange'
else:
mos_color = 'red'
ax1.text(0.02, 0.95, f'MOS Score: {mos_score:.2f} ({quality_rating})',
transform=ax1.transAxes,
bbox=dict(facecolor=mos_color, alpha=0.5))
# Formatting for jitter plot
ax1.set_xlabel('Time (seconds)')
ax1.set_ylabel('Jitter (ms)')
ax1.set_title(f'RTP Stream Analysis - SSRC: {stream.ssrc}')
ax1.grid(True, alpha=0.3)
ax1.legend()
# Add packet flow visualization in the second subplot
packet_y = np.ones(len(timestamps))
ax2.scatter(timestamps, packet_y, marker='|', s=10, color='blue', alpha=0.7)
# Highlight lost packets
if len(lost_packets) > 0:
# Calculate positions for lost packets
lost_x = []
for i in range(len(timestamps)-1):
if i+1 in lost_packets or i in lost_packets:
# Find midpoint between packets
if i+1 < len(timestamps):
midpoint = (timestamps[i] + timestamps[i+1]) / 2
lost_x.append(midpoint)
if lost_x:
ax2.scatter(lost_x, np.ones(len(lost_x)), marker='x', s=50, color='red',
label='Lost Packets')
# Formatting for packet flow
ax2.set_yticks([])
ax2.set_xlabel('Time (seconds)')
ax2.set_title('Packet Flow')
ax2.grid(True, alpha=0.3, axis='x')
# Add stream information
info_text = (
f"Source: {stream.src_ip}:{stream.src_port}\n"
f"Destination: {stream.dst_ip}:{stream.dst_port}\n"
f"Codec: {stream.get_codec_name()}\n"
f"Packets: {stream.packet_count}\n"
f"Lost Packets: {stream.lost_packets} ({stream.get_packet_loss_percentage():.2f}%)"
)
ax2.text(0.02, 0.5, info_text, transform=ax2.transAxes,
bbox=dict(facecolor='white', alpha=0.8))
plt.tight_layout()
# Save the figure
if not output_file:
output_file = os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png")
plt.savefig(output_file, dpi=100)
plt.close(fig)
logger.info(f"RTP visualization generated: {output_file}")
return output_file
def generate_call_quality_dashboard(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate a dashboard with call quality metrics
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: call_quality.png in output_dir)
Returns:
Path to the generated dashboard file
"""
if not calls or not HAS_MATPLOTLIB:
logger.warning("No SIP calls to visualize or Matplotlib not available")
return ""
# Find calls with RTP streams
calls_with_rtp = {call_id: call for call_id, call in calls.items() if call.rtp_streams}
if not calls_with_rtp:
logger.warning("No RTP streams to visualize")
return ""
# Create figure with subplots
fig = plt.figure(figsize=(12, 8))
gs = fig.add_gridspec(2, 2)
# Call summary subplot
ax_summary = fig.add_subplot(gs[0, 0])
# MOS score gauge subplot
ax_mos = fig.add_subplot(gs[0, 1])
# Jitter subplot
ax_jitter = fig.add_subplot(gs[1, 0])
# Packet loss subplot
ax_loss = fig.add_subplot(gs[1, 1])
# Process each call
for i, (call_id, call) in enumerate(calls_with_rtp.items()):
# Calculate average metrics across all streams
avg_mos = call.get_average_mos()
total_jitter = 0
total_packet_loss_pct = 0
total_packets = 0
total_lost_packets = 0
for stream in call.rtp_streams.values():
total_jitter += stream.jitter
total_packets += stream.packet_count
total_lost_packets += stream.lost_packets
avg_jitter = total_jitter / len(call.rtp_streams) if call.rtp_streams else 0
if total_packets > 0:
overall_packet_loss_pct = (total_lost_packets / (total_packets + total_lost_packets)) * 100
else:
overall_packet_loss_pct = 0
# Call summary
summary_text = (
f"Call ID: {call_id}\n"
f"From: {call.from_uri}\n"
f"To: {call.to_uri}\n"
f"Duration: {call.get_duration():.2f} seconds\n"
f"Status: {call.status}\n"
f"RTP Streams: {len(call.rtp_streams)}\n"
f"Total Packets: {total_packets}\n"
f"Lost Packets: {total_lost_packets}\n"
f"Packet Loss: {overall_packet_loss_pct:.2f}%\n"
f"Average Jitter: {avg_jitter:.2f} ms\n"
f"MOS Score: {avg_mos:.2f}"
)
ax_summary.text(0.05, 0.95, summary_text, transform=ax_summary.transAxes,
verticalalignment='top', fontsize=10)
ax_summary.axis('off')
ax_summary.set_title("Call Summary")
# MOS score gauge
self._draw_mos_gauge(ax_mos, avg_mos)
# Get the first stream for detailed metrics
if call.rtp_streams:
stream_id = next(iter(call.rtp_streams))
stream = call.rtp_streams[stream_id]
# Extract packet data
if stream.packets:
# Create timestamps relative to the first packet
first_timestamp = stream.packets[0]['timestamp']
timestamps = [(p['timestamp'] - first_timestamp).total_seconds() for p in stream.packets]
# Create jitter data (use actual jitter if available, otherwise simulate)
if hasattr(stream, 'jitter_values') and stream.jitter_values:
jitter_values = stream.jitter_values
else:
# Simulate jitter values based on the average jitter
base_jitter = stream.jitter
jitter_values = [base_jitter + np.random.normal(0, base_jitter/10) for _ in range(len(timestamps))]
# Plot jitter
ax_jitter.plot(timestamps, jitter_values, color='blue', alpha=0.7)
ax_jitter.set_xlabel('Time (seconds)')
ax_jitter.set_ylabel('Jitter (ms)')
ax_jitter.set_title('Jitter Over Time')
ax_jitter.grid(True, alpha=0.3)
# Add threshold lines
ax_jitter.axhline(y=20, color='green', linestyle='--', alpha=0.5)
ax_jitter.axhline(y=50, color='orange', linestyle='--', alpha=0.5)
ax_jitter.axhline(y=100, color='red', linestyle='--', alpha=0.5)
# Add threshold labels
ax_jitter.text(timestamps[-1], 20, 'Excellent (<20ms)',
verticalalignment='bottom', horizontalalignment='right', color='green')
ax_jitter.text(timestamps[-1], 50, 'Good (<50ms)',
verticalalignment='bottom', horizontalalignment='right', color='orange')
ax_jitter.text(timestamps[-1], 100, 'Fair (<100ms)',
verticalalignment='bottom', horizontalalignment='right', color='red')
# Packet loss pie chart
labels = ['Received', 'Lost']
sizes = [total_packets, total_lost_packets]
colors = ['#66b3ff', '#ff6666']
if sum(sizes) > 0: # Avoid division by zero
ax_loss.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
ax_loss.axis('equal')
ax_loss.set_title('Packet Loss')
else:
ax_loss.text(0.5, 0.5, "No packet data available",
horizontalalignment='center', verticalalignment='center')
ax_loss.axis('off')
plt.tight_layout()
# Save the figure
if not output_file:
output_file = os.path.join(self.output_dir, "call_quality_dashboard.png")
plt.savefig(output_file, dpi=100)
plt.close(fig)
logger.info(f"Call quality dashboard generated: {output_file}")
return output_file
def _draw_mos_gauge(self, ax, mos_score):
"""Draw a gauge chart for MOS score
Args:
ax: Matplotlib axis to draw on
mos_score: MOS score (1.0-4.5)
"""
# Normalize MOS score to 0-1 range
norm_mos = (mos_score - 1.0) / 3.5 # MOS range is 1.0-4.5
norm_mos = max(0, min(1, norm_mos)) # Clamp to 0-1
# Create gauge
gauge_angles = np.linspace(0, 180, 100) * np.pi / 180
gauge_radius = 0.8
gauge_width = 0.2
# Draw gauge background
for i, angle in enumerate(gauge_angles[:-1]):
next_angle = gauge_angles[i+1]
# Determine color based on position
pos = i / len(gauge_angles[:-1])
if pos < 0.3: # Bad to Poor (red)
color = 'red'
elif pos < 0.6: # Poor to Fair (orange)
color = 'orange'
else: # Fair to Excellent (green)
color = 'green'
# Draw gauge segment
ax.add_patch(plt.matplotlib.patches.Wedge(
(0, 0), gauge_radius, angle * 180 / np.pi, next_angle * 180 / np.pi,
width=gauge_width, color=color, alpha=0.7
))
# Draw needle
needle_angle = norm_mos * 180 * np.pi / 180
ax.plot([0, gauge_radius * np.cos(needle_angle)],
[0, gauge_radius * np.sin(needle_angle)],
color='black', linewidth=2)
# Add center circle
ax.add_patch(plt.matplotlib.patches.Circle((0, 0), 0.05, color='black'))
# Add MOS value text
quality_rating = "Unknown"
if mos_score >= 4.3:
quality_rating = "Excellent"
elif mos_score >= 4.0:
quality_rating = "Good"
elif mos_score >= 3.6:
quality_rating = "Fair"
elif mos_score >= 3.1:
quality_rating = "Poor"
else:
quality_rating = "Bad"
ax.text(0, -0.2, f"MOS: {mos_score:.2f}",
horizontalalignment='center', fontsize=12, fontweight='bold')
ax.text(0, -0.3, f"({quality_rating})",
horizontalalignment='center', fontsize=10)
# Add scale labels
ax.text(-gauge_radius-0.1, 0, "1.0",
horizontalalignment='right', verticalalignment='center', fontsize=8)
ax.text(0, gauge_radius+0.1, "3.0",
horizontalalignment='center', verticalalignment='bottom', fontsize=8)
ax.text(gauge_radius+0.1, 0, "4.5",
horizontalalignment='left', verticalalignment='center', fontsize=8)
# Set axis limits and turn off axis
ax.set_xlim(-1, 1)
ax.set_ylim(-0.5, 1)
ax.axis('off')
ax.set_title("Call Quality (MOS)")
def generate_html_report(self, calls: Dict[str, Any],
output_file: Optional[str] = None) -> str:
"""Generate a comprehensive HTML report with all visualizations
Args:
calls: Dictionary of SIP calls from the analyzer
output_file: Output file path (default: sip_rtp_report.html in output_dir)
Returns:
Path to the generated HTML report
"""
if not calls:
logger.warning("No SIP calls to visualize")
return ""
# Generate all visualizations
visualization_files = {}
for call_id, call in calls.items():
# Only process calls with RTP streams
if not call.rtp_streams:
continue
# Generate sequence diagram
seq_diagram = self.generate_sip_sequence_diagram(
{call_id: call},
os.path.join(self.output_dir, f"sip_sequence_{call_id}.png")
)
if seq_diagram:
visualization_files[f"sequence_{call_id}"] = seq_diagram
# Generate RTP visualization for each stream
for stream_id in call.rtp_streams:
rtp_viz = self.generate_rtp_visualization(
{call_id: call},
os.path.join(self.output_dir, f"rtp_visualization_{call_id}_{stream_id}.png")
)
if rtp_viz:
visualization_files[f"rtp_{call_id}_{stream_id}"] = rtp_viz
# Generate call quality dashboard
dashboard = self.generate_call_quality_dashboard(
calls,
os.path.join(self.output_dir, "call_quality_dashboard.png")
)
if dashboard:
visualization_files["dashboard"] = dashboard
# Create HTML report
if not output_file:
output_file = os.path.join(self.output_dir, "sip_rtp_report.html")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(self._generate_html_content(calls, visualization_files))
logger.info(f"HTML report generated: {output_file}")
return output_file
def _generate_html_content(self, calls: Dict[str, Any],
visualization_files: Dict[str, str]) -> str:
"""Generate HTML content for the report
Args:
calls: Dictionary of SIP calls from the analyzer
visualization_files: Dictionary of visualization file paths
Returns:
HTML content as a string
"""
html = []
html.append("<!DOCTYPE html>")
html.append("<html lang='en'>")
html.append("<head>")
html.append(" <meta charset='UTF-8'>")
html.append(" <meta name='viewport' content='width=device-width, initial-scale=1.0'>")
html.append(" <title>SIP and RTP Analysis Report</title>")
html.append(" <style>")
html.append(" body { font-family: Arial, sans-serif; margin: 0; padding: 20px; color: #333; }")
html.append(" h1, h2, h3 { color: #2c3e50; }")
html.append(" .container { max-width: 1200px; margin: 0 auto; }")
html.append(" .header { background-color: #3498db; color: white; padding: 20px; margin-bottom: 20px; }")
html.append(" .section { margin-bottom: 30px; border: 1px solid #ddd; padding: 20px; border-radius: 5px; }")
html.append(" .call-info { display: flex; flex-wrap: wrap; }")
html.append(" .call-info div { margin-right: 20px; margin-bottom: 10px; }")
html.append(" .label { font-weight: bold; color: #7f8c8d; }")
html.append(" .visualization { margin: 20px 0; text-align: center; }")
html.append(" .visualization img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }")
html.append(" .message-flow { margin: 20px 0; }")
html.append(" .message { padding: 10px; margin: 5px 0; background-color: #f9f9f9; border-left: 4px solid #3498db; }")
html.append(" .message.request { border-left-color: #3498db; }")
html.append(" .message.response { border-left-color: #2ecc71; }")
html.append(" .message-details { font-family: monospace; white-space: pre-wrap; margin-top: 10px; padding: 10px; background-color: #f1f1f1; display: none; }")
html.append(" .toggle-details { cursor: pointer; color: #3498db; }")
html.append(" .dashboard { margin: 20px 0; text-align: center; }")
html.append(" .dashboard img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }")
html.append(" .footer { margin-top: 30px; text-align: center; color: #7f8c8d; font-size: 0.9em; }")
html.append(" </style>")
html.append(" <script>")
html.append(" function toggleDetails(id) {")
html.append(" var details = document.getElementById(id);")
html.append(" if (details.style.display === 'none' || !details.style.display) {")
html.append(" details.style.display = 'block';")
html.append(" } else {")
html.append(" details.style.display = 'none';")
html.append(" }")
html.append(" }")
html.append(" </script>")
html.append("</head>")
html.append("<body>")
html.append(" <div class='container'>")
html.append(" <div class='header'>")
html.append(" <h1>SIP and RTP Analysis Report</h1>")
html.append(f" <p>Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>")
html.append(" </div>")
# Summary section
html.append(" <div class='section'>")
html.append(" <h2>Summary</h2>")
html.append(" <div class='call-info'>")
html.append(f" <div><span class='label'>Total Calls:</span> {len(calls)}</div>")
# Count calls with RTP
calls_with_rtp = sum(1 for call in calls.values() if call.rtp_streams)
html.append(f" <div><span class='label'>Calls with RTP:</span> {calls_with_rtp}</div>")
# Count total RTP streams
total_streams = sum(len(call.rtp_streams) for call in calls.values())
html.append(f" <div><span class='label'>Total RTP Streams:</span> {total_streams}</div>")
html.append(" </div>")
# Add dashboard if available
if "dashboard" in visualization_files:
html.append(" <div class='dashboard'>")
html.append(f" <img src='{os.path.basename(visualization_files['dashboard'])}' alt='Call Quality Dashboard'>")
html.append(" </div>")
html.append(" </div>")
# Call details sections
for call_id, call in calls.items():
html.append(f" <div class='section' id='call-{call_id}'>")
html.append(f" <h2>Call: {call_id}</h2>")
# Call information
html.append(" <div class='call-info'>")
html.append(f" <div><span class='label'>From:</span> {call.from_uri}</div>")
html.append(f" <div><span class='label'>To:</span> {call.to_uri}</div>")
html.append(f" <div><span class='label'>Status:</span> {call.status}</div>")
html.append(f" <div><span class='label'>Duration:</span> {call.get_duration():.2f} seconds</div>")
html.append(f" <div><span class='label'>Messages:</span> {len(call.messages)}</div>")
html.append(f" <div><span class='label'>RTP Streams:</span> {len(call.rtp_streams)}</div>")
# Add quality metrics if available
if call.rtp_streams:
quality = call.get_call_quality_summary()
html.append(f" <div><span class='label'>MOS Score:</span> {quality['mos']:.2f} ({quality['rating']})</div>")
html.append(f" <div><span class='label'>Average Jitter:</span> {quality['jitter']:.2f} ms</div>")
html.append(f" <div><span class='label'>Packet Loss:</span> {quality['packet_loss']:.2f}%</div>")
html.append(" </div>")
# Add sequence diagram if available
seq_key = f"sequence_{call_id}"
if seq_key in visualization_files:
html.append(" <div class='visualization'>")
html.append(" <h3>SIP Message Flow</h3>")
html.append(f" <img src='{os.path.basename(visualization_files[seq_key])}' alt='SIP Sequence Diagram'>")
html.append(" </div>")
# Add RTP visualizations if available
for stream_id in call.rtp_streams:
rtp_key = f"rtp_{call_id}_{stream_id}"
if rtp_key in visualization_files:
html.append(" <div class='visualization'>")
html.append(f" <h3>RTP Stream Analysis (SSRC: {stream_id})</h3>")
html.append(f" <img src='{os.path.basename(visualization_files[rtp_key])}' alt='RTP Visualization'>")
html.append(" </div>")
# Message flow details
html.append(" <div class='message-flow'>")
html.append(" <h3>Message Details</h3>")
for i, msg in enumerate(call.messages):
# Determine if this is a request or response
is_request = not msg['method'].startswith("SIP/2.0")
msg_class = "request" if is_request else "response"
html.append(f" <div class='message {msg_class}'>")
html.append(f" <div><strong>{i+1}.</strong> [{msg['timestamp']}] {msg['source_ip']} → {msg['dest_ip']}: {msg['method']}</div>")
html.append(f" <div class='toggle-details' onclick=\"toggleDetails('msg-{call_id}-{i}')\">Show/Hide Details</div>")
# Message details (hidden by default)
html.append(f" <div class='message-details' id='msg-{call_id}-{i}'>")
# Headers
html.append(" <strong>Headers:</strong>")
for header, value in msg['headers'].items():
html.append(f" {header}: {value}")
# Content (SDP)
if msg.get('content'):
html.append("\n <strong>Content:</strong>")
html.append(f" {msg['content']}")
html.append(" </div>")
html.append(" </div>")
html.append(" </div>")
html.append(" </div>")
# Footer
html.append(" <div class='footer'>")
html.append(" <p>Generated by SIP and RTP PCAP Analyzer</p>")
html.append(" </div>")
html.append(" </div>")
html.append("</body>")
html.append("</html>")
return "\n".join(html)
class SipRtpPcapAnalyzer:
"""Main class for analyzing PCAP files and extracting SIP messages and RTP streams"""
def __init__(self, pcap_file: str, output_file: Optional[str] = None, verbose: bool = False,
custom_sip_ports: Optional[List[int]] = None, rtp_port_range: Optional[Tuple[int, int]] = None,
visualize: bool = False, output_dir: Optional[str] = None):
self.pcap_file = pcap_file
self.output_file = output_file
self.verbose = verbose
self.calls: Dict[str, SipCall] = {} # Dictionary of calls indexed by Call-ID
self.messages_count = 0
self.sip_ports: Set[int] = set(SIP_PORTS) # Copy default SIP ports
self.rtp_streams: Dict[int, RtpStream] = {} # Dictionary of RTP streams indexed by SSRC
self.potential_rtp_ports: Set[int] = set() # Ports identified from SDP
# Visualization options
self.visualize = visualize
self.output_dir = output_dir
# Set custom RTP port range if provided
self.rtp_port_range = rtp_port_range if rtp_port_range else RTP_PORT_RANGE
# Add custom SIP ports if provided
if custom_sip_ports:
for port in custom_sip_ports:
self.sip_ports.add(port)
# Set logging level based on verbose flag
if verbose:
logger.setLevel(logging.DEBUG)
def analyze(self) -> None:
"""Analyze the PCAP file based on the available library"""
if not os.path.exists(self.pcap_file):
logger.error(f"Error: File {self.pcap_file} does not exist.")
return
if LIBRARY == "scapy":
self._analyze_with_scapy()
elif LIBRARY == "pyshark":
self._analyze_with_pyshark()
elif LIBRARY == "dpkt":
self._analyze_with_dpkt()
# After analyzing, associate RTP streams with SIP calls
self._associate_rtp_streams_with_calls()
# Calculate final metrics for all RTP streams
for stream in self.rtp_streams.values():
stream.calculate_mos()
# Generate visualizations if requested
if self.visualize:
self._generate_visualizations()
def _analyze_with_scapy(self) -> None:
"""Analyze PCAP file using scapy"""
logger.info(f"Analyzing {self.pcap_file} with scapy...")
try:
# Read the pcap file
packets = scapy.rdpcap(self.pcap_file)
for packet in packets:
# Check if packet has IP layer
if not packet.haslayer(IP):
continue
# Check if packet has UDP or TCP layer
if packet.haslayer(UDP):
transport_layer = packet[UDP]
protocol = "UDP"
elif packet.haslayer(TCP):
transport_layer = packet[TCP]
protocol = "TCP"
else:
continue
# Get source and destination IP addresses
src_ip = packet[IP].src
dst_ip = packet[IP].dst
src_port = transport_layer.sport
dst_port = transport_layer.dport
# Check if this might be a SIP packet (common ports or payload signature)
is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports)
if is_sip_port or protocol == "TCP": # TCP needs further inspection
# Try to check payload for SIP signature
if not hasattr(transport_layer, 'payload') or not transport_layer.payload:
continue
payload = bytes(transport_layer.payload)
if (payload.startswith(b'SIP/2.0') or
any(payload.startswith(method.encode()) for method in SIP_METHODS)):
# This is a SIP packet
timestamp = datetime.fromtimestamp(float(packet.time))
sip_message = payload.decode('utf-8', errors='ignore')
self._process_sip_message(timestamp, src_ip, dst_ip, sip_message)
continue
# Check if this might be an RTP packet
is_potential_rtp = (
protocol == "UDP" and
(src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or
(self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and
self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1]))
)
if is_potential_rtp:
# Try to parse as RTP
try:
if HAS_RTP_LAYER and packet.haslayer(RTP):
# Scapy has built-in RTP layer
rtp_layer = packet[RTP]
ssrc = rtp_layer.sourcesync
seq = rtp_layer.sequence
timestamp_rtp = rtp_layer.timestamp
payload_type = rtp_layer.payload_type
payload_size = len(bytes(rtp_layer.payload))
else:
# Manual parsing of RTP header
rtp_data = bytes(transport_layer.payload)
if len(rtp_data) < 12: # Minimum RTP header size
continue
# Check RTP version (first 2 bits should be 2)
version = (rtp_data[0] >> 6) & 0x03
if version != 2:
continue
# Extract RTP header fields
payload_type = rtp_data[1] & 0x7F
seq = (rtp_data[2] << 8) | rtp_data[3]
timestamp_rtp = ((rtp_data[4] << 24) | (rtp_data[5] << 16) |
(rtp_data[6] << 8) | rtp_data[7])
ssrc = ((rtp_data[8] << 24) | (rtp_data[9] << 16) |
(rtp_data[10] << 8) | rtp_data[11])
payload_size = len(rtp_data) - 12
# Process RTP packet
timestamp = datetime.fromtimestamp(float(packet.time))
self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port,
ssrc, seq, timestamp_rtp, payload_type, payload_size)
except Exception as e:
if self.verbose:
logger.debug(f"Error parsing potential RTP packet: {e}")
continue
logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.")
except Exception as e:
logger.error(f"Error analyzing PCAP file with scapy: {e}")
def _analyze_with_pyshark(self) -> None:
"""Analyze PCAP file using pyshark"""
logger.info(f"Analyzing {self.pcap_file} with pyshark...")
try:
# Open the pcap file
cap = pyshark.FileCapture(self.pcap_file)
for packet in cap:
try:
# Check if packet has IP layer
if not hasattr(packet, 'ip'):
continue
# Get source and destination IP addresses
src_ip = packet.ip.src
dst_ip = packet.ip.dst
# Check for SIP packets
if hasattr(packet, 'sip'):
# This is a SIP packet
timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
# Reconstruct SIP message
sip_message = ""
if hasattr(packet.sip, 'request_line'):
sip_message += packet.sip.request_line + "\r\n"
elif hasattr(packet.sip, 'status_line'):
sip_message += packet.sip.status_line + "\r\n"
# Add headers
for field in packet.sip._all_fields:
field_name = field.name
if field_name.startswith('sip.') and not field_name.startswith('sip.request') and not field_name.startswith('sip.status'):
header_name = field_name.replace('sip.', '')
if hasattr(packet.sip, header_name):
header_value = getattr(packet.sip, header_name)
sip_message += f"{header_name}: {header_value}\r\n"
# Add body if present
if hasattr(packet.sip, 'msg_body'):
sip_message += "\r\n" + packet.sip.msg_body
self._process_sip_message(timestamp, src_ip, dst_ip, sip_message)
continue
# Check for RTP packets
if hasattr(packet, 'rtp'):
# Get transport layer info
if hasattr(packet, 'udp'):
src_port = int(packet.udp.srcport)
dst_port = int(packet.udp.dstport)
else:
continue # RTP should be over UDP
# Extract RTP fields
ssrc = int(packet.rtp.ssrc, 16)
seq = int(packet.rtp.seq)
timestamp_rtp = int(packet.rtp.timestamp)
payload_type = int(packet.rtp.p_type)
# Get payload size
if hasattr(packet.rtp, 'payload'):
payload_size = len(packet.rtp.payload)
else:
payload_size = 0
# Process RTP packet
timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
self._process_rtp_packet(timestamp, src_ip, src_port, dst_ip, dst_port,
ssrc, seq, timestamp_rtp, payload_type, payload_size)
except Exception as e:
if self.verbose:
logger.debug(f"Error processing packet with pyshark: {e}")
continue
logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.")
except Exception as e:
logger.error(f"Error analyzing PCAP file with pyshark: {e}")
def _analyze_with_dpkt(self) -> None:
"""Analyze PCAP file using dpkt"""
logger.info(f"Analyzing {self.pcap_file} with dpkt...")
try:
# Open the pcap file
with open(self.pcap_file, 'rb') as f:
pcap_reader = dpkt.pcap.Reader(f)
for timestamp, buf in pcap_reader:
try:
# Parse Ethernet frame
eth = Ethernet(buf)
# Check if packet has IP layer
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
src_ip = socket.inet_ntoa(ip.src)
dst_ip = socket.inet_ntoa(ip.dst)
# Check if packet has UDP or TCP layer
if isinstance(ip.data, dpkt.udp.UDP):
udp = ip.data
src_port = udp.sport
dst_port = udp.dport
protocol = "UDP"
transport_data = udp.data
elif isinstance(ip.data, dpkt.tcp.TCP):
tcp = ip.data
src_port = tcp.sport
dst_port = tcp.dport
protocol = "TCP"
transport_data = tcp.data
else:
continue
# Check for SIP packets
is_sip_port = (src_port in self.sip_ports or dst_port in self.sip_ports)
if is_sip_port or protocol == "TCP": # TCP needs further inspection
# Check if payload looks like SIP
if (transport_data.startswith(b'SIP/2.0') or
any(transport_data.startswith(method.encode()) for method in SIP_METHODS)):
# This is a SIP packet
timestamp_dt = datetime.fromtimestamp(timestamp)
sip_message = transport_data.decode('utf-8', errors='ignore')
self._process_sip_message(timestamp_dt, src_ip, dst_ip, sip_message)
continue
# Check for RTP packets
is_potential_rtp = (
protocol == "UDP" and
(src_port in self.potential_rtp_ports or dst_port in self.potential_rtp_ports or
(self.rtp_port_range[0] <= src_port <= self.rtp_port_range[1] and
self.rtp_port_range[0] <= dst_port <= self.rtp_port_range[1]))
)
if is_potential_rtp and len(transport_data) >= 12:
# Try to parse as RTP
try:
# Check RTP version (first 2 bits should be 2)
version = (transport_data[0] >> 6) & 0x03
if version != 2:
continue
# Extract RTP header fields
payload_type = transport_data[1] & 0x7F
seq = (transport_data[2] << 8) | transport_data[3]
timestamp_rtp = ((transport_data[4] << 24) | (transport_data[5] << 16) |
(transport_data[6] << 8) | transport_data[7])
ssrc = ((transport_data[8] << 24) | (transport_data[9] << 16) |
(transport_data[10] << 8) | transport_data[11])
payload_size = len(transport_data) - 12
# Process RTP packet
timestamp_dt = datetime.fromtimestamp(timestamp)
self._process_rtp_packet(timestamp_dt, src_ip, src_port, dst_ip, dst_port,
ssrc, seq, timestamp_rtp, payload_type, payload_size)
except Exception as e:
if self.verbose:
logger.debug(f"Error parsing potential RTP packet: {e}")
continue
except Exception as e:
if self.verbose:
logger.debug(f"Error processing packet with dpkt: {e}")
continue
logger.info(f"Analysis complete. Found {len(self.calls)} SIP calls and {len(self.rtp_streams)} RTP streams.")
except Exception as e:
logger.error(f"Error analyzing PCAP file with dpkt: {e}")
def _process_sip_message(self, timestamp: datetime, source_ip: str, dest_ip: str, sip_message: str) -> None:
"""Process a SIP message and add it to the appropriate call"""
if not sip_message:
return
# Split message into lines
lines = sip_message.splitlines()
if not lines:
return
# Parse first line to determine if it's a request or response
first_line = lines[0].strip()
if first_line.startswith('SIP/2.0 '):
# This is a response
method = first_line
else:
# This is a request
parts = first_line.split(' ')
if len(parts) >= 1:
method = parts[0]
else:
return
# Parse headers
headers = {}
content = ""
content_started = False
for line in lines[1:]:
if content_started:
content += line + "\n"
continue
if not line.strip():
content_started = True
continue
if ':' in line:
header_name, header_value = line.split(':', 1)
headers[header_name.strip()] = header_value.strip()
# Get the Call-ID
call_id = headers.get("Call-ID", headers.get("i", None))
if not call_id:
return
# Add message to the appropriate call
if call_id not in self.calls:
self.calls[call_id] = SipCall(call_id)
self.calls[call_id].add_message(
timestamp,
method,
source_ip,
dest_ip,
headers,
content
)
self.messages_count += 1
# Extract potential RTP ports from SDP
if content and "m=audio" in content:
media_matches = re.findall(r'm=audio\s+(\d+)', content)
for port_str in media_matches:
try:
port = int(port_str)
self.potential_rtp_ports.add(port)
if self.verbose:
logger.debug(f"Found potential RTP port {port} in SDP")
except ValueError:
continue
def _process_rtp_packet(self, timestamp: datetime, src_ip: str, src_port: int,
dst_ip: str, dst_port: int, ssrc: int, seq: int,
rtp_timestamp: int, payload_type: int, payload_size: int) -> None:
"""Process an RTP packet and add it to the appropriate stream"""
# Create a unique stream identifier
stream_key = ssrc
# Add to existing stream or create new one
if stream_key not in self.rtp_streams:
self.rtp_streams[stream_key] = RtpStream(ssrc, src_ip, src_port, dst_ip, dst_port)
# Set codec based on payload type if available in SDP
for call in self.calls.values():
if 'codec' in call.media_info and 'id' in call.media_info['codec']:
try:
codec_id = int(call.media_info['codec']['id'])
if codec_id == payload_type:
self.rtp_streams[stream_key].codec = call.media_info['codec']['name']
break
except (ValueError, TypeError):
pass
# Add packet to stream
self.rtp_streams[stream_key].add_packet(
timestamp,
seq,
rtp_timestamp,
payload_type,
payload_size
)
def _associate_rtp_streams_with_calls(self) -> None:
"""Associate RTP streams with SIP calls based on IP addresses and ports"""
# Create a mapping of media endpoints to call IDs
media_endpoints = {}
# First pass: extract media endpoints from SDP in SIP messages
for call_id, call in self.calls.items():
if 'connection_address' in call.media_info and 'port' in call.media_info:
endpoint = (call.media_info['connection_address'], call.media_info['port'])
media_endpoints[endpoint] = call_id
# Second pass: try to match RTP streams to calls
for ssrc, stream in self.rtp_streams.items():
matched = False
# Check source endpoint
src_endpoint = (stream.src_ip, stream.src_port)
if src_endpoint in media_endpoints:
call_id = media_endpoints[src_endpoint]
self.calls[call_id].add_rtp_stream(stream)
matched = True
continue
# Check destination endpoint
dst_endpoint = (stream.dst_ip, stream.dst_port)
if dst_endpoint in media_endpoints:
call_id = media_endpoints[dst_endpoint]
self.calls[call_id].add_rtp_stream(stream)
matched = True
continue
# If no exact match, try matching just by IP address
if not matched:
for call_id, call in self.calls.items():
for msg in call.messages:
if (stream.src_ip == msg['source_ip'] and stream.dst_ip == msg['dest_ip']) or \
(stream.src_ip == msg['dest_ip'] and stream.dst_ip == msg['source_ip']):
call.add_rtp_stream(stream)
matched = True
break
if matched:
break
def _generate_visualizations(self) -> None:
"""Generate visualizations for SIP calls and RTP streams"""
if not HAS_PLANTUML and not HAS_MATPLOTLIB:
logger.warning("Visualization libraries not available. Install plantuml and matplotlib for visualizations.")
return
# Create visualizer
visualizer = SipRtpVisualizer(output_dir=self.output_dir)
# Generate HTML report
report_file = visualizer.generate_html_report(self.calls)
if report_file:
logger.info(f"HTML report generated: {report_file}")
def print_results(self) -> None:
"""Print analysis results to console or file"""
output = []
output.append(f"SIP and RTP Analysis Results for {self.pcap_file}")
output.append("=" * 80)
output.append(f"Total SIP Messages: {self.messages_count}")
output.append(f"Total SIP Calls: {len(self.calls)}")
output.append(f"Total RTP Streams: {len(self.rtp_streams)}")
output.append("")
# Print call details
output.append("Call Details:")
output.append("-" * 80)
for call_id, call in self.calls.items():
output.append(str(call))
output.append("-" * 80)
# Print RTP streams that weren't associated with any call
unassociated_streams = []
for ssrc, stream in self.rtp_streams.items():
is_associated = False
for call in self.calls.values():
if ssrc in call.rtp_streams:
is_associated = True
break
if not is_associated:
unassociated_streams.append(stream)
if unassociated_streams:
output.append("Unassociated RTP Streams:")
output.append("-" * 80)
for stream in unassociated_streams:
output.append(str(stream))
output.append("-" * 80)
# Join all output lines
result = "\n".join(output)
# Print to console or file
if self.output_file:
try:
with open(self.output_file, 'w') as f:
f.write(result)
logger.info(f"Results written to {self.output_file}")
except Exception as e:
logger.error(f"Error writing to output file: {e}")
print(result)
else:
print(result)
def get_call_quality_metrics(self) -> Dict[str, Dict[str, Any]]:
"""Get quality metrics for all calls in a structured format"""
metrics = {}
for call_id, call in self.calls.items():
if call.rtp_streams:
quality = call.get_call_quality_summary()
metrics[call_id] = {
"from": call.from_uri,
"to": call.to_uri,
"duration": call.get_duration(),
"mos": quality["mos"],
"jitter": quality["jitter"],
"packet_loss": quality["packet_loss"],
"rating": quality["rating"],
"stream_count": quality["stream_count"]
}
else:
metrics[call_id] = {
"from": call.from_uri,
"to": call.to_uri,
"duration": call.get_duration(),
"status": "No RTP streams found"
}
return metrics
def main():
"""Main function"""
parser = argparse.ArgumentParser(description='Analyze PCAP files for SIP messages and RTP streams in VoIP calls')
parser.add_argument('pcap_file', help='Path to the PCAP file to analyze')
parser.add_argument('-o', '--output', help='Output file for analysis results')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('-p', '--ports', type=int, nargs='+', help='Additional SIP ports to check')
parser.add_argument('-r', '--rtp-range', type=int, nargs=2, metavar=('MIN', 'MAX'),
help='Custom RTP port range (default: 10000-65535)')
parser.add_argument('--visualize', action='store_true', help='Generate visualizations')
parser.add_argument('--output-dir', help='Output directory for visualizations')
parser.add_argument('--report', action='store_true', help='Generate HTML report')
args = parser.parse_args()
rtp_range = (args.rtp_range[0], args.rtp_range[1]) if args.rtp_range else None
# Check if visualization libraries are available
if args.visualize or args.report:
if not HAS_PLANTUML:
logger.warning("PlantUML library not found. Install with 'pip install plantuml' for sequence diagrams.")
if not HAS_MATPLOTLIB:
logger.warning("Matplotlib library not found. Install with 'pip install matplotlib' for RTP visualizations.")
# Create analyzer
analyzer = SipRtpPcapAnalyzer(
args.pcap_file,
args.output,
args.verbose,
args.ports,
rtp_range,
args.visualize or args.report,
args.output_dir
)
# Analyze PCAP file
analyzer.analyze()
# Print results
analyzer.print_results()
# Generate report if requested
if args.report and not analyzer.visualize:
visualizer = SipRtpVisualizer(output_dir=args.output_dir)
report_file = visualizer.generate_html_report(analyzer.calls)
if report_file:
logger.info(f"HTML report generated: {report_file}")
if __name__ == "__main__":
main()
|