flysafe / app.py
gk2410's picture
Update app.py
591cc99 verified
import gradio as gr
import numpy as np
import folium
import tempfile
from shapely.geometry import LineString
# -------------------------------
# Data: Missions and simulated flights
# -------------------------------
def make_primary_mission():
waypoints = [
(0.0, 0.0),
(200.0, 50.0),
(400.0, 150.0),
(600.0, 120.0),
(800.0, 200.0),
]
return {"id": "primary", "waypoints": waypoints, "t0": 0.0, "t1": 300.0}
def make_simulated_conflict():
flights = []
flights.append({
"id": "f1",
"waypoints": [
(900.0, 300.0),
(700.0, 200.0),
(450.0, 140.0),
(200.0, 0.0)
],
"times": [0.0, 100.0, 160.0, 260.0]
})
flights.append({
"id": "f2",
"waypoints": [
(1000.0, -200.0),
(1100.0, -300.0)
],
"times": [0.0, 400.0]
})
return flights
def make_simulated_no_conflict():
return [{
"id": "g1",
"waypoints": [
(1000.0, 1000.0),
(1100.0, 1100.0)
],
"times": [0.0, 400.0]
}]
# -------------------------------
# Helpers: assign times and interpolate
# -------------------------------
def assign_times_to_primary(waypoints, t0, t1):
ls = LineString(waypoints)
total = ls.length
times = [t0]
acc = 0.0
for i in range(1, len(waypoints)):
seg = LineString([waypoints[i-1], waypoints[i]])
acc += seg.length
frac = acc / total
times.append(t0 + frac * (t1 - t0))
return times
def position_at_time(points, times, t):
if t <= times[0]:
return points[0]
if t >= times[-1]:
return points[-1]
for i in range(1, len(times)):
if times[i-1] <= t <= times[i]:
t0, t1 = times[i-1], times[i]
p0, p1 = points[i-1], points[i]
alpha = (t - t0)/(t1 - t0) if t1 != t0 else 0.0
x = p0[0] + alpha*(p1[0]-p0[0])
y = p0[1] + alpha*(p1[1]-p0[1])
return (x, y)
return points[-1]
# -------------------------------
# Core conflict check
# -------------------------------
def closest_approach(p_pts, p_times, o_pts, o_times, n_samples=200):
t0 = max(p_times[0], o_times[0])
t1 = min(p_times[-1], o_times[-1])
if t1 < t0:
return None
ts = np.linspace(t0, t1, n_samples)
min_d = 1e9
best = None
for t in ts:
p = position_at_time(p_pts, p_times, t)
q = position_at_time(o_pts, o_times, t)
d = np.hypot(p[0]-q[0], p[1]-q[1])
if d < min_d:
min_d = d
best = (t, p, q, d)
return best
def query_mission(primary, others, safety_buffer=30.0):
p_pts = primary["waypoints"]
p_times = assign_times_to_primary(p_pts, primary["t0"], primary["t1"])
conflicts = []
for o in others:
o_pts, o_times = o["waypoints"], o["times"]
ca = closest_approach(p_pts, p_times, o_pts, o_times)
if ca is None:
continue
t, ppos, opos, dmin = ca
if dmin <= safety_buffer:
conflicts.append({
"other_id": o["id"],
"time": float(t),
"distance": float(dmin),
"primary_pos": ppos,
"other_pos": opos
})
status = "conflict" if conflicts else "clear"
return {"status": status, "conflicts": conflicts}
# -------------------------------
# Visualization: Folium interactive map
# -------------------------------
def build_map(primary, others, conflicts):
all_pts = primary["waypoints"][:]
for o in others:
all_pts.extend(o["waypoints"])
avg_x = np.mean([p[0] for p in all_pts])
avg_y = np.mean([p[1] for p in all_pts])
m = folium.Map(location=[avg_y, avg_x], zoom_start=13, tiles="cartodbpositron")
# Primary drone route (black dotted line)
folium.PolyLine(
[(y, x) for x, y in primary["waypoints"]],
color="black", weight=3, dash_array="5,5", tooltip="Primary Mission"
).add_to(m)
# Other drones (gray dotted lines)
for o in others:
folium.PolyLine(
[(y, x) for x, y in o["waypoints"]],
color="gray", weight=2, dash_array="5,5", tooltip=o["id"]
).add_to(m)
# Conflict points
for c in conflicts:
lat, lon = c["primary_pos"][1], c["primary_pos"][0]
folium.CircleMarker(
location=[lat, lon],
radius=6,
color="red",
fill=True,
fill_opacity=1,
tooltip=f"Conflict with {c['other_id']} @ {c['time']:.1f}s"
).add_to(m)
# Save to temp HTML and return string
with tempfile.NamedTemporaryFile(mode="r+", suffix=".html", delete=False) as f:
m.save(f.name)
f.seek(0)
return f.read()
# -------------------------------
# Gradio Interface
# -------------------------------
def run_scenario(choice):
primary = make_primary_mission()
others = make_simulated_conflict() if choice == "Conflict Demo" else make_simulated_no_conflict()
res = query_mission(primary, others, safety_buffer=30.0)
status = res["status"]
if status == "clear":
text = "✅ Mission is CLEAR, no conflicts."
else:
text = "⚠️ Conflict detected:\n"
for c in res["conflicts"]:
text += (f"- With {c['other_id']} at t={c['time']:.1f}s, "
f"distance={c['distance']:.1f}m, "
f"primary={c['primary_pos']}, other={c['other_pos']}\n")
html_map = build_map(primary, others, res["conflicts"])
return text, html_map
with gr.Blocks() as demo:
gr.Markdown("## 🚁 Drone Mission Deconfliction Demo")
scenario = gr.Dropdown(
choices=["Conflict Demo","No Conflict Demo"],
value="Conflict Demo",
label="Select Scenario"
)
run_btn = gr.Button("Run Check")
out_text = gr.Textbox(label="Result", lines=10)
out_map = gr.HTML(label="Interactive Map")
run_btn.click(run_scenario, inputs=scenario, outputs=[out_text, out_map])
if __name__ == "__main__":
demo.launch()