| |
| """ |
| HKLM β MITRE ATT&CK Log Analyzer (Groq API Version) |
| |
| Uses Groq API for fast LLM inference. |
| Everything else is identical: |
| - Same prompts |
| - Same JSON parsing |
| - Same KB lookup |
| - Same caching |
| - Same output format |
| |
| Requires: pip install groq |
| Set Groq token: export GROQ_API_KEY=gsk_xxxxx (or pass to constructor) |
| |
| IMPROVED: Better logging messages for UI display during processing |
| """ |
|
|
| import json |
| import pandas as pd |
| from pathlib import Path |
| from typing import Dict, List, Tuple, Optional |
| from dataclasses import dataclass |
| import re |
| from tqdm import tqdm |
| import warnings |
| import hashlib |
| import os |
| import time |
| import sys |
| import os |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
|
|
| try: |
| from groq import Groq |
| except ImportError: |
| raise ImportError("Install groq: pip install groq") |
|
|
|
|
| @dataclass |
| class MITREPrediction: |
| """Data class for MITRE predictions""" |
| tactic: str |
| technique: str |
| technique_id: str |
| technique_name: str |
| confidence_score: float |
| mitigation_strategies: List[Dict] |
| detection_strategies: List[Dict] |
| reasoning: str |
|
|
|
|
| class MITRELogAnalyzerAPI: |
| """ |
| HKLM analyzer using Groq API. |
| |
| Same 3-stage pipeline: |
| Stage 1: Raw log β Tactic identification (API call) |
| Stage 2: Log + Tactic β Technique identification (API call) |
| Stage 3: KB lookup for mitigation/detection strategies |
| |
| No GPU required. Just a Groq API key. |
| """ |
|
|
| SUPPORTED_MODELS = { |
| "llama-3.3-70b-versatile": "Llama 3.3 70B β Best reasoning, fast", |
| "llama-3.1-8b-instant": "Llama 3.1 8B β Fastest", |
| "llama-3.2-90b-text-preview": "Llama 3.2 90B β Most capable", |
| "mixtral-8x7b-32768": "Mixtral 8x7B β Good balance", |
| } |
|
|
| def __init__( |
| self, |
| mitre_kb_path: str, |
| model_name: str = "llama-3.1-8b-instant", |
| groq_api_key: str = None, |
| use_caching: bool = True, |
| verbose: bool = False, |
| ): |
| self.model_name = model_name |
| self.use_caching = use_caching |
| self.verbose = verbose |
|
|
| |
| api_key = groq_api_key or os.environ.get("GROQ_API_KEY") |
| if not api_key: |
| raise ValueError( |
| "Groq API key required. Set GROQ_API_KEY env var or pass groq_api_key parameter.\n" |
| "Get your key at: https://console.groq.com/keys" |
| ) |
|
|
| |
| print(f"\nπ€ HKLM β Groq API Mode", flush=True) |
| print(f" Model: {model_name}", flush=True) |
| print(f" Caching: {'Enabled' if use_caching else 'Disabled'}", flush=True) |
| print(f" Auth: API key provided ({'*' * 8}...{api_key[-4:]})", flush=True) |
|
|
| print(f" Initializing Groq client...", flush=True) |
| self.client = Groq(api_key=api_key) |
|
|
| |
| print(f" Testing API connection...", flush=True) |
| try: |
| test_response = self.client.chat.completions.create( |
| model=self.model_name, |
| messages=[{"role": "user", "content": "Hello"}], |
| max_tokens=5, |
| temperature=0.1, |
| ) |
| print(f" β
API connection successful", flush=True) |
| except Exception as e: |
| print(f" β οΈ API test failed: {e}", flush=True) |
| print(f" Will retry on actual requests", flush=True) |
|
|
| |
| print(f"\nπ Loading MITRE knowledge base from {mitre_kb_path}...", flush=True) |
| with open(mitre_kb_path, "r", encoding="utf-8") as f: |
| self.mitre_kb = json.load(f) |
|
|
| num_tactics = len(self.mitre_kb.get("tactics", {})) |
| print(f" β
Loaded {num_tactics} tactics from knowledge base", flush=True) |
|
|
| |
| self.cache = {} if use_caching else None |
|
|
| |
| self.stats = { |
| "cache_hits": 0, |
| "cache_misses": 0, |
| "total_processed": 0, |
| "api_calls": 0, |
| "api_errors": 0, |
| } |
|
|
| print(f"\nβ
INITIALIZATION COMPLETE", flush=True) |
| print(f" Ready to analyze logs using {model_name}", flush=True) |
|
|
| |
| |
| |
|
|
| def _create_tactic_prompt(self, log_entry: str) -> str: |
| tactic_summaries = [] |
| for tactic_key, tactic_data in self.mitre_kb.get("tactics", {}).items(): |
| tactic_summaries.append( |
| f"- Name: ({tactic_data['shortname']}) " |
| f"Description: {tactic_data['description']}." |
| ) |
| |
| prompt = f""" |
| You are a Senior Security Operations Center (SOC) Analyst and MITRE ATT&CK Specialist. Your task is to classify security logs into the most accurate Tactic. |
| |
| ### STEP 1: LOG DECONSTRUCTION |
| Examine the log below. Even if field names are unfamiliar, identify: |
| 1. **The Actor (Subject):** Which process, user, or service is initiating the action? |
| 2. **The Action (Verb):** What is happening? (e.g., Is something being accessed, created, modified, executed, or moved?) |
| 3. **The Target (Object):** What is being acted upon? (e.g., a registry key, a file path, a network socket, a memory address, or a configuration setting.) |
| |
| ### STEP 2: INTENT INFERENCE |
| Based on the Subject-Verb-Object relationship: |
| - If the Verb is 'Read/Query' and the Object is 'System Config/Registry', the intent is **Discovery**. |
| - If the Verb is 'Execute/Start' and the Object is 'Binary/Script', the intent is **Execution**. |
| - If the Verb is 'Modify/Write' and the Object is 'Auto-run location/Service', the intent is **Persistence**. |
| |
| ### LOG ENTRY: |
| {log_entry} |
| |
| ### MITRE ATT&CK TACTIC DEFINITIONS: |
| {chr(10).join(tactic_summaries)} |
| |
| ### ANALYSIS GUIDELINES: |
| - **Do not rely on field labels:** Focus on the *content* of the values. |
| - **Context Matters:** Remote access tools performing routine reads are likely in a 'Discovery' phase. |
| - **Match the Goal:** Choose the Tactic whose definition most closely matches the *primary goal* of the action. |
| |
| ### OUTPUT FORMAT (JSON): |
| {{ |
| "tactic_name": "Exact name from provided list", |
| "confidence": 0.0-1.0, |
| "reasoning": "Explain in less than 50 words how the inferred action matches the specific MITRE description." |
| }} |
| |
| JSON Response: |
| """ |
| return prompt |
|
|
| def _create_technique_prompt( |
| self, log_entry: str, tactic_shortname: str |
| ) -> str: |
| tactic_shortname = tactic_shortname.lower() |
| tactic_data = self.mitre_kb.get("tactics", {}).get(tactic_shortname, {}) |
| techniques = tactic_data.get("techniques", []) |
|
|
| technique_summaries = [] |
| for tech in techniques: |
| tech_summary = f"- (technique_id: {tech['attack_id']}, Name: {tech['name']})" |
| technique_summaries.append(tech_summary) |
|
|
| if not technique_summaries: |
| print("No techniques available for this tactic.", flush=True) |
| |
| prompt = f""" |
| You are a Senior Security Operations Center (SOC) Analyst and MITRE ATT&CK Specialist. |
| |
| ### CONTEXT |
| You have already identified the MITRE ATT&CK Tactic for this log: **{tactic_shortname}** |
| |
| Now, **select the most specific Technique** from the list below that best explains *how* the attacker achieved this goal. |
| |
| ### LOG ENTRY: |
| {log_entry} |
| |
| ### AVAILABLE TECHNIQUES FOR "{tactic_shortname}" TACTIC: |
| {chr(10).join(technique_summaries)} |
| |
| ### SELECTION CRITERIA: |
| 1. **Match Technical Indicators:** |
| - Does the log show a registry modification? β Likely T1547 (Boot/Logon Autostart) |
| - Does it show scheduled task creation? β Likely T1053 (Scheduled Task/Job) |
| - Does it access sensitive files (SAM, credentials)? β Likely T1003 (Credential Dumping) |
| |
| 2. **Be Specific:** |
| - If the log shows "reading SAM database", choose T1003 (OS Credential Dumping) over generic Discovery techniques. |
| |
| 3. **Confidence = Specificity:** |
| - If the log precisely matches a technique's definition β 0.8β1.0 confidence |
| - If it's likely but ambiguous β 0.5β0.7 |
| - If it's the best guess among poor fits β 0.3β0.5 |
| |
| ### OUTPUT FORMAT (JSON): |
| {{ |
| "technique_id": "Exact technique_id from the list", |
| "technique_name": "Exact name from the list", |
| "confidence": 0.0-1.0, |
| "reasoning": "In less than 50 words, explain which specific indicators in the log match this technique." |
| }} |
| |
| JSON Response: |
| """ |
| return prompt |
|
|
| |
| |
| |
|
|
| def _extract_json_from_response(self, response: str, verbose: bool = False) -> Optional[Dict]: |
| """ |
| Extract JSON from LLM response using multiple strategies. |
| Returns parsed JSON dict or None if extraction fails. |
| """ |
| try: |
| |
| try: |
| return json.loads(response) |
| except json.JSONDecodeError: |
| pass |
|
|
| |
| json_pattern = r"```(?:json)?\s*(\{.*?\})\s*```" |
| matches = re.findall(json_pattern, response, re.DOTALL) |
| if matches: |
| for match in matches: |
| try: |
| return json.loads(match) |
| except json.JSONDecodeError: |
| continue |
|
|
| |
| json_obj_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' |
| matches = re.findall(json_obj_pattern, response, re.DOTALL) |
| if matches: |
| for match in matches: |
| try: |
| parsed = json.loads(match) |
| if isinstance(parsed, dict): |
| return parsed |
| except json.JSONDecodeError: |
| continue |
|
|
| |
| cleaned = response.strip() |
| if not cleaned.startswith('{'): |
| first_brace = cleaned.find('{') |
| if first_brace != -1: |
| cleaned = cleaned[first_brace:] |
| if not cleaned.endswith('}'): |
| last_brace = cleaned.rfind('}') |
| if last_brace != -1: |
| cleaned = cleaned[:last_brace + 1] |
| try: |
| return json.loads(cleaned) |
| except json.JSONDecodeError: |
| pass |
|
|
| if verbose: |
| print(f" β All JSON extraction strategies failed", flush=True) |
| return None |
|
|
| except Exception as e: |
| if verbose: |
| print(f" β JSON extraction error: {e}", flush=True) |
| return None |
|
|
| |
| |
| |
|
|
| def _api_generate(self, prompt: str, retries: int = 3) -> str: |
| """ |
| Generate a response via Groq API. |
| """ |
| for attempt in range(retries): |
| try: |
| self.stats["api_calls"] += 1 |
|
|
| completion = self.client.chat.completions.create( |
| model=self.model_name, |
| messages=[{"role": "user", "content": prompt}], |
| max_tokens=512, |
| temperature=0.1, |
| ) |
| return completion.choices[0].message.content |
|
|
| except Exception as e: |
| error_str = str(e) |
| self.stats["api_errors"] += 1 |
|
|
| if "rate_limit" in error_str.lower() or "429" in error_str: |
| wait = (attempt + 1) * 10 |
| print(f" β οΈ Rate limited, waiting {wait}s (attempt {attempt+1}/{retries})", flush=True) |
| time.sleep(wait) |
| elif "503" in error_str or "unavailable" in error_str.lower(): |
| wait = 30 |
| print(f" β³ Service unavailable, waiting {wait}s...", flush=True) |
| time.sleep(wait) |
| else: |
| print(f" β οΈ API error: {error_str}", flush=True) |
| if attempt < retries - 1: |
| time.sleep(5) |
| else: |
| raise |
|
|
| return "" |
|
|
| |
| |
| |
|
|
| def _get_strategies( |
| self, tactic_shortname: str, technique_id: str |
| ) -> Tuple[List[Dict], List[Dict]]: |
| mitigations = [] |
| detections = [] |
| try: |
| tactic_data = self.mitre_kb.get("tactics", {}).get(tactic_shortname, {}) |
| techniques = tactic_data.get("techniques", []) |
| for tech in techniques: |
| if tech.get("attack_id") == technique_id: |
| raw_mitigations = tech.get("mitigations", []) |
| mitigations = [ |
| {"name": m.get("name"), "strategy": m.get("description")} |
| for m in raw_mitigations |
| ] |
| detections = tech.get("detection_strategies", []) |
| break |
| except Exception: |
| pass |
| return mitigations, detections |
|
|
| def _get_cache_key(self, log_entry: str) -> str: |
| normalized = log_entry.lower().strip() |
| return hashlib.md5(normalized.encode()).hexdigest() |
|
|
| |
| |
| |
|
|
| def _analyze_single(self, log_entry: str) -> Optional[MITREPrediction]: |
| verbose = self.verbose |
|
|
| if verbose: |
| print(f"\n [Stage 1/3] Identifying Tactic...", flush=True) |
| print(f" Log: {log_entry[:100]}...", flush=True) |
|
|
| tactic_prompt = self._create_tactic_prompt(log_entry) |
| tactic_response = self._api_generate(tactic_prompt) |
| tactic_result = self._extract_json_from_response(tactic_response, verbose=verbose) |
|
|
| if not tactic_result: |
| if verbose: |
| print(f" β Tactic extraction failed", flush=True) |
| return None |
|
|
| if verbose: |
| print(f" β Tactic: {tactic_result.get('tactic_name')} (conf: {tactic_result.get('confidence', 0):.2f})", flush=True) |
| print(f" [Stage 2/3] Identifying Technique...", flush=True) |
|
|
| technique_prompt = self._create_technique_prompt( |
| log_entry, |
| tactic_result.get("tactic_name", ""), |
| ) |
| technique_response = self._api_generate(technique_prompt) |
| technique_result = self._extract_json_from_response(technique_response, verbose=verbose) |
|
|
| if not technique_result: |
| if verbose: |
| print(f" β Technique extraction failed", flush=True) |
| return None |
|
|
| if verbose: |
| print(f" β Technique: {technique_result.get('technique_id')} β {technique_result.get('technique_name')} (conf: {technique_result.get('confidence', 0):.2f})", flush=True) |
| print(f" [Stage 3/3] Retrieving mitigation strategies...", flush=True) |
|
|
| mitigation_strategies, detection_strategies = self._get_strategies( |
| tactic_result.get("tactic_name", ""), |
| technique_result.get("technique_id", ""), |
| ) |
|
|
| if verbose: |
| print(f" β Retrieved {len(mitigation_strategies)} mitigations", flush=True) |
|
|
| prediction = MITREPrediction( |
| tactic=tactic_result.get("tactic_name", ""), |
| technique=technique_result.get("technique_id", ""), |
| technique_id=technique_result.get("technique_id", ""), |
| technique_name=technique_result.get("technique_name", ""), |
| confidence_score=min( |
| tactic_result.get("confidence", 0), |
| technique_result.get("confidence", 0), |
| ), |
| mitigation_strategies=mitigation_strategies, |
| detection_strategies=detection_strategies, |
| reasoning=f"Tactic: {tactic_result.get('reasoning', '')}; " |
| f"Technique: {technique_result.get('reasoning', '')}", |
| ) |
|
|
| self.stats["total_processed"] += 1 |
| return prediction |
|
|
| def _create_result_dict(self, idx, row, prediction: MITREPrediction) -> Dict: |
| return { |
| "log_index": idx, |
| "raw_text": row["raw_text"], |
| "tactic": prediction.tactic.title(), |
| "technique_id": prediction.technique_id, |
| "technique_name": prediction.technique_name, |
| "confidence_score": prediction.confidence_score, |
| "reasoning": prediction.reasoning, |
| "num_mitigations": len(prediction.mitigation_strategies), |
| "mitigation_strategies": json.dumps(prediction.mitigation_strategies), |
| } |
|
|
| |
| |
| |
|
|
| def _process_batched(self, df: pd.DataFrame) -> List[Dict]: |
| """ |
| Process logs using Groq API inference. |
| """ |
| all_results = [] |
|
|
| for idx, row in tqdm(df.iterrows(), desc="Processing events", total=len(df)): |
| log_entry = row["raw_text"] |
|
|
| if self.use_caching: |
| cache_key = self._get_cache_key(log_entry) |
| if cache_key in self.cache: |
| self.stats["cache_hits"] += 1 |
| prediction = self.cache[cache_key] |
| if prediction: |
| all_results.append(self._create_result_dict(idx, row, prediction)) |
| continue |
| self.stats["cache_misses"] += 1 |
|
|
| print(f"\nπ Event {idx + 1}/{len(df)}", flush=True) |
| try: |
| prediction = self._analyze_single(log_entry) |
| except Exception as e: |
| print(f" β Failed: {e}", flush=True) |
| prediction = None |
|
|
| if self.use_caching: |
| cache_key = self._get_cache_key(log_entry) |
| self.cache[cache_key] = prediction |
|
|
| if prediction: |
| all_results.append(self._create_result_dict(idx, row, prediction)) |
|
|
| return all_results |