|
|
"""Core security incident analysis logic.""" |
|
|
|
|
|
import json |
|
|
import re |
|
|
from typing import TYPE_CHECKING, Optional |
|
|
|
|
|
from src.analyzer.models import RiskLevel, SecurityAnalysis |
|
|
from src.utils import setup_logger |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from src.llm.provider import BaseLLMProvider |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
|
|
|
class IncidentAnalyzer: |
|
|
"""Analyzes security logs and generates structured insights.""" |
|
|
|
|
|
def __init__(self, llm_provider: "BaseLLMProvider"): |
|
|
""" |
|
|
Initialize the analyzer with an LLM provider. |
|
|
|
|
|
Args: |
|
|
llm_provider: LLM provider instance (OpenAI, local, mock, etc.) |
|
|
""" |
|
|
self.llm = llm_provider |
|
|
logger.info(f"IncidentAnalyzer initialized with {self.llm.__class__.__name__}") |
|
|
|
|
|
async def analyze(self, log_text: str) -> SecurityAnalysis: |
|
|
""" |
|
|
Analyze a security log or incident. |
|
|
|
|
|
Args: |
|
|
log_text: Raw log or incident description |
|
|
|
|
|
Returns: |
|
|
Structured SecurityAnalysis with summary, risk level, and remediation |
|
|
""" |
|
|
logger.info(f"Analyzing log input ({len(log_text)} chars)") |
|
|
|
|
|
|
|
|
response = await self.llm.analyze(log_text) |
|
|
logger.debug(f"LLM response: {response[:200]}...") |
|
|
|
|
|
|
|
|
analysis = self._parse_response(response) |
|
|
analysis.raw_response = response |
|
|
|
|
|
logger.info(f"Analysis complete: risk_level={analysis.risk_level.value}") |
|
|
return analysis |
|
|
|
|
|
def _parse_response(self, response: str) -> SecurityAnalysis: |
|
|
""" |
|
|
Parse LLM response into structured SecurityAnalysis. |
|
|
|
|
|
Uses regex to extract key sections. Fallback to defaults if parsing fails. |
|
|
""" |
|
|
|
|
|
summary_match = re.search(r"(?:Summary|What Happened|Analysis)[\s:]+([^\n]+)", response, re.IGNORECASE) |
|
|
summary = summary_match.group(1).strip() if summary_match else response[:200] |
|
|
|
|
|
|
|
|
risk_match = re.search( |
|
|
r"(?:Risk|Severity)\s+Level[\s:]+(\w+)|(?:Risk|Severity)[\s:]+(\w+)", |
|
|
response, |
|
|
re.IGNORECASE, |
|
|
) |
|
|
risk_text = (risk_match.group(1) or risk_match.group(2)).lower() if risk_match else "medium" |
|
|
try: |
|
|
risk_level = RiskLevel(risk_text) |
|
|
except ValueError: |
|
|
risk_level = RiskLevel.MEDIUM |
|
|
|
|
|
|
|
|
remediation_match = re.search( |
|
|
r"(?:Suggested\s+)?(?:Remediation|Actions?)[\s:]+([^\n]*(?:\n(?!(?:Risk|Severity|Summary|Indicators|What Happened|Analysis)).*)*)", |
|
|
response, |
|
|
re.IGNORECASE | re.DOTALL, |
|
|
) |
|
|
remediation = ( |
|
|
remediation_match.group(1).strip() if remediation_match else "Review and investigate further" |
|
|
) |
|
|
|
|
|
|
|
|
indicators = re.findall( |
|
|
r"^\s*[-•]\s+(.+)$", |
|
|
response, |
|
|
re.MULTILINE, |
|
|
) |
|
|
|
|
|
return SecurityAnalysis( |
|
|
summary=summary, |
|
|
risk_level=risk_level, |
|
|
remediation=remediation, |
|
|
indicators=indicators, |
|
|
raw_response="", |
|
|
) |
|
|
|