|
|
"""Main Gradio application for Security Incident Analyzer.""" |
|
|
|
|
|
import asyncio |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
from src.analyzer import IncidentAnalyzer |
|
|
from src.llm import create_provider |
|
|
from src.utils import config, setup_logger |
|
|
|
|
|
|
|
|
logger = setup_logger(__name__, debug=config.debug) |
|
|
|
|
|
|
|
|
async def analyze_incident(log_input: str) -> str: |
|
|
""" |
|
|
Analyze a security incident log. |
|
|
|
|
|
Args: |
|
|
log_input: Raw log or incident text from user |
|
|
|
|
|
Returns: |
|
|
Formatted analysis result for display |
|
|
""" |
|
|
if not log_input.strip(): |
|
|
return "Please provide a log or incident description to analyze." |
|
|
|
|
|
try: |
|
|
|
|
|
provider = create_provider() |
|
|
analyzer = IncidentAnalyzer(provider) |
|
|
|
|
|
|
|
|
analysis = await analyzer.analyze(log_input) |
|
|
|
|
|
|
|
|
output = f""" |
|
|
### ๐ Analysis Results |
|
|
|
|
|
**Summary:** |
|
|
{analysis.summary} |
|
|
|
|
|
**Risk Level:** โ ๏ธ **{analysis.risk_level.value.upper()}** |
|
|
|
|
|
**Suggested Actions:** |
|
|
{analysis.remediation} |
|
|
|
|
|
**Key Indicators:** |
|
|
{chr(10).join(f'- {ind}' for ind in analysis.indicators) if analysis.indicators else '- No specific indicators extracted'} |
|
|
|
|
|
--- |
|
|
*Analysis performed by {provider.__class__.__name__}* |
|
|
""" |
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Analysis error: {e}", exc_info=True) |
|
|
return f"โ Error during analysis: {str(e)}\n\nPlease check your configuration and try again." |
|
|
|
|
|
|
|
|
def analyze_incident_sync(log_input: str) -> str: |
|
|
"""Synchronous wrapper for Gradio (which doesn't support async directly in all modes).""" |
|
|
return asyncio.run(analyze_incident(log_input)) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Security Incident Analyzer", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown("# ๐ก๏ธ AI Security Incident Analyzer") |
|
|
gr.Markdown( |
|
|
"Paste your security logs, alerts, or incident descriptions. " |
|
|
"Our LLM analyzes them and explains: what happened, severity, and next steps." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
log_input = gr.Textbox( |
|
|
label="Security Log / Incident", |
|
|
placeholder="Paste your log, alert, or incident description here...", |
|
|
lines=10, |
|
|
interactive=True, |
|
|
) |
|
|
analyze_btn = gr.Button("๐ Analyze", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
output = gr.Markdown( |
|
|
label="Analysis", |
|
|
value="*Results will appear here...*", |
|
|
) |
|
|
|
|
|
analyze_btn.click( |
|
|
fn=analyze_incident_sync, |
|
|
inputs=log_input, |
|
|
outputs=output, |
|
|
) |
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
"Failed authentication attempts from 192.168.1.100: 15 attempts in 2 minutes", |
|
|
"Ransomware detected in backup storage - encryption in progress on 500 files", |
|
|
"Unusual outbound traffic to 10.0.0.1 on port 4444 detected", |
|
|
], |
|
|
inputs=log_input, |
|
|
label="Example Incidents", |
|
|
) |
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
--- |
|
|
**Provider:** """ + config.llm_provider.value.upper() + """ |
|
|
| **Model:** """ + config.model_name + """ |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
try: |
|
|
config.validate() |
|
|
except ValueError as e: |
|
|
print(f"Configuration error: {e}") |
|
|
sys.exit(1) |
|
|
|
|
|
logger.info(f"Starting Security Incident Analyzer") |
|
|
logger.info(f"LLM Provider: {config.llm_provider.value}") |
|
|
logger.info(f"Model: {config.model_name}") |
|
|
|
|
|
|
|
|
demo.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=7860, |
|
|
share=False, |
|
|
) |
|
|
|