| """
|
| Attack Engine
|
|
|
| Multi-turn attack simulation engine with:
|
| - Chaining logic
|
| - Temperature variation
|
| - Diversity scoring
|
| - Reproducibility via seed
|
| - Mutation engine integration
|
| """
|
|
|
| import hashlib
|
| import random
|
| from typing import Any, Dict, List, Optional
|
|
|
| import numpy as np
|
| from sentence_transformers import SentenceTransformer
|
|
|
| from agents.attacker.registry import get_attack_strategy
|
| from agents.attacker.schemas import AttackRequest, AttackResponse, AttackLog
|
| from backend.logging.logger import get_logger
|
|
|
|
|
|
|
| TEMPERATURE_VALUES = [0.2, 0.5, 0.8, 1.0]
|
|
|
|
|
| class AttackEngine:
|
| """
|
| Multi-turn attack simulation engine.
|
|
|
| Handles:
|
| - Attack generation with various strategies
|
| - Multi-turn attack chaining
|
| - Temperature variation
|
| - Diversity scoring using embeddings
|
| - Reproducibility via deterministic seeding
|
| """
|
|
|
| def __init__(self, embedding_model: str = "all-MiniLM-L6-v2"):
|
| """
|
| Initialize the attack engine.
|
|
|
| Args:
|
| embedding_model: Model to use for diversity scoring
|
| """
|
| self.logger = get_logger(__name__)
|
| self._embedding_model: Optional[SentenceTransformer] = None
|
| self._embedding_model_name = embedding_model
|
| self._prompt_cache: Dict[str, str] = {}
|
|
|
| @property
|
| def embedding_model(self) -> SentenceTransformer:
|
| """Lazy load the embedding model."""
|
| if self._embedding_model is None:
|
| self.logger.info(
|
| "Loading embedding model for diversity scoring",
|
| model=self._embedding_model_name
|
| )
|
| self._embedding_model = SentenceTransformer(self._embedding_model_name)
|
| return self._embedding_model
|
|
|
| def _compute_seed(self, run_id: str, sample_id: str) -> int:
|
| """
|
| Compute deterministic seed from run_id and sample_id.
|
|
|
| This ensures reproducibility across runs.
|
|
|
| Args:
|
| run_id: Unique run identifier
|
| sample_id: Sample identifier
|
|
|
| Returns:
|
| Deterministic seed integer
|
| """
|
| hash_input = f"{run_id}{sample_id}"
|
| hash_bytes = hashlib.sha256(hash_input.encode()).digest()
|
| return int.from_bytes(hash_bytes[:4], byteorder="big")
|
|
|
| def _compute_diversity_score(
|
| self,
|
| base_prompt: str,
|
| mutated_prompt: str
|
| ) -> float:
|
| """
|
| Compute diversity score between base and mutated prompts.
|
|
|
| D = 1 - sim(e_base, e_mutated)
|
|
|
| Args:
|
| base_prompt: Original prompt
|
| mutated_prompt: Adversarial prompt
|
|
|
| Returns:
|
| Diversity score between 0 and 1
|
| """
|
| try:
|
| embeddings = self.embedding_model.encode(
|
| [base_prompt, mutated_prompt],
|
| convert_to_numpy=True
|
| )
|
|
|
|
|
| base_embedding = embeddings[0]
|
| mutated_embedding = embeddings[1]
|
|
|
|
|
| base_norm = base_embedding / np.linalg.norm(base_embedding)
|
| mutated_norm = mutated_embedding / np.linalg.norm(mutated_embedding)
|
|
|
|
|
| similarity = np.dot(base_norm, mutated_norm)
|
|
|
|
|
| diversity = 1.0 - float(similarity)
|
|
|
| return max(0.0, min(1.0, diversity))
|
|
|
| except Exception as e:
|
| self.logger.warning(
|
| "Failed to compute diversity score",
|
| error=str(e)
|
| )
|
| return 0.0
|
|
|
| def _select_temperature(
|
| self,
|
| attack_type: str,
|
| base_temperature: float,
|
| seed: int
|
| ) -> float:
|
| """
|
| Select temperature based on attack type and configuration.
|
|
|
| Args:
|
| attack_type: Type of attack
|
| base_temperature: Base temperature from request
|
| seed: Random seed for deterministic selection
|
|
|
| Returns:
|
| Selected temperature
|
| """
|
| random.seed(seed)
|
|
|
|
|
| if base_temperature not in TEMPERATURE_VALUES:
|
| return base_temperature
|
|
|
|
|
| if attack_type in ["jailbreak", "chaining"]:
|
|
|
| return random.choice(TEMPERATURE_VALUES)
|
|
|
| return base_temperature
|
|
|
| async def execute(self, request: AttackRequest) -> AttackResponse:
|
| """
|
| Execute an attack based on the request.
|
|
|
| Args:
|
| request: Attack request with parameters
|
|
|
| Returns:
|
| Attack response with mutated prompt and metadata
|
| """
|
| self.logger.info(
|
| "Executing attack",
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| attack_type=request.attack_type,
|
| chain_depth=request.chain_depth,
|
| temperature=request.temperature
|
| )
|
|
|
| try:
|
|
|
| seed = self._compute_seed(
|
| str(request.run_id),
|
| request.sample_id
|
| )
|
| random.seed(seed)
|
|
|
|
|
| temperature = self._select_temperature(
|
| request.attack_type,
|
| request.temperature,
|
| seed
|
| )
|
|
|
|
|
| strategy = get_attack_strategy(request.attack_type)
|
|
|
|
|
| base_attack_prompt = request.base_prompt
|
| chain_history: List[Dict[str, Any]] = []
|
|
|
| for chain_step in range(request.chain_depth):
|
|
|
| base_attack_prompt = strategy.apply(
|
| base_attack_prompt,
|
| chain_step=chain_step,
|
| total_steps=request.chain_depth,
|
| protected_attributes=request.protected_attributes
|
| )
|
|
|
| chain_history.append({
|
| "step": chain_step,
|
| "strategy": strategy.name,
|
| "prompt_length": len(base_attack_prompt)
|
| })
|
|
|
|
|
|
|
| from agents.mutation.engine import get_mutation_engine
|
| from agents.mutation.schemas import MutationRequest
|
|
|
| mutation_engine = get_mutation_engine()
|
|
|
| mutation_request = MutationRequest(
|
| run_id=request.run_id,
|
| sample_id=request.sample_id,
|
| base_prompt=base_attack_prompt,
|
| attack_type=request.attack_type,
|
| mutation_depth=request.chain_depth,
|
| random_seed=seed
|
| )
|
|
|
|
|
| mutation_response = await mutation_engine.mutate(mutation_request)
|
|
|
|
|
| mutated_prompt = mutation_response.mutated_prompt
|
| diversity_score = mutation_response.diversity_score
|
|
|
|
|
| attack_metadata: Dict[str, Any] = {
|
| "chain_history": chain_history,
|
| "injection_detected_markers": [],
|
| "override_attempt": False,
|
| "seed_used": seed,
|
| "mutation_trace": mutation_response.mutation_trace,
|
| "mutation_depth": mutation_response.mutation_depth,
|
| "cumulative_diversity": mutation_response.cumulative_diversity,
|
| "mutation_applied": True,
|
| }
|
|
|
|
|
| if request.attack_type == "injection":
|
| attack_metadata["injection_detected_markers"] = [
|
| "ignore previous instructions",
|
| "new system instructions"
|
| ]
|
| attack_metadata["override_attempt"] = True
|
|
|
|
|
| self._log_attack(
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| attack_type=request.attack_type,
|
| chain_depth=request.chain_depth,
|
| temperature=temperature,
|
| diversity_score=diversity_score,
|
| success=True
|
| )
|
|
|
| return AttackResponse(
|
| mutated_prompt=mutated_prompt,
|
| attack_type=request.attack_type,
|
| temperature=temperature,
|
| chain_depth=request.chain_depth,
|
| attack_metadata=attack_metadata,
|
| diversity_score=diversity_score,
|
| run_id=request.run_id,
|
| sample_id=request.sample_id
|
| )
|
|
|
| except Exception as e:
|
| self.logger.error(
|
| "Attack execution failed",
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| error=str(e)
|
| )
|
|
|
|
|
| self._log_attack(
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| attack_type=request.attack_type,
|
| chain_depth=request.chain_depth,
|
| temperature=request.temperature,
|
| diversity_score=None,
|
| success=False,
|
| error=str(e)
|
| )
|
|
|
| raise
|
|
|
| def _log_attack(
|
| self,
|
| run_id: str,
|
| sample_id: str,
|
| attack_type: str,
|
| chain_depth: int,
|
| temperature: float,
|
| diversity_score: Optional[float],
|
| success: bool,
|
| error: Optional[str] = None
|
| ) -> None:
|
| """
|
| Log attack execution details.
|
|
|
| Args:
|
| run_id: Run identifier
|
| sample_id: Sample identifier
|
| attack_type: Type of attack
|
| chain_depth: Chain depth applied
|
| temperature: Temperature used
|
| diversity_score: Diversity score computed
|
| success: Whether attack succeeded
|
| error: Error message if failed
|
| """
|
| log_data = {
|
| "run_id": run_id,
|
| "sample_id": sample_id,
|
| "attack_type": attack_type,
|
| "chain_depth": chain_depth,
|
| "temperature": temperature,
|
| "diversity_score": diversity_score,
|
| "success": success,
|
| "error": error
|
| }
|
|
|
| if success:
|
| self.logger.info(
|
| "Attack executed successfully",
|
| **log_data
|
| )
|
| else:
|
| self.logger.error(
|
| "Attack execution failed",
|
| **log_data
|
| )
|
|
|
| def clear_cache(self) -> None:
|
| """Clear the prompt cache."""
|
| self._prompt_cache.clear()
|
| self.logger.info("Prompt cache cleared")
|
|
|
| def get_available_strategies(self) -> List[str]:
|
| """
|
| Get list of available attack strategies.
|
|
|
| Returns:
|
| List of strategy names
|
| """
|
| from agents.attacker.registry import list_attack_strategies
|
| return list_attack_strategies()
|
|
|
|
|
|
|
| _attack_engine: Optional[AttackEngine] = None
|
|
|
|
|
| def get_attack_engine() -> AttackEngine:
|
| """
|
| Get the global attack engine instance.
|
|
|
| Returns:
|
| AttackEngine singleton
|
| """
|
| global _attack_engine
|
| if _attack_engine is None:
|
| _attack_engine = AttackEngine()
|
| return _attack_engine
|
|
|
|
|
| __all__ = [
|
| "AttackEngine",
|
| "get_attack_engine",
|
| "TEMPERATURE_VALUES",
|
| ]
|
|
|