Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """revolutions_exploration.ipynb | |
| Automatically generated by Colaboratory. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1omNn2hrbDL_s1qwCOr7ViaIjrRW61YDt | |
| """ | |
| import random | |
| import pandas as pd | |
| from mesa import Agent, Model | |
| import networkx as nx | |
| from mesa.time import RandomActivation | |
| from mesa.datacollection import DataCollector | |
| import numpy as np | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| import matplotlib as mpl | |
| import cmocean | |
| import tqdm | |
| import scipy as sp | |
| from scipy.stats import beta | |
| import opinionated | |
| import PIL | |
| plt.style.use("opinionated_rc") | |
| # ----------------------------- | |
| # Decayed Beta helpers | |
| # ----------------------------- | |
| def apply_half_life_decay(data_list, half_life, decay_factors=None): | |
| steps = len(data_list) | |
| if decay_factors is None or len(decay_factors) < steps: | |
| decay_factors = [0.5 ** (i / half_life) for i in range(steps)] | |
| return [data_list[i] * decay_factors[steps - 1 - i] for i in range(steps)] | |
| def get_beta_mean_from_experience_dict(experiences, half_life=20, decay_factors=None): | |
| eta = 1e-10 | |
| a = sum(apply_half_life_decay(experiences['dissident_experiences'], half_life, decay_factors)) + eta | |
| b = sum(apply_half_life_decay(experiences['supporter_experiences'], half_life, decay_factors)) + eta | |
| return beta.mean(a, b) | |
| def get_beta_sample_from_experience_dict(experiences, half_life=20, decay_factors=None): | |
| eta = 1e-10 | |
| a = sum(apply_half_life_decay(experiences['dissident_experiences'], half_life, decay_factors)) + eta | |
| b = sum(apply_half_life_decay(experiences['supporter_experiences'], half_life, decay_factors)) + eta | |
| return beta.rvs(a, b, size=1)[0] | |
| # ----------------------------- | |
| # Network helpers | |
| # ----------------------------- | |
| def generate_community_points(num_communities, total_nodes, powerlaw_exponent=2.0, sigma=0.05, plot=False): | |
| sequence = nx.utils.powerlaw_sequence(num_communities, powerlaw_exponent) | |
| probabilities = sequence / np.sum(sequence) | |
| community_assignments = np.random.choice(num_communities, size=total_nodes, p=probabilities) | |
| community_sizes = np.bincount(community_assignments) | |
| if len(community_sizes) < num_communities: | |
| community_sizes = np.pad(community_sizes, (0, num_communities - len(community_sizes)), 'constant') | |
| points, community_centers = [], [] | |
| for i in range(num_communities): | |
| center = np.random.rand(2) | |
| community_centers.append(center) | |
| community_points = np.random.normal(center, sigma, (community_sizes[i], 2)) | |
| points.append(community_points) | |
| points = np.concatenate(points) | |
| if plot: | |
| plt.figure(figsize=(8, 8)) | |
| plt.scatter(points[:, 0], points[:, 1], alpha=0.5) | |
| sns.kdeplot(x=points[:, 0], y=points[:, 1], levels=5, color="k", linewidths=1) | |
| plt.show() | |
| return points | |
| def graph_from_coordinates(coords, radius): | |
| kdtree = sp.spatial.cKDTree(coords) | |
| edge_indexes = kdtree.query_pairs(radius) | |
| g = nx.Graph() | |
| g.add_nodes_from(list(range(len(coords)))) | |
| g.add_edges_from(edge_indexes) | |
| return g | |
| def ensure_neighbors(graph): | |
| nodes = list(graph.nodes()) | |
| for node in nodes: | |
| if graph.degree(node) == 0: | |
| other = random.choice(nodes) | |
| while other == node: | |
| other = random.choice(nodes) | |
| graph.add_edge(node, other) | |
| return graph | |
| def compute_homophily(G, attr_name='attr'): | |
| same = sum(G.nodes[n1][attr_name] == G.nodes[n2][attr_name] for n1, n2 in G.edges()) | |
| m = G.number_of_edges() | |
| return same / m if m > 0 else 0 | |
| def assign_initial_attributes(G, ratio, attr_name='attr'): | |
| nodes = list(G.nodes) | |
| random.shuffle(nodes) | |
| k = int(ratio * len(nodes)) | |
| for i, node in enumerate(nodes): | |
| G.nodes[node][attr_name] = 0 if i < k else 1 | |
| return G | |
| def distribute_attributes(G, target_homophily, seed=None, max_iter=10000, cooling_factor=0.9995, attr_name='attr'): | |
| random.seed(seed) | |
| current = compute_homophily(G, attr_name) | |
| temp = 1.0 | |
| for _ in range(max_iter): | |
| nodes = list(G.nodes) | |
| random.shuffle(nodes) | |
| for n1, n2 in zip(nodes[::2], nodes[1::2]): | |
| if G.nodes[n1][attr_name] != G.nodes[n2][attr_name]: | |
| G.nodes[n1][attr_name], G.nodes[n2][attr_name] = G.nodes[n2][attr_name], G.nodes[n1][attr_name] | |
| break | |
| new = compute_homophily(G, attr_name) | |
| delta = new - current | |
| dir_factor = np.sign(target_homophily - current) | |
| if abs(new - target_homophily) < abs(current - target_homophily) or \ | |
| (delta / temp < 700 and random.random() < np.exp(dir_factor * delta / temp)): | |
| current = new | |
| else: | |
| G.nodes[n1][attr_name], G.nodes[n2][attr_name] = G.nodes[n2][attr_name], G.nodes[n1][attr_name] | |
| temp *= cooling_factor | |
| return G | |
| def reindex_graph_to_match_attributes(G1, G2, attr_name): | |
| g1_sorted = sorted(G1.nodes(data=True), key=lambda x: x[1][attr_name]) | |
| g2_sorted = sorted(G2.nodes(data=True), key=lambda x: x[1][attr_name]) | |
| mapping = {g2[0]: g1[0] for g2, g1 in zip(g2_sorted, g1_sorted)} | |
| return nx.relabel_nodes(G2, mapping) | |
| # ----------------------------- | |
| # Reporters | |
| # ----------------------------- | |
| def compute_mean(model): | |
| return np.mean([a.estimation for a in model.schedule.agents]) | |
| def compute_median(model): | |
| return np.median([a.estimation for a in model.schedule.agents]) | |
| def compute_std(model): | |
| return np.std([a.estimation for a in model.schedule.agents]) | |
| # ----------------------------- | |
| # Agent and Model | |
| # ----------------------------- | |
| class PoliticalAgent(Agent): | |
| def __init__(self, unique_id, model, dissident): | |
| # Mesa versions differ here. Try the new signature, then fall back. | |
| try: | |
| super().__init__(unique_id, model) | |
| except TypeError: | |
| super().__init__() # object.__init__ without args | |
| self.unique_id = unique_id | |
| self.model = model | |
| # provide .random like classic Mesa Agent did | |
| if hasattr(model, "random"): | |
| self.random = model.random | |
| self.experiences = { | |
| 'dissident_experiences': [1], | |
| 'supporter_experiences': [1], | |
| } | |
| self.estimations = [] | |
| self.estimation = 0.5 | |
| self.experiments = [] | |
| self.dissident = dissident | |
| def update_estimation(self, network_id): | |
| partners = [self.model.id2agent[n] | |
| for n in self.model.networks[network_id]['network'].neighbors(self.unique_id)] | |
| current_estimate = get_beta_mean_from_experience_dict( | |
| self.experiences, half_life=self.model.half_life, decay_factors=self.model.decay_factors) | |
| self.estimations.append(current_estimate) | |
| self.estimation = current_estimate | |
| current_experiment = get_beta_sample_from_experience_dict( | |
| self.experiences, half_life=self.model.half_life, decay_factors=self.model.decay_factors) | |
| self.experiments.append(current_experiment) | |
| if not partners: | |
| return | |
| partner = random.choice(partners) | |
| ntype = self.model.networks[network_id]['type'] | |
| if ntype == 'physical': | |
| if current_experiment >= self.model.threshold: | |
| if partner.dissident: | |
| self.experiences['dissident_experiences'].append(1) | |
| self.experiences['supporter_experiences'].append(0) | |
| else: | |
| self.experiences['dissident_experiences'].append(0) | |
| self.experiences['supporter_experiences'].append(1) | |
| partner.experiences['dissident_experiences'].append(1 * self.model.social_learning_factor) | |
| partner.experiences['supporter_experiences'].append(0) | |
| else: | |
| partner.experiences['dissident_experiences'].append(0) | |
| partner.experiences['supporter_experiences'].append(1 * self.model.social_learning_factor) | |
| elif ntype == 'social_media': | |
| if partner.dissident: | |
| self.experiences['dissident_experiences'].append(1 * self.model.social_media_factor) | |
| self.experiences['supporter_experiences'].append(0) | |
| else: | |
| self.experiences['dissident_experiences'].append(0) | |
| self.experiences['supporter_experiences'].append(1 * self.model.social_media_factor) | |
| def combine_estimations(self): | |
| # Bounded confidence placeholder; keep harmless | |
| if not hasattr(self, "current_estimations"): | |
| return | |
| values = [list(d.values())[0] for d in self.current_estimations] | |
| if len(values) > 0: | |
| within = [v for v in values if abs(self.estimation - v) <= self.model.bounded_confidence_range] | |
| if len(within) > 0: | |
| self.estimation = np.mean(within) | |
| def step(self): | |
| if not hasattr(self, 'current_estimations'): | |
| self.current_estimations = [] | |
| for net_id in self.model.networks.keys(): | |
| self.update_estimation(net_id) | |
| self.combine_estimations() | |
| del self.current_estimations | |
| class PoliticalModel(Model): | |
| def __init__( | |
| self, | |
| n_agents, | |
| networks, | |
| share_regime_supporters, | |
| threshold, | |
| social_learning_factor=1, | |
| social_media_factor=1, | |
| half_life=20, | |
| print_agents=False, | |
| print_frequency=30, | |
| early_stopping_steps=20, | |
| early_stopping_range=0.01, | |
| agent_reporters=True, | |
| intervention_list=None, | |
| rng_seed=None, | |
| ): | |
| # Ensure Mesa creates self.random | |
| try: | |
| super().__init__(rng_seed=rng_seed) # Mesa >= 3.0 | |
| except TypeError: | |
| super().__init__(seed=rng_seed) # Mesa < 3.0 | |
| if intervention_list is None: | |
| intervention_list = [] | |
| self.num_agents = n_agents | |
| self.threshold = threshold | |
| self.social_learning_factor = social_learning_factor | |
| self.social_media_factor = social_media_factor | |
| self.print_agents_state = print_agents | |
| self.half_life = half_life | |
| self.intervention_list = intervention_list | |
| self.print_frequency = print_frequency | |
| self.early_stopping_steps = early_stopping_steps | |
| self.early_stopping_range = early_stopping_range | |
| self.bounded_confidence_range = 1.0 # harmless default | |
| self.mean_estimations = [] | |
| self.decay_factors = [0.5 ** (i / self.half_life) for i in range(500)] | |
| self.running = True | |
| self.share_regime_supporters = share_regime_supporters | |
| self.schedule = RandomActivation(self) | |
| self.networks = networks | |
| # Align attributes across networks and compute homophilies | |
| for i, this_network in enumerate(self.networks): | |
| self.networks[this_network]["network"] = assign_initial_attributes( | |
| self.networks[this_network]["network"], self.share_regime_supporters, attr_name='dissident' | |
| ) | |
| if 'homophily' in self.networks[this_network]: | |
| self.networks[this_network]["network"] = distribute_attributes( | |
| self.networks[this_network]["network"], | |
| self.networks[this_network]['homophily'], | |
| max_iter=5000, | |
| cooling_factor=0.995, | |
| attr_name='dissident' | |
| ) | |
| self.networks[this_network].setdefault('network_data_to_keep', {}) | |
| self.networks[this_network]['network_data_to_keep']['actual_homophily'] = compute_homophily( | |
| self.networks[this_network]["network"], attr_name='dissident' | |
| ) | |
| if i > 0: | |
| first_key = next(iter(self.networks)) | |
| self.networks[this_network]["network"] = reindex_graph_to_match_attributes( | |
| self.networks[first_key]["network"], self.networks[this_network]["network"], 'dissident' | |
| ) | |
| # Create agents and id -> agent map | |
| self.id2agent = {} | |
| first_key = next(iter(self.networks)) | |
| for i in range(self.num_agents): | |
| dissident_flag = self.networks[first_key]["network"].nodes[i]['dissident'] | |
| agent = PoliticalAgent(i, self, dissident_flag) | |
| self.schedule.add(agent) | |
| self.id2agent[i] = agent | |
| # Model reporters | |
| model_reporters = { | |
| "Mean": compute_mean, | |
| "Median": compute_median, | |
| "STD": compute_std | |
| } | |
| for this_network in self.networks: | |
| if 'network_data_to_keep' in self.networks[this_network]: | |
| for key, value in self.networks[this_network]['network_data_to_keep'].items(): | |
| attr_name = this_network + '_' + key | |
| setattr(self, attr_name, value) | |
| def reporter(model, attr_name=attr_name): | |
| return getattr(model, attr_name) | |
| model_reporters[attr_name] = reporter | |
| if agent_reporters: | |
| self.datacollector = DataCollector( | |
| model_reporters=model_reporters, | |
| agent_reporters={"Estimation": "estimation", "Dissident": "dissident"} | |
| ) | |
| else: | |
| self.datacollector = DataCollector(model_reporters=model_reporters) | |
| def step(self): | |
| self.datacollector.collect(self) | |
| # Interventions | |
| for this_intervention in self.intervention_list: | |
| if this_intervention['time'] == len(self.mean_estimations): | |
| if this_intervention['type'] == 'threshold_adjustment': | |
| self.threshold = max(0, min(1, self.threshold + this_intervention['strength'])) | |
| if this_intervention['type'] == 'share_adjustment': | |
| target_supporter_share = max(0, min(1, self.share_regime_supporters + this_intervention['strength'])) | |
| agents = list(self.schedule.agents) | |
| current_supporters = sum(not a.dissident for a in agents) | |
| total_agents = len(agents) | |
| required_supporters = int(target_supporter_share * total_agents) | |
| to_change = abs(required_supporters - current_supporters) | |
| if current_supporters / total_agents < target_supporter_share: | |
| pool = [a for a in agents if a.dissident] | |
| for a in random.sample(pool, min(to_change, len(pool))): | |
| a.dissident = False | |
| else: | |
| pool = [a for a in agents if not a.dissident] | |
| for a in random.sample(pool, min(to_change, len(pool))): | |
| a.dissident = True | |
| if this_intervention['type'] == 'social_media_adjustment': | |
| self.social_media_factor = max(0, min(1, self.social_media_factor + this_intervention['strength'])) | |
| self.schedule.step() | |
| self.mean_estimations.append(compute_mean(self)) | |
| if len(self.mean_estimations) >= self.early_stopping_steps: | |
| recent = self.mean_estimations[-self.early_stopping_steps:] | |
| if max(recent) - min(recent) < self.early_stopping_range: | |
| self.running = False | |
| # ----------------------------- | |
| # Runner and plotting | |
| # ----------------------------- | |
| def run_and_plot_simulation( | |
| separate_agent_types=False, | |
| n_agents=300, | |
| share_regime_supporters=0.4, | |
| threshold=0.5, | |
| social_learning_factor=1, | |
| simulation_steps=40, | |
| half_life=20, | |
| phys_network_radius=.06, | |
| powerlaw_exponent=3, | |
| physical_network_type='Fully Connected', | |
| introduce_physical_homophily_true_false=False, | |
| physical_homophily=.5, | |
| introduce_social_media_homophily_true_false=False, | |
| social_media_homophily=.5, | |
| social_media_network_type_random_geometric_radius=.07, | |
| social_media_network_type_powerlaw_exponent=3, | |
| social_media_network_type='Powerlaw', | |
| use_social_media_network=False, | |
| social_media_factor=1.0, | |
| rng_seed=None | |
| ): | |
| print(physical_network_type) | |
| networks = {} | |
| # Physical network | |
| if physical_network_type == 'Fully Connected': | |
| G = nx.complete_graph(n_agents) | |
| networks['physical'] = {"network": G, "type": "physical", "positions": nx.circular_layout(G)} | |
| elif physical_network_type == "Powerlaw": | |
| s = nx.utils.powerlaw_sequence(n_agents, powerlaw_exponent) | |
| G = nx.expected_degree_graph(s, selfloops=False) | |
| G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) | |
| networks['physical'] = {"network": G, "type": "physical", "positions": nx.kamada_kawai_layout(G)} | |
| elif physical_network_type == "Random Geometric": | |
| pts = np.random.rand(n_agents, 2) | |
| G = graph_from_coordinates(pts, phys_network_radius) | |
| G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) | |
| networks['physical'] = {"network": G, "type": "physical", "positions": pts} | |
| if introduce_physical_homophily_true_false: | |
| networks['physical']['homophily'] = physical_homophily | |
| networks['physical'].setdefault('network_data_to_keep', {}) | |
| # Social media network | |
| if use_social_media_network: | |
| if social_media_network_type == 'Fully Connected': | |
| G = nx.complete_graph(n_agents) | |
| networks['social_media'] = {"network": G, "type": "social_media", "positions": nx.circular_layout(G)} | |
| elif social_media_network_type == "Powerlaw": | |
| s = nx.utils.powerlaw_sequence(n_agents, social_media_network_type_powerlaw_exponent) | |
| G = nx.expected_degree_graph(s, selfloops=False) | |
| G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) | |
| networks['social_media'] = {"network": G, "type": "social_media", "positions": nx.kamada_kawai_layout(G)} | |
| elif social_media_network_type == "Random Geometric": | |
| pts = np.random.rand(n_agents, 2) | |
| G = graph_from_coordinates(pts, social_media_network_type_random_geometric_radius) | |
| G = nx.convert_node_labels_to_integers(ensure_neighbors(G)) | |
| networks['social_media'] = {"network": G, "type": "social_media", "positions": pts} | |
| if introduce_social_media_homophily_true_false: | |
| networks['social_media']['homophily'] = social_media_homophily | |
| networks['social_media'].setdefault('network_data_to_keep', {}) | |
| model = PoliticalModel( | |
| n_agents, | |
| networks, | |
| share_regime_supporters, | |
| threshold, | |
| social_learning_factor=social_learning_factor, | |
| social_media_factor=social_media_factor, | |
| half_life=half_life, | |
| print_agents=False, | |
| print_frequency=50, | |
| agent_reporters=True, | |
| intervention_list=[], | |
| rng_seed=rng_seed | |
| ) | |
| for _ in tqdm.tqdm(range(simulation_steps)): | |
| model.step() | |
| agent_df = model.datacollector.get_agent_vars_dataframe().reset_index() | |
| agent_df_pivot = agent_df.pivot(index='Step', columns='AgentID', values='Estimation') | |
| # Time series plot | |
| fig1, ax = plt.subplots(figsize=(12, 8)) | |
| if not separate_agent_types: | |
| for col in agent_df_pivot.columns: | |
| plt.plot(agent_df_pivot.index, agent_df_pivot[col], color='gray', alpha=0.1) | |
| mean_est = agent_df_pivot.mean(axis=1) | |
| plt.plot(mean_est.index, mean_est, color='black', linewidth=2) | |
| else: | |
| colors = {1: '#d6a44b', 0: '#1b4968'} | |
| for aid in agent_df_pivot.columns: | |
| typ = agent_df.loc[agent_df['AgentID'] == aid, 'Dissident'].iloc[0] | |
| plt.plot(agent_df_pivot.index, agent_df_pivot[aid], color=colors[typ], alpha=0.1) | |
| for typ, color in colors.items(): | |
| mean_est = agent_df_pivot.loc[:, agent_df[agent_df['Dissident'] == typ]['AgentID']].mean(axis=1) | |
| plt.plot(mean_est.index, mean_est, color=color, linewidth=2, label='Dissident' if typ == 1 else 'Supporter') | |
| plt.legend(loc='lower right') | |
| plt.title('Agent Estimation Over Time', loc='right') | |
| plt.xlabel('Time step') | |
| plt.ylabel('Estimation') | |
| plt.savefig('run_plot.png', bbox_inches='tight', dpi=400, transparent=True) | |
| run_plot = PIL.Image.open('run_plot.png').convert('RGBA') | |
| # Network plot | |
| n_networks = len(networks) | |
| fig2, axs = plt.subplots(1, n_networks, figsize=(9.5 * n_networks, 8)) | |
| if n_networks == 1: | |
| axs = [axs] | |
| estimations = {a.unique_id: a.estimation for a in model.schedule.agents} | |
| for idx, (net_id, net_dict) in enumerate(networks.items()): | |
| net = net_dict['network'] | |
| nx.set_node_attributes(net, estimations, 'estimation') | |
| pos = net_dict.get('positions', nx.kamada_kawai_layout(net)) | |
| node_colors = [estimations[node] for node in net.nodes] | |
| axs[idx].set_title(f'Network: {net_id}', loc='right') | |
| nx.draw_networkx_nodes( | |
| net, pos, node_size=50, node_color=node_colors, | |
| cmap=cmocean.tools.crop_by_percent(cmocean.cm.curl, 20, which='both', N=None), | |
| vmin=0, vmax=1, ax=axs[idx] | |
| ) | |
| nx.draw_networkx_edges(net, pos, alpha=0.3, ax=axs[idx]) | |
| sm = mpl.cm.ScalarMappable( | |
| cmap=cmocean.tools.crop_by_percent(cmocean.cm.curl, 20, which='both', N=None), | |
| norm=plt.Normalize(vmin=0, vmax=1) | |
| ) | |
| sm.set_array([]) | |
| fig2.colorbar(sm, ax=axs[idx]) | |
| plt.savefig('network_plot.png', bbox_inches='tight', dpi=400, transparent=True) | |
| network_plot = PIL.Image.open('network_plot.png').convert('RGBA') | |
| return run_plot, network_plot | |
| # ----------------------------- | |
| # Gradio UI | |
| # ----------------------------- | |
| import gradio as gr | |
| with gr.Blocks(theme=gr.themes.Monochrome()) as demo: | |
| with gr.Column(): | |
| gr.Markdown("""# Simulate the emergence of social movements | |
| Vary the parameters below, and click 'Run Simulation' to run. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Group(): | |
| separate_agent_types = gr.Checkbox(value=False, label="Separate agent types in plot") | |
| n_agents_slider = gr.Slider(100, 500, step=10, label="Number of Agents", value=150) | |
| share_regime_slider = gr.Slider(0.0, 1.0, step=0.01, label="Share of Regime Supporters", value=0.4) | |
| threshold_slider = gr.Slider(0.0, 1.0, step=0.01, label="Threshold", value=0.5) | |
| social_learning_slider = gr.Slider(0.0, 2.0, step=0.1, label="Social Learning Factor", value=1.0) | |
| steps_slider = gr.Slider(10, 100, step=5, label="Simulation Steps", value=40) | |
| half_life_slider = gr.Slider(5, 50, step=5, label="Half-Life", value=20) | |
| # Physical network settings | |
| with gr.Group(): | |
| gr.Markdown("""**Physical Network Settings:**""") | |
| introduce_physical_homophily_true_false = gr.Checkbox(value=False, label="Stipulate Homophily") | |
| with gr.Group(visible=False) as homophily_group: | |
| physical_homophily = gr.Slider(0, 1, label="Homophily", info='How much homophily to stipulate.') | |
| def update_homophily_group_visibility(checkbox_state): | |
| return {homophily_group: gr.Group(visible=checkbox_state)} | |
| introduce_physical_homophily_true_false.change( | |
| update_homophily_group_visibility, | |
| inputs=introduce_physical_homophily_true_false, | |
| outputs=homophily_group | |
| ) | |
| physical_network_type = gr.Dropdown(label="Physical Network Type", value="Fully Connected", | |
| choices=["Fully Connected", "Random Geometric", "Powerlaw"]) | |
| with gr.Group(visible=True) as physical_network_type_fully_connected_group: | |
| gr.Markdown("""""") | |
| with gr.Group(visible=False) as physical_network_type_random_geometric_group: | |
| physical_network_type_random_geometric_radius = gr.Slider(0.0, 0.5, label="Radius") | |
| with gr.Group(visible=False) as physical_network_type_powerlaw_group: | |
| physical_network_type_random_geometric_powerlaw_exponent = gr.Slider(0.0, 5.2, label="Powerlaw Exponent") | |
| def update_sliders(option): | |
| return { | |
| physical_network_type_fully_connected_group: gr.Group(visible=option == "Fully Connected"), | |
| physical_network_type_random_geometric_group: gr.Group(visible=option == "Random Geometric"), | |
| physical_network_type_powerlaw_group: gr.Group(visible=option == "Powerlaw") | |
| } | |
| physical_network_type.change( | |
| update_sliders, | |
| inputs=physical_network_type, | |
| outputs=[ | |
| physical_network_type_fully_connected_group, | |
| physical_network_type_random_geometric_group, | |
| physical_network_type_powerlaw_group | |
| ] | |
| ) | |
| # Social media settings | |
| use_social_media_network = gr.Checkbox(value=False, label="Use social media network") | |
| with gr.Group(visible=False) as social_media_group: | |
| gr.Markdown("""**Social Media Network Settings:**""") | |
| social_media_factor = gr.Slider(0, 2, label="Social Media Factor", | |
| info='Weight of social media vs learning in the real world.', | |
| value=1.0) | |
| introduce_social_media_homophily_true_false = gr.Checkbox(value=False, label="Stipulate Homophily") | |
| with gr.Group(visible=False) as social_media_homophily_group: | |
| social_media_homophily = gr.Slider(0, 1, label="Homophily", info='How much homophily to stipulate in social media network.') | |
| def update_social_media_homophily_group_visibility(checkbox_state): | |
| return {social_media_homophily_group: gr.Group(visible=checkbox_state)} | |
| introduce_social_media_homophily_true_false.change( | |
| update_social_media_homophily_group_visibility, | |
| inputs=introduce_social_media_homophily_true_false, | |
| outputs=social_media_homophily_group | |
| ) | |
| social_media_network_type = gr.Dropdown(label="Social Media Network Type", value="Fully Connected", | |
| choices=["Fully Connected", "Random Geometric", "Powerlaw"]) | |
| with gr.Group(visible=True) as social_media_network_type_fully_connected_group: | |
| gr.Markdown("""""") | |
| with gr.Group(visible=False) as social_media_network_type_random_geometric_group: | |
| social_media_network_type_random_geometric_radius = gr.Slider(0.0, 0.5, label="Radius") | |
| with gr.Group(visible=False) as social_media_network_type_powerlaw_group: | |
| social_media_network_type_powerlaw_exponent = gr.Slider(0.0, 5.2, label="Powerlaw Exponent") | |
| def update_social_media_network_sliders(option): | |
| return { | |
| social_media_network_type_fully_connected_group: gr.Group(visible=option == "Fully Connected"), | |
| social_media_network_type_random_geometric_group: gr.Group(visible=option == "Random Geometric"), | |
| social_media_network_type_powerlaw_group: gr.Group(visible=option == "Powerlaw") | |
| } | |
| social_media_network_type.change( | |
| update_social_media_network_sliders, | |
| inputs=social_media_network_type, | |
| outputs=[ | |
| social_media_network_type_fully_connected_group, | |
| social_media_network_type_random_geometric_group, | |
| social_media_network_type_powerlaw_group | |
| ] | |
| ) | |
| def update_social_media_group_visibility(checkbox_state): | |
| return {social_media_group: gr.Group(visible=checkbox_state)} | |
| use_social_media_network.change( | |
| update_social_media_group_visibility, | |
| inputs=use_social_media_network, | |
| outputs=social_media_group | |
| ) | |
| with gr.Column(): | |
| button = gr.Button("Run Simulation") | |
| plot_output = gr.Image(label="Simulation Result") | |
| network_output = gr.Image(label="Networks") | |
| def run_simulation_and_plot(*args): | |
| return run_and_plot_simulation(*args) | |
| button.click( | |
| run_simulation_and_plot, | |
| inputs=[ | |
| separate_agent_types, | |
| n_agents_slider, | |
| share_regime_slider, | |
| threshold_slider, | |
| social_learning_slider, | |
| steps_slider, | |
| half_life_slider, | |
| physical_network_type_random_geometric_radius, | |
| physical_network_type_random_geometric_powerlaw_exponent, | |
| physical_network_type, | |
| introduce_physical_homophily_true_false, | |
| physical_homophily, | |
| introduce_social_media_homophily_true_false, | |
| social_media_homophily, | |
| social_media_network_type_random_geometric_radius, | |
| social_media_network_type_powerlaw_exponent, | |
| social_media_network_type, | |
| use_social_media_network, | |
| social_media_factor, | |
| ], | |
| outputs=[plot_output, network_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) |