Debashis
Initial commit: Security Incident Analyzer with LLM integration
0355450
"""Main Gradio application for Security Incident Analyzer."""
import asyncio
import sys
from pathlib import Path
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv()
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
import gradio as gr
from src.analyzer import IncidentAnalyzer
from src.llm import create_provider
from src.utils import config, setup_logger
# Setup logging
logger = setup_logger(__name__, debug=config.debug)
async def analyze_incident(log_input: str) -> str:
"""
Analyze a security incident log.
Args:
log_input: Raw log or incident text from user
Returns:
Formatted analysis result for display
"""
if not log_input.strip():
return "Please provide a log or incident description to analyze."
try:
# Create provider and analyzer
provider = create_provider()
analyzer = IncidentAnalyzer(provider)
# Run analysis
analysis = await analyzer.analyze(log_input)
# Format output
output = f"""
### ๐Ÿ” Analysis Results
**Summary:**
{analysis.summary}
**Risk Level:** โš ๏ธ **{analysis.risk_level.value.upper()}**
**Suggested Actions:**
{analysis.remediation}
**Key Indicators:**
{chr(10).join(f'- {ind}' for ind in analysis.indicators) if analysis.indicators else '- No specific indicators extracted'}
---
*Analysis performed by {provider.__class__.__name__}*
"""
return output
except Exception as e:
logger.error(f"Analysis error: {e}", exc_info=True)
return f"โŒ Error during analysis: {str(e)}\n\nPlease check your configuration and try again."
def analyze_incident_sync(log_input: str) -> str:
"""Synchronous wrapper for Gradio (which doesn't support async directly in all modes)."""
return asyncio.run(analyze_incident(log_input))
# Create Gradio interface
with gr.Blocks(title="Security Incident Analyzer", theme=gr.themes.Soft()) as demo:
gr.Markdown("# ๐Ÿ›ก๏ธ AI Security Incident Analyzer")
gr.Markdown(
"Paste your security logs, alerts, or incident descriptions. "
"Our LLM analyzes them and explains: what happened, severity, and next steps."
)
with gr.Row():
with gr.Column(scale=1):
log_input = gr.Textbox(
label="Security Log / Incident",
placeholder="Paste your log, alert, or incident description here...",
lines=10,
interactive=True,
)
analyze_btn = gr.Button("๐Ÿ” Analyze", variant="primary", size="lg")
with gr.Column(scale=1):
output = gr.Markdown(
label="Analysis",
value="*Results will appear here...*",
)
analyze_btn.click(
fn=analyze_incident_sync,
inputs=log_input,
outputs=output,
)
gr.Examples(
examples=[
"Failed authentication attempts from 192.168.1.100: 15 attempts in 2 minutes",
"Ransomware detected in backup storage - encryption in progress on 500 files",
"Unusual outbound traffic to 10.0.0.1 on port 4444 detected",
],
inputs=log_input,
label="Example Incidents",
)
gr.Markdown(
"""
---
**Provider:** """ + config.llm_provider.value.upper() + """
| **Model:** """ + config.model_name + """
"""
)
if __name__ == "__main__":
# Validate configuration
try:
config.validate()
except ValueError as e:
print(f"Configuration error: {e}")
sys.exit(1)
logger.info(f"Starting Security Incident Analyzer")
logger.info(f"LLM Provider: {config.llm_provider.value}")
logger.info(f"Model: {config.model_name}")
# Launch the app
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
)