AI_Personas / src /influence /equilibrium.py
Claude
Complete Phase 3: Multi-Persona Opinion Equilibria with Scale-Free Networks
3934b26 unverified
"""Equilibrium detection and analysis for opinion dynamics"""
from typing import List, Dict
import statistics
from collections import Counter
from .models import (
RoundResult,
EquilibriumState,
PersonaOpinion,
OpinionPosition,
)
from .network import InfluenceNetwork
class EquilibriumDetector:
"""
Analyzes opinion dynamics results to detect equilibrium and extract insights.
Provides:
- Equilibrium detection
- Consensus metrics
- Opinion cluster analysis
- Opinion leader identification
- Evolution timeline analysis
"""
def __init__(self, convergence_threshold: float = 0.1):
self.convergence_threshold = convergence_threshold
def analyze_equilibrium(
self,
results: List[RoundResult],
influence_network: InfluenceNetwork,
max_rounds: int,
) -> EquilibriumState:
"""
Analyze final equilibrium state.
Args:
results: Complete history of opinion dynamics
influence_network: Influence network used
max_rounds: Maximum rounds configured
Returns:
EquilibriumState with comprehensive analysis
"""
if not results:
raise ValueError("No results to analyze")
final_round = results[-1]
final_opinions = final_round.opinions
# Check if equilibrium was reached
reached_equilibrium = (
len(results) < max_rounds
and final_round.total_change < self.convergence_threshold
)
# Calculate consensus metrics
consensus_strength = self._calculate_consensus_strength(final_opinions)
majority_position, majority_pct = self._find_majority_position(
final_opinions
)
# Position distribution
position_dist = self._calculate_position_distribution(final_opinions)
# Identify opinion clusters
opinion_clusters = self._identify_opinion_clusters(
final_opinions, results
)
# Identify opinion leaders
opinion_leaders = self._identify_opinion_leaders(
final_opinions, results, influence_network
)
return EquilibriumState(
reached_equilibrium=reached_equilibrium,
total_rounds=len(results),
final_opinions=final_opinions,
consensus_strength=consensus_strength,
majority_position=majority_position,
majority_percentage=majority_pct,
position_distribution=position_dist,
opinion_clusters=opinion_clusters,
opinion_leaders=opinion_leaders,
evolution_timeline=results,
)
def _calculate_consensus_strength(
self, opinions: List[PersonaOpinion]
) -> float:
"""
Calculate how much consensus exists (0-1).
1 = Everyone agrees perfectly
0 = Maximum disagreement
"""
if len(opinions) <= 1:
return 1.0
# Calculate variance in positions
positions = [op.position_score for op in opinions]
variance = statistics.variance(positions)
# Max variance is when half are at -3, half at +3
max_variance = 9.0 # (3 - (-3))^2 / 4
# Normalize: high variance = low consensus
consensus = 1.0 - (variance / max_variance)
return max(0.0, min(1.0, consensus))
def _find_majority_position(
self, opinions: List[PersonaOpinion]
) -> tuple:
"""Find the majority position and percentage"""
position_counts = Counter(op.position.value for op in opinions)
majority_pos, majority_count = position_counts.most_common(1)[0]
majority_pct = (majority_count / len(opinions)) * 100
majority_position = OpinionPosition(majority_pos)
return majority_position, majority_pct
def _calculate_position_distribution(
self, opinions: List[PersonaOpinion]
) -> Dict[str, int]:
"""Count personas at each position"""
distribution = {}
for opinion in opinions:
pos = opinion.position.value
distribution[pos] = distribution.get(pos, 0) + 1
return distribution
def _identify_opinion_clusters(
self, final_opinions: List[PersonaOpinion], results: List[RoundResult]
) -> List[Dict[str, any]]:
"""
Identify stable opinion clusters.
Clusters are groups of personas with:
- Similar final positions
- Similar evolution patterns
"""
# Group by final position (within 1 point)
clusters = []
used_personas = set()
sorted_opinions = sorted(
final_opinions, key=lambda x: x.position_score
)
for opinion in sorted_opinions:
if opinion.persona_id in used_personas:
continue
# Find similar personas
cluster_members = [opinion]
used_personas.add(opinion.persona_id)
for other in sorted_opinions:
if other.persona_id in used_personas:
continue
if abs(other.position_score - opinion.position_score) <= 1.0:
cluster_members.append(other)
used_personas.add(other.persona_id)
if cluster_members:
avg_position = statistics.mean(
[m.position_score for m in cluster_members]
)
clusters.append({
"size": len(cluster_members),
"members": [m.persona_name for m in cluster_members],
"member_ids": [m.persona_id for m in cluster_members],
"average_position": avg_position,
"position_label": OpinionPosition.from_score(avg_position).value,
"stability": self._calculate_cluster_stability(
cluster_members, results
),
})
return sorted(clusters, key=lambda x: x["size"], reverse=True)
def _calculate_cluster_stability(
self, members: List[PersonaOpinion], results: List[RoundResult]
) -> float:
"""
Calculate how stable this cluster is (0-1).
High stability = members stayed together throughout
"""
if len(results) <= 1:
return 1.0
member_ids = {m.persona_id for m in members}
# Track how many rounds members stayed close
stable_rounds = 0
for result in results[1:]: # Skip first round
# Get positions for members in this round
member_positions = [
op.position_score
for op in result.opinions
if op.persona_id in member_ids
]
if len(member_positions) <= 1:
stable_rounds += 1
continue
# Check variance within cluster
variance = statistics.variance(member_positions)
if variance < 1.0: # Stayed close
stable_rounds += 1
return stable_rounds / (len(results) - 1)
def _identify_opinion_leaders(
self,
final_opinions: List[PersonaOpinion],
results: List[RoundResult],
influence_network: InfluenceNetwork,
) -> List[Dict[str, any]]:
"""
Identify opinion leaders - personas who influenced the outcome most.
Criteria:
1. High influence centrality in network
2. How many people moved toward their position
3. Consistency (didn't change position much)
"""
leaders = []
network_metrics = influence_network.calculate_network_metrics()
centrality = network_metrics.centrality_scores
for opinion in final_opinions:
persona_id = opinion.persona_id
# 1. Network centrality
centrality_score = centrality.get(persona_id, 0.0)
# 2. Influence impact: how many moved toward them
impact_score = self._calculate_influence_impact(
persona_id, results, opinion.position_score
)
# 3. Consistency: didn't change much
consistency_score = self._calculate_consistency(persona_id, results)
# Combined leadership score
leadership_score = (
centrality_score * 0.4
+ impact_score * 0.4
+ consistency_score * 0.2
)
leaders.append({
"persona_id": persona_id,
"persona_name": opinion.persona_name,
"leadership_score": leadership_score,
"centrality": centrality_score,
"influence_impact": impact_score,
"consistency": consistency_score,
"final_position": opinion.position.value,
})
# Sort by leadership score
leaders = sorted(
leaders, key=lambda x: x["leadership_score"], reverse=True
)
return leaders[:5] # Top 5 leaders
def _calculate_influence_impact(
self, persona_id: str, results: List[RoundResult], final_position: float
) -> float:
"""
Calculate how much others moved toward this persona's position.
Higher score = more people converged to their view
"""
if len(results) <= 1:
return 0.0
# Get this persona's position in round 1
initial_position = None
for op in results[0].opinions:
if op.persona_id == persona_id:
initial_position = op.position_score
break
if initial_position is None:
return 0.0
# Count how many others moved toward their position
convergence_count = 0
total_others = 0
for other_id in [op.persona_id for op in results[0].opinions]:
if other_id == persona_id:
continue
total_others += 1
# Get initial and final position for this other persona
other_initial = None
other_final = None
for op in results[0].opinions:
if op.persona_id == other_id:
other_initial = op.position_score
for op in results[-1].opinions:
if op.persona_id == other_id:
other_final = op.position_score
if other_initial is None or other_final is None:
continue
# Did they move toward this persona?
initial_distance = abs(other_initial - initial_position)
final_distance = abs(other_final - final_position)
if final_distance < initial_distance:
convergence_count += 1
return convergence_count / total_others if total_others > 0 else 0.0
def _calculate_consistency(
self, persona_id: str, results: List[RoundResult]
) -> float:
"""
Calculate how consistent this persona was (0-1).
Higher score = didn't change position much
"""
positions = []
for result in results:
for op in result.opinions:
if op.persona_id == persona_id:
positions.append(op.position_score)
if len(positions) <= 1:
return 1.0
# Calculate variance
variance = statistics.variance(positions)
# Normalize (max variance is 9 if going from -3 to +3)
consistency = 1.0 - (variance / 9.0)
return max(0.0, min(1.0, consistency))
def get_convergence_summary(
self, equilibrium: EquilibriumState
) -> str:
"""Generate human-readable summary of convergence"""
summary = []
if equilibrium.reached_equilibrium:
summary.append(
f"✓ Equilibrium reached after {equilibrium.total_rounds} rounds"
)
else:
summary.append(
f"⚠ No equilibrium reached in {equilibrium.total_rounds} rounds"
)
summary.append(
f"📊 Consensus strength: {equilibrium.consensus_strength:.0%}"
)
summary.append(
f"🎯 Majority position: {equilibrium.majority_position.value} "
f"({equilibrium.majority_percentage:.0f}%)"
)
summary.append(
f"👥 Opinion clusters: {len(equilibrium.opinion_clusters)}"
)
if equilibrium.opinion_leaders:
leader = equilibrium.opinion_leaders[0]
summary.append(
f"⭐ Top opinion leader: {leader['persona_name']}"
)
return "\n".join(summary)