VoIPAnalyzer / app.py
fabioantonini's picture
Upload app.py
7e4ef99 verified
# app.py
import os
import gradio as gr
# Import your modules
from parsers import parse_pcap
from analysis import analyze_calls
from call_flow import create_call_flow_diagram
# We'll adapt llm_utils to load flan-t5-base locally
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
def create_local_pipeline(model_id="google/flan-t5-base"):
"""
Create a local pipeline for Flan-T5, which is a seq2seq model.
This should run within ~16GB RAM for 'flan-t5-base'.
"""
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForSeq2SeqLM.from_pretrained(model_id, device_map="auto")
# Use text2text-generation for T5-based models
return pipeline("text2text-generation", model=model, tokenizer=tokenizer)
# Initialize model pipeline once
MODEL_ID = "google/flan-t5-base"
generator = create_local_pipeline(MODEL_ID)
def get_llm_opinion(prompt, generator):
"""
Generate text from the local Flan-T5 pipeline.
We use text2text-generation to handle the seq2seq nature of T5.
"""
outputs = generator(prompt, max_length=256, do_sample=True, temperature=0.5)
return outputs[0]["generated_text"]
def process_file(pcap_file):
"""
This function is called when user clicks the 'Analyze File' button.
- pcap_file: A dictionary returned by Gradio's File component.
Returns:
- a textual summary (analysis + call flow)
- the dictionary of calls_by_id (saved in Gradio state so we can pass them to the LLM)
"""
print("DEBUG pcap_file =", pcap_file, type(pcap_file)) # <--- debug line
if not pcap_file:
return "No file uploaded.", {}
# pcap_file is a string path to the temp file, e.g. "/tmp/gradio/xyz/somefile.pcap"
temp_filename = pcap_file
# 1) Parse the PCAP
calls_by_id = parse_pcap(temp_filename)
# 2) Analyze the calls
analysis_result = analyze_calls(calls_by_id)
# 3) Create a call flow diagram (textual)
call_flow_text = create_call_flow_diagram(calls_by_id)
# Combine them into one display string
result_text = (
f"=== VoIP Analysis ===\n"
f"{analysis_result}\n\n"
f"=== Call Flow Diagram(s) ===\n"
f"{call_flow_text}"
)
# Delete the temp file if desired
os.remove(temp_filename)
return result_text, calls_by_id
def ask_llm_opinion(calls_data, question):
"""
This function passes the call analysis info + user question to the LLM (local pipeline).
- calls_data: The dictionary of calls returned from parse_pcap() [Gradio state]
- question: The user’s question in text form
"""
if not calls_data:
return "No call data available. Please upload and analyze a PCAP first."
if not question.strip():
return "Please enter a question."
# Summarize the calls for context
calls_context = "Below is a brief description of the calls found in the PCAP:\n"
for call_id, call_obj in calls_data.items():
calls_context += f"- Call-ID: {call_id}, from_tag: {call_obj.from_tag}, to_tag: {call_obj.to_tag}\n"
# Build a prompt for T5
prompt = (
f"{calls_context}\n"
f"User's question: {question}\n\n"
"Please provide your expert VoIP analysis or advice."
)
# Query the local pipeline
llm_response = get_llm_opinion(prompt, generator=generator)
return llm_response
def main():
"""
Build the Gradio interface with two tabs:
1) PCAP Analysis
2) LLM Consultation
"""
with gr.Blocks() as demo:
gr.Markdown("# VoIP Analyzer\nUpload a PCAP/PCAPNG file for SIP/RTP analysis. Then consult Flan-T5 for further insights.")
# We keep the calls data in a Gradio State so we can pass it between tabs
calls_state = gr.State({})
with gr.Tab("PCAP Analysis"):
file_input = gr.File(label="Upload a PCAP or PCAPNG file")
analyze_button = gr.Button("Analyze File")
analysis_output = gr.Textbox(label="Analysis & Call Flow", lines=20)
analyze_button.click(
fn=process_file,
inputs=file_input,
outputs=[analysis_output, calls_state]
)
with gr.Tab("LLM Consultation"):
question_input = gr.Textbox(label="Ask a question about the call(s)")
ask_button = gr.Button("Ask LLM")
llm_output = gr.Textbox(label="LLM Response", lines=10)
ask_button.click(
fn=ask_llm_opinion,
inputs=[calls_state, question_input],
outputs=[llm_output]
)
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)
if __name__ == "__main__":
main()