InventorsHub commited on
Commit
2f0ff12
·
verified ·
1 Parent(s): 0aeb59d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +321 -321
app.py CHANGED
@@ -1,321 +1,321 @@
1
- import gradio as gr
2
- from pygame import Vector2
3
- import time
4
- import threading
5
- import queue
6
- from simulator_env import StreamableSimulation, SwarmAgent, MyConfig, MyWindow
7
-
8
- import speech_processing
9
- import text_processing
10
- import safety_module
11
- import bt_generator
12
-
13
- from pathlib import Path
14
-
15
- BASE = Path(__file__).parent
16
-
17
- class GradioStreamer:
18
- _instance = None
19
-
20
- def __new__(cls):
21
- if cls._instance is None:
22
- cls._instance = super(GradioStreamer, cls).__new__(cls)
23
- cls._instance.initialized = False
24
- return cls._instance
25
-
26
- def __init__(self):
27
- if not self.initialized:
28
- self.latest_frame = None
29
- self.running = True
30
- self.sim = None
31
- self.sim_thread = None
32
- self.initialized = True
33
- self.quit = False
34
-
35
- def update_frame(self, frame):
36
- self.latest_frame = frame
37
-
38
- def run_simulation(self):
39
- # Instantiate simulation and agents here:
40
-
41
- nest_pos = Vector2(450, 400)
42
- target_pos = Vector2(300, 200)
43
-
44
- agent_images = ["white.png", "green.png", "red circle.png"]
45
- image_paths = [str(BASE / "images" / fname) for fname in agent_images]
46
-
47
-
48
- # agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"]
49
- config = MyConfig(radius=25, visualise_chunks=True, movement_speed=2.0)
50
- self.sim = StreamableSimulation(config=config)
51
- loaded_agent_images = self.sim._load_image(image_paths)
52
- # loaded_agent_images = self.sim._load_image(agent_images_paths)
53
-
54
- # Create agents (each agent builds its own BT in its __init__)
55
- for _ in range(50):
56
- agents_pos = Vector2(450, 400)
57
- agent = SwarmAgent(
58
- images=loaded_agent_images,
59
- simulation=self.sim,
60
- pos=agents_pos,
61
- nest_pos=nest_pos,
62
- target_pos=target_pos
63
- )
64
- self.sim._agents.add(agent)
65
- self.sim._all.add(agent)
66
- # (Optionally spawn obstacles and sites.)
67
- self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst.png"), 350, 50)
68
- self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst (1).png"), 100, 350)
69
-
70
- self.sim.spawn_site(str(BASE / "images" / "rect.png"), 300, 200)
71
- self.sim.spawn_site(str(BASE / "images" / "nest.png"), 450, 400)
72
-
73
-
74
- start_time = time.time() # Record the start time
75
- # while self.sim.running:
76
- # self.sim.tick()
77
- # for agent in self.sim._agents:
78
- # agent.bt.tick_once() # Continuously update BTs
79
- # if not self.sim.frame_queue.empty():
80
- # frame = self.sim.frame_queue.get()
81
- # self.update_frame(frame)
82
- # time.sleep(1/30)
83
-
84
- while self.running:
85
- self.sim.tick()
86
-
87
- if not self.sim.frame_queue.empty():
88
- frame = self.sim.frame_queue.get()
89
- self.update_frame(frame)
90
-
91
- time.sleep(1/30) # Maintain a frame rate of ~30 FPS
92
- # Stop after 1 minute
93
- if time.time() - start_time >= 120:
94
- print("Simulation stopped after 1 minute.")
95
- break
96
-
97
-
98
-
99
- def stream(self):
100
- while True:
101
- if self.sim is not None and self.latest_frame is not None:
102
- yield self.latest_frame
103
- else:
104
- # Optionally, yield a blank image or None.
105
- yield None
106
- time.sleep(1/30)
107
-
108
-
109
- def start_simulation(self):
110
- """Start the simulation, creating a new thread if necessary."""
111
- if not self.sim_thread or not self.sim_thread.is_alive():
112
- self.running = True # Reset running flag
113
- self.quit = False # Reset quit flag
114
- self.latest_frame = None # Clear out the old frame
115
- self.sim_thread = threading.Thread(target=self.run_simulation, daemon=True)
116
- self.sim_thread.start()
117
-
118
-
119
- def clear_frame_queue(self):
120
- if self.sim:
121
- try:
122
- while True:
123
- self.sim.frame_queue.get_nowait()
124
- except queue.Empty:
125
- pass
126
-
127
-
128
-
129
- def stop_simulation(self):
130
- print("Stopping Simulation...")
131
- self.running = False
132
- self.quit = True
133
- if self.sim:
134
- for agent in self.sim._agents:
135
- agent.bt_active = False
136
- self.sim.running = False
137
- self.sim.stop()
138
- self.clear_frame_queue()
139
- self.sim = None
140
- if self.sim_thread and self.sim_thread.is_alive():
141
- self.sim_thread.join(timeout=2)
142
- print("Simulation thread terminated.")
143
- self.latest_frame = None # Clear the displayed frame
144
- print("Simulation stopped successfully.")
145
-
146
-
147
-
148
-
149
-
150
-
151
- def test(temp):
152
- return "test"
153
-
154
- def test_safe(temp, checkbox):
155
- return "Safe"
156
-
157
- def test_LLM_generate_BT(temp):
158
- print(temp)
159
- return None
160
-
161
-
162
-
163
-
164
-
165
- def stop_gradio_interface():
166
- raise Exception("Simulation stopped!")
167
-
168
-
169
- def create_gradio_interface():
170
- streamer = GradioStreamer()
171
-
172
- def on_translate_or_process():
173
- streamer.start_simulation()
174
- return gr.update(visible=True)
175
-
176
- def on_stop():
177
- print("Simulation on_stop")
178
- streamer.stop_simulation()
179
- return gr.update(visible=False)
180
-
181
- behaviors = bt_generator.call_behaviors()
182
- formatted_behaviors = "\n".join(f"- **{name}**: {doc.split('Returns:')[0].strip()}" for name, doc in behaviors.items())
183
- # formatted_behaviors = "Test"
184
-
185
- # Gradio Interface
186
- with gr.Blocks() as demo:
187
- gr.Markdown(
188
- """
189
- # 🐝 **SwarmChat:** Enabling Human–Swarm Interaction and Robot Control via Natural Language
190
- Easily talk to virtual robots, and see the result live.
191
- """
192
- )
193
- gr.Markdown(
194
- """
195
- **How it works**
196
-
197
- 1. Speak or type a task in *any EU language* (e.g. “Find the target, then line up by colour”).
198
- 2. Press **Start** to launch the simulator. Use **Stop** to halt & reset.
199
- 3. SwarmChat translates your command, runs a safety check, and auto-builds a behaviour tree (BT).
200
-
201
- > The BT XML is shown on the right so you can copy / save it for real robots.
202
- """
203
- )
204
- with gr.Tabs():
205
- # Tab for microphone input
206
- with gr.Tab("Microphone Input"):
207
- gr.Markdown("## 🎙️ Voice mode")
208
- gr.Markdown("""
209
- Use your microphone to record audio instructions for the swarm. The system translates them into a robot-executable BT.
210
- """)
211
- with gr.Row():
212
- with gr.Column():
213
- microphone_input = gr.Audio(sources=["microphone"], type="filepath", label="🎙️ Record Audio")
214
- safety_checkbox = gr.Checkbox(label="Turn off Safety Model")
215
- with gr.Column():
216
- output_text_audio = gr.Textbox(label="📄 Translated Instructions to English" )
217
- safty_check_audio = gr.Textbox(label="✅ Safety Check")
218
-
219
-
220
- translate_button_audio = gr.Button("Start")
221
-
222
- simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
223
- stop_button = gr.Button("Stop")
224
- with gr.Row():
225
- with gr.Column():
226
- gr.Markdown(f"""**🛠 Primitive behaviours available.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")
227
-
228
- with gr.Column():
229
- generated_BT_audio = gr.Textbox(label="Generated behavior tree")
230
-
231
- translate_button_audio.click(
232
- fn=speech_processing.translate_audio,
233
- # fn=test,
234
- inputs=microphone_input,
235
- outputs=output_text_audio
236
- ).then(
237
- fn=safety_module.check_safety,
238
- # fn=test_safe,
239
- inputs=[output_text_audio,safety_checkbox],
240
- outputs=safty_check_audio
241
- ).then(
242
- fn=lambda x: x if x == "Safe" else stop_gradio_interface(),
243
- inputs=safty_check_audio,
244
- outputs=None
245
- ).success(
246
- fn=bt_generator.generate_behavior_tree,
247
- # fn=test_LLM_generate_BT,
248
- inputs=output_text_audio,
249
- outputs=generated_BT_audio
250
- ).success(
251
- fn=on_translate_or_process,
252
- outputs=simulation_output
253
- )
254
-
255
- # stop_button.click(fn=on_stop, outputs=simulation_output)
256
- # stop_button.click(fn=on_stop, outputs=simulation_output)#.then(js="window.location.reload()")
257
- stop_button.click(fn=on_stop,outputs=simulation_output)#.then(js="window.location.reload()")
258
- demo.load(fn=streamer.stream, outputs=simulation_output)
259
-
260
- # Tab for text input
261
- with gr.Tab("📝 Text Input"):
262
- gr.Markdown("## 📝 Text mode")
263
- gr.Markdown("""
264
- Enter text-based instructions for the swarm. The system translates them into a robot-executable BT.
265
- """)
266
- with gr.Row():
267
- with gr.Column():
268
- text_input = gr.Textbox(lines=4, placeholder="Enter your instructions here...", label="📝 Input Text")
269
- safety_checkbox_text = gr.Checkbox(label="Turn off Safety Model")
270
- with gr.Column():
271
- output_text_text = gr.Textbox(label="📄 Translated Instructions to English", lines=2)
272
- safty_check_text = gr.Textbox(label="✅ Safety Check")
273
-
274
- process_button_text = gr.Button("Start")
275
-
276
- simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
277
- stop_button = gr.Button("Stop")
278
- with gr.Row():
279
- with gr.Column():
280
- gr.Markdown(f"""**🛠 Primitive behaviours available.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")
281
-
282
- with gr.Column():
283
- generated_BT_text = gr.Textbox(label="Generated behavior tree")
284
-
285
- process_button_text.click(
286
- fn=text_processing.translate_text,
287
- # fn=test,
288
- inputs=text_input,
289
- outputs=output_text_text
290
- ).then(
291
- fn=safety_module.check_safety,
292
- # fn=test_safe,
293
- inputs=[output_text_text,safety_checkbox_text],
294
- outputs=safty_check_text
295
- ).then(
296
- fn=lambda x: x if x == "Safe" else stop_gradio_interface(),
297
- inputs=safty_check_text,
298
- outputs=None
299
- ).success(
300
- fn=bt_generator.generate_behavior_tree,
301
- # fn=test_LLM_generate_BT,
302
- inputs=output_text_text,
303
- outputs=generated_BT_text
304
- ).success(
305
- fn=on_translate_or_process,
306
- outputs=simulation_output
307
- )
308
- stop_button.click(fn=on_stop,outputs=simulation_output)#.then(fn=reload_page,outputs=None ,js="window.location.reload()")
309
- # stop_button.click(fn=on_stop, outputs=simulation_output, js="window.location.reload()")
310
- demo.load(fn=streamer.stream, outputs=simulation_output)
311
-
312
- return demo
313
-
314
- if __name__ == "__main__":
315
- demo = create_gradio_interface()
316
- try:
317
- demo.launch(server_port=7860, server_name="0.0.0.0")
318
- finally:
319
- streamer = GradioStreamer()
320
- streamer.stop_simulation()
321
-
 
1
+ import gradio as gr
2
+ from pygame import Vector2
3
+ import time
4
+ import threading
5
+ import queue
6
+ from simulator_env import StreamableSimulation, SwarmAgent, MyConfig, MyWindow
7
+
8
+ import speech_processing
9
+ import text_processing
10
+ import safety_module
11
+ import bt_generator
12
+
13
+ from pathlib import Path
14
+
15
+ BASE = Path(__file__).parent
16
+
17
+ class GradioStreamer:
18
+ _instance = None
19
+
20
+ def __new__(cls):
21
+ if cls._instance is None:
22
+ cls._instance = super(GradioStreamer, cls).__new__(cls)
23
+ cls._instance.initialized = False
24
+ return cls._instance
25
+
26
+ def __init__(self):
27
+ if not self.initialized:
28
+ self.latest_frame = None
29
+ self.running = True
30
+ self.sim = None
31
+ self.sim_thread = None
32
+ self.initialized = True
33
+ self.quit = False
34
+
35
+ def update_frame(self, frame):
36
+ self.latest_frame = frame
37
+
38
+ def run_simulation(self):
39
+ # Instantiate simulation and agents here:
40
+
41
+ nest_pos = Vector2(450, 400)
42
+ target_pos = Vector2(300, 200)
43
+
44
+ agent_images = ["white.png", "green.png", "red circle.png"]
45
+ image_paths = [str(BASE / "images" / fname) for fname in agent_images]
46
+
47
+
48
+ # agent_images_paths = ["./images/white.png", "./images/green.png", "./images/red circle.png"]
49
+ config = MyConfig(radius=25, visualise_chunks=True, movement_speed=2.0)
50
+ self.sim = StreamableSimulation(config=config)
51
+ loaded_agent_images = self.sim._load_image(image_paths)
52
+ # loaded_agent_images = self.sim._load_image(agent_images_paths)
53
+
54
+ # Create agents (each agent builds its own BT in its __init__)
55
+ for _ in range(50):
56
+ agents_pos = Vector2(450, 400)
57
+ agent = SwarmAgent(
58
+ images=loaded_agent_images,
59
+ simulation=self.sim,
60
+ pos=agents_pos,
61
+ nest_pos=nest_pos,
62
+ target_pos=target_pos
63
+ )
64
+ self.sim._agents.add(agent)
65
+ self.sim._all.add(agent)
66
+ # (Optionally spawn obstacles and sites.)
67
+ self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst.png"), 350, 50)
68
+ self.sim.spawn_obstacle(str(BASE / "images" / "rect_obst (1).png"), 100, 350)
69
+
70
+ self.sim.spawn_site(str(BASE / "images" / "rect.png"), 300, 200)
71
+ self.sim.spawn_site(str(BASE / "images" / "nest.png"), 450, 400)
72
+
73
+
74
+ start_time = time.time() # Record the start time
75
+ # while self.sim.running:
76
+ # self.sim.tick()
77
+ # for agent in self.sim._agents:
78
+ # agent.bt.tick_once() # Continuously update BTs
79
+ # if not self.sim.frame_queue.empty():
80
+ # frame = self.sim.frame_queue.get()
81
+ # self.update_frame(frame)
82
+ # time.sleep(1/30)
83
+
84
+ while self.running:
85
+ self.sim.tick()
86
+
87
+ if not self.sim.frame_queue.empty():
88
+ frame = self.sim.frame_queue.get()
89
+ self.update_frame(frame)
90
+
91
+ time.sleep(1/30) # Maintain a frame rate of ~30 FPS
92
+ # Stop after 1 minute
93
+ if time.time() - start_time >= 120:
94
+ print("Simulation stopped after 1 minute.")
95
+ break
96
+
97
+
98
+
99
+ def stream(self):
100
+ while True:
101
+ if self.sim is not None and self.latest_frame is not None:
102
+ yield self.latest_frame
103
+ else:
104
+ # Optionally, yield a blank image or None.
105
+ yield None
106
+ time.sleep(1/30)
107
+
108
+
109
+ def start_simulation(self):
110
+ """Start the simulation, creating a new thread if necessary."""
111
+ if not self.sim_thread or not self.sim_thread.is_alive():
112
+ self.running = True # Reset running flag
113
+ self.quit = False # Reset quit flag
114
+ self.latest_frame = None # Clear out the old frame
115
+ self.sim_thread = threading.Thread(target=self.run_simulation, daemon=True)
116
+ self.sim_thread.start()
117
+
118
+
119
+ def clear_frame_queue(self):
120
+ if self.sim:
121
+ try:
122
+ while True:
123
+ self.sim.frame_queue.get_nowait()
124
+ except queue.Empty:
125
+ pass
126
+
127
+
128
+
129
+ def stop_simulation(self):
130
+ print("Stopping Simulation...")
131
+ self.running = False
132
+ self.quit = True
133
+ if self.sim:
134
+ for agent in self.sim._agents:
135
+ agent.bt_active = False
136
+ self.sim.running = False
137
+ self.sim.stop()
138
+ self.clear_frame_queue()
139
+ self.sim = None
140
+ if self.sim_thread and self.sim_thread.is_alive():
141
+ self.sim_thread.join(timeout=2)
142
+ print("Simulation thread terminated.")
143
+ self.latest_frame = None # Clear the displayed frame
144
+ print("Simulation stopped successfully.")
145
+
146
+
147
+
148
+
149
+
150
+
151
+ def test(temp):
152
+ return "test"
153
+
154
+ def test_safe(temp, checkbox):
155
+ return "Safe"
156
+
157
+ def test_LLM_generate_BT(temp):
158
+ print(temp)
159
+ return None
160
+
161
+
162
+
163
+
164
+
165
+ def stop_gradio_interface():
166
+ raise Exception("Simulation stopped!")
167
+
168
+
169
+ def create_gradio_interface():
170
+ streamer = GradioStreamer()
171
+
172
+ def on_translate_or_process():
173
+ streamer.start_simulation()
174
+ return gr.update(visible=True)
175
+
176
+ def on_stop():
177
+ print("Simulation on_stop")
178
+ streamer.stop_simulation()
179
+ return gr.update(visible=False)
180
+
181
+ behaviors = bt_generator.call_behaviors()
182
+ formatted_behaviors = "\n".join(f"- **{name}**: {doc.split('Returns:')[0].strip()}" for name, doc in behaviors.items())
183
+ # formatted_behaviors = "Test"
184
+
185
+ # Gradio Interface
186
+ with gr.Blocks() as demo:
187
+ gr.Markdown(
188
+ """
189
+ # 🐝 **SwarmChat:** Enabling Human–Swarm Interaction and Robot Control via Natural Language
190
+ Easily talk to virtual robots, and see the result live.
191
+ """
192
+ )
193
+ gr.Markdown(
194
+ """
195
+ **How it works**
196
+
197
+ 1. Speak or type a task in *any EU language* (e.g. “Find the target, then line up by colour”).
198
+ 2. Press **Start** to launch the simulator. Use **Stop** to halt & reset.
199
+ 3. SwarmChat translates your command, runs a safety check, and auto-builds a behaviour tree (BT).
200
+
201
+ > The BT XML is shown on the right so you can copy / save it for real robots.
202
+ """
203
+ )
204
+ with gr.Tabs():
205
+ # Tab for microphone input
206
+ with gr.Tab("Microphone Input"):
207
+ gr.Markdown("## 🎙️ Voice mode")
208
+ gr.Markdown("""
209
+ Use your microphone to record audio instructions for the swarm. The system translates them into a robot-executable BT.
210
+ """)
211
+ with gr.Row():
212
+ with gr.Column():
213
+ microphone_input = gr.Audio(sources=["microphone"], type="filepath", label="🎙️ Record Audio")
214
+ safety_checkbox = gr.Checkbox(label="Turn off Safety Model")
215
+ with gr.Column():
216
+ output_text_audio = gr.Textbox(label="📄 Translated Instructions to English" )
217
+ safty_check_audio = gr.Textbox(label="✅ Safety Check")
218
+
219
+
220
+ translate_button_audio = gr.Button("Start")
221
+
222
+ simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
223
+ stop_button = gr.Button("Stop")
224
+ with gr.Row():
225
+ with gr.Column():
226
+ gr.Markdown(f"""**🛠 Primitive behaviours available.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")
227
+
228
+ with gr.Column():
229
+ generated_BT_audio = gr.Textbox(label="Generated behavior tree")
230
+
231
+ translate_button_audio.click(
232
+ fn=speech_processing.translate_audio,
233
+ # fn=test,
234
+ inputs=microphone_input,
235
+ outputs=output_text_audio
236
+ ).then(
237
+ fn=safety_module.check_safety,
238
+ # fn=test_safe,
239
+ inputs=[output_text_audio,safety_checkbox],
240
+ outputs=safty_check_audio
241
+ ).then(
242
+ fn=lambda x: x if x == "Safe" else stop_gradio_interface(),
243
+ inputs=safty_check_audio,
244
+ outputs=None
245
+ ).success(
246
+ fn=bt_generator.generate_behavior_tree,
247
+ # fn=test_LLM_generate_BT,
248
+ inputs=output_text_audio,
249
+ outputs=generated_BT_audio
250
+ ).success(
251
+ fn=on_translate_or_process,
252
+ outputs=simulation_output
253
+ )
254
+
255
+ # stop_button.click(fn=on_stop, outputs=simulation_output)
256
+ # stop_button.click(fn=on_stop, outputs=simulation_output)#.then(js="window.location.reload()")
257
+ stop_button.click(fn=on_stop,outputs=simulation_output)#.then(js="window.location.reload()")
258
+ demo.load(fn=streamer.stream, outputs=simulation_output)
259
+
260
+ # Tab for text input
261
+ with gr.Tab("📝 Text Input"):
262
+ gr.Markdown("## 📝 Text mode")
263
+ gr.Markdown("""
264
+ Enter text-based instructions for the swarm. The system translates them into a robot-executable BT.
265
+ """)
266
+ with gr.Row():
267
+ with gr.Column():
268
+ text_input = gr.Textbox(lines=4, placeholder="Enter your instructions here...", label="📝 Input Text")
269
+ safety_checkbox_text = gr.Checkbox(label="Turn off Safety Model")
270
+ with gr.Column():
271
+ output_text_text = gr.Textbox(label="📄 Translated Instructions to English", lines=2)
272
+ safty_check_text = gr.Textbox(label="✅ Safety Check")
273
+
274
+ process_button_text = gr.Button("Start")
275
+
276
+ simulation_output = gr.Image(label="Live Stream", streaming=True, visible=False)
277
+ stop_button = gr.Button("Stop")
278
+ with gr.Row():
279
+ with gr.Column():
280
+ gr.Markdown(f"""**🛠 Primitive behaviours available.**\n{formatted_behaviors}\n\nThese are the only low-level actions/conditions the model is allowed to use yet.""")
281
+
282
+ with gr.Column():
283
+ generated_BT_text = gr.Textbox(label="Generated behavior tree")
284
+
285
+ process_button_text.click(
286
+ fn=text_processing.translate_text,
287
+ # fn=test,
288
+ inputs=text_input,
289
+ outputs=output_text_text
290
+ ).then(
291
+ fn=safety_module.check_safety,
292
+ # fn=test_safe,
293
+ inputs=[output_text_text,safety_checkbox_text],
294
+ outputs=safty_check_text
295
+ ).then(
296
+ fn=lambda x: x if x == "Safe" else stop_gradio_interface(),
297
+ inputs=safty_check_text,
298
+ outputs=None
299
+ ).success(
300
+ fn=bt_generator.generate_behavior_tree,
301
+ # fn=test_LLM_generate_BT,
302
+ inputs=output_text_text,
303
+ outputs=generated_BT_text
304
+ ).success(
305
+ fn=on_translate_or_process,
306
+ outputs=simulation_output
307
+ )
308
+ stop_button.click(fn=on_stop,outputs=simulation_output)#.then(fn=reload_page,outputs=None ,js="window.location.reload()")
309
+ # stop_button.click(fn=on_stop, outputs=simulation_output, js="window.location.reload()")
310
+ demo.load(fn=streamer.stream, outputs=simulation_output)
311
+
312
+ return demo
313
+
314
+ if __name__ == "__main__":
315
+ demo = create_gradio_interface()
316
+ try:
317
+ demo.launch(server_port=7860, server_name="0.0.0.0", share=True)
318
+ finally:
319
+ streamer = GradioStreamer()
320
+ streamer.stop_simulation()
321
+