"""Equilibrium detection and analysis for opinion dynamics""" from typing import List, Dict import statistics from collections import Counter from .models import ( RoundResult, EquilibriumState, PersonaOpinion, OpinionPosition, ) from .network import InfluenceNetwork class EquilibriumDetector: """ Analyzes opinion dynamics results to detect equilibrium and extract insights. Provides: - Equilibrium detection - Consensus metrics - Opinion cluster analysis - Opinion leader identification - Evolution timeline analysis """ def __init__(self, convergence_threshold: float = 0.1): self.convergence_threshold = convergence_threshold def analyze_equilibrium( self, results: List[RoundResult], influence_network: InfluenceNetwork, max_rounds: int, ) -> EquilibriumState: """ Analyze final equilibrium state. Args: results: Complete history of opinion dynamics influence_network: Influence network used max_rounds: Maximum rounds configured Returns: EquilibriumState with comprehensive analysis """ if not results: raise ValueError("No results to analyze") final_round = results[-1] final_opinions = final_round.opinions # Check if equilibrium was reached reached_equilibrium = ( len(results) < max_rounds and final_round.total_change < self.convergence_threshold ) # Calculate consensus metrics consensus_strength = self._calculate_consensus_strength(final_opinions) majority_position, majority_pct = self._find_majority_position( final_opinions ) # Position distribution position_dist = self._calculate_position_distribution(final_opinions) # Identify opinion clusters opinion_clusters = self._identify_opinion_clusters( final_opinions, results ) # Identify opinion leaders opinion_leaders = self._identify_opinion_leaders( final_opinions, results, influence_network ) return EquilibriumState( reached_equilibrium=reached_equilibrium, total_rounds=len(results), final_opinions=final_opinions, consensus_strength=consensus_strength, majority_position=majority_position, majority_percentage=majority_pct, position_distribution=position_dist, opinion_clusters=opinion_clusters, opinion_leaders=opinion_leaders, evolution_timeline=results, ) def _calculate_consensus_strength( self, opinions: List[PersonaOpinion] ) -> float: """ Calculate how much consensus exists (0-1). 1 = Everyone agrees perfectly 0 = Maximum disagreement """ if len(opinions) <= 1: return 1.0 # Calculate variance in positions positions = [op.position_score for op in opinions] variance = statistics.variance(positions) # Max variance is when half are at -3, half at +3 max_variance = 9.0 # (3 - (-3))^2 / 4 # Normalize: high variance = low consensus consensus = 1.0 - (variance / max_variance) return max(0.0, min(1.0, consensus)) def _find_majority_position( self, opinions: List[PersonaOpinion] ) -> tuple: """Find the majority position and percentage""" position_counts = Counter(op.position.value for op in opinions) majority_pos, majority_count = position_counts.most_common(1)[0] majority_pct = (majority_count / len(opinions)) * 100 majority_position = OpinionPosition(majority_pos) return majority_position, majority_pct def _calculate_position_distribution( self, opinions: List[PersonaOpinion] ) -> Dict[str, int]: """Count personas at each position""" distribution = {} for opinion in opinions: pos = opinion.position.value distribution[pos] = distribution.get(pos, 0) + 1 return distribution def _identify_opinion_clusters( self, final_opinions: List[PersonaOpinion], results: List[RoundResult] ) -> List[Dict[str, any]]: """ Identify stable opinion clusters. Clusters are groups of personas with: - Similar final positions - Similar evolution patterns """ # Group by final position (within 1 point) clusters = [] used_personas = set() sorted_opinions = sorted( final_opinions, key=lambda x: x.position_score ) for opinion in sorted_opinions: if opinion.persona_id in used_personas: continue # Find similar personas cluster_members = [opinion] used_personas.add(opinion.persona_id) for other in sorted_opinions: if other.persona_id in used_personas: continue if abs(other.position_score - opinion.position_score) <= 1.0: cluster_members.append(other) used_personas.add(other.persona_id) if cluster_members: avg_position = statistics.mean( [m.position_score for m in cluster_members] ) clusters.append({ "size": len(cluster_members), "members": [m.persona_name for m in cluster_members], "member_ids": [m.persona_id for m in cluster_members], "average_position": avg_position, "position_label": OpinionPosition.from_score(avg_position).value, "stability": self._calculate_cluster_stability( cluster_members, results ), }) return sorted(clusters, key=lambda x: x["size"], reverse=True) def _calculate_cluster_stability( self, members: List[PersonaOpinion], results: List[RoundResult] ) -> float: """ Calculate how stable this cluster is (0-1). High stability = members stayed together throughout """ if len(results) <= 1: return 1.0 member_ids = {m.persona_id for m in members} # Track how many rounds members stayed close stable_rounds = 0 for result in results[1:]: # Skip first round # Get positions for members in this round member_positions = [ op.position_score for op in result.opinions if op.persona_id in member_ids ] if len(member_positions) <= 1: stable_rounds += 1 continue # Check variance within cluster variance = statistics.variance(member_positions) if variance < 1.0: # Stayed close stable_rounds += 1 return stable_rounds / (len(results) - 1) def _identify_opinion_leaders( self, final_opinions: List[PersonaOpinion], results: List[RoundResult], influence_network: InfluenceNetwork, ) -> List[Dict[str, any]]: """ Identify opinion leaders - personas who influenced the outcome most. Criteria: 1. High influence centrality in network 2. How many people moved toward their position 3. Consistency (didn't change position much) """ leaders = [] network_metrics = influence_network.calculate_network_metrics() centrality = network_metrics.centrality_scores for opinion in final_opinions: persona_id = opinion.persona_id # 1. Network centrality centrality_score = centrality.get(persona_id, 0.0) # 2. Influence impact: how many moved toward them impact_score = self._calculate_influence_impact( persona_id, results, opinion.position_score ) # 3. Consistency: didn't change much consistency_score = self._calculate_consistency(persona_id, results) # Combined leadership score leadership_score = ( centrality_score * 0.4 + impact_score * 0.4 + consistency_score * 0.2 ) leaders.append({ "persona_id": persona_id, "persona_name": opinion.persona_name, "leadership_score": leadership_score, "centrality": centrality_score, "influence_impact": impact_score, "consistency": consistency_score, "final_position": opinion.position.value, }) # Sort by leadership score leaders = sorted( leaders, key=lambda x: x["leadership_score"], reverse=True ) return leaders[:5] # Top 5 leaders def _calculate_influence_impact( self, persona_id: str, results: List[RoundResult], final_position: float ) -> float: """ Calculate how much others moved toward this persona's position. Higher score = more people converged to their view """ if len(results) <= 1: return 0.0 # Get this persona's position in round 1 initial_position = None for op in results[0].opinions: if op.persona_id == persona_id: initial_position = op.position_score break if initial_position is None: return 0.0 # Count how many others moved toward their position convergence_count = 0 total_others = 0 for other_id in [op.persona_id for op in results[0].opinions]: if other_id == persona_id: continue total_others += 1 # Get initial and final position for this other persona other_initial = None other_final = None for op in results[0].opinions: if op.persona_id == other_id: other_initial = op.position_score for op in results[-1].opinions: if op.persona_id == other_id: other_final = op.position_score if other_initial is None or other_final is None: continue # Did they move toward this persona? initial_distance = abs(other_initial - initial_position) final_distance = abs(other_final - final_position) if final_distance < initial_distance: convergence_count += 1 return convergence_count / total_others if total_others > 0 else 0.0 def _calculate_consistency( self, persona_id: str, results: List[RoundResult] ) -> float: """ Calculate how consistent this persona was (0-1). Higher score = didn't change position much """ positions = [] for result in results: for op in result.opinions: if op.persona_id == persona_id: positions.append(op.position_score) if len(positions) <= 1: return 1.0 # Calculate variance variance = statistics.variance(positions) # Normalize (max variance is 9 if going from -3 to +3) consistency = 1.0 - (variance / 9.0) return max(0.0, min(1.0, consistency)) def get_convergence_summary( self, equilibrium: EquilibriumState ) -> str: """Generate human-readable summary of convergence""" summary = [] if equilibrium.reached_equilibrium: summary.append( f"✓ Equilibrium reached after {equilibrium.total_rounds} rounds" ) else: summary.append( f"⚠ No equilibrium reached in {equilibrium.total_rounds} rounds" ) summary.append( f"📊 Consensus strength: {equilibrium.consensus_strength:.0%}" ) summary.append( f"🎯 Majority position: {equilibrium.majority_position.value} " f"({equilibrium.majority_percentage:.0f}%)" ) summary.append( f"👥 Opinion clusters: {len(equilibrium.opinion_clusters)}" ) if equilibrium.opinion_leaders: leader = equilibrium.opinion_leaders[0] summary.append( f"⭐ Top opinion leader: {leader['persona_name']}" ) return "\n".join(summary)