Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import time | |
| from main import analyze_cve | |
| from cve_validator import validate_cve_id | |
| from search import search_cves | |
| from storage import load_cve_analysis | |
| from vector_storage import vector_storage | |
| # Well-known CVE examples for quick testing | |
| EXAMPLE_CVES = [ | |
| ("CVE-2021-44228", "Log4Shell - Apache Log4j RCE"), | |
| ("CVE-2020-1472", "Zerologon - Windows Netlogon Vulnerability"), | |
| ("CVE-2022-22965", "Spring4Shell - Spring Framework RCE"), | |
| ("CVE-2021-34527", "PrintNightmare - Windows Print Spooler"), | |
| ("CVE-2020-0601", "CurveBall - Windows CryptoAPI Spoofing"), | |
| ] | |
| def analyze_cve_handler(cve_input, progress=gr.Progress()): | |
| """Main CVE analysis function with progress tracking""" | |
| if not cve_input.strip(): | |
| return "Please enter a CVE ID", "" | |
| # Input processing | |
| progress(0.1, desc="Processing CVE ID format...") | |
| cve_id = cve_input.strip().upper() | |
| if not cve_id.startswith('CVE-'): | |
| # Auto-add CVE- prefix for formats like "2021-44228" | |
| if '-' in cve_id and len(cve_id.split('-')) == 2: | |
| cve_id = f"CVE-{cve_id}" | |
| # Format validation | |
| progress(0.2, desc="Validating CVE format...") | |
| if not validate_cve_id(cve_id): | |
| return f"Invalid CVE format: {cve_id}", "" | |
| try: | |
| # Check cache | |
| progress(0.3, desc="Checking local cache...") | |
| cached = load_cve_analysis(cve_id) | |
| if cached: | |
| # Load from cache (faster) | |
| progress(0.8, desc="Loading from cache...") | |
| cached_date = cached.get('analyzed_at', 'Unknown date')[:10] | |
| source_info = f"**Source:** Local cache ({cached_date})" | |
| result = cached | |
| else: | |
| # First time analysis (slower) | |
| progress(0.4, desc="Fetching from NIST API...") | |
| progress(0.6, desc="Generating AI security analysis...") | |
| source_info = f"**Source:** NIST API (fresh data)" | |
| result = analyze_cve(cve_id, use_cache=False) | |
| progress(0.8, desc="Saving analysis results...") | |
| # Format results | |
| progress(0.9, desc="Formatting results...") | |
| if result: | |
| cve = result['cve_data']['cve'] | |
| # Format basic information | |
| output = f"## {cve['id']}\n\n" | |
| output += f"**Published:** {cve['published'][:10]}\n" | |
| output += f"**Status:** {cve.get('vulnStatus', 'Unknown')}\n" | |
| # CVSS information | |
| if 'metrics' in cve and 'cvssMetricV31' in cve['metrics']: | |
| cvss = cve['metrics']['cvssMetricV31'][0]['cvssData'] | |
| output += f"**CVSS Score:** {cvss['baseScore']}/10.0\n" | |
| output += f"**Severity:** {cvss['baseSeverity']}\n" | |
| # Description - show full description | |
| description = cve['descriptions'][0]['value'] | |
| output += f"\n**Description:**\n{description}" | |
| # Add source info at the bottom | |
| output += f"\n\n{source_info}" | |
| ai_summary = result.get('ai_summary', 'AI summary not available') | |
| # Add similar CVE recommendations | |
| similar_cves = vector_storage.find_similar_cves(cve_id, top_k=3) | |
| if similar_cves: | |
| ai_summary += f"\n\n---\n\n**Related Vulnerabilities:**\n\n" | |
| for similar in similar_cves: | |
| ai_summary += f"• **{similar['cve_id']}** (similarity: {similar['similarity']}) - {similar['description']}\n\n" | |
| # Final step | |
| progress(1.0, desc="Analysis complete!") | |
| return output, ai_summary | |
| else: | |
| return "Failed to fetch CVE data", "" | |
| except Exception as e: | |
| return f"Analysis error: {str(e)}", "" | |
| def quick_example_handler(example_choice, progress=gr.Progress()): | |
| """Handle quick example selection with progress""" | |
| if example_choice: | |
| progress(0.1, desc="Processing example selection...") | |
| cve_id = example_choice.split(" - ")[0] | |
| return analyze_cve_handler(cve_id, progress) | |
| return "Please select an example", "" | |
| def semantic_search_handler(query, progress=gr.Progress()): | |
| """AI-powered semantic search""" | |
| if not query.strip(): | |
| return "Please enter a search query" | |
| try: | |
| progress(0.3, desc="Performing AI-powered search...") | |
| results = vector_storage.search_by_text(query, top_k=5) | |
| progress(0.8, desc="Formatting AI search results...") | |
| if results: | |
| output = f"## AI Search Results for '{query}'\n\n" | |
| for i, r in enumerate(results, 1): | |
| output += f"**{i}. {r['cve_id']}** (Relevance: {r['similarity']})\n" | |
| output += f" Severity: {r['severity']}\n" | |
| output += f" {r['description']}\n\n" | |
| progress(1.0, desc="AI search complete!") | |
| return output | |
| else: | |
| progress(1.0, desc="AI search complete!") | |
| return f"No relevant CVEs found for '{query}'" | |
| except Exception as e: | |
| return f"AI search failed: {str(e)}" | |
| def search_handler(keyword, progress=gr.Progress()): | |
| """Search previously analysed CVEs with progress""" | |
| if not keyword.strip(): | |
| return "Please enter search keywords" | |
| try: | |
| progress(0.3, desc="Searching local database...") | |
| results = search_cves(keyword) | |
| progress(0.8, desc="Formatting search results...") | |
| if results: | |
| output = f"## Found {len(results)} results for '{keyword}'\n\n" | |
| for i, r in enumerate(results[:8], 1): | |
| output += f"**{i}. {r['cve_id']}**\n" | |
| output += f" {r['description']}\n\n" | |
| if len(results) > 8: | |
| output += f"... and {len(results) - 8} more results" | |
| progress(1.0, desc="Search complete!") | |
| return output | |
| else: | |
| progress(1.0, desc="Search complete!") | |
| return f"No results found for '{keyword}'" | |
| except Exception as e: | |
| return f"Search failed: {str(e)}" | |
| # Main interface | |
| with gr.Blocks(title="CVE Analysis Agent", theme=gr.themes.Default(primary_hue="violet")) as interface: | |
| with gr.Column(): | |
| # Beautiful header | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| # CVE Analysis Agent | |
| """) | |
| # Main input card | |
| with gr.Column(): | |
| gr.Markdown("### Enter CVE ID for Analysis") | |
| with gr.Row(): | |
| cve_input = gr.Textbox( | |
| label="CVE Identifier", | |
| placeholder="CVE-2021-44228 or 2021-44228", | |
| info="Smart format detection - accepts both full and short formats", | |
| scale=4, | |
| container=True | |
| ) | |
| analyse_btn = gr.Button( | |
| "Analyse", | |
| variant="primary", | |
| scale=1 | |
| ) | |
| # Quick examples section | |
| with gr.Row(): | |
| gr.Markdown("**Quick Examples:**") | |
| example_dropdown = gr.Dropdown( | |
| choices=[f"{cve} - {desc}" for cve, desc in EXAMPLE_CVES], | |
| label="Select a famous vulnerability", | |
| value=None, | |
| scale=3 | |
| ) | |
| example_btn = gr.Button("Try Example", variant="secondary", scale=1) | |
| # Results container | |
| with gr.Column(): | |
| gr.Markdown("### Analysis Results") | |
| # CVE Information on top | |
| basic_output = gr.Markdown( | |
| label="CVE Information", | |
| value="*CVE information will appear here after analysis*" | |
| ) | |
| # AI Analysis below | |
| ai_output = gr.Markdown( | |
| label="AI Security Analysis", | |
| value="*AI-generated security analysis will appear here*" | |
| ) | |
| # Search section | |
| with gr.Accordion("Search Previously Analysed CVEs", open=False): | |
| with gr.Column(): | |
| gr.Markdown("**Search through your analysed vulnerabilities**") | |
| with gr.Tab("Keyword Search"): | |
| with gr.Row(): | |
| search_input = gr.Textbox( | |
| label="Search Keywords", | |
| placeholder="log4j, apache, spring, windows, print", | |
| scale=3 | |
| ) | |
| search_btn = gr.Button("Search", scale=1) | |
| search_output = gr.Markdown( | |
| label="Search Results", | |
| value="*Search results will appear here*" | |
| ) | |
| with gr.Tab("Semantic Search"): | |
| with gr.Row(): | |
| semantic_input = gr.Textbox( | |
| label="Natural Language Search", | |
| placeholder="remote code execution in Java applications", | |
| scale=3 | |
| ) | |
| semantic_btn = gr.Button("AI Search", scale=1) | |
| semantic_output = gr.Markdown( | |
| label="AI Search Results", | |
| value="*AI-powered search results will appear here*" | |
| ) | |
| # Event handlers with progress support | |
| analyse_btn.click( | |
| analyze_cve_handler, | |
| inputs=cve_input, | |
| outputs=[basic_output, ai_output], | |
| show_progress=True | |
| ) | |
| example_btn.click( | |
| quick_example_handler, | |
| inputs=example_dropdown, | |
| outputs=[basic_output, ai_output], | |
| show_progress=True | |
| ) | |
| search_btn.click( | |
| search_handler, | |
| inputs=search_input, | |
| outputs=search_output, | |
| show_progress=True | |
| ) | |
| semantic_btn.click( | |
| semantic_search_handler, | |
| inputs=semantic_input, | |
| outputs=semantic_output, | |
| show_progress=True | |
| ) | |
| if __name__ == "__main__": | |
| interface.launch() |