"""Population-scale opinion dynamics with homophily-based network assignment""" from typing import List, Dict, Tuple import random import networkx as nx from ..personas.models import Persona from ..population.variant_generator import VariantGenerator, VariationLevel from .network import InfluenceNetwork, NetworkType class PopulationNetwork: """ Creates population-scale networks with homophily-based persona assignment. Combines Phase 2 (population variants) with Phase 3 (opinion networks). """ def __init__( self, base_personas: List[Persona], population_size: int, network_type: NetworkType = "scale_free", homophily: float = 0.5, variation_level: VariationLevel = VariationLevel.MODERATE, random_seed: int = None, persona_weights: Dict[str, float] = None, ): """ Initialize population network. Args: base_personas: List of base personas to create variants from population_size: Total number of nodes in network network_type: Network topology homophily: Homophily parameter (0-1, higher = more clustering) variation_level: How much to vary persona characteristics random_seed: Random seed for reproducibility persona_weights: Optional custom distribution weights for personas """ self.base_personas = base_personas self.population_size = population_size self.network_type = network_type self.homophily = homophily self.variation_level = variation_level self.persona_weights = persona_weights if random_seed is not None: random.seed(random_seed) # Generate population variants self.variants = self._generate_population_variants() # Create network topology self.network_graph = self._create_network_topology() # Assign personas to nodes with homophily self.node_to_persona = self._assign_personas_with_homophily() # Build influence network self.influence_network = InfluenceNetwork( personas=self.variants, network_type=network_type, homophily=homophily, ) def _generate_population_variants(self) -> List[Persona]: """Generate population variants from base personas""" variants = [] if self.persona_weights: # Use custom weights to distribute population for base_persona in self.base_personas: weight = self.persona_weights.get(base_persona.persona_id, 0) if weight == 0: continue # Skip personas with zero weight count = int(round(weight * self.population_size)) if count == 0: continue # Skip if rounding resulted in zero generator = VariantGenerator(base_persona, self.variation_level) persona_variants = [ generator.generate_variant(f"_v{len(variants) + j}") for j in range(count) ] variants.extend(persona_variants) else: # Distribute population evenly across base personas variants_per_base = self.population_size // len(self.base_personas) remainder = self.population_size % len(self.base_personas) for i, base_persona in enumerate(self.base_personas): # Generate variants for this base persona count = variants_per_base + (1 if i < remainder else 0) generator = VariantGenerator(base_persona, self.variation_level) persona_variants = [ generator.generate_variant(f"_v{len(variants) + j}") for j in range(count) ] variants.extend(persona_variants) return variants def _create_network_topology(self) -> nx.Graph: """Create network topology graph""" n = self.population_size if self.network_type == "fully_connected": return nx.complete_graph(n) elif self.network_type == "scale_free": # Barabási-Albert m = max(2, min(5, n // 10)) # Edges to attach per new node return nx.barabasi_albert_graph(n, m) elif self.network_type == "small_world": # Watts-Strogatz k = max(4, min(10, n // 5)) # Nearest neighbors if k % 2 != 0: k -= 1 p = 0.1 # Rewiring probability return nx.watts_strogatz_graph(n, k, p) else: raise ValueError(f"Unknown network type: {self.network_type}") def _assign_personas_with_homophily(self) -> Dict[int, str]: """ Assign persona variants to network nodes using homophily. Higher homophily = similar personas become neighbors. """ node_to_persona = {} if self.homophily <= 0.1: # Random assignment (low homophily) shuffled_variants = random.sample(self.variants, len(self.variants)) for node_id in self.network_graph.nodes(): node_to_persona[node_id] = shuffled_variants[node_id].persona_id return node_to_persona # High homophily: use similarity-based assignment # Start with one random node assigned_nodes = set() unassigned_personas = {v.persona_id: v for v in self.variants} # Pick random starting node and persona start_node = random.choice(list(self.network_graph.nodes())) start_persona = random.choice(list(unassigned_personas.values())) node_to_persona[start_node] = start_persona.persona_id assigned_nodes.add(start_node) del unassigned_personas[start_persona.persona_id] # Assign remaining nodes using BFS with similarity while assigned_nodes and unassigned_personas: # Pick a random assigned node current_node = random.choice(list(assigned_nodes)) current_persona_id = node_to_persona[current_node] current_persona = next( v for v in self.variants if v.persona_id == current_persona_id ) # Find unassigned neighbors neighbors = [ n for n in self.network_graph.neighbors(current_node) if n not in assigned_nodes ] if not neighbors: assigned_nodes.remove(current_node) continue # Pick a random unassigned neighbor neighbor = random.choice(neighbors) # Assign persona based on homophily if random.random() < self.homophily: # High homophily: pick most similar persona best_persona = self._find_most_similar_persona( current_persona, list(unassigned_personas.values()) ) else: # Random choice (reduces homophily effect) best_persona = random.choice(list(unassigned_personas.values())) node_to_persona[neighbor] = best_persona.persona_id assigned_nodes.add(neighbor) del unassigned_personas[best_persona.persona_id] # Assign any remaining unassigned nodes (shouldn't happen, but safety) remaining_personas = list(unassigned_personas.values()) for node_id in self.network_graph.nodes(): if node_id not in node_to_persona and remaining_personas: persona = remaining_personas.pop() node_to_persona[node_id] = persona.persona_id return node_to_persona def _find_most_similar_persona( self, reference: Persona, candidates: List[Persona] ) -> Persona: """Find the most similar persona from candidates""" if not candidates: return None similarities = [ ( p, self.influence_network.calculate_persona_similarity(reference, p) if hasattr(self, 'influence_network') else self._quick_similarity(reference, p) ) for p in candidates ] return max(similarities, key=lambda x: x[1])[0] def _quick_similarity(self, p1: Persona, p2: Persona) -> float: """Quick similarity calculation (when influence network not yet built)""" # Political alignment scale = { "very_progressive": -2, "progressive": -1, "moderate": 0, "independent": 0, "conservative": 1, "very_conservative": 2, } pos1 = scale.get(p1.psychographics.political_leaning, 0) pos2 = scale.get(p2.psychographics.political_leaning, 0) political_sim = 1.0 - (abs(pos1 - pos2) / 4.0) # Age similarity age_sim = 1.0 - min(abs(p1.demographics.age - p2.demographics.age) / 100.0, 1.0) return (political_sim * 0.6 + age_sim * 0.4) def get_persona_for_node(self, node_id: int) -> Persona: """Get the persona assigned to a specific node""" persona_id = self.node_to_persona[node_id] return next(v for v in self.variants if v.persona_id == persona_id) def get_base_type_for_node(self, node_id: int) -> str: """Get the base persona type for a node""" persona = self.get_persona_for_node(node_id) return InfluenceNetwork.get_persona_base_type(persona) def get_neighbors(self, node_id: int) -> List[int]: """Get neighboring nodes""" return list(self.network_graph.neighbors(node_id)) def get_network_stats(self) -> Dict[str, any]: """Get network statistics""" return { "nodes": self.network_graph.number_of_nodes(), "edges": self.network_graph.number_of_edges(), "avg_degree": sum(dict(self.network_graph.degree()).values()) / self.population_size, "density": nx.density(self.network_graph), "homophily": self.homophily, "base_persona_distribution": self._calculate_base_distribution(), } def _calculate_base_distribution(self) -> Dict[str, int]: """Calculate distribution of base persona types""" distribution = {} for node_id in self.network_graph.nodes(): base_type = self.get_base_type_for_node(node_id) distribution[base_type] = distribution.get(base_type, 0) + 1 return distribution