"""Core security incident analysis logic.""" import json import re from typing import TYPE_CHECKING, Optional from src.analyzer.models import RiskLevel, SecurityAnalysis from src.utils import setup_logger if TYPE_CHECKING: from src.llm.provider import BaseLLMProvider logger = setup_logger(__name__) class IncidentAnalyzer: """Analyzes security logs and generates structured insights.""" def __init__(self, llm_provider: "BaseLLMProvider"): """ Initialize the analyzer with an LLM provider. Args: llm_provider: LLM provider instance (OpenAI, local, mock, etc.) """ self.llm = llm_provider logger.info(f"IncidentAnalyzer initialized with {self.llm.__class__.__name__}") async def analyze(self, log_text: str) -> SecurityAnalysis: """ Analyze a security log or incident. Args: log_text: Raw log or incident description Returns: Structured SecurityAnalysis with summary, risk level, and remediation """ logger.info(f"Analyzing log input ({len(log_text)} chars)") # Get LLM analysis response = await self.llm.analyze(log_text) logger.debug(f"LLM response: {response[:200]}...") # Parse structured output from response analysis = self._parse_response(response) analysis.raw_response = response logger.info(f"Analysis complete: risk_level={analysis.risk_level.value}") return analysis def _parse_response(self, response: str) -> SecurityAnalysis: """ Parse LLM response into structured SecurityAnalysis. Uses regex to extract key sections. Fallback to defaults if parsing fails. """ # Extract summary (first 2 sentences or up to 200 chars) summary_match = re.search(r"(?:Summary|What Happened|Analysis)[\s:]+([^\n]+)", response, re.IGNORECASE) summary = summary_match.group(1).strip() if summary_match else response[:200] # Extract risk level - be more flexible with matching risk_match = re.search( r"(?:Risk|Severity)\s+Level[\s:]+(\w+)|(?:Risk|Severity)[\s:]+(\w+)", response, re.IGNORECASE, ) risk_text = (risk_match.group(1) or risk_match.group(2)).lower() if risk_match else "medium" try: risk_level = RiskLevel(risk_text) except ValueError: risk_level = RiskLevel.MEDIUM # Extract remediation - capture multi-line content after "Suggested Actions" or similar remediation_match = re.search( r"(?:Suggested\s+)?(?:Remediation|Actions?)[\s:]+([^\n]*(?:\n(?!(?:Risk|Severity|Summary|Indicators|What Happened|Analysis)).*)*)", response, re.IGNORECASE | re.DOTALL, ) remediation = ( remediation_match.group(1).strip() if remediation_match else "Review and investigate further" ) # Extract indicators - match lines starting with - or • that appear anywhere (with optional leading whitespace) indicators = re.findall( r"^\s*[-•]\s+(.+)$", response, re.MULTILINE, ) return SecurityAnalysis( summary=summary, risk_level=risk_level, remediation=remediation, indicators=indicators, raw_response="", # Set by caller )