|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import folium |
|
|
import tempfile |
|
|
from shapely.geometry import LineString |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def make_primary_mission(): |
|
|
waypoints = [ |
|
|
(0.0, 0.0), |
|
|
(200.0, 50.0), |
|
|
(400.0, 150.0), |
|
|
(600.0, 120.0), |
|
|
(800.0, 200.0), |
|
|
] |
|
|
return {"id": "primary", "waypoints": waypoints, "t0": 0.0, "t1": 300.0} |
|
|
|
|
|
def make_simulated_conflict(): |
|
|
flights = [] |
|
|
flights.append({ |
|
|
"id": "f1", |
|
|
"waypoints": [ |
|
|
(900.0, 300.0), |
|
|
(700.0, 200.0), |
|
|
(450.0, 140.0), |
|
|
(200.0, 0.0) |
|
|
], |
|
|
"times": [0.0, 100.0, 160.0, 260.0] |
|
|
}) |
|
|
flights.append({ |
|
|
"id": "f2", |
|
|
"waypoints": [ |
|
|
(1000.0, -200.0), |
|
|
(1100.0, -300.0) |
|
|
], |
|
|
"times": [0.0, 400.0] |
|
|
}) |
|
|
return flights |
|
|
|
|
|
def make_simulated_no_conflict(): |
|
|
return [{ |
|
|
"id": "g1", |
|
|
"waypoints": [ |
|
|
(1000.0, 1000.0), |
|
|
(1100.0, 1100.0) |
|
|
], |
|
|
"times": [0.0, 400.0] |
|
|
}] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def assign_times_to_primary(waypoints, t0, t1): |
|
|
ls = LineString(waypoints) |
|
|
total = ls.length |
|
|
times = [t0] |
|
|
acc = 0.0 |
|
|
for i in range(1, len(waypoints)): |
|
|
seg = LineString([waypoints[i-1], waypoints[i]]) |
|
|
acc += seg.length |
|
|
frac = acc / total |
|
|
times.append(t0 + frac * (t1 - t0)) |
|
|
return times |
|
|
|
|
|
def position_at_time(points, times, t): |
|
|
if t <= times[0]: |
|
|
return points[0] |
|
|
if t >= times[-1]: |
|
|
return points[-1] |
|
|
for i in range(1, len(times)): |
|
|
if times[i-1] <= t <= times[i]: |
|
|
t0, t1 = times[i-1], times[i] |
|
|
p0, p1 = points[i-1], points[i] |
|
|
alpha = (t - t0)/(t1 - t0) if t1 != t0 else 0.0 |
|
|
x = p0[0] + alpha*(p1[0]-p0[0]) |
|
|
y = p0[1] + alpha*(p1[1]-p0[1]) |
|
|
return (x, y) |
|
|
return points[-1] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def closest_approach(p_pts, p_times, o_pts, o_times, n_samples=200): |
|
|
t0 = max(p_times[0], o_times[0]) |
|
|
t1 = min(p_times[-1], o_times[-1]) |
|
|
if t1 < t0: |
|
|
return None |
|
|
ts = np.linspace(t0, t1, n_samples) |
|
|
min_d = 1e9 |
|
|
best = None |
|
|
for t in ts: |
|
|
p = position_at_time(p_pts, p_times, t) |
|
|
q = position_at_time(o_pts, o_times, t) |
|
|
d = np.hypot(p[0]-q[0], p[1]-q[1]) |
|
|
if d < min_d: |
|
|
min_d = d |
|
|
best = (t, p, q, d) |
|
|
return best |
|
|
|
|
|
def query_mission(primary, others, safety_buffer=30.0): |
|
|
p_pts = primary["waypoints"] |
|
|
p_times = assign_times_to_primary(p_pts, primary["t0"], primary["t1"]) |
|
|
conflicts = [] |
|
|
for o in others: |
|
|
o_pts, o_times = o["waypoints"], o["times"] |
|
|
ca = closest_approach(p_pts, p_times, o_pts, o_times) |
|
|
if ca is None: |
|
|
continue |
|
|
t, ppos, opos, dmin = ca |
|
|
if dmin <= safety_buffer: |
|
|
conflicts.append({ |
|
|
"other_id": o["id"], |
|
|
"time": float(t), |
|
|
"distance": float(dmin), |
|
|
"primary_pos": ppos, |
|
|
"other_pos": opos |
|
|
}) |
|
|
status = "conflict" if conflicts else "clear" |
|
|
return {"status": status, "conflicts": conflicts} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_map(primary, others, conflicts): |
|
|
all_pts = primary["waypoints"][:] |
|
|
for o in others: |
|
|
all_pts.extend(o["waypoints"]) |
|
|
avg_x = np.mean([p[0] for p in all_pts]) |
|
|
avg_y = np.mean([p[1] for p in all_pts]) |
|
|
m = folium.Map(location=[avg_y, avg_x], zoom_start=13, tiles="cartodbpositron") |
|
|
|
|
|
|
|
|
folium.PolyLine( |
|
|
[(y, x) for x, y in primary["waypoints"]], |
|
|
color="black", weight=3, dash_array="5,5", tooltip="Primary Mission" |
|
|
).add_to(m) |
|
|
|
|
|
|
|
|
for o in others: |
|
|
folium.PolyLine( |
|
|
[(y, x) for x, y in o["waypoints"]], |
|
|
color="gray", weight=2, dash_array="5,5", tooltip=o["id"] |
|
|
).add_to(m) |
|
|
|
|
|
|
|
|
for c in conflicts: |
|
|
lat, lon = c["primary_pos"][1], c["primary_pos"][0] |
|
|
folium.CircleMarker( |
|
|
location=[lat, lon], |
|
|
radius=6, |
|
|
color="red", |
|
|
fill=True, |
|
|
fill_opacity=1, |
|
|
tooltip=f"Conflict with {c['other_id']} @ {c['time']:.1f}s" |
|
|
).add_to(m) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode="r+", suffix=".html", delete=False) as f: |
|
|
m.save(f.name) |
|
|
f.seek(0) |
|
|
return f.read() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_scenario(choice): |
|
|
primary = make_primary_mission() |
|
|
others = make_simulated_conflict() if choice == "Conflict Demo" else make_simulated_no_conflict() |
|
|
res = query_mission(primary, others, safety_buffer=30.0) |
|
|
|
|
|
status = res["status"] |
|
|
if status == "clear": |
|
|
text = "✅ Mission is CLEAR, no conflicts." |
|
|
else: |
|
|
text = "⚠️ Conflict detected:\n" |
|
|
for c in res["conflicts"]: |
|
|
text += (f"- With {c['other_id']} at t={c['time']:.1f}s, " |
|
|
f"distance={c['distance']:.1f}m, " |
|
|
f"primary={c['primary_pos']}, other={c['other_pos']}\n") |
|
|
|
|
|
html_map = build_map(primary, others, res["conflicts"]) |
|
|
return text, html_map |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("## 🚁 Drone Mission Deconfliction Demo") |
|
|
scenario = gr.Dropdown( |
|
|
choices=["Conflict Demo","No Conflict Demo"], |
|
|
value="Conflict Demo", |
|
|
label="Select Scenario" |
|
|
) |
|
|
run_btn = gr.Button("Run Check") |
|
|
out_text = gr.Textbox(label="Result", lines=10) |
|
|
out_map = gr.HTML(label="Interactive Map") |
|
|
run_btn.click(run_scenario, inputs=scenario, outputs=[out_text, out_map]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|