import gradio as gr import os import time from main import analyze_cve from cve_validator import validate_cve_id from search import search_cves from storage import load_cve_analysis from vector_storage import vector_storage # Well-known CVE examples for quick testing EXAMPLE_CVES = [ ("CVE-2021-44228", "Log4Shell - Apache Log4j RCE"), ("CVE-2020-1472", "Zerologon - Windows Netlogon Vulnerability"), ("CVE-2022-22965", "Spring4Shell - Spring Framework RCE"), ("CVE-2021-34527", "PrintNightmare - Windows Print Spooler"), ("CVE-2020-0601", "CurveBall - Windows CryptoAPI Spoofing"), ] def analyze_cve_handler(cve_input, progress=gr.Progress()): """Main CVE analysis function with progress tracking""" if not cve_input.strip(): return "Please enter a CVE ID", "" # Input processing progress(0.1, desc="Processing CVE ID format...") cve_id = cve_input.strip().upper() if not cve_id.startswith('CVE-'): # Auto-add CVE- prefix for formats like "2021-44228" if '-' in cve_id and len(cve_id.split('-')) == 2: cve_id = f"CVE-{cve_id}" # Format validation progress(0.2, desc="Validating CVE format...") if not validate_cve_id(cve_id): return f"Invalid CVE format: {cve_id}", "" try: # Check cache progress(0.3, desc="Checking local cache...") cached = load_cve_analysis(cve_id) if cached: # Load from cache (faster) progress(0.8, desc="Loading from cache...") cached_date = cached.get('analyzed_at', 'Unknown date')[:10] source_info = f"**Source:** Local cache ({cached_date})" result = cached else: # First time analysis (slower) progress(0.4, desc="Fetching from NIST API...") progress(0.6, desc="Generating AI security analysis...") source_info = f"**Source:** NIST API (fresh data)" result = analyze_cve(cve_id, use_cache=False) progress(0.8, desc="Saving analysis results...") # Format results progress(0.9, desc="Formatting results...") if result: cve = result['cve_data']['cve'] # Format basic information output = f"## {cve['id']}\n\n" output += f"**Published:** {cve['published'][:10]}\n" output += f"**Status:** {cve.get('vulnStatus', 'Unknown')}\n" # CVSS information if 'metrics' in cve and 'cvssMetricV31' in cve['metrics']: cvss = cve['metrics']['cvssMetricV31'][0]['cvssData'] output += f"**CVSS Score:** {cvss['baseScore']}/10.0\n" output += f"**Severity:** {cvss['baseSeverity']}\n" # Description - show full description description = cve['descriptions'][0]['value'] output += f"\n**Description:**\n{description}" # Add source info at the bottom output += f"\n\n{source_info}" ai_summary = result.get('ai_summary', 'AI summary not available') # Add similar CVE recommendations similar_cves = vector_storage.find_similar_cves(cve_id, top_k=3) if similar_cves: ai_summary += f"\n\n---\n\n**Related Vulnerabilities:**\n\n" for similar in similar_cves: ai_summary += f"• **{similar['cve_id']}** (similarity: {similar['similarity']}) - {similar['description']}\n\n" # Final step progress(1.0, desc="Analysis complete!") return output, ai_summary else: return "Failed to fetch CVE data", "" except Exception as e: return f"Analysis error: {str(e)}", "" def quick_example_handler(example_choice, progress=gr.Progress()): """Handle quick example selection with progress""" if example_choice: progress(0.1, desc="Processing example selection...") cve_id = example_choice.split(" - ")[0] return analyze_cve_handler(cve_id, progress) return "Please select an example", "" def semantic_search_handler(query, progress=gr.Progress()): """AI-powered semantic search""" if not query.strip(): return "Please enter a search query" try: progress(0.3, desc="Performing AI-powered search...") results = vector_storage.search_by_text(query, top_k=5) progress(0.8, desc="Formatting AI search results...") if results: output = f"## AI Search Results for '{query}'\n\n" for i, r in enumerate(results, 1): output += f"**{i}. {r['cve_id']}** (Relevance: {r['similarity']})\n" output += f" Severity: {r['severity']}\n" output += f" {r['description']}\n\n" progress(1.0, desc="AI search complete!") return output else: progress(1.0, desc="AI search complete!") return f"No relevant CVEs found for '{query}'" except Exception as e: return f"AI search failed: {str(e)}" def search_handler(keyword, progress=gr.Progress()): """Search previously analysed CVEs with progress""" if not keyword.strip(): return "Please enter search keywords" try: progress(0.3, desc="Searching local database...") results = search_cves(keyword) progress(0.8, desc="Formatting search results...") if results: output = f"## Found {len(results)} results for '{keyword}'\n\n" for i, r in enumerate(results[:8], 1): output += f"**{i}. {r['cve_id']}**\n" output += f" {r['description']}\n\n" if len(results) > 8: output += f"... and {len(results) - 8} more results" progress(1.0, desc="Search complete!") return output else: progress(1.0, desc="Search complete!") return f"No results found for '{keyword}'" except Exception as e: return f"Search failed: {str(e)}" # Main interface with gr.Blocks(title="CVE Analysis Agent", theme=gr.themes.Default(primary_hue="violet")) as interface: with gr.Column(): # Beautiful header with gr.Row(): gr.Markdown(""" # CVE Analysis Agent """) # Main input card with gr.Column(): gr.Markdown("### Enter CVE ID for Analysis") with gr.Row(): cve_input = gr.Textbox( label="CVE Identifier", placeholder="CVE-2021-44228 or 2021-44228", info="Smart format detection - accepts both full and short formats", scale=4, container=True ) analyse_btn = gr.Button( "Analyse", variant="primary", scale=1 ) # Quick examples section with gr.Row(): gr.Markdown("**Quick Examples:**") example_dropdown = gr.Dropdown( choices=[f"{cve} - {desc}" for cve, desc in EXAMPLE_CVES], label="Select a famous vulnerability", value=None, scale=3 ) example_btn = gr.Button("Try Example", variant="secondary", scale=1) # Results container with gr.Column(): gr.Markdown("### Analysis Results") # CVE Information on top basic_output = gr.Markdown( label="CVE Information", value="*CVE information will appear here after analysis*" ) # AI Analysis below ai_output = gr.Markdown( label="AI Security Analysis", value="*AI-generated security analysis will appear here*" ) # Search section with gr.Accordion("Search Previously Analysed CVEs", open=False): with gr.Column(): gr.Markdown("**Search through your analysed vulnerabilities**") with gr.Tab("Keyword Search"): with gr.Row(): search_input = gr.Textbox( label="Search Keywords", placeholder="log4j, apache, spring, windows, print", scale=3 ) search_btn = gr.Button("Search", scale=1) search_output = gr.Markdown( label="Search Results", value="*Search results will appear here*" ) with gr.Tab("Semantic Search"): with gr.Row(): semantic_input = gr.Textbox( label="Natural Language Search", placeholder="remote code execution in Java applications", scale=3 ) semantic_btn = gr.Button("AI Search", scale=1) semantic_output = gr.Markdown( label="AI Search Results", value="*AI-powered search results will appear here*" ) # Event handlers with progress support analyse_btn.click( analyze_cve_handler, inputs=cve_input, outputs=[basic_output, ai_output], show_progress=True ) example_btn.click( quick_example_handler, inputs=example_dropdown, outputs=[basic_output, ai_output], show_progress=True ) search_btn.click( search_handler, inputs=search_input, outputs=search_output, show_progress=True ) semantic_btn.click( semantic_search_handler, inputs=semantic_input, outputs=semantic_output, show_progress=True ) if __name__ == "__main__": interface.launch()