import gradio as gr import numpy as np import matplotlib.pyplot as plt from PIL import Image from scipy.stats import gaussian_kde import os # 1. Hamiltonian Optimal Control Function def hamilton_optimal_control(x, alpha_k, z, local_weight, pos_idx, d_sq): if len(pos_idx) == 0: return None, 0 w_sub = local_weight[pos_idx] d_wnE = d_sq / (w_sub**2 + 1e-12) idx_sorted = np.argsort(d_wnE) sorted_pos_idx = pos_idx[idx_sorted] sorted_w = local_weight[sorted_pos_idx] cumsum_w = np.cumsum(sorted_w) cutoff = np.searchsorted(cumsum_w, alpha_k) if cutoff >= len(sorted_pos_idx): cutoff = len(sorted_pos_idx) - 1 final_idx = sorted_pos_idx[:cutoff+1] final_w = sorted_w[:cutoff+1].copy() if cutoff > 0: final_w[-1] = alpha_k - cumsum_w[cutoff-1] else: final_w[-1] = alpha_k y_loc_pos = z[final_idx] y_loc_wgt = final_w.reshape(-1, 1) gamma = np.sum(y_loc_wgt) xg = np.sum(y_loc_wgt * y_loc_pos, axis=0) / (gamma + 1e-12) return xg, gamma # 2. Local Weight Update Function def update_weight_local(x, z, local_weight, alpha_k): pos_idx = np.where(local_weight > 1e-15)[0] if len(pos_idx) == 0: return local_weight, np.zeros_like(local_weight), pos_idx, np.array([]) dist_sq = np.sum((z[pos_idx] - x)**2, axis=1) idx_sort = np.argsort(dist_sq) sorted_idx = pos_idx[idx_sort] sorted_w = local_weight[sorted_idx] cumsum_w = np.cumsum(sorted_w) cutoff = np.searchsorted(cumsum_w, alpha_k) weight_dist = np.zeros_like(local_weight) if cutoff < len(sorted_idx): take_idx = sorted_idx[:cutoff] weight_dist[take_idx] = local_weight[take_idx]; local_weight[take_idx] = 0 remainder = alpha_k - (cumsum_w[cutoff-1] if cutoff > 0 else 0) weight_dist[sorted_idx[cutoff]] = remainder; local_weight[sorted_idx[cutoff]] -= remainder else: weight_dist[sorted_idx] = local_weight[sorted_idx]; local_weight[sorted_idx] = 0 local_weight[local_weight < 1e-12] = 0 new_pos_idx = np.where(local_weight > 1e-15)[0] new_dist_sq = np.sum((z[new_pos_idx] - x)**2, axis=1) if len(new_pos_idx) > 0 else np.array([]) return local_weight, weight_dist, new_pos_idx, new_dist_sq # 3. Main Simulation Logic def run_simulation(num_agents, battery_life): no_of_agents = int(num_agents) bat_life = int(battery_life) T, g, comm_range = 0.1, 9.81, 15.0 alpha_k = 1.0 / bat_life frames = [] Ad_sub = np.array([[1, T, 0, 0], [0, 0.7, -T*g, 0], [0, 0, 1, T], [0, 0, 0, 0.7]]) Ad = np.zeros((8, 8)); Ad[:4, :4] = Ad_sub; Ad[4:, 4:] = Ad_sub Bd = np.zeros((8, 2)); Bd[3, 0] = T/0.1; Bd[7, 1] = T/0.1 CTC = np.zeros((8, 8)); CTC[0,0] = 1.0; CTC[4,4] = 1.0 Q_base, R_block = np.eye(8) * 0.1, np.eye(2) * 10.0 hor_leng = 5 E12 = np.zeros((40, 40)) for i in range(hor_leng): E12[i*8:(i+1)*8, i*8:(i+1)*8] = -np.eye(8) if i < hor_leng - 1: E12[i*8:(i+1)*8, (i+1)*8:(i+2)*8] = Ad.T E12_inv = np.linalg.inv(E12) E23 = np.kron(np.eye(hor_leng), Bd) E33 = np.kron(np.eye(hor_leng), R_block) E12_inv_E23 = E12_inv @ E23 E23T_E12inv = E23.T @ E12_inv y_ref = np.vstack([np.random.multivariate_normal(np.random.uniform(20, 80, 2), np.eye(2)*15, 200) for _ in range(8)]) xi, yi = np.mgrid[0:100:50j, 0:100:50j] positions = np.vstack([xi.ravel(), yi.ravel()]) kde = gaussian_kde(y_ref.T) zi = np.reshape(kde(positions).T, xi.shape) agent_betas = [np.ones(len(y_ref)) / len(y_ref) for _ in range(no_of_agents)] agent_wgt_history = np.zeros((no_of_agents, no_of_agents, len(y_ref))) comm_time, agents = np.zeros((no_of_agents, no_of_agents)), np.zeros((no_of_agents, 8)) trajectories = [[] for _ in range(no_of_agents)] for n in range(no_of_agents): agents[n, [0, 4]] = np.random.uniform(10, 90, 2) fig, ax = plt.subplots(figsize=(6, 6)) max_vel = 0.8 max_steps = 20000 for t in range(1, max_steps+1): for n in range(no_of_agents): trajectories[n].append(agents[n, [0, 4]].copy()) agent_betas[n], delta_w, pos_idx, d_sq = update_weight_local(agents[n, [0, 4]], y_ref, agent_betas[n], alpha_k) agent_wgt_history[n, n] += delta_w comm_time[n, n] = t xg, gamma = hamilton_optimal_control(agents[n, [0, 4]], alpha_k, y_ref, agent_betas[n], pos_idx, d_sq) if xg is not None and gamma > 1e-12: Q_track = (gamma * 250.0) * CTC + Q_base xg_full = np.zeros(8); xg_full[0], xg_full[4] = xg[0], xg[1] F1 = np.tile(Q_track @ xg_full, hor_leng); F2 = np.zeros(40); F2[:8] = -Ad @ agents[n] M = E12_inv_E23 E11_M = np.zeros_like(M) for i in range(hor_leng): E11_M[i*8:(i+1)*8, :] = Q_track @ M[i*8:(i+1)*8, :] lhs = E33 + M.T @ E11_M temp_F2_trans = E12_inv.T @ F2 E11_temp_F2 = np.zeros(40) for i in range(hor_leng): E11_temp_F2[i*8:(i+1)*8] = Q_track @ temp_F2_trans[i*8:(i+1)*8] rhs = -(E23T_E12inv @ F1) + E23.T @ (E12_inv @ E11_temp_F2) u_star = np.linalg.solve(lhs, rhs)[:2] else: u_star = -agents[n, [1, 5]] / T agents[n] = Ad @ agents[n] + Bd @ np.clip(u_star, -1.0, 1.0) agents[n, [0, 4]] = np.clip(agents[n, [0, 4]], 0.1, 99.9) vel = agents[n, [1, 5]]; spd = np.linalg.norm(vel) if spd > max_vel: agents[n, [1, 5]] = (vel / spd) * max_vel if t % 100 == 0: ax.clear() ax.contour(xi, yi, zi, levels=10, colors='black', linewidths=0.3, alpha=0.3) consensus_rem = np.min(agent_betas, axis=0) consumed = consensus_rem <= 1e-8 ax.scatter(y_ref[~consumed][:,0], y_ref[~consumed][:,1], c='#007BFF', s=2, alpha=0.2) for n in range(no_of_agents): path_data = np.array(trajectories[n]) ax.plot(path_data[:, 0], path_data[:, 1], lw=1, alpha=0.6) ax.plot(agents[n, 0], agents[n, 4], 'o', ms=6) ax.set_xlim(0, 100); ax.set_ylim(0, 100) ax.set_title(f"Coverage Progress: {(1-np.sum(consensus_rem))*100:.1f}%") fig.canvas.draw() image = Image.fromarray(np.array(fig.canvas.renderer.buffer_rgba())).convert("RGB") frames.append(image) if (1-np.sum(consensus_rem)) > 0.95: break gif_path = "output.gif" frames[0].save(gif_path, save_all=True, append_images=frames[1:], duration=150, loop=0) plt.close(fig) return gif_path # 4. Gradio UI Configuration with gr.Blocks() as demo: gr.Markdown("# 🛸 D2OC: Decentralized Density-Driven Optimal Control for Multi-Robot Systems") gr.Markdown(f""" ### Official Interactive Demo: Decentralized Density-Driven Optimal Control This tool visualizes the **D2OC framework**, enabling decentralized multi-agent coordination for **nonuniform area coverage** using **Optimal Transport** and Wasserstein distance. **GitHub Repository:** Source Code & Star here ⭐ **Official Publication:** IEEE Transactions on Systems, Man, and Cybernetics: Systems (DOI: 10.1109/TSMC.2025.3622075) **Preprint:** arXiv:2511.12756 """) with gr.Row(): num_agents = gr.Slider(minimum=2, maximum=20, value=5, step=1, label="Number of Agents") bat_life = gr.Slider(minimum=5000, maximum=20000, value=7000, step=1000, label="Task Capacity (Battery Life)") btn = gr.Button("Run Simulation") output_gif = gr.Image(label="Simulation Result (GIF)") btn.click(fn=run_simulation, inputs=[num_agents, bat_life], outputs=output_gif) gr.Markdown(""" ### Cite this work ```bibtex @ARTICLE{seo2025density, author={Seo, Sungjun and Lee, Kooktae}, journal={IEEE Transactions on Systems, Man, and Cybernetics: Systems}, title={Density-Driven Optimal Control for Efficient and Collaborative Multiagent Nonuniform Coverage}, year={2025}, volume={55}, number={12}, pages={9340-9354}, doi={10.1109/TSMC.2025.3622075}} ``` """) demo.launch()