# Gradio Web Interface for VAPT Agent """ Modern Gradio UI for the VAPT (Vulnerability Assessment and Penetration Testing) agent. Features: - Input API endpoint, HTTP method, and optional API key - Real-time progress streaming - Downloadable Markdown report - Visual Dashboard (risk gauge & severity pie chart) - AI Security Tutor (interactive Q&A about the report) """ import asyncio import gradio as gr from datetime import datetime import threading import time from typing import Optional, Generator, List, Tuple from vapt_agent import run_vapt_agent_with_callback from config import VAPTConfig from dashboard_utils import ( parse_vapt_report, calculate_risk_score, create_severity_chart, create_risk_gauge, ) from ai_tutor import get_tutor import os CSS_PATH = os.path.join(os.path.dirname(__file__), "vapt_styles.css") def load_custom_css(path: str = CSS_PATH) -> str: try: with open(path, "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: # Fail silently; app will still run without custom CSS return "" # --------------------------------------------------------------------------- # Helper: run the VAPT agent and stream updates to Gradio # --------------------------------------------------------------------------- def run_security_test( api_endpoint: str, http_method: str, api_key: Optional[str] = None, ) -> Generator[Tuple[str, str, str, gr.Button], None, None]: """Yield progress, report markdown, report file path, and button state for Gradio. The function validates inputs, starts the VAPT agent in a background thread, and periodically yields any new progress messages. The button is disabled during the test and re-enabled when complete. """ # ---------- Validation ---------- if not api_endpoint or not api_endpoint.strip(): yield ( "❌ Error: API endpoint is required", "## Error\n\nPlease provide a valid API endpoint URL.", None, gr.Button(interactive=True), ) return if not api_endpoint.startswith(("http://", "https://")): yield ( "❌ Error: Invalid URL format", "## Error\n\nAPI endpoint must start with `http://` or `https://`.", None, gr.Button(interactive=True), ) return # ---------- Progress handling ---------- progress_messages: List[str] = [] lock = threading.Lock() def add_progress(msg: str) -> str: with lock: progress_messages.append(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") return "\n".join(progress_messages) def progress_callback(msg: str): with lock: progress_messages.append(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") # Initial message yield ( add_progress("🚀 Initializing VAPT Agent..."), "## Starting Security Test\n\nPlease wait while we assess your API endpoint...", None, gr.Button(interactive=False), ) # Prepare request headers headers = {"Content-Type": "application/json", "User-Agent": "VAPT-Agent/1.0"} if api_key and api_key.strip(): headers["Authorization"] = f"Bearer {api_key.strip()}" yield ( add_progress("🔑 API key provided – will test authenticated endpoints"), "## Starting Security Test\n\nPreparing to test with authentication...", None, gr.Button(interactive=False), ) # ---------- Run agent in background thread ---------- result = { "report_content": None, "report_file_path": None, "error": None, "done": False, } def agent_worker(): try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) content, path = loop.run_until_complete( run_vapt_agent_with_callback( api_endpoint=api_endpoint, method=http_method, headers=headers, progress_callback=progress_callback, ) ) loop.close() result["report_content"] = content result["report_file_path"] = path except asyncio.TimeoutError: result["error"] = "Timeout: Security test took too long" except Exception as e: result["error"] = str(e) finally: result["done"] = True # Connect to LLM backend – just a placeholder progress update yield ( add_progress("🔌 Connecting to security engine..."), "## Starting Security Test\n\nConnecting...", None, gr.Button(interactive=False), ) threading.Thread(target=agent_worker, daemon=True).start() # ---------- Poll for updates ---------- last_len = 0 while not result["done"]: time.sleep(0.5) with lock: if len(progress_messages) > last_len: yield ( "\n".join(progress_messages), "## Security Test in Progress\n\nPlease wait while the agent performs testing...", None, gr.Button(interactive=False), ) last_len = len(progress_messages) # ---------- Final handling ---------- if result["error"]: err = result["error"] if "Timeout" in err: yield ( add_progress(f"⏱️ {err}"), "## Error\n\n**Timeout Error**\n\nThe assessment exceeded the allowed time.", None, gr.Button(interactive=True), ) else: yield ( add_progress(f"❌ Error: {err}"), f"## Error\n\n**Exception Occurred**\n\n```\n{err}\n```\n\nPlease check configuration and retry.", None, gr.Button(interactive=True), ) else: # Success – return report and file path yield ( add_progress("✅ Security assessment completed successfully!"), result["report_content"] or "## Error\n\nNo report was generated.", result["report_file_path"], gr.Button(interactive=True), ) # --------------------------------------------------------------------------- # Gradio UI construction # --------------------------------------------------------------------------- def create_gradio_interface() -> gr.Blocks: with gr.Blocks(title="VAPT Agent") as iface: # Inject custom CSS – light theme, white cards, improved readability custom_css = load_custom_css() if custom_css: gr.HTML(f"") # --- Header --- with gr.Row(elem_id="app-header"): with gr.Column(scale=6): gr.Markdown( """
Agentic VAPT • API Security

