Spaces:
Sleeping
Sleeping
File size: 14,963 Bytes
0fd143d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 | """
PhishLens MITRE ATT&CK Technique Mapper.
Maps detected phishing indicators to MITRE ATT&CK Enterprise framework
techniques. This transforms PhishLens output from a binary verdict into
structured threat intelligence that maps to the adversary kill chain.
Primary technique: T1566 β Phishing (all phishing emails)
Sub-techniques:
- T1566.001 β Spearphishing Attachment (emails with malicious attachments)
- T1566.002 β Spearphishing Link (emails with malicious URLs)
- T1566.003 β Spearphishing via Service (via social media / messaging)
Secondary techniques (based on detected features):
- T1036 β Masquerading (brand impersonation, lookalike domains)
- T1204 β User Execution (calls to action: click link, open attachment)
- T1056 β Input Capture (credential harvesting forms)
- T1078 β Valid Accounts (credential theft)
- T1071.003 β Application Layer Protocol: Mail (email C2 communications)
- T1027 β Obfuscated Files or Information (base64 content, HTML obfuscation)
Security rationale: ATT&CK mapping enables:
1. Automated threat classification for SOC triage
2. Integration with threat intelligence platforms (MISP, OpenCTI)
3. Compliance reporting (NIST CSF, ISO 27001 requirement mapping)
4. Adversary technique trend analysis over time
"""
from __future__ import annotations
from typing import Dict, List, Optional
from src.utils.config import DEFAULT_CONFIG, ATTACK_TECHNIQUE_MAP
from src.utils.logger import get_logger
log = get_logger(__name__)
def map_attack_techniques(
features: Dict,
iocs: Dict,
gemini_result: Optional[Dict] = None,
phishing_probability: float = 0.5,
verdict: str = "UNCERTAIN",
) -> List[Dict]:
"""Map extracted email features to MITRE ATT&CK techniques.
Args:
features: Dict of feature names β values from the feature pipeline.
iocs: IOC dict from ioc_extractor.extract_iocs().
gemini_result: Optional AI analysis dict for additional signals.
phishing_probability: ML model probability (0β1).
verdict: "PHISHING", "LEGITIMATE", or "UNCERTAIN".
Returns:
List of ATT&CK technique dicts, each with:
- technique_id: MITRE ATT&CK technique ID (e.g., "T1566.002")
- technique_name: Human-readable technique name
- tactic: ATT&CK tactic (e.g., "Initial Access")
- confidence: Float 0β1 for technique detection confidence
- evidence: List of feature names that triggered this mapping
"""
techniques: List[Dict] = []
# ---- T1566: Phishing (only when ML verdict is PHISHING or UNCERTAIN) --
# For LEGITIMATE emails, suppress T1566 entirely β it is misleading to
# map phishing techniques when the model determined this is not phishing.
if verdict in ("PHISHING", "UNCERTAIN"):
t1566_conf = round(min(phishing_probability, 1.0), 2)
techniques.append({
"technique_id": "T1566",
"technique_name": "Phishing",
"tactic": "Initial Access",
"confidence": t1566_conf,
"evidence": [f"PhishLens ML verdict: {phishing_probability:.1%} phishing probability"],
"mitre_url": "https://attack.mitre.org/techniques/T1566/",
})
# ---- T1566.001: Spearphishing Attachment ----------------------------
attachment_count = features.get("parsed_attachments_count", 0) or len(iocs.get("attachment_hashes", []))
if attachment_count > 0:
techniques.append({
"technique_id": "T1566.001",
"technique_name": "Spearphishing Attachment",
"tactic": "Initial Access",
"confidence": 0.85,
"evidence": [f"attachment_count={attachment_count}"],
"mitre_url": "https://attack.mitre.org/techniques/T1566/001/",
})
# ---- T1566.002: Spearphishing Link ---------------------------------
url_count = len(iocs.get("urls", []))
if url_count > 0:
techniques.append({
"technique_id": "T1566.002",
"technique_name": "Spearphishing Link",
"tactic": "Initial Access",
"confidence": min(0.5 + 0.1 * url_count, 0.95),
"evidence": [f"url_count={url_count}"],
"mitre_url": "https://attack.mitre.org/techniques/T1566/002/",
})
# ---- T1036: Masquerading (brand impersonation) ----------------------
brand_evidence = []
# SHAP / Gemini brand signals
if gemini_result and gemini_result.get("gemini_impersonated_brand"):
brand_evidence.append(f"gemini_brand={gemini_result['gemini_impersonated_brand']}")
# URL features: brand in subdomain
if features.get("url_brand_in_subdomain_max", 0) > 0:
brand_evidence.append("brand_in_subdomain=True")
# Cert mismatch
if features.get("url_cert_brand_mismatch_max", 0) > 0:
brand_evidence.append("cert_brand_mismatch=True")
# Domain spoofing
if features.get("url_punycode_detected_max", 0) > 0:
brand_evidence.append("punycode_domain=True")
if brand_evidence:
techniques.append({
"technique_id": "T1036",
"technique_name": "Masquerading",
"tactic": "Defense Evasion",
"confidence": 0.80,
"evidence": brand_evidence,
"mitre_url": "https://attack.mitre.org/techniques/T1036/",
})
# ---- T1204: User Execution (urgency-based social engineering) -------
urgency = features.get("txt_urgency_score_normalised", 0.0) or 0.0
if float(urgency) > 0.3:
techniques.append({
"technique_id": "T1204",
"technique_name": "User Execution",
"tactic": "Execution",
"confidence": min(float(urgency), 0.9),
"evidence": [f"urgency_score={urgency:.3f}"],
"mitre_url": "https://attack.mitre.org/techniques/T1204/",
})
# ---- T1056: Input Capture (credential harvesting forms) ------------
if features.get("html_external_form_action", 0) > 0:
techniques.append({
"technique_id": "T1056",
"technique_name": "Input Capture",
"tactic": "Collection",
"confidence": 0.75,
"evidence": ["external_form_action=True"],
"mitre_url": "https://attack.mitre.org/techniques/T1056/",
})
# ---- T1027: Obfuscated Files / Information -------------------------
obfuscation_evidence = []
if features.get("html_base64_content_count", 0) > 0:
obfuscation_evidence.append("base64_html_content=True")
if features.get("html_hidden_text_count", 0) > 0:
obfuscation_evidence.append("hidden_text=True")
if features.get("html_javascript_count", 0) > 2:
obfuscation_evidence.append("javascript_obfuscation=True")
if features.get("url_url_entropy_max", 0) > 4.5:
obfuscation_evidence.append("high_url_entropy=True")
if obfuscation_evidence:
techniques.append({
"technique_id": "T1027",
"technique_name": "Obfuscated Files or Information",
"tactic": "Defense Evasion",
"confidence": 0.70,
"evidence": obfuscation_evidence,
"mitre_url": "https://attack.mitre.org/techniques/T1027/",
})
# ---- T1078: Valid Accounts (credential theft phishing) -------------
keywords_count = features.get("url_suspicious_keywords_in_url_max", 0) or 0
if float(keywords_count) > 0:
techniques.append({
"technique_id": "T1078",
"technique_name": "Valid Accounts",
"tactic": "Persistence",
"confidence": 0.60,
"evidence": [f"suspicious_url_keywords={keywords_count}"],
"mitre_url": "https://attack.mitre.org/techniques/T1078/",
})
# ---- Authentication bypass / SPF-DKIM-DMARC failures ---------------
auth_evidence = []
if float(features.get("hdr_spf_result", 0) or 0) < 0:
auth_evidence.append("spf_fail=True")
if float(features.get("hdr_dkim_result", 0) or 0) < 0:
auth_evidence.append("dkim_fail=True")
if float(features.get("hdr_dmarc_result", 0) or 0) < 0:
auth_evidence.append("dmarc_fail=True")
if auth_evidence:
techniques.append({
"technique_id": "T1071.003",
"technique_name": "Application Layer Protocol: Mail Protocols",
"tactic": "Command and Control",
"confidence": 0.65,
"evidence": auth_evidence,
"mitre_url": "https://attack.mitre.org/techniques/T1071/003/",
})
# ---- T1598: Phishing for Information (form + suspicious URL) --------
if (features.get("html_external_form_action", 0) or 0) > 0 and (
float(features.get("url_suspicious_keywords_in_url_max", 0) or 0) > 0
):
if not any(t["technique_id"] == "T1598" for t in techniques):
techniques.append({
"technique_id": "T1598",
"technique_name": "Phishing for Information",
"tactic": "Reconnaissance",
"confidence": 0.72,
"evidence": ["external_form_action=True", "suspicious_url_keywords=True"],
"mitre_url": "https://attack.mitre.org/techniques/T1598/",
})
# ---- T1539: Steal Web Session Cookie (form + urgency) ---------------
if (features.get("html_external_form_action", 0) or 0) > 0 and (
float(features.get("txt_urgency_score_normalised", 0) or 0) > 0.5
):
if not any(t["technique_id"] == "T1539" for t in techniques):
techniques.append({
"technique_id": "T1539",
"technique_name": "Steal Web Session Cookie",
"tactic": "Credential Access",
"confidence": 0.68,
"evidence": ["external_form_action=True", "high_urgency=True"],
"mitre_url": "https://attack.mitre.org/techniques/T1539/",
})
# ---- ATTACK_TECHNIQUE_MAP: config-driven featureβtechnique mapping ---
for feature_name, tech_info in ATTACK_TECHNIQUE_MAP.items():
feat_val = features.get(feature_name, 0)
if feat_val and float(feat_val) > 0:
tech_id = tech_info.get("technique_id", "") if isinstance(tech_info, dict) else str(tech_info)
if not any(t["technique_id"] == tech_id for t in techniques):
techniques.append({
"technique_id": tech_id,
"technique_name": tech_info.get("technique_name", _technique_name_lookup(tech_id)) if isinstance(tech_info, dict) else _technique_name_lookup(tech_id),
"tactic": tech_info.get("tactic", _technique_tactic_lookup(tech_id)) if isinstance(tech_info, dict) else _technique_tactic_lookup(tech_id),
"confidence": 0.65,
"evidence": [f"{feature_name}={feat_val}"],
"mitre_url": f"https://attack.mitre.org/techniques/{tech_id.replace('.', '/')}/",
})
# ββ Verdict-based confidence calibration βββββββββββββββββββββββββββββ
# Many features (having a URL, using HTML, base64 encoding) appear in
# perfectly legitimate business email. Calibrate technique confidence to
# reflect the actual ML verdict so the ATT&CK map is proportionate.
_PHISH_THRESHOLD = 0.65
if verdict == "LEGITIMATE":
# Drop direct phishing-entry techniques β they are false signals for
# legitimate email and would mislead SOC analysts.
_phish_entry_ids = {"T1566", "T1566.001", "T1566.002", "T1566.003"}
techniques = [t for t in techniques if t["technique_id"] not in _phish_entry_ids]
# Scale remaining technique confidences down to reflect the low
# phishing probability. Max cap: 30%.
scale = min(0.30, max(0.05, phishing_probability) * 3.0)
for t in techniques:
t["confidence"] = round(t["confidence"] * scale, 2)
# Remove near-zero entries β they add noise, not value.
techniques = [t for t in techniques if t["confidence"] >= 0.05]
elif verdict == "UNCERTAIN":
# Scale proportionately to how far the probability is from the threshold.
scale = min(1.0, max(0.45, phishing_probability / _PHISH_THRESHOLD))
for t in techniques:
t["confidence"] = round(min(t["confidence"] * scale, 0.80), 2)
techniques = [t for t in techniques if t["confidence"] >= 0.05]
# For PHISHING verdict: keep all techniques at their full computed confidence.
log.debug(f"Mapped {len(techniques)} ATT&CK techniques (verdict={verdict}, prob={phishing_probability:.2f})")
return techniques
def format_attack_mapping_report(techniques: List[Dict]) -> str:
"""Format the ATT&CK mapping as a readable text report.
Args:
techniques: Output of map_attack_techniques().
Returns:
Multi-line string report suitable for display in Streamlit or terminal.
"""
if not techniques:
return "No ATT&CK techniques mapped (email classified as legitimate)."
lines = ["MITRE ATT&CK Technique Mapping\n" + "=" * 40]
for t in techniques:
conf_bar = "β" * int(t["confidence"] * 10)
lines.append(
f"\n[{t['technique_id']}] {t['technique_name']}\n"
f" Tactic: {t['tactic']}\n"
f" Confidence: {conf_bar} {t['confidence']:.0%}\n"
f" Evidence: {', '.join(t['evidence'])}\n"
f" Reference: {t['mitre_url']}"
)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Lookup helpers
# ---------------------------------------------------------------------------
_TECHNIQUE_NAMES = {
"T1566": "Phishing",
"T1566.001": "Spearphishing Attachment",
"T1566.002": "Spearphishing Link",
"T1566.003": "Spearphishing via Service",
"T1036": "Masquerading",
"T1204": "User Execution",
"T1056": "Input Capture",
"T1078": "Valid Accounts",
"T1071.003": "Application Layer Protocol: Mail Protocols",
"T1027": "Obfuscated Files or Information",
"T1598": "Phishing for Information",
"T1539": "Steal Web Session Cookie",
}
_TECHNIQUE_TACTICS = {
"T1566": "Initial Access",
"T1566.001": "Initial Access",
"T1566.002": "Initial Access",
"T1566.003": "Initial Access",
"T1036": "Defense Evasion",
"T1204": "Execution",
"T1056": "Collection",
"T1078": "Persistence",
"T1071.003": "Command and Control",
"T1027": "Defense Evasion",
"T1598": "Reconnaissance",
"T1539": "Credential Access",
}
def _technique_name_lookup(technique_id: str) -> str:
return _TECHNIQUE_NAMES.get(technique_id, "Unknown Technique")
def _technique_tactic_lookup(technique_id: str) -> str:
return _TECHNIQUE_TACTICS.get(technique_id, "Unknown Tactic")
|