# analysis.py from datetime import datetime def analyze_calls(calls_by_id): """ Inspect each Call object in 'calls_by_id' and identify issues such as: - Call never answered - Call never ended - No RTP packets - Very short RTP stream Return a textual summary of the calls and any identified issues. """ if not calls_by_id: return "No calls found in the capture." summary_lines = [] for call_id, call_obj in calls_by_id.items(): # Basic call info call_info = f"Call-ID: {call_id}" call_info += f"\n From-Tag: {call_obj.from_tag}" call_info += f"\n To-Tag: {call_obj.to_tag}" # Times invite_time = call_obj.invite_time answer_time = call_obj.answer_time end_time = call_obj.end_time call_info += f"\n Invite time: {invite_time}" call_info += f"\n Answer time: {answer_time}" call_info += f"\n End time: {end_time}" # Gather all issues for this call issues = spot_issues(call_obj) # Media summary media_info = [] for ssrc, media_stream in call_obj.media_streams.items(): media_info.append( f" SSRC: {ssrc}, Payload: {media_stream.payload_type}, Packets: {media_stream.packets}" ) if media_info: call_info += "\n Media Streams:\n" + "\n".join(media_info) else: call_info += "\n Media Streams: None" # Combine call info and issues if issues: call_info += "\n Issues:" for i in issues: call_info += f"\n - {i}" else: call_info += "\n No major issues identified." summary_lines.append(call_info + "\n") return "\n".join(summary_lines) def spot_issues(call_obj): """ Given a Call object, return a list of textual issues found. This is a naive example – adapt it as needed. """ issues = [] # 1) Was there an INVITE but no 200 OK => never answered if call_obj.invite_time and not call_obj.answer_time: issues.append("Call was never answered (no 200 OK).") # 2) Was there an answer but no BYE => never properly ended # (We assume end_time is set when a BYE occurs or 200 to BYE is seen.) if call_obj.answer_time and not call_obj.end_time: issues.append("Call was never ended (no BYE).") # 3) Check if RTP packets exist total_rtp_packets = sum(stream.packets for stream in call_obj.media_streams.values()) if total_rtp_packets == 0: issues.append("No RTP packets observed.") elif total_rtp_packets < 50: # Arbitrary threshold just to highlight short calls or potential media problems issues.append(f"Very few RTP packets ({total_rtp_packets}). May indicate a short or broken stream.") # 4) Optional: check duration vs. packet count # If call was answered but we see extremely few packets, there's likely an issue if call_obj.answer_time and call_obj.end_time: call_duration = (call_obj.end_time - call_obj.answer_time).total_seconds() if call_duration > 0: rtp_rate = total_rtp_packets / call_duration # E.g., if we consider typical G.711 at 50 packets/sec in each direction, # and we see less than 10 pkts/sec => potential audio problem if rtp_rate < 10: issues.append( f"Low RTP packet rate ({rtp_rate:.1f} pkts/sec). Possible audio issue." ) # Additional checks (jitter, packet loss, incomplete SDP, etc.) can be added here. return issues