VAPT Agent Dashboard

Generate API specs, run automated vulnerability tests, and explore results with an AI security tutor. Gain clear insights into API risks, misconfigurations, and recommended remediation steps in dashboard.

""", elem_id="header-title", ) with gr.Column(scale=4, elem_id="workflow-overview-card"): gr.Markdown( """ **Workflow Overview** 1. Provide an API endpoint and method 2. Agent discovers endpoints and builds the API spec using Postman MCP 3. Customized VAPT MCP tools run automated security tests 4. Dashboard + Tutor help you interpret and fix issues """, ) with gr.Row(elem_id="demo-video-row"): gr.HTML( """
📺 New to VAPT Agent? Watch our DEMO VIDEO to learn how to use this tool effectively.
""" ) # --- Main two-column layout --- with gr.Row(): # Left: API configuration with gr.Column(scale=5): with gr.Group(elem_id="config-card", elem_classes=["section-card"]): gr.Markdown("### 📋 API Configuration") api_endpoint = gr.Textbox( label="API Endpoint URL", placeholder="https://sandbox.api.sap.com/SAPCALM/calm-tasks/v1/tasks?projectId=111", value="https://sandbox.api.sap.com/SAPCALM/calm-tasks/v1/tasks?projectId=111", info="Full URL of the API endpoint to test", ) http_method = gr.Dropdown( label="HTTP Method", choices=["GET", "POST", "PUT", "DELETE", "PATCH"], value="GET", info="Select the HTTP method for the endpoint", ) api_key = gr.Textbox( label="API Key (Optional)", placeholder="Enter your API key or Bearer token", type="password", value="Ww9aGPGeGoDGCFetcBtsaEtGOpGSUNXp", info="If the API requires authentication, provide the key here", ) with gr.Row(): submit_btn = gr.Button( "🚀 Start Security Test", variant="primary", size="lg" ) clear_btn = gr.Button( "🔄 Reset", variant="secondary", elem_id="reset-btn" ) gr.HTML( """
⚠️ Authorized use only: Run this tool only against systems and APIs you are explicitly allowed to test.
""" ) # Right: Results area with gr.Column(scale=7): with gr.Group(elem_id="results-card", elem_classes=["section-card"]): gr.Markdown("### 📊 Security Assessment Results") with gr.Tab("Live Progress"): progress_output = gr.Textbox( label="Agent Activity Log", lines=15, max_lines=20, interactive=False, show_label=False, placeholder="Agent activity will appear here...", ) with gr.Tab("Security Report"): report_file = gr.File( label="📥 Download Report (.md)", interactive=False, visible=True, ) report_output = gr.Markdown( value="Security report will appear here after the test completes...", label="VAPT Report", elem_id="security-report-md", ) with gr.Tab("Dashboard"): # Row that we'll center via CSS with gr.Row(elem_id="dashboard-row"): with gr.Column(scale=1, elem_id="risk-col"): risk_gauge = gr.Plot(label="Risk Score") with gr.Column(scale=1, elem_id="severity-col"): severity_pie = gr.Plot( label="Vulnerability Distribution" ) top_findings = gr.Markdown( "Run a security test to see summarized key findings..." ) with gr.Tab("Security Tutor"): with gr.Column(elem_id="tutor-section"): gr.Markdown( """
Ask questions about your security report.
Get clear explanations, remediation guidance, and best-practice advice.
""" ) chatbot = gr.Chatbot( label="Security Tutor", height=380, elem_id="tutor-chat", ) with gr.Row(elem_id="tutor-input-row"): tutor_input = gr.Textbox( label="Your Question", placeholder="e.g., What is SQL injection and how do I fix it?", lines=2, scale=4, show_label=True, ) tutor_btn = gr.Button("Ask", variant="primary", scale=1) gr.HTML( """
Example questions you can ask:
""" ) # ------------------------------------------------------------------- # Event bindings # ------------------------------------------------------------------- # VAPT run submit_btn.click( fn=run_security_test, inputs=[api_endpoint, http_method, api_key], outputs=[progress_output, report_output, report_file, submit_btn], show_progress=True, ) # Reset clear_btn.click( fn=lambda: ( "https://sandbox.api.sap.com/SAPCALM/calm-tasks/v1/tasks?projectId=111", "GET", "", "", "Security report will appear here after the test completes...", None, ), inputs=[], outputs=[ api_endpoint, http_method, api_key, progress_output, report_output, report_file, ], ) # Dashboard updates – triggered after a successful report def update_dashboard(report_md: str): data = parse_vapt_report(report_md) sev = data["severities"] risk = calculate_risk_score(sev) return ( create_risk_gauge(risk), create_severity_chart(sev), ( "\n".join(data.get("findings", [])[:5]) if data.get("findings") else "No findings detected." ), ) report_output.change( fn=update_dashboard, inputs=[report_output], outputs=[risk_gauge, severity_pie, top_findings], ) # AI Tutor interaction # Gradio Chatbot (v6) uses "messages" format: list of {"role": ..., "content": ...} # We convert that to (user, assistant) pairs for the tutor, then convert back. def tutor_respond(question, history, report_md: str): # 1) Convert Gradio "messages" history -> list of (user, assistant) pairs pairs: List[Tuple[str, str]] = [] current_user = None for msg in history or []: if isinstance(msg, dict): role = msg.get("role") content = msg.get("content", "") elif isinstance(msg, (list, tuple)) and len(msg) == 2: # Backward compatibility: skip old tuple-style entries continue else: continue if role == "user": current_user = content elif role == "assistant": if current_user is None: current_user = "" pairs.append((current_user, content)) current_user = None # 2) Call the tutor with pairs tutor = get_tutor() answer = tutor.chat(question, report_md, pairs) # 3) Append new user + assistant messages in Gradio's messages format new_history = list(history or []) new_history.append({"role": "user", "content": question}) new_history.append({"role": "assistant", "content": answer}) # Clear the input textbox and re-enable the button return new_history, "", gr.Button(interactive=True) tutor_btn.click( fn=tutor_respond, inputs=[tutor_input, chatbot, report_output], outputs=[chatbot, tutor_input, tutor_btn], ) return iface # --------------------------------------------------------------------------- # Main entry point # --------------------------------------------------------------------------- def main(): print("=" * 80) print("VAPT Agent - Gradio Web Interface") print("=" * 80) try: cfg = VAPTConfig() print(f"✓ Configuration loaded successfully") print(f" Provider: {'AWS Bedrock' if cfg.use_bedrock else 'Anthropic API'}") if cfg.use_bedrock: print(f" Region: {cfg.aws_region}") print() except Exception as exc: print(f"❌ Configuration error: {exc}") print("Please check your .env file and ensure all required variables are set.") return iface = create_gradio_interface() print("Starting Gradio server...") print("=" * 80) #iface.launch(server_name="0.0.0.0", server_port=7861, share=True, inbrowser=True) iface.launch(server_name="0.0.0.0", mcp_server=True) if __name__ == "__main__": main()