fabioantonini commited on
Commit
936432e
·
verified ·
1 Parent(s): 9973426

Upload 5 files

Browse files

first implementation

Files changed (5) hide show
  1. analysis.py +101 -0
  2. app.py +120 -0
  3. call_flow.py +83 -0
  4. parsers.py +158 -0
  5. requirements.txt +12 -0
analysis.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # analysis.py
2
+
3
+ from datetime import datetime
4
+
5
+ def analyze_calls(calls_by_id):
6
+ """
7
+ Inspect each Call object in 'calls_by_id' and identify issues such as:
8
+ - Call never answered
9
+ - Call never ended
10
+ - No RTP packets
11
+ - Very short RTP stream
12
+ Return a textual summary of the calls and any identified issues.
13
+ """
14
+
15
+ if not calls_by_id:
16
+ return "No calls found in the capture."
17
+
18
+ summary_lines = []
19
+ for call_id, call_obj in calls_by_id.items():
20
+ # Basic call info
21
+ call_info = f"Call-ID: {call_id}"
22
+ call_info += f"\n From-Tag: {call_obj.from_tag}"
23
+ call_info += f"\n To-Tag: {call_obj.to_tag}"
24
+
25
+ # Times
26
+ invite_time = call_obj.invite_time
27
+ answer_time = call_obj.answer_time
28
+ end_time = call_obj.end_time
29
+
30
+ call_info += f"\n Invite time: {invite_time}"
31
+ call_info += f"\n Answer time: {answer_time}"
32
+ call_info += f"\n End time: {end_time}"
33
+
34
+ # Gather all issues for this call
35
+ issues = spot_issues(call_obj)
36
+
37
+ # Media summary
38
+ media_info = []
39
+ for ssrc, media_stream in call_obj.media_streams.items():
40
+ media_info.append(
41
+ f" SSRC: {ssrc}, Payload: {media_stream.payload_type}, Packets: {media_stream.packets}"
42
+ )
43
+
44
+ if media_info:
45
+ call_info += "\n Media Streams:\n" + "\n".join(media_info)
46
+ else:
47
+ call_info += "\n Media Streams: None"
48
+
49
+ # Combine call info and issues
50
+ if issues:
51
+ call_info += "\n Issues:"
52
+ for i in issues:
53
+ call_info += f"\n - {i}"
54
+ else:
55
+ call_info += "\n No major issues identified."
56
+
57
+ summary_lines.append(call_info + "\n")
58
+
59
+ return "\n".join(summary_lines)
60
+
61
+ def spot_issues(call_obj):
62
+ """
63
+ Given a Call object, return a list of textual issues found.
64
+ This is a naive example – adapt it as needed.
65
+ """
66
+
67
+ issues = []
68
+
69
+ # 1) Was there an INVITE but no 200 OK => never answered
70
+ if call_obj.invite_time and not call_obj.answer_time:
71
+ issues.append("Call was never answered (no 200 OK).")
72
+
73
+ # 2) Was there an answer but no BYE => never properly ended
74
+ # (We assume end_time is set when a BYE occurs or 200 to BYE is seen.)
75
+ if call_obj.answer_time and not call_obj.end_time:
76
+ issues.append("Call was never ended (no BYE).")
77
+
78
+ # 3) Check if RTP packets exist
79
+ total_rtp_packets = sum(stream.packets for stream in call_obj.media_streams.values())
80
+ if total_rtp_packets == 0:
81
+ issues.append("No RTP packets observed.")
82
+ elif total_rtp_packets < 50:
83
+ # Arbitrary threshold just to highlight short calls or potential media problems
84
+ issues.append(f"Very few RTP packets ({total_rtp_packets}). May indicate a short or broken stream.")
85
+
86
+ # 4) Optional: check duration vs. packet count
87
+ # If call was answered but we see extremely few packets, there's likely an issue
88
+ if call_obj.answer_time and call_obj.end_time:
89
+ call_duration = (call_obj.end_time - call_obj.answer_time).total_seconds()
90
+ if call_duration > 0:
91
+ rtp_rate = total_rtp_packets / call_duration
92
+ # E.g., if we consider typical G.711 at 50 packets/sec in each direction,
93
+ # and we see less than 10 pkts/sec => potential audio problem
94
+ if rtp_rate < 10:
95
+ issues.append(
96
+ f"Low RTP packet rate ({rtp_rate:.1f} pkts/sec). Possible audio issue."
97
+ )
98
+
99
+ # Additional checks (jitter, packet loss, incomplete SDP, etc.) can be added here.
100
+
101
+ return issues
app.py ADDED
@@ -0,0 +1,120 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app.py
2
+
3
+ import os
4
+ import gradio as gr
5
+
6
+ # Import your modules
7
+ from parsers import parse_pcap
8
+ from analysis import analyze_calls
9
+ from call_flow import create_call_flow_diagram
10
+ from llm_utils import create_local_pipeline, get_llm_opinion
11
+
12
+ # 1) Initialize your local Hugging Face pipeline (model)
13
+ # Adjust model_id to your chosen model on HF.
14
+ MODEL_ID = "tiiuae/falcon-7b-instruct"
15
+ generator = create_local_pipeline(MODEL_ID)
16
+
17
+ def process_file(pcap_file):
18
+ """
19
+ This function is called when user clicks the 'Analyze File' button.
20
+ - pcap_file: The uploaded PCAP file object from Gradio.
21
+ Returns:
22
+ - a textual summary (analysis + call flow)
23
+ - the dictionary of calls_by_id (saved in Gradio state so we can pass them to the LLM)
24
+ """
25
+ if not pcap_file:
26
+ return "No file uploaded.", {}
27
+
28
+ # Save the uploaded file to a temporary path
29
+ temp_filename = pcap_file.name
30
+ with open(temp_filename, "wb") as f:
31
+ f.write(pcap_file.read())
32
+
33
+ # 1) Parse the PCAP
34
+ calls_by_id = parse_pcap(temp_filename)
35
+
36
+ # 2) Analyze the calls
37
+ analysis_result = analyze_calls(calls_by_id)
38
+
39
+ # 3) Create a call flow diagram (textual)
40
+ call_flow_text = create_call_flow_diagram(calls_by_id)
41
+
42
+ # Combine them into one display string
43
+ result_text = (
44
+ f"=== VoIP Analysis ===\n"
45
+ f"{analysis_result}\n\n"
46
+ f"=== Call Flow Diagram(s) ===\n"
47
+ f"{call_flow_text}"
48
+ )
49
+
50
+ # Delete the temp file if desired
51
+ os.remove(temp_filename)
52
+
53
+ return result_text, calls_by_id
54
+
55
+ def ask_llm_opinion(calls_data, question):
56
+ """
57
+ This function passes the call analysis info + user question to the LLM (local pipeline).
58
+ - calls_data: The dictionary of calls returned from parse_pcap() [Gradio state]
59
+ - question: The user’s question in text form
60
+ """
61
+ if not calls_data:
62
+ return "No call data available. Please upload and analyze a PCAP first."
63
+
64
+ if not question.strip():
65
+ return "Please enter a question."
66
+
67
+ # You might want to create a summary of the calls or pass the entire calls_data.
68
+ # For brevity, let's just embed a brief mention that calls_data exist:
69
+ calls_context = "Below is a representation of the calls found in the PCAP:\n"
70
+ for call_id, call_obj in calls_data.items():
71
+ calls_context += f"- Call-ID: {call_id}, from_tag: {call_obj.from_tag}, to_tag: {call_obj.to_tag}\n"
72
+
73
+ prompt = (
74
+ f"{calls_context}\n"
75
+ f"User's question: {question}\n"
76
+ f"Please provide your expert VoIP analysis or advice."
77
+ )
78
+
79
+ # Query the local pipeline
80
+ llm_response = get_llm_opinion(prompt, generator=generator)
81
+ return llm_response
82
+
83
+ def main():
84
+ """
85
+ Build the Gradio interface with two tabs:
86
+ 1) PCAP Analysis
87
+ 2) LLM Consultation
88
+ """
89
+ with gr.Blocks() as demo:
90
+ gr.Markdown("# VoIP Analyzer\nUpload a PCAP/PCAPNG file for SIP/RTP analysis. Then consult an LLM for further insights.")
91
+
92
+ # We keep the calls data in a Gradio State so we can pass it between tabs
93
+ calls_state = gr.State({})
94
+
95
+ with gr.Tab("PCAP Analysis"):
96
+ file_input = gr.File(label="Upload a PCAP or PCAPNG file")
97
+ analyze_button = gr.Button("Analyze File")
98
+ analysis_output = gr.Textbox(label="Analysis & Call Flow", lines=20)
99
+
100
+ analyze_button.click(
101
+ fn=process_file,
102
+ inputs=file_input,
103
+ outputs=[analysis_output, calls_state]
104
+ )
105
+
106
+ with gr.Tab("LLM Consultation"):
107
+ question_input = gr.Textbox(label="Ask a question about the call(s)")
108
+ ask_button = gr.Button("Ask LLM")
109
+ llm_output = gr.Textbox(label="LLM Response", lines=10)
110
+
111
+ ask_button.click(
112
+ fn=ask_llm_opinion,
113
+ inputs=[calls_state, question_input],
114
+ outputs=[llm_output]
115
+ )
116
+
117
+ demo.launch(server_name="0.0.0.0", server_port=7860)
118
+
119
+ if __name__ == "__main__":
120
+ main()
call_flow.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # call_flow.py
2
+
3
+ def create_call_flow_diagram(calls_by_id):
4
+ """
5
+ Given a dictionary of Call objects (keyed by call_id), generate
6
+ a textual call flow for each call. We assume each Call has a
7
+ 'sip_sequence' attribute: a list of dicts with:
8
+ {
9
+ 'timestamp': datetime object,
10
+ 'src_ip': str,
11
+ 'dst_ip': str,
12
+ 'message': str # e.g. "INVITE (Call-ID: abc)"
13
+ }
14
+ The output will look like a classic ASCII SIP ladder diagram.
15
+
16
+ Returns a string that concatenates the diagrams for all calls.
17
+ """
18
+
19
+ if not calls_by_id:
20
+ return "No calls found to display."
21
+
22
+ # We'll combine diagrams for each call into one text output
23
+ output = []
24
+
25
+ for call_id, call_obj in calls_by_id.items():
26
+ # Retrieve the SIP sequence in chronological order (by timestamp)
27
+ # You must ensure your parser sets call_obj.sip_sequence sorted by time
28
+ sip_sequence = getattr(call_obj, 'sip_sequence', [])
29
+ sip_sequence = sorted(sip_sequence, key=lambda x: x['timestamp'])
30
+
31
+ # Get unique participants for a horizontal layout
32
+ # We'll just gather distinct IP addresses from the sequence
33
+ # and put them left (caller) -> right (callee).
34
+ # If you know which is the caller vs. callee, you can fix that order.
35
+ participants = sorted(
36
+ list({msg['src_ip'] for msg in sip_sequence} | {msg['dst_ip'] for msg in sip_sequence})
37
+ )
38
+ if len(participants) < 2:
39
+ # If we somehow only have one or zero participants, skip
40
+ output.append(f"Call-ID {call_id} only has one participant.\n")
41
+ continue
42
+
43
+ # We’ll just place the first IP as “Left” and last IP as “Right” for demonstration
44
+ left_participant = participants[0]
45
+ right_participant = participants[-1]
46
+
47
+ # Header for this call
48
+ diagram_lines = []
49
+ diagram_lines.append(f"Call Flow for Call-ID: {call_id}")
50
+ diagram_lines.append(f" {left_participant:<30} {right_participant}")
51
+ diagram_lines.append(" -----------------------------------------------------------")
52
+
53
+ # Each SIP message in chronological order
54
+ for msg in sip_sequence:
55
+ src = msg['src_ip']
56
+ dst = msg['dst_ip']
57
+ message = msg['message']
58
+
59
+ if src == left_participant and dst == right_participant:
60
+ # Left -> Right
61
+ diagram_lines.append(f" {message:<30} ----------------->")
62
+ elif src == right_participant and dst == left_participant:
63
+ # Right -> Left
64
+ # We want the message on the right side, but in ASCII, we can do:
65
+ # Some spaces, then the message, then <----
66
+ # For demonstration, we do something simpler:
67
+ diagram_lines.append(f" {message}")
68
+ diagram_lines.append(" <-------------------------------")
69
+ else:
70
+ # If the message is between some other IP pair, we can either
71
+ # skip or try to align it in the middle. For now, just note it.
72
+ # This might happen if there are multiple proxies or servers.
73
+ diagram_lines.append(f" [{src} -> {dst}] {message}")
74
+
75
+ # Example: If we want to artificially place an "RTP ...." line
76
+ # you could do a simple check if the call has media streams:
77
+ if call_obj.media_streams:
78
+ diagram_lines.append(" RTP ...................... RTP ....................")
79
+
80
+ # Add a blank line after the call’s diagram
81
+ output.append("\n".join(diagram_lines) + "\n")
82
+
83
+ return "\n".join(output)
parsers.py ADDED
@@ -0,0 +1,158 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # parsers.py
2
+
3
+ import pyshark
4
+ from datetime import datetime
5
+
6
+ class MediaStream:
7
+ def __init__(self, ssrc=None, payload_type=None, packets=0):
8
+ self.ssrc = ssrc
9
+ self.payload_type = payload_type
10
+ self.packets = packets
11
+
12
+ def __repr__(self):
13
+ return (f"<MediaStream ssrc={self.ssrc}, "
14
+ f"payload_type={self.payload_type}, "
15
+ f"packets={self.packets}>")
16
+
17
+ class Call:
18
+ def __init__(self, call_id):
19
+ self.call_id = call_id
20
+ self.from_tag = None
21
+ self.to_tag = None
22
+ self.invite_time = None
23
+ self.answer_time = None
24
+ self.end_time = None
25
+ self.sip_messages = [] # store raw SIP info if needed
26
+ self.media_streams = {} # key: SSRC, value: MediaStream
27
+
28
+ # NEW: a chronological record of SIP messages:
29
+ # each entry: { 'timestamp', 'src_ip', 'dst_ip', 'message' }
30
+ self.sip_sequence = []
31
+
32
+ def __repr__(self):
33
+ return (f"<Call call_id={self.call_id}, from_tag={self.from_tag}, "
34
+ f"to_tag={self.to_tag}, invite_time={self.invite_time}, "
35
+ f"answer_time={self.answer_time}, end_time={self.end_time}, "
36
+ f"media_streams={list(self.media_streams.values())}>")
37
+
38
+ def parse_pcap(pcap_path):
39
+ """
40
+ Parse a pcap/pcapng file using PyShark.
41
+ Return a dictionary of calls keyed by Call-ID.
42
+ Each call holds relevant SIP and RTP info, including a sip_sequence
43
+ for generating call flows.
44
+ """
45
+
46
+ capture = pyshark.FileCapture(pcap_path, keep_packets=False)
47
+ calls_by_id = {}
48
+
49
+ for packet in capture:
50
+ # Convert sniff timestamp to a datetime object
51
+ pkt_ts = datetime.utcfromtimestamp(float(packet.sniff_timestamp))
52
+
53
+ # Attempt to extract src/dst IP (assuming IPv4)
54
+ src_ip = getattr(packet.ip, 'src', None) if hasattr(packet, 'ip') else None
55
+ dst_ip = getattr(packet.ip, 'dst', None) if hasattr(packet, 'ip') else None
56
+
57
+ # --- Check for SIP packets ---
58
+ if 'sip' in packet:
59
+ sip_layer = packet.sip
60
+
61
+ try:
62
+ call_id = sip_layer.call_id.strip()
63
+ except AttributeError:
64
+ # If we can't find a Call-ID, skip
65
+ continue
66
+
67
+ if call_id not in calls_by_id:
68
+ calls_by_id[call_id] = Call(call_id)
69
+
70
+ call_obj = calls_by_id[call_id]
71
+
72
+ # Capture raw SIP message if desired
73
+ # e.g. "INVITE sip:..." or "SIP/2.0 200 OK"
74
+ raw_msg = (sip_layer.get_field_value('Request-Line') or
75
+ sip_layer.get_field_value('Status-Line') or
76
+ "UNKNOWN SIP MESSAGE")
77
+ call_obj.sip_messages.append(raw_msg)
78
+
79
+ # Try to parse from-tag and to-tag
80
+ try:
81
+ call_obj.from_tag = sip_layer.from_tag
82
+ except AttributeError:
83
+ pass
84
+ try:
85
+ call_obj.to_tag = sip_layer.to_tag
86
+ except AttributeError:
87
+ pass
88
+
89
+ # Determine whether it's a request or a response
90
+ # and build a short summary like "INVITE (Call-ID: abc)" or "200 (Call-ID: abc)"
91
+ message_summary = None
92
+
93
+ # Check request method
94
+ try:
95
+ method = sip_layer.Request_Line_Method.lower()
96
+ if method:
97
+ # e.g. "INVITE" (uppercase) plus the Call-ID
98
+ message_summary = f"{method.upper()} (Call-ID: {call_id})"
99
+ except AttributeError:
100
+ # No method => might be a response
101
+ pass
102
+
103
+ # If it's a response, we look at the status line
104
+ if not message_summary:
105
+ status_line = sip_layer.get_field_value('Status-Line')
106
+ if status_line:
107
+ parts = status_line.split(None, 2)
108
+ if len(parts) >= 2 and parts[1].isdigit():
109
+ # e.g. "SIP/2.0 200 OK"
110
+ code = parts[1]
111
+ message_summary = f"{code} (Call-ID: {call_id})"
112
+ else:
113
+ message_summary = f"UNKNOWN RESPONSE (Call-ID: {call_id})"
114
+ else:
115
+ message_summary = f"UNKNOWN SIP MESSAGE (Call-ID: {call_id})"
116
+
117
+ # Store the short summary in our new sip_sequence
118
+ call_obj.sip_sequence.append({
119
+ 'timestamp': pkt_ts,
120
+ 'src_ip': src_ip,
121
+ 'dst_ip': dst_ip,
122
+ 'message': message_summary
123
+ })
124
+
125
+ # Record key timestamps
126
+ # If we detect an INVITE
127
+ if message_summary.startswith("INVITE"):
128
+ call_obj.invite_time = call_obj.invite_time or pkt_ts
129
+
130
+ # If we detect a 200, treat as call answered (naive approach)
131
+ if message_summary.startswith("200"):
132
+ if call_obj.answer_time is None:
133
+ call_obj.answer_time = pkt_ts
134
+
135
+ # If we detect a BYE or a 200 to a BYE => call ended
136
+ if "BYE" in message_summary:
137
+ call_obj.end_time = pkt_ts
138
+
139
+ # If there is an SDP part, you can parse media lines here (not shown)
140
+ # ...
141
+
142
+ # --- Check for RTP packets ---
143
+ elif 'rtp' in packet:
144
+ rtp_layer = packet.rtp
145
+ ssrc = getattr(rtp_layer, 'ssrc', None)
146
+ payload_type = getattr(rtp_layer, 'payload_type', None)
147
+
148
+ # This is a simplistic approach, not tying SSRC directly to the call
149
+ # with matching IP/port from SDP. Instead, we store the RTP in all calls.
150
+ # In practice, you'd match the IP/port from SDP to associate the flow
151
+ # with the correct Call.
152
+ for c_obj in calls_by_id.values():
153
+ if ssrc not in c_obj.media_streams:
154
+ c_obj.media_streams[ssrc] = MediaStream(ssrc=ssrc, payload_type=payload_type)
155
+ c_obj.media_streams[ssrc].packets += 1
156
+
157
+ capture.close()
158
+ return calls_by_id
requirements.txt ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # For SIP/RTP parsing:
2
+ scapy>=2.4.5
3
+
4
+ # For building the Gradio interface:
5
+ gradio>=3.23.0
6
+
7
+ # For making API calls to OpenAI or Hugging Face:
8
+ requests>=2.28.1
9
+
10
+ # (Optional) if you want to use Hugging Face transformers/models locally:
11
+ transformers>=4.26.0
12
+ huggingface_hub>=0.10.1