"""Main Gradio application for Security Incident Analyzer.""" import asyncio import sys from pathlib import Path # Load environment variables from .env file from dotenv import load_dotenv load_dotenv() # Add src to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) import gradio as gr from src.analyzer import IncidentAnalyzer from src.llm import create_provider from src.utils import config, setup_logger # Setup logging logger = setup_logger(__name__, debug=config.debug) async def analyze_incident(log_input: str) -> str: """ Analyze a security incident log. Args: log_input: Raw log or incident text from user Returns: Formatted analysis result for display """ if not log_input.strip(): return "Please provide a log or incident description to analyze." try: # Create provider and analyzer provider = create_provider() analyzer = IncidentAnalyzer(provider) # Run analysis analysis = await analyzer.analyze(log_input) # Format output output = f""" ### 🔍 Analysis Results **Summary:** {analysis.summary} **Risk Level:** ⚠️ **{analysis.risk_level.value.upper()}** **Suggested Actions:** {analysis.remediation} **Key Indicators:** {chr(10).join(f'- {ind}' for ind in analysis.indicators) if analysis.indicators else '- No specific indicators extracted'} --- *Analysis performed by {provider.__class__.__name__}* """ return output except Exception as e: logger.error(f"Analysis error: {e}", exc_info=True) return f"❌ Error during analysis: {str(e)}\n\nPlease check your configuration and try again." def analyze_incident_sync(log_input: str) -> str: """Synchronous wrapper for Gradio (which doesn't support async directly in all modes).""" return asyncio.run(analyze_incident(log_input)) # Create Gradio interface with gr.Blocks(title="Security Incident Analyzer", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🛡️ AI Security Incident Analyzer") gr.Markdown( "Paste your security logs, alerts, or incident descriptions. " "Our LLM analyzes them and explains: what happened, severity, and next steps." ) with gr.Row(): with gr.Column(scale=1): log_input = gr.Textbox( label="Security Log / Incident", placeholder="Paste your log, alert, or incident description here...", lines=10, interactive=True, ) analyze_btn = gr.Button("🔍 Analyze", variant="primary", size="lg") with gr.Column(scale=1): output = gr.Markdown( label="Analysis", value="*Results will appear here...*", ) analyze_btn.click( fn=analyze_incident_sync, inputs=log_input, outputs=output, ) gr.Examples( examples=[ "Failed authentication attempts from 192.168.1.100: 15 attempts in 2 minutes", "Ransomware detected in backup storage - encryption in progress on 500 files", "Unusual outbound traffic to 10.0.0.1 on port 4444 detected", ], inputs=log_input, label="Example Incidents", ) gr.Markdown( """ --- **Provider:** """ + config.llm_provider.value.upper() + """ | **Model:** """ + config.model_name + """ """ ) if __name__ == "__main__": # Validate configuration try: config.validate() except ValueError as e: print(f"Configuration error: {e}") sys.exit(1) logger.info(f"Starting Security Incident Analyzer") logger.info(f"LLM Provider: {config.llm_provider.value}") logger.info(f"Model: {config.model_name}") # Launch the app demo.launch( server_name="0.0.0.0", server_port=7860, share=False, )