# -*- coding: utf-8 -*- """revolutions_exploration.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/1omNn2hrbDL_s1qwCOr7ViaIjrRW61YDt """ import random import pandas as pd from mesa import Agent, Model import networkx as nx from mesa.time import RandomActivation from mesa.datacollection import DataCollector import numpy as np import seaborn as sns import matplotlib.pyplot as plt import matplotlib as mpl import cmocean import tqdm import scipy as sp from scipy.stats import beta import opinionated import PIL plt.style.use("opinionated_rc") # ----------------------------- # Decayed Beta helpers # ----------------------------- def apply_half_life_decay(data_list, half_life, decay_factors=None): steps = len(data_list) if decay_factors is None or len(decay_factors) < steps: decay_factors = [0.5 ** (i / half_life) for i in range(steps)] return [data_list[i] * decay_factors[steps - 1 - i] for i in range(steps)] def get_beta_mean_from_experience_dict(experiences, half_life=20, decay_factors=None): eta = 1e-10 a = sum(apply_half_life_decay(experiences['dissident_experiences'], half_life, decay_factors)) + eta b = sum(apply_half_life_decay(experiences['supporter_experiences'], half_life, decay_factors)) + eta return beta.mean(a, b) def get_beta_sample_from_experience_dict(experiences, half_life=20, decay_factors=None): eta = 1e-10 a = sum(apply_half_life_decay(experiences['dissident_experiences'], half_life, decay_factors)) + eta b = sum(apply_half_life_decay(experiences['supporter_experiences'], half_life, decay_factors)) + eta return beta.rvs(a, b, size=1)[0] # ----------------------------- # Network helpers # ----------------------------- def generate_community_points(num_communities, total_nodes, powerlaw_exponent=2.0, sigma=0.05, plot=False): sequence = nx.utils.powerlaw_sequence(num_communities, powerlaw_exponent) probabilities = sequence / np.sum(sequence) community_assignments = np.random.choice(num_communities, size=total_nodes, p=probabilities) community_sizes = np.bincount(community_assignments) if len(community_sizes) < num_communities: community_sizes = np.pad(community_sizes, (0, num_communities - len(community_sizes)), 'constant') points, community_centers = [], [] for i in range(num_communities): center = np.random.rand(2) community_centers.append(center) community_points = np.random.normal(center, sigma, (community_sizes[i], 2)) points.append(community_points) points = np.concatenate(points) if plot: plt.figure(figsize=(8, 8)) plt.scatter(points[:, 0], points[:, 1], alpha=0.5) sns.kdeplot(x=points[:, 0], y=points[:, 1], levels=5, color="k", linewidths=1) plt.show() return points def graph_from_coordinates(coords, radius): kdtree = sp.spatial.cKDTree(coords) edge_indexes = kdtree.query_pairs(radius) g = nx.Graph() g.add_nodes_from(list(range(len(coords)))) g.add_edges_from(edge_indexes) return g def ensure_neighbors(graph): nodes = list(graph.nodes()) for node in nodes: if graph.degree(node) == 0: other = random.choice(nodes) while other == node: other = random.choice(nodes) graph.add_edge(node, other) return graph def compute_homophily(G, attr_name='attr'): same = sum(G.nodes[n1][attr_name] == G.nodes[n2][attr_name] for n1, n2 in G.edges()) m = G.number_of_edges() return same / m if m > 0 else 0 def assign_initial_attributes(G, ratio, attr_name='attr'): nodes = list(G.nodes) random.shuffle(nodes) k = int(ratio * len(nodes)) for i, node in enumerate(nodes): G.nodes[node][attr_name] = 0 if i < k else 1 return G def distribute_attributes(G, target_homophily, seed=None, max_iter=10000, cooling_factor=0.9995, attr_name='attr'): random.seed(seed) current = compute_homophily(G, attr_name) temp = 1.0 for _ in range(max_iter): nodes = list(G.nodes) random.shuffle(nodes) for n1, n2 in zip(nodes[::2], nodes[1::2]): if G.nodes[n1][attr_name] != G.nodes[n2][attr_name]: G.nodes[n1][attr_name], G.nodes[n2][attr_name] = G.nodes[n2][attr_name], G.nodes[n1][attr_name] break new = compute_homophily(G, attr_name) delta = new - current dir_factor = np.sign(target_homophily - current) if abs(new - target_homophily) < abs(current - target_homophily) or \ (delta / temp < 700 and random.random() < np.exp(dir_factor * delta / temp)): current = new else: G.nodes[n1][attr_name], G.nodes[n2][attr_name] = G.nodes[n2][attr_name], G.nodes[n1][attr_name] temp *= cooling_factor return G def reindex_graph_to_match_attributes(G1, G2, attr_name): g1_sorted = sorted(G1.nodes(data=True), key=lambda x: x[1][attr_name]) g2_sorted = sorted(G2.nodes(data=True), key=lambda x: x[1][attr_name]) mapping = {g2[0]: g1[0] for g2, g1 in zip(g2_sorted, g1_sorted)} return nx.relabel_nodes(G2, mapping) # ----------------------------- # Reporters # ----------------------------- def compute_mean(model): return np.mean([a.estimation for a in model.schedule.agents]) def compute_median(model): return np.median([a.estimation for a in model.schedule.agents]) def compute_std(model): return np.std([a.estimation for a in model.schedule.agents]) # ----------------------------- # Agent and Model # ----------------------------- class PoliticalAgent(Agent): def __init__(self, unique_id, model, dissident): # Mesa versions differ here. Try the new signature, then fall back. try: super().__init__(unique_id, model) except TypeError: super().__init__() # object.__init__ without args self.unique_id = unique_id self.model = model # provide .random like classic Mesa Agent did if hasattr(model, "random"): self.random = model.random self.experiences = { 'dissident_experiences': [1], 'supporter_experiences': [1], } self.estimations = [] self.estimation = 0.5 self.experiments = [] self.dissident = dissident def update_estimation(self, network_id): partners = [self.model.id2agent[n] for n in self.model.networks[network_id]['network'].neighbors(self.unique_id)] current_estimate = get_beta_mean_from_experience_dict( self.experiences, half_life=self.model.half_life, decay_factors=self.model.decay_factors) self.estimations.append(current_estimate) self.estimation = current_estimate current_experiment = get_beta_sample_from_experience_dict( self.experiences, half_life=self.model.half_life, decay_factors=self.model.decay_factors) self.experiments.append(current_experiment) if not partners: return partner = random.choice(partners) ntype = self.model.networks[network_id]['type'] if ntype == 'physical': if current_experiment >= self.model.threshold: if partner.dissident: self.experiences['dissident_experiences'].append(1) self.experiences['supporter_experiences'].append(0) else: self.experiences['dissident_experiences'].append(0) self.experiences['supporter_experiences'].append(1) partner.experiences['dissident_experiences'].append(1 * self.model.social_learning_factor) partner.experiences['supporter_experiences'].append(0) else: partner.experiences['dissident_experiences'].append(0) partner.experiences['supporter_experiences'].append(1 * self.model.social_learning_factor) elif ntype == 'social_media': if partner.dissident: self.experiences['dissident_experiences'].append(1 * self.model.social_media_factor) self.experiences['supporter_experiences'].append(0) else: self.experiences['dissident_experiences'].append(0) self.experiences['supporter_experiences'].append(1 * self.model.social_media_factor) def combine_estimations(self): # Bounded confidence placeholder; keep harmless if not hasattr(self, "current_estimations"): return values = [list(d.values())[0] for d in self.current_estimations] if len(values) > 0: within = [v for v in values if abs(self.estimation - v) <= self.model.bounded_confidence_range] if len(within) > 0: self.estimation = np.mean(within) def step(self): if not hasattr(self, 'current_estimations'): self.current_estimations = [] for net_id in self.model.networks.keys(): self.update_estimation(net_id) self.combine_estimations() del self.current_estimations class PoliticalModel(Model): def __init__( self, n_agents, networks, share_regime_supporters, threshold, social_learning_factor=1, social_media_factor=1, half_life=20, print_agents=False, print_frequency=30, early_stopping_steps=20, early_stopping_range=0.01, agent_reporters=True, intervention_list=None, rng_seed=None, ): # Ensure Mesa creates self.random try: super().__init__(rng_seed=rng_seed) # Mesa >= 3.0 except TypeError: super().__init__(seed=rng_seed) # Mesa < 3.0 if intervention_list is None: intervention_list = [] self.num_agents = n_agents self.threshold = threshold self.social_learning_factor = social_learning_factor self.social_media_factor = social_media_factor self.print_agents_state = print_agents self.half_life = half_life self.intervention_list = intervention_list self.print_frequency = print_frequency self.early_stopping_steps = early_stopping_steps self.early_stopping_range = early_stopping_range self.bounded_confidence_range = 1.0 # harmless default self.mean_estimations = [] self.decay_factors = [0.5 ** (i / self.half_life) for i in range(500)] self.running = True self.share_regime_supporters = share_regime_supporters self.schedule = RandomActivation(self) self.networks = networks # Align attributes across networks and compute homophilies for i, this_network in enumerate(self.networks): self.networks[this_network]["network"] = assign_initial_attributes( self.networks[this_network]["network"], self.share_regime_supporters, attr_name='dissident' ) if 'homophily' in self.networks[this_network]: self.networks[this_network]["network"] = distribute_attributes( self.networks[this_network]["network"], self.networks[this_network]['homophily'], max_iter=5000, cooling_factor=0.995, attr_name='dissident' ) self.networks[this_network].setdefault('network_data_to_keep', {}) self.networks[this_network]['network_data_to_keep']['actual_homophily'] = compute_homophily( self.networks[this_network]["network"], attr_name='dissident' ) if i > 0: first_key = next(iter(self.networks)) self.networks[this_network]["network"] = reindex_graph_to_match_attributes( self.networks[first_key]["network"], self.networks[this_network]["network"], 'dissident' ) # Create agents and id -> agent map self.id2agent = {} first_key = next(iter(self.networks)) for i in range(self.num_agents): dissident_flag = self.networks[first_key]["network"].nodes[i]['dissident'] agent = PoliticalAgent(i, self, dissident_flag) self.schedule.add(agent) self.id2agent[i] = agent # Model reporters model_reporters = { "Mean": compute_mean, "Median": compute_median, "STD": compute_std } for this_network in self.networks: if 'network_data_to_keep' in self.networks[this_network]: for key, value in self.networks[this_network]['network_data_to_keep'].items(): attr_name = this_network + '_' + key setattr(self, attr_name, value) def reporter(model, attr_name=attr_name): return getattr(model, attr_name) model_reporters[attr_name] = reporter if agent_reporters: self.datacollector = DataCollector( model_reporters=model_reporters, agent_reporters={"Estimation": "estimation", "Dissident": "dissident"} ) else: self.datacollector = DataCollector(model_reporters=model_reporters) def step(self): self.datacollector.collect(self) # Interventions for this_intervention in self.intervention_list: if this_intervention['time'] == len(self.mean_estimations): if this_intervention['type'] == 'threshold_adjustment': self.threshold = max(0, min(1, self.threshold + this_intervention['strength'])) if this_intervention['type'] == 'share_adjustment': target_supporter_share = max(0, min(1, self.share_regime_supporters + this_intervention['strength'])) agents = list(self.schedule.agents) current_supporters = sum(not a.dissident for a in agents) total_agents = len(agents) required_supporters = int(target_supporter_share * total_agents) to_change = abs(required_supporters - current_supporters) if current_supporters / total_agents < target_supporter_share: pool = [a for a in agents if a.dissident] for a in random.sample(pool, min(to_change, len(pool))): a.dissident = False else: pool = [a for a in agents if not a.dissident] for a in random.sample(pool, min(to_change, len(pool))): a.dissident = True if this_intervention['type'] == 'social_media_adjustment': self.social_media_factor = max(0, min(1, self.social_media_factor + this_intervention['strength'])) self.schedule.step() self.mean_estimations.append(compute_mean(self)) if len(self.mean_estimations) >= self.early_stopping_steps: recent = self.mean_estimations[-self.early_stopping_steps:] if max(recent) - min(recent) < self.early_stopping_range: self.running = False # ----------------------------- # Runner and plotting # ----------------------------- def run_and_plot_simulation( separate_agent_types=False, n_agents=300, share_regime_supporters=0.4, threshold=0.5, social_learning_factor=1, simulation_steps=40, half_life=20, phys_network_radius=.06, powerlaw_exponent=3, physical_network_type='Fully Connected', introduce_physical_homophily_true_false=False, physical_homophily=.5, introduce_social_media_homophily_true_false=False, social_media_homophily=.5, social_media_network_type_random_geometric_radius=.07, social_media_network_type_powerlaw_exponent=3, social_media_network_type='Powerlaw', use_social_media_network=False, social_media_factor=1.0, rng_seed=None ): print(physical_network_type) networks = {} # Physical network if physical_network_type == 'Fully Connected': G = nx.complete_graph(n_agents) networks['physical'] = {"network": G, "type": "physical", "positions": nx.circular_layout(G)} elif physical_network_type == "Powerlaw": s = nx.utils.powerlaw_sequence(n_agents, powerlaw_exponent) G = nx.expected_degree_graph(s, selfloops=False) G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) networks['physical'] = {"network": G, "type": "physical", "positions": nx.kamada_kawai_layout(G)} elif physical_network_type == "Random Geometric": pts = np.random.rand(n_agents, 2) G = graph_from_coordinates(pts, phys_network_radius) G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) networks['physical'] = {"network": G, "type": "physical", "positions": pts} if introduce_physical_homophily_true_false: networks['physical']['homophily'] = physical_homophily networks['physical'].setdefault('network_data_to_keep', {}) # Social media network if use_social_media_network: if social_media_network_type == 'Fully Connected': G = nx.complete_graph(n_agents) networks['social_media'] = {"network": G, "type": "social_media", "positions": nx.circular_layout(G)} elif social_media_network_type == "Powerlaw": s = nx.utils.powerlaw_sequence(n_agents, social_media_network_type_powerlaw_exponent) G = nx.expected_degree_graph(s, selfloops=False) G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) networks['social_media'] = {"network": G, "type": "social_media", "positions": nx.kamada_kawai_layout(G)} elif social_media_network_type == "Random Geometric": pts = np.random.rand(n_agents, 2) G = graph_from_coordinates(pts, social_media_network_type_random_geometric_radius) G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) networks['social_media'] = {"network": G, "type": "social_media", "positions": pts} if introduce_social_media_homophily_true_false: networks['social_media']['homophily'] = social_media_homophily networks['social_media'].setdefault('network_data_to_keep', {}) model = PoliticalModel( n_agents, networks, share_regime_supporters, threshold, social_learning_factor=social_learning_factor, social_media_factor=social_media_factor, half_life=half_life, print_agents=False, print_frequency=50, agent_reporters=True, intervention_list=[], rng_seed=rng_seed ) for _ in tqdm.tqdm(range(simulation_steps)): model.step() agent_df = model.datacollector.get_agent_vars_dataframe().reset_index() agent_df_pivot = agent_df.pivot(index='Step', columns='AgentID', values='Estimation') # Time series plot fig1, ax = plt.subplots(figsize=(12, 8)) if not separate_agent_types: for col in agent_df_pivot.columns: plt.plot(agent_df_pivot.index, agent_df_pivot[col], color='gray', alpha=0.1) mean_est = agent_df_pivot.mean(axis=1) plt.plot(mean_est.index, mean_est, color='black', linewidth=2) else: colors = {1: '#d6a44b', 0: '#1b4968'} for aid in agent_df_pivot.columns: typ = agent_df.loc[agent_df['AgentID'] == aid, 'Dissident'].iloc[0] plt.plot(agent_df_pivot.index, agent_df_pivot[aid], color=colors[typ], alpha=0.1) for typ, color in colors.items(): mean_est = agent_df_pivot.loc[:, agent_df[agent_df['Dissident'] == typ]['AgentID']].mean(axis=1) plt.plot(mean_est.index, mean_est, color=color, linewidth=2, label='Dissident' if typ == 1 else 'Supporter') plt.legend(loc='lower right') plt.title('Agent Estimation Over Time', loc='right') plt.xlabel('Time step') plt.ylabel('Estimation') plt.savefig('run_plot.png', bbox_inches='tight', dpi=400, transparent=True) run_plot = PIL.Image.open('run_plot.png').convert('RGBA') # Network plot n_networks = len(networks) fig2, axs = plt.subplots(1, n_networks, figsize=(9.5 * n_networks, 8)) if n_networks == 1: axs = [axs] estimations = {a.unique_id: a.estimation for a in model.schedule.agents} for idx, (net_id, net_dict) in enumerate(networks.items()): net = net_dict['network'] nx.set_node_attributes(net, estimations, 'estimation') pos = net_dict.get('positions', nx.kamada_kawai_layout(net)) node_colors = [estimations[node] for node in net.nodes] axs[idx].set_title(f'Network: {net_id}', loc='right') nx.draw_networkx_nodes( net, pos, node_size=50, node_color=node_colors, cmap=cmocean.tools.crop_by_percent(cmocean.cm.curl, 20, which='both', N=None), vmin=0, vmax=1, ax=axs[idx] ) nx.draw_networkx_edges(net, pos, alpha=0.3, ax=axs[idx]) sm = mpl.cm.ScalarMappable( cmap=cmocean.tools.crop_by_percent(cmocean.cm.curl, 20, which='both', N=None), norm=plt.Normalize(vmin=0, vmax=1) ) sm.set_array([]) fig2.colorbar(sm, ax=axs[idx]) plt.savefig('network_plot.png', bbox_inches='tight', dpi=400, transparent=True) network_plot = PIL.Image.open('network_plot.png').convert('RGBA') return run_plot, network_plot # ----------------------------- # Gradio UI # ----------------------------- import gradio as gr with gr.Blocks(theme=gr.themes.Monochrome()) as demo: with gr.Column(): gr.Markdown("""# Simulate the emergence of social movements Vary the parameters below, and click 'Run Simulation' to run. """) with gr.Row(): with gr.Column(): with gr.Group(): separate_agent_types = gr.Checkbox(value=False, label="Separate agent types in plot") n_agents_slider = gr.Slider(100, 500, step=10, label="Number of Agents", value=150) share_regime_slider = gr.Slider(0.0, 1.0, step=0.01, label="Share of Regime Supporters", value=0.4) threshold_slider = gr.Slider(0.0, 1.0, step=0.01, label="Threshold", value=0.5) social_learning_slider = gr.Slider(0.0, 2.0, step=0.1, label="Social Learning Factor", value=1.0) steps_slider = gr.Slider(10, 100, step=5, label="Simulation Steps", value=40) half_life_slider = gr.Slider(5, 50, step=5, label="Half-Life", value=20) # Physical network settings with gr.Group(): gr.Markdown("""**Physical Network Settings:**""") introduce_physical_homophily_true_false = gr.Checkbox(value=False, label="Stipulate Homophily") with gr.Group(visible=False) as homophily_group: physical_homophily = gr.Slider(0, 1, label="Homophily", info='How much homophily to stipulate.') def update_homophily_group_visibility(checkbox_state): return {homophily_group: gr.Group(visible=checkbox_state)} introduce_physical_homophily_true_false.change( update_homophily_group_visibility, inputs=introduce_physical_homophily_true_false, outputs=homophily_group ) physical_network_type = gr.Dropdown(label="Physical Network Type", value="Fully Connected", choices=["Fully Connected", "Random Geometric", "Powerlaw"]) with gr.Group(visible=True) as physical_network_type_fully_connected_group: gr.Markdown("""""") with gr.Group(visible=False) as physical_network_type_random_geometric_group: physical_network_type_random_geometric_radius = gr.Slider(0.0, 0.5, label="Radius") with gr.Group(visible=False) as physical_network_type_powerlaw_group: physical_network_type_random_geometric_powerlaw_exponent = gr.Slider(0.0, 5.2, label="Powerlaw Exponent") def update_sliders(option): return { physical_network_type_fully_connected_group: gr.Group(visible=option == "Fully Connected"), physical_network_type_random_geometric_group: gr.Group(visible=option == "Random Geometric"), physical_network_type_powerlaw_group: gr.Group(visible=option == "Powerlaw") } physical_network_type.change( update_sliders, inputs=physical_network_type, outputs=[ physical_network_type_fully_connected_group, physical_network_type_random_geometric_group, physical_network_type_powerlaw_group ] ) # Social media settings use_social_media_network = gr.Checkbox(value=False, label="Use social media network") with gr.Group(visible=False) as social_media_group: gr.Markdown("""**Social Media Network Settings:**""") social_media_factor = gr.Slider(0, 2, label="Social Media Factor", info='Weight of social media vs learning in the real world.', value=1.0) introduce_social_media_homophily_true_false = gr.Checkbox(value=False, label="Stipulate Homophily") with gr.Group(visible=False) as social_media_homophily_group: social_media_homophily = gr.Slider(0, 1, label="Homophily", info='How much homophily to stipulate in social media network.') def update_social_media_homophily_group_visibility(checkbox_state): return {social_media_homophily_group: gr.Group(visible=checkbox_state)} introduce_social_media_homophily_true_false.change( update_social_media_homophily_group_visibility, inputs=introduce_social_media_homophily_true_false, outputs=social_media_homophily_group ) social_media_network_type = gr.Dropdown(label="Social Media Network Type", value="Fully Connected", choices=["Fully Connected", "Random Geometric", "Powerlaw"]) with gr.Group(visible=True) as social_media_network_type_fully_connected_group: gr.Markdown("""""") with gr.Group(visible=False) as social_media_network_type_random_geometric_group: social_media_network_type_random_geometric_radius = gr.Slider(0.0, 0.5, label="Radius") with gr.Group(visible=False) as social_media_network_type_powerlaw_group: social_media_network_type_powerlaw_exponent = gr.Slider(0.0, 5.2, label="Powerlaw Exponent") def update_social_media_network_sliders(option): return { social_media_network_type_fully_connected_group: gr.Group(visible=option == "Fully Connected"), social_media_network_type_random_geometric_group: gr.Group(visible=option == "Random Geometric"), social_media_network_type_powerlaw_group: gr.Group(visible=option == "Powerlaw") } social_media_network_type.change( update_social_media_network_sliders, inputs=social_media_network_type, outputs=[ social_media_network_type_fully_connected_group, social_media_network_type_random_geometric_group, social_media_network_type_powerlaw_group ] ) def update_social_media_group_visibility(checkbox_state): return {social_media_group: gr.Group(visible=checkbox_state)} use_social_media_network.change( update_social_media_group_visibility, inputs=use_social_media_network, outputs=social_media_group ) with gr.Column(): button = gr.Button("Run Simulation") plot_output = gr.Image(label="Simulation Result") network_output = gr.Image(label="Networks") def run_simulation_and_plot(*args): return run_and_plot_simulation(*args) button.click( run_simulation_and_plot, inputs=[ separate_agent_types, n_agents_slider, share_regime_slider, threshold_slider, social_learning_slider, steps_slider, half_life_slider, physical_network_type_random_geometric_radius, physical_network_type_random_geometric_powerlaw_exponent, physical_network_type, introduce_physical_homophily_true_false, physical_homophily, introduce_social_media_homophily_true_false, social_media_homophily, social_media_network_type_random_geometric_radius, social_media_network_type_powerlaw_exponent, social_media_network_type, use_social_media_network, social_media_factor, ], outputs=[plot_output, network_output] ) if __name__ == "__main__": demo.launch(debug=True)