File size: 3,435 Bytes
0355450 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
"""Core security incident analysis logic."""
import json
import re
from typing import TYPE_CHECKING, Optional
from src.analyzer.models import RiskLevel, SecurityAnalysis
from src.utils import setup_logger
if TYPE_CHECKING:
from src.llm.provider import BaseLLMProvider
logger = setup_logger(__name__)
class IncidentAnalyzer:
"""Analyzes security logs and generates structured insights."""
def __init__(self, llm_provider: "BaseLLMProvider"):
"""
Initialize the analyzer with an LLM provider.
Args:
llm_provider: LLM provider instance (OpenAI, local, mock, etc.)
"""
self.llm = llm_provider
logger.info(f"IncidentAnalyzer initialized with {self.llm.__class__.__name__}")
async def analyze(self, log_text: str) -> SecurityAnalysis:
"""
Analyze a security log or incident.
Args:
log_text: Raw log or incident description
Returns:
Structured SecurityAnalysis with summary, risk level, and remediation
"""
logger.info(f"Analyzing log input ({len(log_text)} chars)")
# Get LLM analysis
response = await self.llm.analyze(log_text)
logger.debug(f"LLM response: {response[:200]}...")
# Parse structured output from response
analysis = self._parse_response(response)
analysis.raw_response = response
logger.info(f"Analysis complete: risk_level={analysis.risk_level.value}")
return analysis
def _parse_response(self, response: str) -> SecurityAnalysis:
"""
Parse LLM response into structured SecurityAnalysis.
Uses regex to extract key sections. Fallback to defaults if parsing fails.
"""
# Extract summary (first 2 sentences or up to 200 chars)
summary_match = re.search(r"(?:Summary|What Happened|Analysis)[\s:]+([^\n]+)", response, re.IGNORECASE)
summary = summary_match.group(1).strip() if summary_match else response[:200]
# Extract risk level - be more flexible with matching
risk_match = re.search(
r"(?:Risk|Severity)\s+Level[\s:]+(\w+)|(?:Risk|Severity)[\s:]+(\w+)",
response,
re.IGNORECASE,
)
risk_text = (risk_match.group(1) or risk_match.group(2)).lower() if risk_match else "medium"
try:
risk_level = RiskLevel(risk_text)
except ValueError:
risk_level = RiskLevel.MEDIUM
# Extract remediation - capture multi-line content after "Suggested Actions" or similar
remediation_match = re.search(
r"(?:Suggested\s+)?(?:Remediation|Actions?)[\s:]+([^\n]*(?:\n(?!(?:Risk|Severity|Summary|Indicators|What Happened|Analysis)).*)*)",
response,
re.IGNORECASE | re.DOTALL,
)
remediation = (
remediation_match.group(1).strip() if remediation_match else "Review and investigate further"
)
# Extract indicators - match lines starting with - or • that appear anywhere (with optional leading whitespace)
indicators = re.findall(
r"^\s*[-•]\s+(.+)$",
response,
re.MULTILINE,
)
return SecurityAnalysis(
summary=summary,
risk_level=risk_level,
remediation=remediation,
indicators=indicators,
raw_response="", # Set by caller
)
|