| """
|
| Conflict Detection and Classification Engine
|
|
|
| Identifies conflicting claims across agent pairs using token-level confidence scores.
|
| Classifies conflicts by type (contradiction, emphasis, framework) and scores strength
|
| weighted by agent confidence.
|
|
|
| Author: Claude Code
|
| """
|
|
|
| import re
|
| import logging
|
| import math
|
| from dataclasses import dataclass, asdict
|
| from typing import Dict, List, Tuple, Optional
|
| from collections import defaultdict
|
|
|
| logging.basicConfig(level=logging.INFO)
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def adjust_conflict_strength_with_memory(conflict, memory_weighting=None):
|
| """
|
| Enhance conflict strength using historical adapter performance.
|
|
|
| Makes conflict importance adaptive: conflicts involving high-performing
|
| adapters are weighted heavier, enabling experience-aware reasoning.
|
|
|
| Args:
|
| conflict: Conflict object with agent_a, agent_b, conflict_strength
|
| memory_weighting: MemoryWeighting instance (or None for no adjustment)
|
|
|
| Returns:
|
| Adjusted conflict strength (same type as input)
|
| """
|
| if not memory_weighting:
|
| return conflict.conflict_strength
|
|
|
| try:
|
|
|
| weight_a = memory_weighting.adapter_weights.get(conflict.agent_a.lower(), None)
|
| weight_b = memory_weighting.adapter_weights.get(conflict.agent_b.lower(), None)
|
|
|
| if not weight_a or not weight_b:
|
| return conflict.conflict_strength
|
|
|
|
|
| avg_weight = (weight_a.weight + weight_b.weight) / 2.0
|
|
|
|
|
|
|
|
|
|
|
| modifier = 0.5 + (avg_weight / 2.0) * 0.5
|
| modifier = max(0.5, min(1.5, modifier))
|
|
|
| adjusted = conflict.conflict_strength * modifier
|
|
|
| return adjusted
|
|
|
| except Exception as e:
|
| logger.debug(f"Error adjusting conflict strength: {e}")
|
| return conflict.conflict_strength
|
|
|
|
|
|
|
| @dataclass
|
| class Conflict:
|
| """A detected conflict between two agents on a specific claim."""
|
|
|
| agent_a: str
|
| agent_b: str
|
| claim_a: str
|
| claim_b: str
|
| conflict_type: str
|
| conflict_strength: float
|
| confidence_a: float
|
| confidence_b: float
|
| semantic_overlap: float
|
| opposition_score: float
|
|
|
| def to_dict(self) -> Dict:
|
| """Serialize for storage."""
|
| return asdict(self)
|
|
|
|
|
| class ConflictEngine:
|
| """Detects and scores conflicts between agent responses."""
|
|
|
| def __init__(
|
| self,
|
| token_confidence_engine: Optional[object] = None,
|
| contradiction_threshold: float = 0.7,
|
| overlap_threshold: float = 0.3,
|
| semantic_tension_engine: Optional[object] = None,
|
| max_conflicts_per_pair: int = 2,
|
| max_total_conflicts: int = 12,
|
| ):
|
| """
|
| Initialize conflict detection engine.
|
|
|
| Args:
|
| token_confidence_engine: TokenConfidenceEngine for scoring claims
|
| contradiction_threshold: Semantic overlap needed to consider claims related
|
| overlap_threshold: Threshold for identifying same-claim conflicts
|
| semantic_tension_engine: (Phase 6) SemanticTensionEngine for embedding-based tension
|
| max_conflicts_per_pair: Max conflicts to generate per agent pair (default: 2)
|
| max_total_conflicts: Max total conflicts allowed (default: 12)
|
| """
|
| self.token_confidence = token_confidence_engine
|
| self.contradiction_threshold = contradiction_threshold
|
| self.overlap_threshold = overlap_threshold
|
| self.semantic_tension_engine = semantic_tension_engine
|
| self.max_conflicts_per_pair = max_conflicts_per_pair
|
| self.max_total_conflicts = max_total_conflicts
|
|
|
|
|
| self.negation_patterns = [
|
| (r"\b(no|not|none|neither|never|cannot|doesn['\"]t)\b", "negation"),
|
| (r"\b(must|should|always|only)\b", "necessity"),
|
| (r"\b(reject|disagree|oppose|deny|false|wrong)\b", "rejection"),
|
| ]
|
|
|
| def detect_conflicts(
|
| self, agent_analyses: Dict[str, str]
|
| ) -> List[Conflict]:
|
| """
|
| Detect conflicts across agent pairs.
|
|
|
| Args:
|
| agent_analyses: Dict {agent_name: response_text}
|
|
|
| Returns:
|
| List of Conflicts sorted by strength (descending)
|
| """
|
| conflicts = []
|
|
|
|
|
| agent_scores = {}
|
| agent_names = list(agent_analyses.keys())
|
|
|
| for agent_name in agent_names:
|
| response = agent_analyses[agent_name]
|
| if self.token_confidence:
|
| peer_responses = {
|
| a: agent_analyses[a]
|
| for a in agent_names
|
| if a != agent_name
|
| }
|
| scores = self.token_confidence.score_tokens(
|
| response, agent_name, peer_responses=peer_responses
|
| )
|
| agent_scores[agent_name] = scores
|
| else:
|
| logger.warning(
|
| "No token_confidence engine provided; using fallback scoring"
|
| )
|
|
|
|
|
| for i, agent_a in enumerate(agent_names):
|
| for agent_b in agent_names[i + 1 :]:
|
| claims_a = (
|
| agent_scores[agent_a].claims
|
| if agent_a in agent_scores
|
| else self._extract_simple_claims(agent_analyses[agent_a])
|
| )
|
| claims_b = (
|
| agent_scores[agent_b].claims
|
| if agent_b in agent_scores
|
| else self._extract_simple_claims(agent_analyses[agent_b])
|
| )
|
|
|
|
|
| pair_conflicts = []
|
|
|
|
|
| for claim_a in claims_a:
|
| for claim_b in claims_b:
|
|
|
| if len(pair_conflicts) >= self.max_conflicts_per_pair:
|
| break
|
|
|
|
|
| overlap = self._compute_semantic_overlap(claim_a.text, claim_b.text)
|
|
|
|
|
| if overlap > self.overlap_threshold:
|
| conflict_type, opposition_score = self._classify_conflict(
|
| claim_a.text, claim_b.text, overlap
|
| )
|
|
|
| if opposition_score > 0:
|
|
|
| conflict_strength = (
|
| claim_a.confidence
|
| * claim_b.confidence
|
| * opposition_score
|
| )
|
|
|
| conflict = Conflict(
|
| agent_a=agent_a,
|
| agent_b=agent_b,
|
| claim_a=claim_a.text,
|
| claim_b=claim_b.text,
|
| conflict_type=conflict_type,
|
| conflict_strength=conflict_strength,
|
| confidence_a=claim_a.confidence,
|
| confidence_b=claim_b.confidence,
|
| semantic_overlap=overlap,
|
| opposition_score=opposition_score,
|
| )
|
| pair_conflicts.append(conflict)
|
|
|
|
|
| if len(pair_conflicts) >= self.max_conflicts_per_pair:
|
| break
|
|
|
|
|
| conflicts.extend(pair_conflicts)
|
|
|
|
|
| conflicts.sort(key=lambda c: c.conflict_strength, reverse=True)
|
|
|
|
|
|
|
| for conflict in conflicts:
|
| memory_weighting = getattr(self, "memory_weighting", None)
|
| conflict.conflict_strength = adjust_conflict_strength_with_memory(
|
| conflict, memory_weighting
|
| )
|
|
|
|
|
| conflicts.sort(key=lambda c: c.conflict_strength, reverse=True)
|
|
|
|
|
|
|
| if len(conflicts) > self.max_total_conflicts:
|
| logger.info(
|
| f"Capping conflicts: {len(conflicts)} → {self.max_total_conflicts} "
|
| f"(per-pair cap: {self.max_conflicts_per_pair}, total budget: {self.max_total_conflicts})"
|
| )
|
| conflicts = conflicts[: self.max_total_conflicts]
|
|
|
| return conflicts
|
|
|
| def _extract_simple_claims(self, response: str) -> List[object]:
|
| """
|
| Fallback: extract simple sentence-based claims without token scoring.
|
|
|
| Returns:
|
| List of simple claim objects with text and neutral confidence
|
| """
|
| claim_pattern = re.compile(r"[.!?]+")
|
| sentences = claim_pattern.split(response)
|
|
|
| claims = []
|
| for sentence in sentences:
|
| if not sentence.strip():
|
| continue
|
|
|
|
|
| class SimpleClaim:
|
| def __init__(self, text):
|
| self.text = text
|
| self.confidence = 0.5
|
| self.agent_name = ""
|
|
|
| claims.append(SimpleClaim(sentence.strip()))
|
|
|
| return claims
|
|
|
| def _compute_semantic_overlap(self, claim_a: str, claim_b: str) -> float:
|
| """
|
| Compute semantic overlap between two claims via cosine similarity.
|
|
|
| Simple implementation: word overlap ratio.
|
|
|
| Returns:
|
| Similarity [0, 1]
|
| """
|
| words_a = set(claim_a.lower().split())
|
| words_b = set(claim_b.lower().split())
|
|
|
|
|
| stop_words = {
|
| "the",
|
| "a",
|
| "an",
|
| "is",
|
| "are",
|
| "and",
|
| "or",
|
| "of",
|
| "to",
|
| "in",
|
| "that",
|
| "it",
|
| "for",
|
| "with",
|
| }
|
| words_a = words_a - stop_words
|
| words_b = words_b - stop_words
|
|
|
| if not words_a or not words_b:
|
| return 0.0
|
|
|
|
|
| intersection = len(words_a & words_b)
|
| union = len(words_a | words_b)
|
|
|
| if union == 0:
|
| return 0.0
|
|
|
| return intersection / union
|
|
|
| def _classify_conflict(
|
| self, claim_a: str, claim_b: str, overlap: float
|
| ) -> Tuple[str, float]:
|
| """
|
| Classify the type of conflict and compute opposition score.
|
|
|
| Phase 6 Enhancement: Blends heuristic opposition_score (discrete 0.4/0.7/1.0)
|
| with embedding-based semantic tension (continuous [0, 1]) for nuanced conflicts.
|
|
|
| Returns:
|
| (conflict_type, opposition_score) where:
|
| - conflict_type: "contradiction" | "emphasis" | "framework" | "paraphrase"
|
| - opposition_score: [0, 1] how directly opposed are the claims
|
| (0 = paraphrase/same, 1 = maximum opposition)
|
| """
|
| claim_a_lower = claim_a.lower()
|
| claim_b_lower = claim_b.lower()
|
|
|
|
|
|
|
| negation_in_a = any(
|
| re.search(pattern, claim_a_lower) for pattern, _ in self.negation_patterns
|
| )
|
| negation_in_b = any(
|
| re.search(pattern, claim_b_lower) for pattern, _ in self.negation_patterns
|
| )
|
|
|
|
|
| heuristic_opposition = 1.0
|
| heuristic_type = "contradiction"
|
| if negation_in_a != negation_in_b:
|
| logger.debug(f"Direct contradiction detected:\n A: {claim_a}\n B: {claim_b}")
|
| heuristic_opposition = 1.0
|
| heuristic_type = "contradiction"
|
| else:
|
|
|
| key_noun_pattern = re.compile(r"\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b")
|
| nouns_a = set(m.group() for m in key_noun_pattern.finditer(claim_a))
|
| nouns_b = set(m.group() for m in key_noun_pattern.finditer(claim_b))
|
|
|
|
|
| if nouns_a and nouns_b and nouns_a != nouns_b:
|
| heuristic_opposition = 0.4
|
| heuristic_type = "framework"
|
| else:
|
|
|
|
|
| emphasis_words = ["important", "prioritize", "focus", "emphasize", "weight", "prefer", "favor"]
|
| emphasis_pattern = "|".join(emphasis_words)
|
|
|
| has_emphasis_a = bool(re.search(emphasis_pattern, claim_a_lower))
|
| has_emphasis_b = bool(re.search(emphasis_pattern, claim_b_lower))
|
|
|
| if has_emphasis_a and has_emphasis_b:
|
|
|
| logger.debug(f"Emphasis conflict detected:\n A: {claim_a}\n B: {claim_b}")
|
| heuristic_opposition = 0.7
|
| heuristic_type = "emphasis"
|
| else:
|
|
|
| heuristic_opposition = 0.4
|
| heuristic_type = "framework"
|
|
|
|
|
| semantic_opposition = 0.4
|
| semantic_type = "framework"
|
|
|
| if self.semantic_tension_engine:
|
| try:
|
| semantic_opposition = self.semantic_tension_engine.compute_semantic_tension(claim_a, claim_b)
|
| semantic_type = self.semantic_tension_engine.compute_polarity(claim_a, claim_b)
|
| logger.debug(f"Semantic tension: {semantic_opposition:.3f} ({semantic_type})")
|
| except Exception as e:
|
| logger.debug(f"Semantic tension computation failed: {e}, using heuristic only")
|
|
|
|
|
|
|
|
|
| if self.semantic_tension_engine:
|
| final_opposition = 0.6 * semantic_opposition + 0.4 * heuristic_opposition
|
| final_type = semantic_type
|
| else:
|
| final_opposition = heuristic_opposition
|
| final_type = heuristic_type
|
|
|
| return (final_type, float(final_opposition))
|
|
|
| def resolve_conflict_round(
|
| self,
|
| conflict: Conflict,
|
| agent_a_response_round2: str,
|
| agent_b_response_round2: str,
|
| ) -> Dict:
|
| """
|
| Score whether agents moved towards resolving a conflict in next round.
|
|
|
| Args:
|
| conflict: The original conflict
|
| agent_a_response_round2: Agent A's response in round 2
|
| agent_b_response_round2: Agent B's response in round 2
|
|
|
| Returns:
|
| Dict with resolution metrics
|
| """
|
|
|
| addressed_by_a = self._is_claim_addressed(conflict.claim_b, agent_a_response_round2)
|
| addressed_by_b = self._is_claim_addressed(conflict.claim_a, agent_b_response_round2)
|
|
|
|
|
| softened_a = self._is_claim_softened(conflict.claim_a, agent_a_response_round2)
|
| softened_b = self._is_claim_softened(conflict.claim_b, agent_b_response_round2)
|
|
|
| resolution_score = 0.0
|
| if addressed_by_a and addressed_by_b:
|
| resolution_score += 0.4
|
| if softened_a and softened_b:
|
| resolution_score += 0.3
|
| if addressed_by_a or addressed_by_b:
|
| resolution_score += 0.1
|
|
|
| resolution_score = min(1.0, resolution_score)
|
|
|
| return {
|
| "engaged_with_conflict": addressed_by_a or addressed_by_b,
|
| "both_addressed": addressed_by_a and addressed_by_b,
|
| "softened_positions": softened_a or softened_b,
|
| "resolution_score": resolution_score,
|
| }
|
|
|
| def _is_claim_addressed(self, claim: str, response: str) -> bool:
|
| """
|
| Check if a claim is explicitly addressed in response.
|
|
|
| Detects pronoun references, direct quotes, or semantic restatement.
|
| """
|
| response_lower = response.lower()
|
| claim_lower = claim.lower()
|
|
|
|
|
| if claim_lower in response_lower:
|
| return True
|
|
|
|
|
| key_words = [
|
| w
|
| for w in claim.split()
|
| if len(w) > 4 and w.lower() not in ["this", "that", "these", "other"]
|
| ]
|
|
|
| matching_words = sum(1 for w in key_words if w.lower() in response_lower)
|
| return matching_words >= 2
|
|
|
| def _is_claim_softened(self, original_claim: str, followup_response: str) -> bool:
|
| """
|
| Check if an agent has softened their original claim in follow-up.
|
|
|
| Detects addition of qualifiers, exceptions, or concessions.
|
| """
|
| softening_words = [
|
| "however",
|
| "though",
|
| "but",
|
| "perhaps",
|
| "maybe",
|
| "could",
|
| "might",
|
| "arguably",
|
| "in some cases",
|
| "exception",
|
| "qualify",
|
| "depends",
|
| ]
|
|
|
| response_lower = followup_response.lower()
|
|
|
|
|
| has_softening = any(word in response_lower for word in softening_words)
|
|
|
|
|
| has_concession = bool(re.search(r"\b(granted|acknowledge|admit|agree)\b", response_lower))
|
|
|
| return has_softening or has_concession
|
|
|
| def group_conflicts_by_pair(self, conflicts: List[Conflict]) -> Dict[str, List[Conflict]]:
|
| """
|
| Group conflicts by agent pair.
|
|
|
| Returns:
|
| Dict {agent_pair_key: List[Conflict]}
|
| """
|
| grouped = defaultdict(list)
|
| for conflict in conflicts:
|
| pair_key = f"{conflict.agent_a}_vs_{conflict.agent_b}"
|
| grouped[pair_key].append(conflict)
|
| return dict(grouped)
|
|
|
| def summarize_conflicts(self, conflicts: List[Conflict]) -> Dict:
|
| """
|
| Generate summary statistics for conflicts.
|
|
|
| Returns:
|
| Dict with count, average strength, distribution by type
|
| """
|
| if not conflicts:
|
| return {
|
| "total_conflicts": 0,
|
| "avg_conflict_strength": 0.0,
|
| "by_type": {},
|
| "top_conflicts": [],
|
| }
|
|
|
| by_type = defaultdict(list)
|
| for c in conflicts:
|
| by_type[c.conflict_type].append(c)
|
|
|
| return {
|
| "total_conflicts": len(conflicts),
|
| "avg_conflict_strength": sum(c.conflict_strength for c in conflicts) / len(conflicts),
|
| "by_type": {
|
| ctype: len(clist) for ctype, clist in by_type.items()
|
| },
|
| "type_avg_strength": {
|
| ctype: sum(c.conflict_strength for c in clist) / len(clist)
|
| for ctype, clist in by_type.items()
|
| },
|
| "top_conflicts": [
|
| {
|
| "agent_a": c.agent_a,
|
| "agent_b": c.agent_b,
|
| "type": c.conflict_type,
|
| "strength": c.conflict_strength,
|
| "claim_a_excerpt": c.claim_a[:100],
|
| "claim_b_excerpt": c.claim_b[:100],
|
| }
|
| for c in conflicts[:5]
|
| ],
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| @dataclass
|
| class ConflictEvolution:
|
| """Track how a conflict changes across multiple debate rounds."""
|
|
|
| original_conflict: Conflict
|
| round_trajectories: Dict[int, Dict]
|
| resolution_rate: float = 0.0
|
| resolution_type: str = "new"
|
| resolved_in_round: int = -1
|
|
|
| def _compute_resolution_rate(self) -> float:
|
| """Calculate (initial - final) / initial."""
|
| if not self.round_trajectories or 0 not in self.round_trajectories:
|
| return 0.0
|
|
|
| initial_strength = self.round_trajectories[0].get("strength", 0)
|
| if initial_strength == 0:
|
| return 0.0
|
|
|
| final_strength = min(
|
| (s.get("strength", float('inf')) for s in self.round_trajectories.values()),
|
| default=initial_strength
|
| )
|
|
|
| return (initial_strength - final_strength) / initial_strength
|
|
|
|
|
| class ConflictTracker:
|
| """Track conflicts across multiple debate rounds (Phase 3)."""
|
|
|
| def __init__(self, conflict_engine):
|
| """Initialize tracker with reference to ConflictEngine."""
|
| self.conflict_engine = conflict_engine
|
| self.evolution_data: Dict[str, ConflictEvolution] = {}
|
|
|
| def track_round(self, round_num: int, agent_analyses: Dict[str, str],
|
| previous_round_conflicts: List[Conflict]) -> List[ConflictEvolution]:
|
| """Track conflicts across rounds."""
|
| current_round_conflicts = self.conflict_engine.detect_conflicts(agent_analyses)
|
|
|
| evolutions = []
|
|
|
|
|
| for prev_conflict in previous_round_conflicts:
|
| matches = self._find_matching_conflicts(prev_conflict, current_round_conflicts)
|
|
|
| if matches:
|
| current_conflict = matches[0]
|
| evolution = self._compute_evolution(
|
| prev_conflict, current_conflict, round_num, agent_analyses
|
| )
|
| else:
|
| evolution = self._mark_resolved(prev_conflict, round_num)
|
|
|
| evolutions.append(evolution)
|
|
|
|
|
| new_conflicts = self._find_new_conflicts(previous_round_conflicts, current_round_conflicts)
|
| for new_conflict in new_conflicts:
|
| evolution = ConflictEvolution(
|
| original_conflict=new_conflict,
|
| round_trajectories={round_num: {
|
| "strength": new_conflict.conflict_strength,
|
| "addressing_score": 0.0,
|
| "softening_score": 0.0,
|
| }},
|
| resolution_rate=0.0,
|
| resolution_type="new",
|
| resolved_in_round=-1,
|
| )
|
| evolutions.append(evolution)
|
|
|
| return evolutions
|
|
|
| def _find_matching_conflicts(self, conflict: Conflict,
|
| candidates: List[Conflict]) -> List[Conflict]:
|
| """Find conflicts that likely match across rounds."""
|
| matches = []
|
| for candidate in candidates:
|
|
|
| same_pair = (
|
| (conflict.agent_a == candidate.agent_a and conflict.agent_b == candidate.agent_b) or
|
| (conflict.agent_a == candidate.agent_b and conflict.agent_b == candidate.agent_a)
|
| )
|
|
|
| if same_pair:
|
|
|
| overlap = self.conflict_engine._compute_semantic_overlap(
|
| conflict.claim_a, candidate.claim_a
|
| )
|
| if overlap > 0.5:
|
| matches.append(candidate)
|
|
|
| return matches
|
|
|
| def _compute_evolution(self, prev_conflict: Conflict, current_conflict: Conflict,
|
| round_num: int, agent_analyses: Dict[str, str]) -> ConflictEvolution:
|
| """Compute how conflict evolved between rounds."""
|
|
|
| addressing_a = self.conflict_engine._is_claim_addressed(
|
| prev_conflict.claim_b, agent_analyses.get(current_conflict.agent_a, "")
|
| )
|
| addressing_b = self.conflict_engine._is_claim_addressed(
|
| prev_conflict.claim_a, agent_analyses.get(current_conflict.agent_b, "")
|
| )
|
| addressing_score = (float(addressing_a) + float(addressing_b)) / 2.0
|
|
|
|
|
| softening_a = self.conflict_engine._is_claim_softened(
|
| prev_conflict.claim_a, agent_analyses.get(current_conflict.agent_a, "")
|
| )
|
| softening_b = self.conflict_engine._is_claim_softened(
|
| prev_conflict.claim_b, agent_analyses.get(current_conflict.agent_b, "")
|
| )
|
| softening_score = (float(softening_a) + float(softening_b)) / 2.0
|
|
|
|
|
| strength_delta = prev_conflict.conflict_strength - current_conflict.conflict_strength
|
| if strength_delta > prev_conflict.conflict_strength * 0.5:
|
| resolution_type = "hard_victory"
|
| elif strength_delta > 0.05:
|
| resolution_type = "soft_consensus"
|
| elif abs(strength_delta) < 0.05:
|
| resolution_type = "stalled"
|
| else:
|
| resolution_type = "worsened"
|
|
|
|
|
| key = f"{prev_conflict.agent_a}_vs_{prev_conflict.agent_b}"
|
| if key not in self.evolution_data:
|
| self.evolution_data[key] = ConflictEvolution(
|
| original_conflict=prev_conflict,
|
| round_trajectories={0: {
|
| "strength": prev_conflict.conflict_strength,
|
| "addressing_score": 0.0,
|
| "softening_score": 0.0,
|
| }},
|
| resolution_rate=0.0,
|
| resolution_type="new",
|
| resolved_in_round=-1,
|
| )
|
|
|
| self.evolution_data[key].round_trajectories[round_num] = {
|
| "strength": current_conflict.conflict_strength,
|
| "addressing_score": addressing_score,
|
| "softening_score": softening_score,
|
| }
|
|
|
| self.evolution_data[key].resolution_rate = self.evolution_data[key]._compute_resolution_rate()
|
| self.evolution_data[key].resolution_type = resolution_type
|
|
|
| return self.evolution_data[key]
|
|
|
| def _mark_resolved(self, conflict: Conflict, round_num: int) -> ConflictEvolution:
|
| """Mark conflict as resolved (no longer detected)."""
|
| key = f"{conflict.agent_a}_vs_{conflict.agent_b}"
|
| if key not in self.evolution_data:
|
| self.evolution_data[key] = ConflictEvolution(
|
| original_conflict=conflict,
|
| round_trajectories={0: {
|
| "strength": conflict.conflict_strength,
|
| "addressing_score": 0.0,
|
| "softening_score": 0.0,
|
| }},
|
| resolution_rate=1.0,
|
| resolution_type="resolved",
|
| resolved_in_round=round_num,
|
| )
|
| self.evolution_data[key].round_trajectories[round_num] = {
|
| "strength": 0.0,
|
| "addressing_score": 1.0,
|
| "softening_score": 1.0,
|
| }
|
|
|
| return self.evolution_data[key]
|
|
|
| def _find_new_conflicts(self, previous: List[Conflict],
|
| current: List[Conflict]) -> List[Conflict]:
|
| """Find conflicts that are new."""
|
| prev_pairs = {(c.agent_a, c.agent_b) for c in previous}
|
| new = []
|
| for conflict in current:
|
| pair = (conflict.agent_a, conflict.agent_b)
|
| if pair not in prev_pairs:
|
| new.append(conflict)
|
| return new
|
|
|
| def get_summary(self) -> Dict:
|
| """Get summary of all conflict evolutions."""
|
| if not self.evolution_data:
|
| return {"total_tracked": 0, "message": "No conflicts tracked yet"}
|
|
|
| resolved = [e for e in self.evolution_data.values() if e.resolution_type == "resolved"]
|
| hard_victory = [e for e in self.evolution_data.values() if e.resolution_type == "hard_victory"]
|
| soft_consensus = [e for e in self.evolution_data.values() if e.resolution_type == "soft_consensus"]
|
| stalled = [e for e in self.evolution_data.values() if e.resolution_type == "stalled"]
|
| worsened = [e for e in self.evolution_data.values() if e.resolution_type == "worsened"]
|
|
|
| avg_resolution = sum(e.resolution_rate for e in self.evolution_data.values()) / len(self.evolution_data)
|
|
|
| return {
|
| "total_tracked": len(self.evolution_data),
|
| "resolved": len(resolved),
|
| "hard_victory": len(hard_victory),
|
| "soft_consensus": len(soft_consensus),
|
| "stalled": len(stalled),
|
| "worsened": len(worsened),
|
| "avg_resolution_rate": avg_resolution,
|
| "by_type": {
|
| "resolved": len(resolved),
|
| "hard_victory": len(hard_victory),
|
| "soft_consensus": len(soft_consensus),
|
| "stalled": len(stalled),
|
| "worsened": len(worsened),
|
| },
|
| }
|
|
|