Spaces:
Running
Running
| """AI Security Analyst — vLLM / OpenAI-compatible, Ollama, or cinematic fallback.""" | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import re | |
| from typing import Any | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| from models.schemas import AnalystReport, Incident, RiskAssessment | |
| async def generate_analyst_report(incident: Incident, risk: RiskAssessment) -> AnalystReport: | |
| prompt = _build_prompt(incident, risk) | |
| text: str | None = None | |
| vllm_base = (os.getenv("VLLM_BASE_URL") or os.getenv("OPENAI_BASE_URL") or "").strip() | |
| if vllm_base: | |
| text = await _openai_compatible_chat( | |
| vllm_base, | |
| os.getenv("SENTINEL_LLM_MODEL", "meta-llama/Meta-Llama-3-8B-Instruct"), | |
| prompt, | |
| ) | |
| if not text: | |
| ollama = os.getenv("OLLAMA_HOST", "http://localhost:11434") | |
| model = os.getenv("OLLAMA_MODEL", os.getenv("SENTINEL_LLM_MODEL", "llama3")) | |
| text = await _ollama_generate(ollama, model, prompt) | |
| if not text: | |
| text = _cinematic_fallback_json(incident, risk) | |
| parsed = _parse_analyst_json(text, incident, risk) | |
| return AnalystReport( | |
| incident_id=incident.id, | |
| executive_summary=parsed["executive_summary"], | |
| technical_analysis=parsed["technical_analysis"], | |
| investigation_notes=parsed["investigation_notes"], | |
| indicators=_extract_iocs(incident), | |
| recommended_actions=parsed["recommended_actions"], | |
| ) | |
| def _build_prompt(incident: Incident, risk: RiskAssessment) -> str: | |
| tl = json.dumps(incident.timeline[:20], default=str) | |
| return f"""You are a senior SOC analyst writing an executive-ready incident briefing. | |
| Output ONLY valid JSON (no markdown fences) with exactly these string keys: | |
| - "narrative": 2-4 sentences. Opening line MUST start with "SentinelAI detected". Enterprise tone: technical, concise, security-focused. Reference SSH/auth abuse, suspicious IPs, privilege moves, or outbound retrieval when applicable. | |
| - "progression": numbered step-by-step attack progression (use \\n between steps). Map what likely happened chronologically. | |
| - "severity_rationale": 2-3 sentences explaining why severity is justified (risk score {risk.risk_score}, label {risk.severity.value}), confidence, and blast radius. | |
| - "recommended_actions": array of 4-7 short imperative strings (e.g. "Block offending IP at perimeter", "Rotate credentials for affected accounts", "Inspect shell history and authorized_keys", "Enable MFA on privileged users"). | |
| Incident title: {incident.title} | |
| Machine summary: {incident.summary} | |
| Risk: score={risk.risk_score} severity={risk.severity.value} | |
| Timeline JSON: {tl} | |
| """ | |
| async def _openai_compatible_chat(base_url: str, model: str, prompt: str) -> str | None: | |
| key = os.getenv("VLLM_API_KEY") or os.getenv("OPENAI_API_KEY") or "" | |
| headers: dict[str, str] = { | |
| "Accept": "application/json", | |
| "Content-Type": "application/json", | |
| } | |
| if key: | |
| headers["Authorization"] = f"Bearer {key}" | |
| max_tokens = int(os.getenv("LLM_MAX_TOKENS", "4096")) | |
| payload: dict[str, Any] = { | |
| "model": model, | |
| "max_tokens": max_tokens, | |
| "temperature": float(os.getenv("LLM_TEMPERATURE", "0.2")), | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You write incident reports as strict JSON only. No markdown.", | |
| }, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| } | |
| _top_p = os.getenv("LLM_TOP_P") | |
| if _top_p not in (None, ""): | |
| payload["top_p"] = float(_top_p) | |
| _top_k = os.getenv("LLM_TOP_K") | |
| if _top_k not in (None, ""): | |
| payload["top_k"] = int(_top_k) | |
| base = base_url.rstrip("/") | |
| chat_url = f"{base}/chat/completions" if base.endswith("/v1") else f"{base}/v1/chat/completions" | |
| try: | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| r = await client.post( | |
| chat_url, | |
| headers=headers, | |
| json=payload, | |
| ) | |
| if r.status_code != 200: | |
| logger.warning( | |
| "OpenAI-compatible chat failed: %s %s", | |
| r.status_code, | |
| (r.text or "")[:800], | |
| ) | |
| return None | |
| data = r.json() | |
| choice = (data.get("choices") or [{}])[0] | |
| msg = choice.get("message") or {} | |
| content = (msg.get("content") or "").strip() | |
| return _normalize_llm_json(content) | |
| except Exception: # noqa: BLE001 | |
| return None | |
| def _normalize_llm_json(content: str) -> str: | |
| s = content.strip() | |
| fence = re.match(r"^```(?:json)?\s*([\s\S]*?)```$", s, re.IGNORECASE) | |
| if fence: | |
| s = fence.group(1).strip() | |
| try: | |
| json.loads(s) | |
| return s | |
| except json.JSONDecodeError: | |
| m = re.search(r"\{[\s\S]*\}", s) | |
| if m: | |
| return m.group(0).strip() | |
| return s | |
| async def _ollama_generate(host: str, model: str, prompt: str) -> str | None: | |
| try: | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| r = await client.post( | |
| f"{host.rstrip('/')}/api/generate", | |
| json={"model": model, "prompt": prompt, "stream": False}, | |
| ) | |
| if r.status_code != 200: | |
| return None | |
| return (r.json().get("response") or "").strip() | |
| except Exception: # noqa: BLE001 | |
| return None | |
| def _parse_analyst_json(blob: str, incident: Incident, risk: RiskAssessment) -> dict[str, Any]: | |
| try: | |
| data = json.loads(blob) | |
| except json.JSONDecodeError: | |
| return _cinematic_fallback_dict(incident, risk) | |
| narrative = str(data.get("narrative") or data.get("executive") or "").strip() | |
| progression = str(data.get("progression") or data.get("technical") or "").strip() | |
| sev = str(data.get("severity_rationale") or data.get("notes") or "").strip() | |
| actions = data.get("recommended_actions") or data.get("actions") or [] | |
| if isinstance(actions, str): | |
| actions = [x.strip("- •\t ") for x in actions.split("\n") if x.strip()] | |
| if not isinstance(actions, list): | |
| actions = [] | |
| actions = [str(a).strip() for a in actions if str(a).strip()][:12] | |
| if not narrative: | |
| return _cinematic_fallback_dict(incident, risk) | |
| if not progression: | |
| progression = _default_progression(incident) | |
| if not sev: | |
| sev = _default_severity_rationale(risk) | |
| if not actions: | |
| actions = _default_actions() | |
| return { | |
| "executive_summary": narrative, | |
| "technical_analysis": progression, | |
| "investigation_notes": sev, | |
| "recommended_actions": actions, | |
| } | |
| def _cinematic_fallback_json(incident: Incident, risk: RiskAssessment) -> str: | |
| d = _cinematic_fallback_dict(incident, risk) | |
| return json.dumps( | |
| { | |
| "narrative": d["executive_summary"], | |
| "progression": d["technical_analysis"], | |
| "severity_rationale": d["investigation_notes"], | |
| "recommended_actions": d["recommended_actions"], | |
| } | |
| ) | |
| def _cinematic_fallback_dict(incident: Incident, risk: RiskAssessment) -> dict[str, Any]: | |
| return { | |
| "executive_summary": ( | |
| f"SentinelAI detected correlated authentication and host telemetry consistent with a targeted intrusion " | |
| f"chain against assets tied to “{incident.title}”. " | |
| f"Repeated SSH authentication failures from a concentrated source were followed by successful session " | |
| f"establishment and privileged execution patterns indicative of post-compromise activity. " | |
| f"Outbound retrieval-style commands suggest possible payload staging or command-and-control preparation." | |
| ), | |
| "technical_analysis": _default_progression(incident), | |
| "investigation_notes": _default_severity_rationale(risk), | |
| "recommended_actions": _default_actions(), | |
| } | |
| def _default_progression(incident: Incident) -> str: | |
| lines = [ | |
| "1. Reconnaissance / credential spray against SSH surface from a high-velocity source IP.", | |
| "2. Brute-force or password-spray phase producing clustered authentication failures.", | |
| "3. Successful authentication — pivot from noise to confirmed access.", | |
| "4. Privilege escalation via sudo or equivalent administrative channel.", | |
| "5. Potential exfil or staging via scripted download utilities (e.g. curl/wget) to non-standard paths.", | |
| ] | |
| if incident.timeline: | |
| lines.append(f"6. Correlated timeline contains {len(incident.timeline)} normalized events for graph reconstruction.") | |
| return "\n".join(lines) | |
| def _default_severity_rationale(risk: RiskAssessment) -> str: | |
| return ( | |
| f"Severity is driven by a composite risk score of {risk.risk_score}/100 with label {risk.severity.value}. " | |
| f"The sequence combines authentication abuse with privilege boundary crossing, elevating impact beyond " | |
| f"nuisance scanning. Confidence reflects rule-and-window correlation across multiple telemetry stages; " | |
| f"treat as incident-grade until disproven by host forensics." | |
| ) | |
| def _default_actions() -> list[str]: | |
| return [ | |
| "Block offending IP at perimeter firewall and WAF allowlists", | |
| "Rotate credentials and invalidate active sessions for implicated accounts", | |
| "Inspect shell history, authorized_keys, and cron for persistence", | |
| "Enable or enforce MFA on all break-glass and sudo-capable users", | |
| "Isolate affected host to a quarantine VLAN for memory and disk capture", | |
| "Review outbound DNS and proxy logs for matching IOC time windows", | |
| ] | |
| def _extract_iocs(incident: Incident) -> list[str]: | |
| iocs: list[str] = [] | |
| for row in incident.timeline: | |
| msg = str(row.get("msg", "")) | |
| for token in msg.split(): | |
| if token.count(".") == 3 and token.replace(".", "").isdigit(): | |
| iocs.append(token) | |
| return list(dict.fromkeys(iocs))[:16] | |