D2OC's picture
Update app.py
e6d66bf verified
import gradio as gr
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from scipy.stats import gaussian_kde
import os
# 1. Hamiltonian Optimal Control Function
def hamilton_optimal_control(x, alpha_k, z, local_weight, pos_idx, d_sq):
if len(pos_idx) == 0: return None, 0
w_sub = local_weight[pos_idx]
d_wnE = d_sq / (w_sub**2 + 1e-12)
idx_sorted = np.argsort(d_wnE)
sorted_pos_idx = pos_idx[idx_sorted]
sorted_w = local_weight[sorted_pos_idx]
cumsum_w = np.cumsum(sorted_w)
cutoff = np.searchsorted(cumsum_w, alpha_k)
if cutoff >= len(sorted_pos_idx): cutoff = len(sorted_pos_idx) - 1
final_idx = sorted_pos_idx[:cutoff+1]
final_w = sorted_w[:cutoff+1].copy()
if cutoff > 0: final_w[-1] = alpha_k - cumsum_w[cutoff-1]
else: final_w[-1] = alpha_k
y_loc_pos = z[final_idx]
y_loc_wgt = final_w.reshape(-1, 1)
gamma = np.sum(y_loc_wgt)
xg = np.sum(y_loc_wgt * y_loc_pos, axis=0) / (gamma + 1e-12)
return xg, gamma
# 2. Local Weight Update Function
def update_weight_local(x, z, local_weight, alpha_k):
pos_idx = np.where(local_weight > 1e-15)[0]
if len(pos_idx) == 0: return local_weight, np.zeros_like(local_weight), pos_idx, np.array([])
dist_sq = np.sum((z[pos_idx] - x)**2, axis=1)
idx_sort = np.argsort(dist_sq)
sorted_idx = pos_idx[idx_sort]
sorted_w = local_weight[sorted_idx]
cumsum_w = np.cumsum(sorted_w)
cutoff = np.searchsorted(cumsum_w, alpha_k)
weight_dist = np.zeros_like(local_weight)
if cutoff < len(sorted_idx):
take_idx = sorted_idx[:cutoff]
weight_dist[take_idx] = local_weight[take_idx]; local_weight[take_idx] = 0
remainder = alpha_k - (cumsum_w[cutoff-1] if cutoff > 0 else 0)
weight_dist[sorted_idx[cutoff]] = remainder; local_weight[sorted_idx[cutoff]] -= remainder
else:
weight_dist[sorted_idx] = local_weight[sorted_idx]; local_weight[sorted_idx] = 0
local_weight[local_weight < 1e-12] = 0
new_pos_idx = np.where(local_weight > 1e-15)[0]
new_dist_sq = np.sum((z[new_pos_idx] - x)**2, axis=1) if len(new_pos_idx) > 0 else np.array([])
return local_weight, weight_dist, new_pos_idx, new_dist_sq
# 3. Main Simulation Logic
def run_simulation(num_agents, battery_life):
no_of_agents = int(num_agents)
bat_life = int(battery_life)
T, g, comm_range = 0.1, 9.81, 15.0
alpha_k = 1.0 / bat_life
frames = []
Ad_sub = np.array([[1, T, 0, 0], [0, 0.7, -T*g, 0], [0, 0, 1, T], [0, 0, 0, 0.7]])
Ad = np.zeros((8, 8)); Ad[:4, :4] = Ad_sub; Ad[4:, 4:] = Ad_sub
Bd = np.zeros((8, 2)); Bd[3, 0] = T/0.1; Bd[7, 1] = T/0.1
CTC = np.zeros((8, 8)); CTC[0,0] = 1.0; CTC[4,4] = 1.0
Q_base, R_block = np.eye(8) * 0.1, np.eye(2) * 10.0
hor_leng = 5
E12 = np.zeros((40, 40))
for i in range(hor_leng):
E12[i*8:(i+1)*8, i*8:(i+1)*8] = -np.eye(8)
if i < hor_leng - 1: E12[i*8:(i+1)*8, (i+1)*8:(i+2)*8] = Ad.T
E12_inv = np.linalg.inv(E12)
E23 = np.kron(np.eye(hor_leng), Bd)
E33 = np.kron(np.eye(hor_leng), R_block)
E12_inv_E23 = E12_inv @ E23
E23T_E12inv = E23.T @ E12_inv
y_ref = np.vstack([np.random.multivariate_normal(np.random.uniform(20, 80, 2), np.eye(2)*15, 200) for _ in range(8)])
xi, yi = np.mgrid[0:100:50j, 0:100:50j]
positions = np.vstack([xi.ravel(), yi.ravel()])
kde = gaussian_kde(y_ref.T)
zi = np.reshape(kde(positions).T, xi.shape)
agent_betas = [np.ones(len(y_ref)) / len(y_ref) for _ in range(no_of_agents)]
agent_wgt_history = np.zeros((no_of_agents, no_of_agents, len(y_ref)))
comm_time, agents = np.zeros((no_of_agents, no_of_agents)), np.zeros((no_of_agents, 8))
trajectories = [[] for _ in range(no_of_agents)]
for n in range(no_of_agents):
agents[n, [0, 4]] = np.random.uniform(10, 90, 2)
fig, ax = plt.subplots(figsize=(6, 6))
max_vel = 0.8
max_steps = 20000
for t in range(1, max_steps+1):
for n in range(no_of_agents):
trajectories[n].append(agents[n, [0, 4]].copy())
agent_betas[n], delta_w, pos_idx, d_sq = update_weight_local(agents[n, [0, 4]], y_ref, agent_betas[n], alpha_k)
agent_wgt_history[n, n] += delta_w
comm_time[n, n] = t
xg, gamma = hamilton_optimal_control(agents[n, [0, 4]], alpha_k, y_ref, agent_betas[n], pos_idx, d_sq)
if xg is not None and gamma > 1e-12:
Q_track = (gamma * 250.0) * CTC + Q_base
xg_full = np.zeros(8); xg_full[0], xg_full[4] = xg[0], xg[1]
F1 = np.tile(Q_track @ xg_full, hor_leng); F2 = np.zeros(40); F2[:8] = -Ad @ agents[n]
M = E12_inv_E23
E11_M = np.zeros_like(M)
for i in range(hor_leng): E11_M[i*8:(i+1)*8, :] = Q_track @ M[i*8:(i+1)*8, :]
lhs = E33 + M.T @ E11_M
temp_F2_trans = E12_inv.T @ F2
E11_temp_F2 = np.zeros(40)
for i in range(hor_leng): E11_temp_F2[i*8:(i+1)*8] = Q_track @ temp_F2_trans[i*8:(i+1)*8]
rhs = -(E23T_E12inv @ F1) + E23.T @ (E12_inv @ E11_temp_F2)
u_star = np.linalg.solve(lhs, rhs)[:2]
else:
u_star = -agents[n, [1, 5]] / T
agents[n] = Ad @ agents[n] + Bd @ np.clip(u_star, -1.0, 1.0)
agents[n, [0, 4]] = np.clip(agents[n, [0, 4]], 0.1, 99.9)
vel = agents[n, [1, 5]]; spd = np.linalg.norm(vel)
if spd > max_vel: agents[n, [1, 5]] = (vel / spd) * max_vel
if t % 100 == 0:
ax.clear()
ax.contour(xi, yi, zi, levels=10, colors='black', linewidths=0.3, alpha=0.3)
consensus_rem = np.min(agent_betas, axis=0)
consumed = consensus_rem <= 1e-8
ax.scatter(y_ref[~consumed][:,0], y_ref[~consumed][:,1], c='#007BFF', s=2, alpha=0.2)
for n in range(no_of_agents):
path_data = np.array(trajectories[n])
ax.plot(path_data[:, 0], path_data[:, 1], lw=1, alpha=0.6)
ax.plot(agents[n, 0], agents[n, 4], 'o', ms=6)
ax.set_xlim(0, 100); ax.set_ylim(0, 100)
ax.set_title(f"Coverage Progress: {(1-np.sum(consensus_rem))*100:.1f}%")
fig.canvas.draw()
image = Image.fromarray(np.array(fig.canvas.renderer.buffer_rgba())).convert("RGB")
frames.append(image)
if (1-np.sum(consensus_rem)) > 0.95: break
gif_path = "output.gif"
frames[0].save(gif_path, save_all=True, append_images=frames[1:], duration=150, loop=0)
plt.close(fig)
return gif_path
# 4. Gradio UI Configuration
with gr.Blocks() as demo:
gr.Markdown("# 🛸 D2OC: Decentralized Density-Driven Optimal Control for Multi-Robot Systems")
gr.Markdown(f"""
### Official Interactive Demo: Decentralized Density-Driven Optimal Control
This tool visualizes the **D2OC framework**, enabling decentralized multi-agent coordination for **nonuniform area coverage** using **Optimal Transport** and Wasserstein distance.
**GitHub Repository:** <a href="https://github.com/kooktaelee/D2OC" target="_blank">Source Code & Star here ⭐</a>
**Official Publication:** <a href="https://doi.org/10.1109/TSMC.2025.3622075" target="_blank">IEEE Transactions on Systems, Man, and Cybernetics: Systems (DOI: 10.1109/TSMC.2025.3622075)</a>
**Preprint:** <a href="https://arxiv.org/abs/2511.12756" target="_blank">arXiv:2511.12756</a>
""")
with gr.Row():
num_agents = gr.Slider(minimum=2, maximum=20, value=5, step=1, label="Number of Agents")
bat_life = gr.Slider(minimum=5000, maximum=20000, value=7000, step=1000, label="Task Capacity (Battery Life)")
btn = gr.Button("Run Simulation")
output_gif = gr.Image(label="Simulation Result (GIF)")
btn.click(fn=run_simulation, inputs=[num_agents, bat_life], outputs=output_gif)
gr.Markdown("""
### Cite this work
```bibtex
@ARTICLE{seo2025density,
author={Seo, Sungjun and Lee, Kooktae},
journal={IEEE Transactions on Systems, Man, and Cybernetics: Systems},
title={Density-Driven Optimal Control for Efficient and Collaborative Multiagent Nonuniform Coverage},
year={2025},
volume={55},
number={12},
pages={9340-9354},
doi={10.1109/TSMC.2025.3622075}}
```
""")
demo.launch()