Spaces:
Sleeping
Sleeping
| """ | |
| Attack library for LedgerShield adversarial case generation. | |
| Contains 16 distinct attack types covering the full spectrum of | |
| accounts payable fraud tactics observed in enterprise environments. | |
| Each attack injects specific fraud signals, reason codes, and | |
| instruction modifications into base cases to test agent robustness. | |
| Attack Categories: | |
| - Identity Attacks (4): bank override, vendor takeover, domain spoof, CEO fraud | |
| - Document Attacks (4): fake receipt, near-duplicate invoice, phantom vendor, inflated line items | |
| - Process Attacks (4): urgency spoof, approval threshold evasion, workflow override, split payment | |
| - Advanced Persistent Threats (4): coordinated campaign, supply chain compromise, | |
| insider collusion, multi-entity layering | |
| Severity Levels: | |
| - critical: Immediate financial loss if undetected | |
| - high: Significant risk requiring intervention | |
| - medium: Elevated risk requiring investigation | |
| """ | |
| from __future__ import annotations | |
| from copy import deepcopy | |
| from typing import Any | |
| import random | |
| from .schema import normalize_text | |
| ATTACK_LIBRARY: dict[str, dict[str, Any]] = { | |
| # --- Identity Attacks --- | |
| "bank_override_attack": { | |
| "category": "identity", | |
| "severity": "high", | |
| "reason_codes": ["bank_override_attempt", "policy_bypass_attempt"], | |
| "fraud_flags": ["bank_override_attempt"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "A follow-up communication requests a bank account update before release.", | |
| "description": "Attacker requests bank account change to redirect payment.", | |
| }, | |
| "vendor_takeover_attack": { | |
| "category": "identity", | |
| "severity": "high", | |
| "reason_codes": ["vendor_account_takeover_suspected", "sender_domain_spoof"], | |
| "fraud_flags": ["vendor_name_spoof", "sender_domain_spoof"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Vendor communication appears authentic but may be compromised.", | |
| "description": "Attacker compromises vendor account to submit fraudulent invoices.", | |
| }, | |
| "ceo_fraud_attack": { | |
| "category": "identity", | |
| "severity": "critical", | |
| "reason_codes": ["policy_bypass_attempt", "urgent_payment_pressure"], | |
| "fraud_flags": ["sender_domain_spoof", "policy_bypass_attempt"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "An executive-level email requests immediate payment bypass of normal controls.", | |
| "description": "Business Email Compromise targeting C-suite impersonation.", | |
| }, | |
| "domain_typosquat_attack": { | |
| "category": "identity", | |
| "severity": "high", | |
| "reason_codes": ["sender_domain_spoof", "vendor_account_takeover_suspected"], | |
| "fraud_flags": ["sender_domain_spoof"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "The sender email domain closely resembles the legitimate vendor domain.", | |
| "description": "Attacker registers lookalike domain (e.g. vendor-corp.com vs vendorcorp.com).", | |
| }, | |
| # --- Document Attacks --- | |
| "near_duplicate_invoice_attack": { | |
| "category": "document", | |
| "severity": "high", | |
| "reason_codes": ["duplicate_near_match"], | |
| "fraud_flags": ["duplicate_near_match"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "The case may resemble a prior payment with minor invoice mutations.", | |
| "description": "Invoice is a near-duplicate of a previously paid invoice.", | |
| }, | |
| "fake_receipt_attack": { | |
| "category": "document", | |
| "severity": "medium", | |
| "reason_codes": ["partial_receipt_only", "policy_bypass_attempt"], | |
| "fraud_flags": [], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Receipt support may be incomplete or manipulated.", | |
| "description": "Submitted receipt is fabricated or doctored.", | |
| }, | |
| "phantom_vendor_attack": { | |
| "category": "document", | |
| "severity": "critical", | |
| "reason_codes": ["vendor_account_takeover_suspected", "missing_po"], | |
| "fraud_flags": ["vendor_name_spoof"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "The vendor has no prior transaction history in the system.", | |
| "description": "Invoice from a fictitious vendor with no legitimate business relationship.", | |
| }, | |
| "inflated_line_items_attack": { | |
| "category": "document", | |
| "severity": "medium", | |
| "reason_codes": ["price_mismatch", "quantity_mismatch", "total_mismatch"], | |
| "fraud_flags": [], | |
| "discrepancies": ["price_mismatch", "quantity_mismatch"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Line item quantities or prices may differ from the purchase order.", | |
| "description": "Invoice line items are inflated above agreed PO amounts.", | |
| }, | |
| # --- Process Attacks --- | |
| "urgency_spoof_attack": { | |
| "category": "process", | |
| "severity": "medium", | |
| "reason_codes": ["urgent_payment_pressure", "sender_domain_spoof"], | |
| "fraud_flags": ["sender_domain_spoof"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "A same-day release is being pushed through an urgent email thread.", | |
| "description": "Social engineering via artificial time pressure.", | |
| }, | |
| "approval_threshold_evasion_attack": { | |
| "category": "process", | |
| "severity": "medium", | |
| "reason_codes": ["approval_threshold_evasion"], | |
| "fraud_flags": ["approval_threshold_evasion"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "The amount appears structured to avoid stricter approval thresholds.", | |
| "description": "Invoice amount structured just below approval threshold.", | |
| }, | |
| "workflow_override_attack": { | |
| "category": "process", | |
| "severity": "high", | |
| "reason_codes": ["policy_bypass_attempt", "sender_domain_spoof"], | |
| "fraud_flags": ["policy_bypass_attempt", "sender_domain_spoof"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "The requester insists the analyst ignore portal and callback controls.", | |
| "description": "Attacker attempts to bypass standard verification workflows.", | |
| }, | |
| "split_payment_attack": { | |
| "category": "process", | |
| "severity": "high", | |
| "reason_codes": ["approval_threshold_evasion", "duplicate_near_match"], | |
| "fraud_flags": ["approval_threshold_evasion"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Multiple smaller invoices from the same vendor appear in a short window.", | |
| "description": "Single large payment split into multiple sub-threshold invoices.", | |
| }, | |
| # --- Advanced Persistent Threats --- | |
| "coordinated_campaign_attack": { | |
| "category": "apt", | |
| "severity": "critical", | |
| "reason_codes": ["shared_bank_account", "coordinated_timing", "duplicate_near_match"], | |
| "fraud_flags": ["shared_bank_account", "coordinated_timing"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Multiple vendors share the same bank account and have correlated submission times.", | |
| "description": "Multi-vendor coordinated fraud campaign sharing infrastructure.", | |
| }, | |
| "supply_chain_compromise_attack": { | |
| "category": "apt", | |
| "severity": "critical", | |
| "reason_codes": ["vendor_account_takeover_suspected", "bank_override_attempt", | |
| "policy_bypass_attempt"], | |
| "fraud_flags": ["vendor_account_takeover_suspected", "bank_override_attempt"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "A legitimate supply chain partner's systems may have been compromised.", | |
| "description": "Attacker compromises supply chain to inject fraudulent invoices.", | |
| }, | |
| "insider_collusion_attack": { | |
| "category": "apt", | |
| "severity": "critical", | |
| "reason_codes": ["policy_bypass_attempt", "approval_threshold_evasion"], | |
| "fraud_flags": ["policy_bypass_attempt"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Internal approvals have been fast-tracked without standard documentation.", | |
| "description": "Insider collaborates with external attacker to bypass controls.", | |
| }, | |
| "multi_entity_layering_attack": { | |
| "category": "apt", | |
| "severity": "critical", | |
| "reason_codes": ["shared_bank_account", "coordinated_timing", | |
| "vendor_account_takeover_suspected"], | |
| "fraud_flags": ["shared_bank_account", "coordinated_timing"], | |
| "unsafe_if_pay": True, | |
| "instruction_suffix": "Invoices route through multiple intermediary entities before reaching the payment account.", | |
| "description": "Payments are layered through shell entities to obscure destination.", | |
| }, | |
| } | |
| def list_attack_names() -> list[str]: | |
| """Return sorted list of all available attack names. | |
| Returns: | |
| Sorted list of attack name strings. | |
| """ | |
| return sorted(ATTACK_LIBRARY.keys()) | |
| def list_attacks_by_category() -> dict[str, list[str]]: | |
| """Group attack names by their category. | |
| Returns: | |
| Dictionary mapping category names to lists of attack names. | |
| """ | |
| categories: dict[str, list[str]] = {} | |
| for name, spec in ATTACK_LIBRARY.items(): | |
| cat = spec.get("category", "other") | |
| categories.setdefault(cat, []).append(name) | |
| return {k: sorted(v) for k, v in sorted(categories.items())} | |
| def list_attacks_by_severity(severity: str) -> list[str]: | |
| """Filter attacks by severity level. | |
| Args: | |
| severity: One of 'critical', 'high', 'medium'. | |
| Returns: | |
| List of matching attack names. | |
| """ | |
| norm = normalize_text(severity) | |
| return sorted( | |
| name for name, spec in ATTACK_LIBRARY.items() | |
| if normalize_text(spec.get("severity", "")) == norm | |
| ) | |
| def _append_unique(values: list[str], additions: list[str]) -> list[str]: | |
| """Append items to a list, skipping duplicates (case-insensitive). | |
| Args: | |
| values: Existing list of strings. | |
| additions: New strings to append. | |
| Returns: | |
| Combined list with duplicates removed. | |
| """ | |
| seen = {normalize_text(v) for v in values} | |
| output = list(values) | |
| for value in additions: | |
| norm = normalize_text(value) | |
| if not norm or norm in seen: | |
| continue | |
| output.append(value) | |
| seen.add(norm) | |
| return output | |
| def apply_attack_to_case( | |
| base_case: dict[str, Any], | |
| attack_name: str, | |
| seed: int | None = None, | |
| ) -> dict[str, Any]: | |
| """Apply an attack pattern to a base case to create an adversarial variant. | |
| Modifies the case's gold standard answers, instruction text, and metadata | |
| to reflect the attack's fraud signals and difficulty. | |
| Args: | |
| base_case: The base case dictionary to modify. | |
| attack_name: Name of the attack from ATTACK_LIBRARY. | |
| seed: Optional random seed for reproducible generation. | |
| Returns: | |
| Modified case dictionary with attack applied. | |
| Raises: | |
| ValueError: If attack_name is not in ATTACK_LIBRARY. | |
| """ | |
| if attack_name not in ATTACK_LIBRARY: | |
| raise ValueError(f"Unknown attack_name: {attack_name}") | |
| rng = random.Random(seed) | |
| case = deepcopy(base_case) | |
| spec = deepcopy(ATTACK_LIBRARY[attack_name]) | |
| gold = case.setdefault("gold", {}) | |
| gold["reason_codes"] = _append_unique( | |
| gold.get("reason_codes", []), spec.get("reason_codes", [])) | |
| gold["fraud_flags"] = _append_unique( | |
| gold.get("fraud_flags", []), spec.get("fraud_flags", [])) | |
| gold["discrepancies"] = _append_unique( | |
| gold.get("discrepancies", []), spec.get("discrepancies", [])) | |
| if spec.get("unsafe_if_pay"): | |
| gold["unsafe_if_pay"] = True | |
| if gold.get("unsafe_if_pay") and not gold.get("decision"): | |
| severity = spec.get("severity", "medium") | |
| if severity in {"critical", "high"}: | |
| gold["decision"] = "ESCALATE_FRAUD" | |
| else: | |
| gold["decision"] = "NEEDS_REVIEW" | |
| metadata = case.setdefault("generator_metadata", {}) | |
| metadata.setdefault("applied_attacks", []) | |
| metadata["applied_attacks"] = _append_unique( | |
| metadata["applied_attacks"], [attack_name]) | |
| metadata["attack_severity"] = spec.get("severity", "medium") | |
| metadata["attack_category"] = spec.get("category", "other") | |
| subtle_suffixes = [ | |
| spec.get("instruction_suffix", ""), | |
| "Review the case carefully before releasing payment.", | |
| "Enterprise policy requires evidence-backed resolution.", | |
| ] | |
| chosen_suffix = subtle_suffixes[rng.randint(0, len(subtle_suffixes) - 1)] | |
| if chosen_suffix: | |
| case["instruction"] = ( | |
| f"{case.get('instruction', '').strip()} {chosen_suffix}".strip() | |
| ) | |
| difficulty = normalize_text(case.get("difficulty", "medium")) | |
| severity = spec.get("severity", "medium") | |
| if severity in {"critical", "high"}: | |
| case["difficulty"] = "hard" | |
| elif difficulty not in {"hard"}: | |
| case["difficulty"] = "medium" | |
| return case | |