#!/usr/bin/env python3 """ HKLM — MITRE ATT&CK Log Analyzer (Groq API Version) Uses Groq API for fast LLM inference. Everything else is identical: - Same prompts - Same JSON parsing - Same KB lookup - Same caching - Same output format Requires: pip install groq Set Groq token: export GROQ_API_KEY=gsk_xxxxx (or pass to constructor) IMPROVED: Better logging messages for UI display during processing """ import json import pandas as pd from pathlib import Path from typing import Dict, List, Tuple, Optional from dataclasses import dataclass import re from tqdm import tqdm import warnings import hashlib import os import time import sys import os from dotenv import load_dotenv # Load the variables from .env into the environment load_dotenv() try: from groq import Groq except ImportError: raise ImportError("Install groq: pip install groq") @dataclass class MITREPrediction: """Data class for MITRE predictions""" tactic: str technique: str technique_id: str technique_name: str confidence_score: float mitigation_strategies: List[Dict] detection_strategies: List[Dict] reasoning: str class MITRELogAnalyzerAPI: """ HKLM analyzer using Groq API. Same 3-stage pipeline: Stage 1: Raw log → Tactic identification (API call) Stage 2: Log + Tactic → Technique identification (API call) Stage 3: KB lookup for mitigation/detection strategies No GPU required. Just a Groq API key. """ SUPPORTED_MODELS = { "llama-3.3-70b-versatile": "Llama 3.3 70B — Best reasoning, fast", "llama-3.1-8b-instant": "Llama 3.1 8B — Fastest", "llama-3.2-90b-text-preview": "Llama 3.2 90B — Most capable", "mixtral-8x7b-32768": "Mixtral 8x7B — Good balance", } def __init__( self, mitre_kb_path: str, model_name: str = "llama-3.1-8b-instant", groq_api_key: str = None, use_caching: bool = True, verbose: bool = False, ): self.model_name = model_name self.use_caching = use_caching self.verbose = verbose # Resolve token api_key = groq_api_key or os.environ.get("GROQ_API_KEY") if not api_key: raise ValueError( "Groq API key required. Set GROQ_API_KEY env var or pass groq_api_key parameter.\n" "Get your key at: https://console.groq.com/keys" ) # Initialize Groq Client print(f"\n🤖 HKLM — Groq API Mode", flush=True) print(f" Model: {model_name}", flush=True) print(f" Caching: {'Enabled' if use_caching else 'Disabled'}", flush=True) print(f" Auth: API key provided ({'*' * 8}...{api_key[-4:]})", flush=True) print(f" Initializing Groq client...", flush=True) self.client = Groq(api_key=api_key) # Test the connection print(f" Testing API connection...", flush=True) try: test_response = self.client.chat.completions.create( model=self.model_name, messages=[{"role": "user", "content": "Hello"}], max_tokens=5, temperature=0.1, ) print(f" ✅ API connection successful", flush=True) except Exception as e: print(f" ⚠️ API test failed: {e}", flush=True) print(f" Will retry on actual requests", flush=True) # Load MITRE knowledge base print(f"\n📚 Loading MITRE knowledge base from {mitre_kb_path}...", flush=True) with open(mitre_kb_path, "r", encoding="utf-8") as f: self.mitre_kb = json.load(f) num_tactics = len(self.mitre_kb.get("tactics", {})) print(f" ✅ Loaded {num_tactics} tactics from knowledge base", flush=True) # Initialize cache self.cache = {} if use_caching else None # Stats self.stats = { "cache_hits": 0, "cache_misses": 0, "total_processed": 0, "api_calls": 0, "api_errors": 0, } print(f"\n✅ INITIALIZATION COMPLETE", flush=True) print(f" Ready to analyze logs using {model_name}", flush=True) # ────────────────────────────────────────────── # PROMPT CONSTRUCTION # ────────────────────────────────────────────── def _create_tactic_prompt(self, log_entry: str) -> str: tactic_summaries = [] for tactic_key, tactic_data in self.mitre_kb.get("tactics", {}).items(): tactic_summaries.append( f"- Name: ({tactic_data['shortname']}) " f"Description: {tactic_data['description']}." ) prompt = f""" You are a Senior Security Operations Center (SOC) Analyst and MITRE ATT&CK Specialist. Your task is to classify security logs into the most accurate Tactic. ### STEP 1: LOG DECONSTRUCTION Examine the log below. Even if field names are unfamiliar, identify: 1. **The Actor (Subject):** Which process, user, or service is initiating the action? 2. **The Action (Verb):** What is happening? (e.g., Is something being accessed, created, modified, executed, or moved?) 3. **The Target (Object):** What is being acted upon? (e.g., a registry key, a file path, a network socket, a memory address, or a configuration setting.) ### STEP 2: INTENT INFERENCE Based on the Subject-Verb-Object relationship: - If the Verb is 'Read/Query' and the Object is 'System Config/Registry', the intent is **Discovery**. - If the Verb is 'Execute/Start' and the Object is 'Binary/Script', the intent is **Execution**. - If the Verb is 'Modify/Write' and the Object is 'Auto-run location/Service', the intent is **Persistence**. ### LOG ENTRY: {log_entry} ### MITRE ATT&CK TACTIC DEFINITIONS: {chr(10).join(tactic_summaries)} ### ANALYSIS GUIDELINES: - **Do not rely on field labels:** Focus on the *content* of the values. - **Context Matters:** Remote access tools performing routine reads are likely in a 'Discovery' phase. - **Match the Goal:** Choose the Tactic whose definition most closely matches the *primary goal* of the action. ### OUTPUT FORMAT (JSON): {{ "tactic_name": "Exact name from provided list", "confidence": 0.0-1.0, "reasoning": "Explain in less than 50 words how the inferred action matches the specific MITRE description." }} JSON Response: """ return prompt def _create_technique_prompt( self, log_entry: str, tactic_shortname: str ) -> str: tactic_shortname = tactic_shortname.lower() tactic_data = self.mitre_kb.get("tactics", {}).get(tactic_shortname, {}) techniques = tactic_data.get("techniques", []) technique_summaries = [] for tech in techniques: tech_summary = f"- (technique_id: {tech['attack_id']}, Name: {tech['name']})" technique_summaries.append(tech_summary) if not technique_summaries: print("No techniques available for this tactic.", flush=True) prompt = f""" You are a Senior Security Operations Center (SOC) Analyst and MITRE ATT&CK Specialist. ### CONTEXT You have already identified the MITRE ATT&CK Tactic for this log: **{tactic_shortname}** Now, **select the most specific Technique** from the list below that best explains *how* the attacker achieved this goal. ### LOG ENTRY: {log_entry} ### AVAILABLE TECHNIQUES FOR "{tactic_shortname}" TACTIC: {chr(10).join(technique_summaries)} ### SELECTION CRITERIA: 1. **Match Technical Indicators:** - Does the log show a registry modification? → Likely T1547 (Boot/Logon Autostart) - Does it show scheduled task creation? → Likely T1053 (Scheduled Task/Job) - Does it access sensitive files (SAM, credentials)? → Likely T1003 (Credential Dumping) 2. **Be Specific:** - If the log shows "reading SAM database", choose T1003 (OS Credential Dumping) over generic Discovery techniques. 3. **Confidence = Specificity:** - If the log precisely matches a technique's definition → 0.8–1.0 confidence - If it's likely but ambiguous → 0.5–0.7 - If it's the best guess among poor fits → 0.3–0.5 ### OUTPUT FORMAT (JSON): {{ "technique_id": "Exact technique_id from the list", "technique_name": "Exact name from the list", "confidence": 0.0-1.0, "reasoning": "In less than 50 words, explain which specific indicators in the log match this technique." }} JSON Response: """ return prompt # ────────────────────────────────────────────── # JSON EXTRACTION # ────────────────────────────────────────────── def _extract_json_from_response(self, response: str, verbose: bool = False) -> Optional[Dict]: """ Extract JSON from LLM response using multiple strategies. Returns parsed JSON dict or None if extraction fails. """ try: # Strategy 1: Try direct JSON parse try: return json.loads(response) except json.JSONDecodeError: pass # Strategy 2: Extract JSON block from markdown json_pattern = r"```(?:json)?\s*(\{.*?\})\s*```" matches = re.findall(json_pattern, response, re.DOTALL) if matches: for match in matches: try: return json.loads(match) except json.JSONDecodeError: continue # Strategy 3: Find JSON object with regex json_obj_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' matches = re.findall(json_obj_pattern, response, re.DOTALL) if matches: for match in matches: try: parsed = json.loads(match) if isinstance(parsed, dict): return parsed except json.JSONDecodeError: continue # Strategy 4: Try cleaning common issues cleaned = response.strip() if not cleaned.startswith('{'): first_brace = cleaned.find('{') if first_brace != -1: cleaned = cleaned[first_brace:] if not cleaned.endswith('}'): last_brace = cleaned.rfind('}') if last_brace != -1: cleaned = cleaned[:last_brace + 1] try: return json.loads(cleaned) except json.JSONDecodeError: pass if verbose: print(f" ✗ All JSON extraction strategies failed", flush=True) return None except Exception as e: if verbose: print(f" ✗ JSON extraction error: {e}", flush=True) return None # ────────────────────────────────────────────── # GROQ API INFERENCE # ────────────────────────────────────────────── def _api_generate(self, prompt: str, retries: int = 3) -> str: """ Generate a response via Groq API. """ for attempt in range(retries): try: self.stats["api_calls"] += 1 completion = self.client.chat.completions.create( model=self.model_name, messages=[{"role": "user", "content": prompt}], max_tokens=512, temperature=0.1, ) return completion.choices[0].message.content except Exception as e: error_str = str(e) self.stats["api_errors"] += 1 if "rate_limit" in error_str.lower() or "429" in error_str: wait = (attempt + 1) * 10 print(f" ⚠️ Rate limited, waiting {wait}s (attempt {attempt+1}/{retries})", flush=True) time.sleep(wait) elif "503" in error_str or "unavailable" in error_str.lower(): wait = 30 print(f" ⏳ Service unavailable, waiting {wait}s...", flush=True) time.sleep(wait) else: print(f" ⚠️ API error: {error_str}", flush=True) if attempt < retries - 1: time.sleep(5) else: raise return "" # ────────────────────────────────────────────── # KB LOOKUP # ────────────────────────────────────────────── def _get_strategies( self, tactic_shortname: str, technique_id: str ) -> Tuple[List[Dict], List[Dict]]: mitigations = [] detections = [] try: tactic_data = self.mitre_kb.get("tactics", {}).get(tactic_shortname, {}) techniques = tactic_data.get("techniques", []) for tech in techniques: if tech.get("attack_id") == technique_id: raw_mitigations = tech.get("mitigations", []) mitigations = [ {"name": m.get("name"), "strategy": m.get("description")} for m in raw_mitigations ] detections = tech.get("detection_strategies", []) break except Exception: pass return mitigations, detections def _get_cache_key(self, log_entry: str) -> str: normalized = log_entry.lower().strip() return hashlib.md5(normalized.encode()).hexdigest() # ────────────────────────────────────────────── # SINGLE EVENT ANALYSIS # ────────────────────────────────────────────── def _analyze_single(self, log_entry: str) -> Optional[MITREPrediction]: verbose = self.verbose if verbose: print(f"\n [Stage 1/3] Identifying Tactic...", flush=True) print(f" Log: {log_entry[:100]}...", flush=True) tactic_prompt = self._create_tactic_prompt(log_entry) tactic_response = self._api_generate(tactic_prompt) tactic_result = self._extract_json_from_response(tactic_response, verbose=verbose) if not tactic_result: if verbose: print(f" ✗ Tactic extraction failed", flush=True) return None if verbose: print(f" ✓ Tactic: {tactic_result.get('tactic_name')} (conf: {tactic_result.get('confidence', 0):.2f})", flush=True) print(f" [Stage 2/3] Identifying Technique...", flush=True) technique_prompt = self._create_technique_prompt( log_entry, tactic_result.get("tactic_name", ""), ) technique_response = self._api_generate(technique_prompt) technique_result = self._extract_json_from_response(technique_response, verbose=verbose) if not technique_result: if verbose: print(f" ✗ Technique extraction failed", flush=True) return None if verbose: print(f" ✓ Technique: {technique_result.get('technique_id')} — {technique_result.get('technique_name')} (conf: {technique_result.get('confidence', 0):.2f})", flush=True) print(f" [Stage 3/3] Retrieving mitigation strategies...", flush=True) mitigation_strategies, detection_strategies = self._get_strategies( tactic_result.get("tactic_name", ""), technique_result.get("technique_id", ""), ) if verbose: print(f" ✓ Retrieved {len(mitigation_strategies)} mitigations", flush=True) prediction = MITREPrediction( tactic=tactic_result.get("tactic_name", ""), technique=technique_result.get("technique_id", ""), technique_id=technique_result.get("technique_id", ""), technique_name=technique_result.get("technique_name", ""), confidence_score=min( tactic_result.get("confidence", 0), technique_result.get("confidence", 0), ), mitigation_strategies=mitigation_strategies, detection_strategies=detection_strategies, reasoning=f"Tactic: {tactic_result.get('reasoning', '')}; " f"Technique: {technique_result.get('reasoning', '')}", ) self.stats["total_processed"] += 1 return prediction def _create_result_dict(self, idx, row, prediction: MITREPrediction) -> Dict: return { "log_index": idx, "raw_text": row["raw_text"], "tactic": prediction.tactic.title(), "technique_id": prediction.technique_id, "technique_name": prediction.technique_name, "confidence_score": prediction.confidence_score, "reasoning": prediction.reasoning, "num_mitigations": len(prediction.mitigation_strategies), "mitigation_strategies": json.dumps(prediction.mitigation_strategies), } # ────────────────────────────────────────────── # MAIN PROCESSING # ────────────────────────────────────────────── def _process_batched(self, df: pd.DataFrame) -> List[Dict]: """ Process logs using Groq API inference. """ all_results = [] for idx, row in tqdm(df.iterrows(), desc="Processing events", total=len(df)): log_entry = row["raw_text"] if self.use_caching: cache_key = self._get_cache_key(log_entry) if cache_key in self.cache: self.stats["cache_hits"] += 1 prediction = self.cache[cache_key] if prediction: all_results.append(self._create_result_dict(idx, row, prediction)) continue self.stats["cache_misses"] += 1 print(f"\n📍 Event {idx + 1}/{len(df)}", flush=True) try: prediction = self._analyze_single(log_entry) except Exception as e: print(f" ❌ Failed: {e}", flush=True) prediction = None if self.use_caching: cache_key = self._get_cache_key(log_entry) self.cache[cache_key] = prediction if prediction: all_results.append(self._create_result_dict(idx, row, prediction)) return all_results