AI_Personas / src /influence /population_network.py
Claude
Fix population network distribution and add opinion-based node coloring
e7dd723 unverified
"""Population-scale opinion dynamics with homophily-based network assignment"""
from typing import List, Dict, Tuple
import random
import networkx as nx
from ..personas.models import Persona
from ..population.variant_generator import VariantGenerator, VariationLevel
from .network import InfluenceNetwork, NetworkType
class PopulationNetwork:
"""
Creates population-scale networks with homophily-based persona assignment.
Combines Phase 2 (population variants) with Phase 3 (opinion networks).
"""
def __init__(
self,
base_personas: List[Persona],
population_size: int,
network_type: NetworkType = "scale_free",
homophily: float = 0.5,
variation_level: VariationLevel = VariationLevel.MODERATE,
random_seed: int = None,
persona_weights: Dict[str, float] = None,
):
"""
Initialize population network.
Args:
base_personas: List of base personas to create variants from
population_size: Total number of nodes in network
network_type: Network topology
homophily: Homophily parameter (0-1, higher = more clustering)
variation_level: How much to vary persona characteristics
random_seed: Random seed for reproducibility
persona_weights: Optional custom distribution weights for personas
"""
self.base_personas = base_personas
self.population_size = population_size
self.network_type = network_type
self.homophily = homophily
self.variation_level = variation_level
self.persona_weights = persona_weights
if random_seed is not None:
random.seed(random_seed)
# Generate population variants
self.variants = self._generate_population_variants()
# Create network topology
self.network_graph = self._create_network_topology()
# Assign personas to nodes with homophily
self.node_to_persona = self._assign_personas_with_homophily()
# Build influence network
self.influence_network = InfluenceNetwork(
personas=self.variants,
network_type=network_type,
homophily=homophily,
)
def _generate_population_variants(self) -> List[Persona]:
"""Generate population variants from base personas"""
variants = []
if self.persona_weights:
# Use custom weights to distribute population
for base_persona in self.base_personas:
weight = self.persona_weights.get(base_persona.persona_id, 0)
if weight == 0:
continue # Skip personas with zero weight
count = int(round(weight * self.population_size))
if count == 0:
continue # Skip if rounding resulted in zero
generator = VariantGenerator(base_persona, self.variation_level)
persona_variants = [
generator.generate_variant(f"_v{len(variants) + j}")
for j in range(count)
]
variants.extend(persona_variants)
else:
# Distribute population evenly across base personas
variants_per_base = self.population_size // len(self.base_personas)
remainder = self.population_size % len(self.base_personas)
for i, base_persona in enumerate(self.base_personas):
# Generate variants for this base persona
count = variants_per_base + (1 if i < remainder else 0)
generator = VariantGenerator(base_persona, self.variation_level)
persona_variants = [
generator.generate_variant(f"_v{len(variants) + j}")
for j in range(count)
]
variants.extend(persona_variants)
return variants
def _create_network_topology(self) -> nx.Graph:
"""Create network topology graph"""
n = self.population_size
if self.network_type == "fully_connected":
return nx.complete_graph(n)
elif self.network_type == "scale_free":
# Barabási-Albert
m = max(2, min(5, n // 10)) # Edges to attach per new node
return nx.barabasi_albert_graph(n, m)
elif self.network_type == "small_world":
# Watts-Strogatz
k = max(4, min(10, n // 5)) # Nearest neighbors
if k % 2 != 0:
k -= 1
p = 0.1 # Rewiring probability
return nx.watts_strogatz_graph(n, k, p)
else:
raise ValueError(f"Unknown network type: {self.network_type}")
def _assign_personas_with_homophily(self) -> Dict[int, str]:
"""
Assign persona variants to network nodes using homophily.
Higher homophily = similar personas become neighbors.
"""
node_to_persona = {}
if self.homophily <= 0.1:
# Random assignment (low homophily)
shuffled_variants = random.sample(self.variants, len(self.variants))
for node_id in self.network_graph.nodes():
node_to_persona[node_id] = shuffled_variants[node_id].persona_id
return node_to_persona
# High homophily: use similarity-based assignment
# Start with one random node
assigned_nodes = set()
unassigned_personas = {v.persona_id: v for v in self.variants}
# Pick random starting node and persona
start_node = random.choice(list(self.network_graph.nodes()))
start_persona = random.choice(list(unassigned_personas.values()))
node_to_persona[start_node] = start_persona.persona_id
assigned_nodes.add(start_node)
del unassigned_personas[start_persona.persona_id]
# Assign remaining nodes using BFS with similarity
while assigned_nodes and unassigned_personas:
# Pick a random assigned node
current_node = random.choice(list(assigned_nodes))
current_persona_id = node_to_persona[current_node]
current_persona = next(
v for v in self.variants if v.persona_id == current_persona_id
)
# Find unassigned neighbors
neighbors = [
n for n in self.network_graph.neighbors(current_node)
if n not in assigned_nodes
]
if not neighbors:
assigned_nodes.remove(current_node)
continue
# Pick a random unassigned neighbor
neighbor = random.choice(neighbors)
# Assign persona based on homophily
if random.random() < self.homophily:
# High homophily: pick most similar persona
best_persona = self._find_most_similar_persona(
current_persona, list(unassigned_personas.values())
)
else:
# Random choice (reduces homophily effect)
best_persona = random.choice(list(unassigned_personas.values()))
node_to_persona[neighbor] = best_persona.persona_id
assigned_nodes.add(neighbor)
del unassigned_personas[best_persona.persona_id]
# Assign any remaining unassigned nodes (shouldn't happen, but safety)
remaining_personas = list(unassigned_personas.values())
for node_id in self.network_graph.nodes():
if node_id not in node_to_persona and remaining_personas:
persona = remaining_personas.pop()
node_to_persona[node_id] = persona.persona_id
return node_to_persona
def _find_most_similar_persona(
self, reference: Persona, candidates: List[Persona]
) -> Persona:
"""Find the most similar persona from candidates"""
if not candidates:
return None
similarities = [
(
p,
self.influence_network.calculate_persona_similarity(reference, p)
if hasattr(self, 'influence_network')
else self._quick_similarity(reference, p)
)
for p in candidates
]
return max(similarities, key=lambda x: x[1])[0]
def _quick_similarity(self, p1: Persona, p2: Persona) -> float:
"""Quick similarity calculation (when influence network not yet built)"""
# Political alignment
scale = {
"very_progressive": -2,
"progressive": -1,
"moderate": 0,
"independent": 0,
"conservative": 1,
"very_conservative": 2,
}
pos1 = scale.get(p1.psychographics.political_leaning, 0)
pos2 = scale.get(p2.psychographics.political_leaning, 0)
political_sim = 1.0 - (abs(pos1 - pos2) / 4.0)
# Age similarity
age_sim = 1.0 - min(abs(p1.demographics.age - p2.demographics.age) / 100.0, 1.0)
return (political_sim * 0.6 + age_sim * 0.4)
def get_persona_for_node(self, node_id: int) -> Persona:
"""Get the persona assigned to a specific node"""
persona_id = self.node_to_persona[node_id]
return next(v for v in self.variants if v.persona_id == persona_id)
def get_base_type_for_node(self, node_id: int) -> str:
"""Get the base persona type for a node"""
persona = self.get_persona_for_node(node_id)
return InfluenceNetwork.get_persona_base_type(persona)
def get_neighbors(self, node_id: int) -> List[int]:
"""Get neighboring nodes"""
return list(self.network_graph.neighbors(node_id))
def get_network_stats(self) -> Dict[str, any]:
"""Get network statistics"""
return {
"nodes": self.network_graph.number_of_nodes(),
"edges": self.network_graph.number_of_edges(),
"avg_degree": sum(dict(self.network_graph.degree()).values()) / self.population_size,
"density": nx.density(self.network_graph),
"homophily": self.homophily,
"base_persona_distribution": self._calculate_base_distribution(),
}
def _calculate_base_distribution(self) -> Dict[str, int]:
"""Calculate distribution of base persona types"""
distribution = {}
for node_id in self.network_graph.nodes():
base_type = self.get_base_type_for_node(node_id)
distribution[base_type] = distribution.get(base_type, 0) + 1
return distribution