| import gradio as gr |
| import numpy as np |
| import matplotlib.pyplot as plt |
| from PIL import Image |
| from scipy.stats import gaussian_kde |
| import os |
|
|
| |
| def hamilton_optimal_control(x, alpha_k, z, local_weight, pos_idx, d_sq): |
| if len(pos_idx) == 0: return None, 0 |
| w_sub = local_weight[pos_idx] |
| d_wnE = d_sq / (w_sub**2 + 1e-12) |
| idx_sorted = np.argsort(d_wnE) |
| sorted_pos_idx = pos_idx[idx_sorted] |
| sorted_w = local_weight[sorted_pos_idx] |
| cumsum_w = np.cumsum(sorted_w) |
| cutoff = np.searchsorted(cumsum_w, alpha_k) |
| if cutoff >= len(sorted_pos_idx): cutoff = len(sorted_pos_idx) - 1 |
| final_idx = sorted_pos_idx[:cutoff+1] |
| final_w = sorted_w[:cutoff+1].copy() |
| if cutoff > 0: final_w[-1] = alpha_k - cumsum_w[cutoff-1] |
| else: final_w[-1] = alpha_k |
| y_loc_pos = z[final_idx] |
| y_loc_wgt = final_w.reshape(-1, 1) |
| gamma = np.sum(y_loc_wgt) |
| xg = np.sum(y_loc_wgt * y_loc_pos, axis=0) / (gamma + 1e-12) |
| return xg, gamma |
|
|
| |
| def update_weight_local(x, z, local_weight, alpha_k): |
| pos_idx = np.where(local_weight > 1e-15)[0] |
| if len(pos_idx) == 0: return local_weight, np.zeros_like(local_weight), pos_idx, np.array([]) |
| dist_sq = np.sum((z[pos_idx] - x)**2, axis=1) |
| idx_sort = np.argsort(dist_sq) |
| sorted_idx = pos_idx[idx_sort] |
| sorted_w = local_weight[sorted_idx] |
| cumsum_w = np.cumsum(sorted_w) |
| cutoff = np.searchsorted(cumsum_w, alpha_k) |
| weight_dist = np.zeros_like(local_weight) |
| if cutoff < len(sorted_idx): |
| take_idx = sorted_idx[:cutoff] |
| weight_dist[take_idx] = local_weight[take_idx]; local_weight[take_idx] = 0 |
| remainder = alpha_k - (cumsum_w[cutoff-1] if cutoff > 0 else 0) |
| weight_dist[sorted_idx[cutoff]] = remainder; local_weight[sorted_idx[cutoff]] -= remainder |
| else: |
| weight_dist[sorted_idx] = local_weight[sorted_idx]; local_weight[sorted_idx] = 0 |
| local_weight[local_weight < 1e-12] = 0 |
| new_pos_idx = np.where(local_weight > 1e-15)[0] |
| new_dist_sq = np.sum((z[new_pos_idx] - x)**2, axis=1) if len(new_pos_idx) > 0 else np.array([]) |
| return local_weight, weight_dist, new_pos_idx, new_dist_sq |
|
|
| |
| def run_simulation(num_agents, battery_life): |
| no_of_agents = int(num_agents) |
| bat_life = int(battery_life) |
| T, g, comm_range = 0.1, 9.81, 15.0 |
| alpha_k = 1.0 / bat_life |
| frames = [] |
|
|
| Ad_sub = np.array([[1, T, 0, 0], [0, 0.7, -T*g, 0], [0, 0, 1, T], [0, 0, 0, 0.7]]) |
| Ad = np.zeros((8, 8)); Ad[:4, :4] = Ad_sub; Ad[4:, 4:] = Ad_sub |
| Bd = np.zeros((8, 2)); Bd[3, 0] = T/0.1; Bd[7, 1] = T/0.1 |
| CTC = np.zeros((8, 8)); CTC[0,0] = 1.0; CTC[4,4] = 1.0 |
| Q_base, R_block = np.eye(8) * 0.1, np.eye(2) * 10.0 |
| hor_leng = 5 |
| E12 = np.zeros((40, 40)) |
| for i in range(hor_leng): |
| E12[i*8:(i+1)*8, i*8:(i+1)*8] = -np.eye(8) |
| if i < hor_leng - 1: E12[i*8:(i+1)*8, (i+1)*8:(i+2)*8] = Ad.T |
| E12_inv = np.linalg.inv(E12) |
| E23 = np.kron(np.eye(hor_leng), Bd) |
| E33 = np.kron(np.eye(hor_leng), R_block) |
| E12_inv_E23 = E12_inv @ E23 |
| E23T_E12inv = E23.T @ E12_inv |
|
|
| y_ref = np.vstack([np.random.multivariate_normal(np.random.uniform(20, 80, 2), np.eye(2)*15, 200) for _ in range(8)]) |
| xi, yi = np.mgrid[0:100:50j, 0:100:50j] |
| positions = np.vstack([xi.ravel(), yi.ravel()]) |
| kde = gaussian_kde(y_ref.T) |
| zi = np.reshape(kde(positions).T, xi.shape) |
|
|
| agent_betas = [np.ones(len(y_ref)) / len(y_ref) for _ in range(no_of_agents)] |
| agent_wgt_history = np.zeros((no_of_agents, no_of_agents, len(y_ref))) |
| comm_time, agents = np.zeros((no_of_agents, no_of_agents)), np.zeros((no_of_agents, 8)) |
| trajectories = [[] for _ in range(no_of_agents)] |
|
|
| for n in range(no_of_agents): |
| agents[n, [0, 4]] = np.random.uniform(10, 90, 2) |
|
|
| fig, ax = plt.subplots(figsize=(6, 6)) |
| max_vel = 0.8 |
|
|
| max_steps = 20000 |
| for t in range(1, max_steps+1): |
| for n in range(no_of_agents): |
| trajectories[n].append(agents[n, [0, 4]].copy()) |
| agent_betas[n], delta_w, pos_idx, d_sq = update_weight_local(agents[n, [0, 4]], y_ref, agent_betas[n], alpha_k) |
| agent_wgt_history[n, n] += delta_w |
| comm_time[n, n] = t |
| xg, gamma = hamilton_optimal_control(agents[n, [0, 4]], alpha_k, y_ref, agent_betas[n], pos_idx, d_sq) |
| if xg is not None and gamma > 1e-12: |
| Q_track = (gamma * 250.0) * CTC + Q_base |
| xg_full = np.zeros(8); xg_full[0], xg_full[4] = xg[0], xg[1] |
| F1 = np.tile(Q_track @ xg_full, hor_leng); F2 = np.zeros(40); F2[:8] = -Ad @ agents[n] |
| M = E12_inv_E23 |
| E11_M = np.zeros_like(M) |
| for i in range(hor_leng): E11_M[i*8:(i+1)*8, :] = Q_track @ M[i*8:(i+1)*8, :] |
| lhs = E33 + M.T @ E11_M |
| temp_F2_trans = E12_inv.T @ F2 |
| E11_temp_F2 = np.zeros(40) |
| for i in range(hor_leng): E11_temp_F2[i*8:(i+1)*8] = Q_track @ temp_F2_trans[i*8:(i+1)*8] |
| rhs = -(E23T_E12inv @ F1) + E23.T @ (E12_inv @ E11_temp_F2) |
| u_star = np.linalg.solve(lhs, rhs)[:2] |
| else: |
| u_star = -agents[n, [1, 5]] / T |
| agents[n] = Ad @ agents[n] + Bd @ np.clip(u_star, -1.0, 1.0) |
| agents[n, [0, 4]] = np.clip(agents[n, [0, 4]], 0.1, 99.9) |
| vel = agents[n, [1, 5]]; spd = np.linalg.norm(vel) |
| if spd > max_vel: agents[n, [1, 5]] = (vel / spd) * max_vel |
|
|
| if t % 100 == 0: |
| ax.clear() |
| ax.contour(xi, yi, zi, levels=10, colors='black', linewidths=0.3, alpha=0.3) |
| consensus_rem = np.min(agent_betas, axis=0) |
| consumed = consensus_rem <= 1e-8 |
| ax.scatter(y_ref[~consumed][:,0], y_ref[~consumed][:,1], c='#007BFF', s=2, alpha=0.2) |
| for n in range(no_of_agents): |
| path_data = np.array(trajectories[n]) |
| ax.plot(path_data[:, 0], path_data[:, 1], lw=1, alpha=0.6) |
| ax.plot(agents[n, 0], agents[n, 4], 'o', ms=6) |
| ax.set_xlim(0, 100); ax.set_ylim(0, 100) |
| ax.set_title(f"Coverage Progress: {(1-np.sum(consensus_rem))*100:.1f}%") |
| fig.canvas.draw() |
| image = Image.fromarray(np.array(fig.canvas.renderer.buffer_rgba())).convert("RGB") |
| frames.append(image) |
| if (1-np.sum(consensus_rem)) > 0.95: break |
|
|
| gif_path = "output.gif" |
| frames[0].save(gif_path, save_all=True, append_images=frames[1:], duration=150, loop=0) |
| plt.close(fig) |
| return gif_path |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# 🛸 D2OC: Density-Driven Optimal Control") |
| gr.Markdown(""" |
| ### Interactive Demo for Decentralized Multi-Agent Coverage |
| |
| **Official Publication:** [IEEE Transactions on Systems, Man, and Cybernetics: Systems (DOI: 10.1109/TSMC.2025.3622075)](https://doi.org/10.1109/TSMC.2025.3622075) |
| |
| **Preprint:** [arXiv:2511.12756](https://arxiv.org/abs/2511.12756) |
| """) |
| with gr.Row(): |
| num_agents = gr.Slider(minimum=2, maximum=20, value=8, step=1, label="Number of Agents") |
| bat_life = gr.Slider(minimum=5000, maximum=20000, value=8000, step=1000, label="Task Capacity (Battery Life)") |
| btn = gr.Button("Run Simulation") |
| output_gif = gr.Image(label="Simulation Result (GIF)") |
| btn.click(fn=run_simulation, inputs=[num_agents, bat_life], outputs=output_gif) |
|
|
| demo.launch() |