| """
|
| Mutation Engine
|
|
|
| Main engine for prompt mutation with:
|
| - Strategy selection and execution
|
| - Multi-hop mutation
|
| - Diversity scoring
|
| - Lineage tracking
|
| - Reproducibility controls
|
| """
|
|
|
| import hashlib
|
| import random
|
| from typing import Any, Dict, List, Optional
|
|
|
| from agents.mutation.diversity import DiversityScorer, get_diversity_scorer
|
| from agents.mutation.registry import get_mutation_strategy, list_mutation_strategies
|
| from agents.mutation.schemas import (
|
| MutationLog,
|
| MutationRequest,
|
| MutationResponse,
|
| )
|
| from backend.logging.logger import get_logger
|
|
|
|
|
|
|
| DEFAULT_STRATEGIES = [
|
| "synonym_replacement",
|
| "paraphrase",
|
| "role_swap",
|
| "context_obfuscation",
|
| ]
|
|
|
|
|
| class MutationEngine:
|
| """
|
| Main mutation engine for prompt mutation.
|
|
|
| Handles:
|
| - Strategy selection and execution
|
| - Multi-hop mutation
|
| - Diversity scoring
|
| - Lineage tracking
|
| - Reproducibility via deterministic seeding
|
| """
|
|
|
| def __init__(
|
| self,
|
| embedding_model: str = "all-MiniLM-L6-v2",
|
| min_diversity_threshold: float = 0.1,
|
| min_similarity_threshold: float = 0.5,
|
| max_retries: int = 3
|
| ):
|
| """
|
| Initialize the mutation engine.
|
|
|
| Args:
|
| embedding_model: Model for diversity scoring
|
| min_diversity_threshold: Minimum diversity to accept
|
| min_similarity_threshold: Minimum similarity to preserve intent
|
| max_retries: Maximum retries for low diversity
|
| """
|
| self.logger = get_logger(__name__)
|
| self._diversity_scorer: Optional[DiversityScorer] = None
|
| self._embedding_model_name = embedding_model
|
| self._min_diversity_threshold = min_diversity_threshold
|
| self._min_similarity_threshold = min_similarity_threshold
|
| self._max_retries = max_retries
|
|
|
| @property
|
| def diversity_scorer(self) -> DiversityScorer:
|
| """Lazy load the diversity scorer."""
|
| if self._diversity_scorer is None:
|
| self._diversity_scorer = get_diversity_scorer()
|
| return self._diversity_scorer
|
|
|
| def _compute_seed(
|
| self,
|
| run_id: str,
|
| sample_id: str,
|
| attack_type: str,
|
| depth: int = 0
|
| ) -> int:
|
| """
|
| Compute deterministic seed for reproducibility.
|
|
|
| seed = hash(run_id + sample_id + attack_type + depth)
|
|
|
| Args:
|
| run_id: Run identifier
|
| sample_id: Sample identifier
|
| attack_type: Attack type
|
| depth: Mutation depth
|
|
|
| Returns:
|
| Deterministic seed
|
| """
|
| hash_input = f"{run_id}{sample_id}{attack_type}{depth}"
|
| hash_bytes = hashlib.sha256(hash_input.encode()).digest()
|
| return int.from_bytes(hash_bytes[:4], byteorder="big")
|
|
|
| def _select_strategies(
|
| self,
|
| mutation_depth: int,
|
| seed: int,
|
| attack_type: str
|
| ) -> List[str]:
|
| """
|
| Select strategies for mutation based on depth.
|
|
|
| Args:
|
| mutation_depth: Number of mutations
|
| seed: Random seed
|
| attack_type: Type of attack
|
|
|
| Returns:
|
| List of strategy names
|
| """
|
| random.seed(seed)
|
|
|
| strategies = []
|
| available = DEFAULT_STRATEGIES.copy()
|
|
|
| for i in range(mutation_depth):
|
| if not available:
|
| available = DEFAULT_STRATEGIES.copy()
|
|
|
|
|
| strategy = random.choice(available)
|
| strategies.append(strategy)
|
|
|
|
|
| available.remove(strategy)
|
|
|
| return strategies
|
|
|
| def _validate_mutation(
|
| self,
|
| base_prompt: str,
|
| mutated_prompt: str
|
| ) -> tuple[bool, str, float, float]:
|
| """
|
| Validate that a mutation preserves attack intent.
|
|
|
| Args:
|
| base_prompt: Original prompt
|
| mutated_prompt: Mutated prompt
|
|
|
| Returns:
|
| Tuple of (is_valid, reason, diversity, similarity)
|
| """
|
| diversity, similarity = self.diversity_scorer.compute_step_diversity(
|
| base_prompt,
|
| mutated_prompt
|
| )
|
|
|
| if diversity < self._min_diversity_threshold:
|
| return False, f"Diversity {diversity:.3f} below threshold", diversity, similarity
|
|
|
| if similarity < self._min_similarity_threshold:
|
| return False, f"Similarity {similarity:.3f} below threshold", diversity, similarity
|
|
|
| return True, "Valid", diversity, similarity
|
|
|
| async def mutate(self, request: MutationRequest) -> MutationResponse:
|
| """
|
| Execute prompt mutation based on the request.
|
|
|
| Args:
|
| request: Mutation request with parameters
|
|
|
| Returns:
|
| Mutation response with mutated prompt and metadata
|
| """
|
| self.logger.info(
|
| "Executing mutation",
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| attack_type=request.attack_type,
|
| mutation_depth=request.mutation_depth
|
| )
|
|
|
| try:
|
|
|
| if request.random_seed is not None:
|
| base_seed = request.random_seed
|
| else:
|
| base_seed = self._compute_seed(
|
| str(request.run_id),
|
| request.sample_id,
|
| request.attack_type
|
| )
|
|
|
|
|
| strategies = self._select_strategies(
|
| request.mutation_depth,
|
| base_seed,
|
| request.attack_type
|
| )
|
|
|
|
|
| mutated_prompt = request.base_prompt
|
| mutation_trace: List[str] = []
|
| prompt_history = [request.base_prompt]
|
|
|
| for depth in range(request.mutation_depth):
|
| seed = self._compute_seed(
|
| str(request.run_id),
|
| request.sample_id,
|
| request.attack_type,
|
| depth
|
| )
|
|
|
|
|
| strategy = get_mutation_strategy(strategies[depth])
|
|
|
| if strategy is None:
|
| self.logger.warning(
|
| "Strategy not found, using default",
|
| strategy=strategies[depth]
|
| )
|
| strategy = get_mutation_strategy("synonym_replacement")
|
|
|
|
|
| mutated_prompt = strategy.apply(mutated_prompt, seed)
|
| mutation_trace.append(strategy.name)
|
| prompt_history.append(mutated_prompt)
|
|
|
|
|
| is_valid, reason, diversity, similarity = self._validate_mutation(
|
| request.base_prompt,
|
| mutated_prompt
|
| )
|
|
|
|
|
| cumulative_diversity = self.diversity_scorer.compute_cumulative_diversity(
|
| prompt_history
|
| )
|
|
|
|
|
| mutation_metadata: Dict[str, Any] = {
|
| "strategies_used": strategies,
|
| "cumulative_diversity": cumulative_diversity,
|
| "final_similarity": similarity,
|
| "is_valid": is_valid,
|
| "validation_reason": reason,
|
| "seed_used": base_seed,
|
| "prompt_history": prompt_history,
|
| }
|
|
|
|
|
| if not is_valid:
|
| self.logger.warning(
|
| "Mutation validation failed",
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| reason=reason,
|
| diversity=diversity,
|
| similarity=similarity
|
| )
|
|
|
|
|
| self._log_mutation(
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| attack_type=request.attack_type,
|
| mutation_depth=request.mutation_depth,
|
| strategies_used=mutation_trace,
|
| diversity_score=diversity,
|
| cumulative_diversity=cumulative_diversity,
|
| success=True
|
| )
|
|
|
| return MutationResponse(
|
| mutated_prompt=mutated_prompt,
|
| mutation_trace=mutation_trace,
|
| diversity_score=diversity,
|
| cumulative_diversity=cumulative_diversity,
|
| mutation_depth=request.mutation_depth,
|
| mutation_metadata=mutation_metadata,
|
| run_id=request.run_id,
|
| sample_id=request.sample_id
|
| )
|
|
|
| except Exception as e:
|
| self.logger.error(
|
| "Mutation execution failed",
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| error=str(e)
|
| )
|
|
|
|
|
| self._log_mutation(
|
| run_id=str(request.run_id),
|
| sample_id=request.sample_id,
|
| attack_type=request.attack_type,
|
| mutation_depth=request.mutation_depth,
|
| strategies_used=[],
|
| diversity_score=0.0,
|
| cumulative_diversity=0.0,
|
| success=False,
|
| error=str(e)
|
| )
|
|
|
| raise
|
|
|
| def _log_mutation(
|
| self,
|
| run_id: str,
|
| sample_id: str,
|
| attack_type: str,
|
| mutation_depth: int,
|
| strategies_used: List[str],
|
| diversity_score: float,
|
| cumulative_diversity: float,
|
| success: bool,
|
| error: Optional[str] = None
|
| ) -> None:
|
| """
|
| Log mutation execution details.
|
|
|
| Args:
|
| run_id: Run identifier
|
| sample_id: Sample identifier
|
| attack_type: Attack type
|
| mutation_depth: Depth of mutation
|
| strategies_used: List of strategies applied
|
| diversity_score: Final diversity score
|
| cumulative_diversity: Cumulative diversity
|
| success: Whether mutation succeeded
|
| error: Error message if failed
|
| """
|
| log_data = {
|
| "run_id": run_id,
|
| "sample_id": sample_id,
|
| "attack_type": attack_type,
|
| "mutation_depth": mutation_depth,
|
| "strategies_used": strategies_used,
|
| "diversity_score": diversity_score,
|
| "cumulative_diversity": cumulative_diversity,
|
| "success": success,
|
| "error": error
|
| }
|
|
|
| if success:
|
| if diversity_score < self._min_diversity_threshold:
|
| self.logger.warning("Mutation diversity below threshold", **log_data)
|
| else:
|
| self.logger.info("Mutation executed successfully", **log_data)
|
| else:
|
| self.logger.error("Mutation execution failed", **log_data)
|
|
|
| def get_available_strategies(self) -> List[str]:
|
| """
|
| Get list of available mutation strategies.
|
|
|
| Returns:
|
| List of strategy names
|
| """
|
| return list_mutation_strategies()
|
|
|
| def clear_cache(self) -> None:
|
| """Clear the diversity scorer cache."""
|
| if self._diversity_scorer:
|
| self._diversity_scorer.clear_cache()
|
| self.logger.info("Mutation engine cache cleared")
|
|
|
|
|
|
|
| _mutation_engine: Optional[MutationEngine] = None
|
|
|
|
|
| def get_mutation_engine() -> MutationEngine:
|
| """
|
| Get the global mutation engine instance.
|
|
|
| Returns:
|
| MutationEngine singleton
|
| """
|
| global _mutation_engine
|
| if _mutation_engine is None:
|
| _mutation_engine = MutationEngine()
|
| return _mutation_engine
|
|
|
|
|
| __all__ = [
|
| "MutationEngine",
|
| "get_mutation_engine",
|
| "DEFAULT_STRATEGIES",
|
| ]
|
|
|