CVE_Analysis_Agent / web_ui.py
ChienChung's picture
Upload 11 files
be9311f verified
import gradio as gr
import os
import time
from main import analyze_cve
from cve_validator import validate_cve_id
from search import search_cves
from storage import load_cve_analysis
from vector_storage import vector_storage
# Well-known CVE examples for quick testing
EXAMPLE_CVES = [
("CVE-2021-44228", "Log4Shell - Apache Log4j RCE"),
("CVE-2020-1472", "Zerologon - Windows Netlogon Vulnerability"),
("CVE-2022-22965", "Spring4Shell - Spring Framework RCE"),
("CVE-2021-34527", "PrintNightmare - Windows Print Spooler"),
("CVE-2020-0601", "CurveBall - Windows CryptoAPI Spoofing"),
]
def analyze_cve_handler(cve_input, progress=gr.Progress()):
"""Main CVE analysis function with progress tracking"""
if not cve_input.strip():
return "Please enter a CVE ID", ""
# Input processing
progress(0.1, desc="Processing CVE ID format...")
cve_id = cve_input.strip().upper()
if not cve_id.startswith('CVE-'):
# Auto-add CVE- prefix for formats like "2021-44228"
if '-' in cve_id and len(cve_id.split('-')) == 2:
cve_id = f"CVE-{cve_id}"
# Format validation
progress(0.2, desc="Validating CVE format...")
if not validate_cve_id(cve_id):
return f"Invalid CVE format: {cve_id}", ""
try:
# Check cache
progress(0.3, desc="Checking local cache...")
cached = load_cve_analysis(cve_id)
if cached:
# Load from cache (faster)
progress(0.8, desc="Loading from cache...")
cached_date = cached.get('analyzed_at', 'Unknown date')[:10]
source_info = f"**Source:** Local cache ({cached_date})"
result = cached
else:
# First time analysis (slower)
progress(0.4, desc="Fetching from NIST API...")
progress(0.6, desc="Generating AI security analysis...")
source_info = f"**Source:** NIST API (fresh data)"
result = analyze_cve(cve_id, use_cache=False)
progress(0.8, desc="Saving analysis results...")
# Format results
progress(0.9, desc="Formatting results...")
if result:
cve = result['cve_data']['cve']
# Format basic information
output = f"## {cve['id']}\n\n"
output += f"**Published:** {cve['published'][:10]}\n"
output += f"**Status:** {cve.get('vulnStatus', 'Unknown')}\n"
# CVSS information
if 'metrics' in cve and 'cvssMetricV31' in cve['metrics']:
cvss = cve['metrics']['cvssMetricV31'][0]['cvssData']
output += f"**CVSS Score:** {cvss['baseScore']}/10.0\n"
output += f"**Severity:** {cvss['baseSeverity']}\n"
# Description - show full description
description = cve['descriptions'][0]['value']
output += f"\n**Description:**\n{description}"
# Add source info at the bottom
output += f"\n\n{source_info}"
ai_summary = result.get('ai_summary', 'AI summary not available')
# Add similar CVE recommendations
similar_cves = vector_storage.find_similar_cves(cve_id, top_k=3)
if similar_cves:
ai_summary += f"\n\n---\n\n**Related Vulnerabilities:**\n\n"
for similar in similar_cves:
ai_summary += f"• **{similar['cve_id']}** (similarity: {similar['similarity']}) - {similar['description']}\n\n"
# Final step
progress(1.0, desc="Analysis complete!")
return output, ai_summary
else:
return "Failed to fetch CVE data", ""
except Exception as e:
return f"Analysis error: {str(e)}", ""
def quick_example_handler(example_choice, progress=gr.Progress()):
"""Handle quick example selection with progress"""
if example_choice:
progress(0.1, desc="Processing example selection...")
cve_id = example_choice.split(" - ")[0]
return analyze_cve_handler(cve_id, progress)
return "Please select an example", ""
def semantic_search_handler(query, progress=gr.Progress()):
"""AI-powered semantic search"""
if not query.strip():
return "Please enter a search query"
try:
progress(0.3, desc="Performing AI-powered search...")
results = vector_storage.search_by_text(query, top_k=5)
progress(0.8, desc="Formatting AI search results...")
if results:
output = f"## AI Search Results for '{query}'\n\n"
for i, r in enumerate(results, 1):
output += f"**{i}. {r['cve_id']}** (Relevance: {r['similarity']})\n"
output += f" Severity: {r['severity']}\n"
output += f" {r['description']}\n\n"
progress(1.0, desc="AI search complete!")
return output
else:
progress(1.0, desc="AI search complete!")
return f"No relevant CVEs found for '{query}'"
except Exception as e:
return f"AI search failed: {str(e)}"
def search_handler(keyword, progress=gr.Progress()):
"""Search previously analysed CVEs with progress"""
if not keyword.strip():
return "Please enter search keywords"
try:
progress(0.3, desc="Searching local database...")
results = search_cves(keyword)
progress(0.8, desc="Formatting search results...")
if results:
output = f"## Found {len(results)} results for '{keyword}'\n\n"
for i, r in enumerate(results[:8], 1):
output += f"**{i}. {r['cve_id']}**\n"
output += f" {r['description']}\n\n"
if len(results) > 8:
output += f"... and {len(results) - 8} more results"
progress(1.0, desc="Search complete!")
return output
else:
progress(1.0, desc="Search complete!")
return f"No results found for '{keyword}'"
except Exception as e:
return f"Search failed: {str(e)}"
# Main interface
with gr.Blocks(title="CVE Analysis Agent", theme=gr.themes.Default(primary_hue="violet")) as interface:
with gr.Column():
# Beautiful header
with gr.Row():
gr.Markdown("""
# CVE Analysis Agent
""")
# Main input card
with gr.Column():
gr.Markdown("### Enter CVE ID for Analysis")
with gr.Row():
cve_input = gr.Textbox(
label="CVE Identifier",
placeholder="CVE-2021-44228 or 2021-44228",
info="Smart format detection - accepts both full and short formats",
scale=4,
container=True
)
analyse_btn = gr.Button(
"Analyse",
variant="primary",
scale=1
)
# Quick examples section
with gr.Row():
gr.Markdown("**Quick Examples:**")
example_dropdown = gr.Dropdown(
choices=[f"{cve} - {desc}" for cve, desc in EXAMPLE_CVES],
label="Select a famous vulnerability",
value=None,
scale=3
)
example_btn = gr.Button("Try Example", variant="secondary", scale=1)
# Results container
with gr.Column():
gr.Markdown("### Analysis Results")
# CVE Information on top
basic_output = gr.Markdown(
label="CVE Information",
value="*CVE information will appear here after analysis*"
)
# AI Analysis below
ai_output = gr.Markdown(
label="AI Security Analysis",
value="*AI-generated security analysis will appear here*"
)
# Search section
with gr.Accordion("Search Previously Analysed CVEs", open=False):
with gr.Column():
gr.Markdown("**Search through your analysed vulnerabilities**")
with gr.Tab("Keyword Search"):
with gr.Row():
search_input = gr.Textbox(
label="Search Keywords",
placeholder="log4j, apache, spring, windows, print",
scale=3
)
search_btn = gr.Button("Search", scale=1)
search_output = gr.Markdown(
label="Search Results",
value="*Search results will appear here*"
)
with gr.Tab("Semantic Search"):
with gr.Row():
semantic_input = gr.Textbox(
label="Natural Language Search",
placeholder="remote code execution in Java applications",
scale=3
)
semantic_btn = gr.Button("AI Search", scale=1)
semantic_output = gr.Markdown(
label="AI Search Results",
value="*AI-powered search results will appear here*"
)
# Event handlers with progress support
analyse_btn.click(
analyze_cve_handler,
inputs=cve_input,
outputs=[basic_output, ai_output],
show_progress=True
)
example_btn.click(
quick_example_handler,
inputs=example_dropdown,
outputs=[basic_output, ai_output],
show_progress=True
)
search_btn.click(
search_handler,
inputs=search_input,
outputs=search_output,
show_progress=True
)
semantic_btn.click(
semantic_search_handler,
inputs=semantic_input,
outputs=semantic_output,
show_progress=True
)
if __name__ == "__main__":
interface.launch()