Debashis
Initial commit: Security Incident Analyzer with LLM integration
0355450
"""Core security incident analysis logic."""
import json
import re
from typing import TYPE_CHECKING, Optional
from src.analyzer.models import RiskLevel, SecurityAnalysis
from src.utils import setup_logger
if TYPE_CHECKING:
from src.llm.provider import BaseLLMProvider
logger = setup_logger(__name__)
class IncidentAnalyzer:
"""Analyzes security logs and generates structured insights."""
def __init__(self, llm_provider: "BaseLLMProvider"):
"""
Initialize the analyzer with an LLM provider.
Args:
llm_provider: LLM provider instance (OpenAI, local, mock, etc.)
"""
self.llm = llm_provider
logger.info(f"IncidentAnalyzer initialized with {self.llm.__class__.__name__}")
async def analyze(self, log_text: str) -> SecurityAnalysis:
"""
Analyze a security log or incident.
Args:
log_text: Raw log or incident description
Returns:
Structured SecurityAnalysis with summary, risk level, and remediation
"""
logger.info(f"Analyzing log input ({len(log_text)} chars)")
# Get LLM analysis
response = await self.llm.analyze(log_text)
logger.debug(f"LLM response: {response[:200]}...")
# Parse structured output from response
analysis = self._parse_response(response)
analysis.raw_response = response
logger.info(f"Analysis complete: risk_level={analysis.risk_level.value}")
return analysis
def _parse_response(self, response: str) -> SecurityAnalysis:
"""
Parse LLM response into structured SecurityAnalysis.
Uses regex to extract key sections. Fallback to defaults if parsing fails.
"""
# Extract summary (first 2 sentences or up to 200 chars)
summary_match = re.search(r"(?:Summary|What Happened|Analysis)[\s:]+([^\n]+)", response, re.IGNORECASE)
summary = summary_match.group(1).strip() if summary_match else response[:200]
# Extract risk level - be more flexible with matching
risk_match = re.search(
r"(?:Risk|Severity)\s+Level[\s:]+(\w+)|(?:Risk|Severity)[\s:]+(\w+)",
response,
re.IGNORECASE,
)
risk_text = (risk_match.group(1) or risk_match.group(2)).lower() if risk_match else "medium"
try:
risk_level = RiskLevel(risk_text)
except ValueError:
risk_level = RiskLevel.MEDIUM
# Extract remediation - capture multi-line content after "Suggested Actions" or similar
remediation_match = re.search(
r"(?:Suggested\s+)?(?:Remediation|Actions?)[\s:]+([^\n]*(?:\n(?!(?:Risk|Severity|Summary|Indicators|What Happened|Analysis)).*)*)",
response,
re.IGNORECASE | re.DOTALL,
)
remediation = (
remediation_match.group(1).strip() if remediation_match else "Review and investigate further"
)
# Extract indicators - match lines starting with - or • that appear anywhere (with optional leading whitespace)
indicators = re.findall(
r"^\s*[-•]\s+(.+)$",
response,
re.MULTILINE,
)
return SecurityAnalysis(
summary=summary,
risk_level=risk_level,
remediation=remediation,
indicators=indicators,
raw_response="", # Set by caller
)