VoIPAnalyzer / analysis.py
fabioantonini's picture
Upload 5 files
936432e verified
# analysis.py
from datetime import datetime
def analyze_calls(calls_by_id):
"""
Inspect each Call object in 'calls_by_id' and identify issues such as:
- Call never answered
- Call never ended
- No RTP packets
- Very short RTP stream
Return a textual summary of the calls and any identified issues.
"""
if not calls_by_id:
return "No calls found in the capture."
summary_lines = []
for call_id, call_obj in calls_by_id.items():
# Basic call info
call_info = f"Call-ID: {call_id}"
call_info += f"\n From-Tag: {call_obj.from_tag}"
call_info += f"\n To-Tag: {call_obj.to_tag}"
# Times
invite_time = call_obj.invite_time
answer_time = call_obj.answer_time
end_time = call_obj.end_time
call_info += f"\n Invite time: {invite_time}"
call_info += f"\n Answer time: {answer_time}"
call_info += f"\n End time: {end_time}"
# Gather all issues for this call
issues = spot_issues(call_obj)
# Media summary
media_info = []
for ssrc, media_stream in call_obj.media_streams.items():
media_info.append(
f" SSRC: {ssrc}, Payload: {media_stream.payload_type}, Packets: {media_stream.packets}"
)
if media_info:
call_info += "\n Media Streams:\n" + "\n".join(media_info)
else:
call_info += "\n Media Streams: None"
# Combine call info and issues
if issues:
call_info += "\n Issues:"
for i in issues:
call_info += f"\n - {i}"
else:
call_info += "\n No major issues identified."
summary_lines.append(call_info + "\n")
return "\n".join(summary_lines)
def spot_issues(call_obj):
"""
Given a Call object, return a list of textual issues found.
This is a naive example – adapt it as needed.
"""
issues = []
# 1) Was there an INVITE but no 200 OK => never answered
if call_obj.invite_time and not call_obj.answer_time:
issues.append("Call was never answered (no 200 OK).")
# 2) Was there an answer but no BYE => never properly ended
# (We assume end_time is set when a BYE occurs or 200 to BYE is seen.)
if call_obj.answer_time and not call_obj.end_time:
issues.append("Call was never ended (no BYE).")
# 3) Check if RTP packets exist
total_rtp_packets = sum(stream.packets for stream in call_obj.media_streams.values())
if total_rtp_packets == 0:
issues.append("No RTP packets observed.")
elif total_rtp_packets < 50:
# Arbitrary threshold just to highlight short calls or potential media problems
issues.append(f"Very few RTP packets ({total_rtp_packets}). May indicate a short or broken stream.")
# 4) Optional: check duration vs. packet count
# If call was answered but we see extremely few packets, there's likely an issue
if call_obj.answer_time and call_obj.end_time:
call_duration = (call_obj.end_time - call_obj.answer_time).total_seconds()
if call_duration > 0:
rtp_rate = total_rtp_packets / call_duration
# E.g., if we consider typical G.711 at 50 packets/sec in each direction,
# and we see less than 10 pkts/sec => potential audio problem
if rtp_rate < 10:
issues.append(
f"Low RTP packet rate ({rtp_rate:.1f} pkts/sec). Possible audio issue."
)
# Additional checks (jitter, packet loss, incomplete SDP, etc.) can be added here.
return issues