sgbaird commited on
Commit
aa54fc0
·
1 Parent(s): d14e0f6

create simple starter app.py

Browse files
Files changed (1) hide show
  1. app.py +4 -509
app.py CHANGED
@@ -1,514 +1,9 @@
1
  import gradio as gr
2
- from queue import Queue
3
- import threading
4
- import time
5
- import paho.mqtt.client as mqtt
6
- import json
7
- import secrets
8
- from DB_utls import find_unused_wells, update_used_wells, save_result, get_student_quota, decrement_student_quota
9
- import os
10
 
11
- # NOTE: New global dict to store tasks keyed by (student_id, experiment_id)
12
- tasks_dict = {}
13
 
14
- task_queue = Queue()
15
- result_queue = Queue()
16
- current_task = None
17
- sensor_results = None
18
- queue_counter = task_queue.qsize()
19
 
20
 
21
-
22
- MQTT_BROKER = os.getenv("MQTT_BROKER")
23
- MQTT_PORT = int(os.getenv("MQTT_PORT"))
24
- MQTT_USERNAME = os.getenv("MQTT_USERNAME")
25
- MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
26
-
27
-
28
- OT2_SERIAL = "OT2CEP20240218R04"
29
- PICO_ID = "e66130100f895134"
30
-
31
- OT2_COMMAND_TOPIC = f"command/ot2/{OT2_SERIAL}/pipette"
32
- OT2_STATUS_TOPIC = f"status/ot2/{OT2_SERIAL}/complete"
33
- SENSOR_COMMAND_TOPIC = f"command/picow/{PICO_ID}/as7341/read"
34
- SENSOR_DATA_TOPIC = f"color-mixing/picow/{PICO_ID}/as7341"
35
-
36
-
37
- def check_student_quota(student_id):
38
- """Check student's remaining experiment quota"""
39
- student_quota = get_student_quota(student_id)
40
- return student_quota
41
-
42
- def validate_ryb_input(R, Y, B):
43
- """Validate RYB input volumes"""
44
- total = R + Y + B
45
- if total > 300:
46
- return {
47
- "is_valid": False,
48
- "message": f"Total volume cannot exceed 300 µL. Current total: {total} µL."
49
- }
50
- return {
51
- "is_valid": True,
52
- "message": f"Current total: {total} µL."
53
- }
54
-
55
-
56
- mqtt_client = mqtt.Client()
57
- mqtt_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
58
- mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
59
-
60
- def on_connect(client, userdata, flags, rc):
61
- print(f"Connected to MQTT Broker with result code {rc}")
62
- client.subscribe([(OT2_STATUS_TOPIC, 2), (SENSOR_DATA_TOPIC, 2)])
63
-
64
- def on_message(client, userdata, msg):
65
- global current_task, sensor_results
66
- try:
67
- payload = json.loads(msg.payload.decode("utf-8"))
68
- if msg.topic == OT2_STATUS_TOPIC:
69
- handle_sensor_status(payload)
70
- elif msg.topic == SENSOR_DATA_TOPIC:
71
- print("Sensor data received")
72
- sensor_results = payload
73
- mqtt_client.publish(
74
- OT2_COMMAND_TOPIC,
75
- json.dumps({
76
- "command": {"sensor_status": "read"},
77
- "experiment_id": payload["experiment_id"],
78
- "session_id": payload["session_id"]
79
- }),
80
- )
81
- except Exception as e:
82
- print(f"Error processing MQTT message: {e}")
83
-
84
- mqtt_client.on_connect = on_connect
85
- mqtt_client.on_message = on_message
86
- mqtt_client.connect(MQTT_BROKER, MQTT_PORT)
87
- mqtt_client.loop_start()
88
-
89
- def handle_sensor_status(payload):
90
- global current_task, sensor_results
91
- if "in_place" in json.dumps(payload):
92
- mqtt_client.publish(
93
- SENSOR_COMMAND_TOPIC,
94
- json.dumps({
95
- "command": {
96
- "R": current_task["R"],
97
- "Y": current_task["Y"],
98
- "B": current_task["B"],
99
- "well": current_task["well"]
100
- },
101
- "experiment_id": current_task["experiment_id"],
102
- "session_id": current_task["session_id"]
103
- })
104
- )
105
- elif payload["status"]["sensor_status"] == "charging":
106
-
107
- experiment_result = {
108
- "Status": "Complete",
109
- "Message": "Experiment completed successfully!",
110
- "Student ID": current_task["session_id"],
111
- "Command": {
112
- "R": current_task["R"],
113
- "Y": current_task["Y"],
114
- "B": current_task["B"],
115
- "well": current_task["well"],
116
- },
117
- "Sensor Data": sensor_results["sensor_data"],
118
- "Experiment ID": current_task["experiment_id"]
119
- }
120
- # Store full result in result queue
121
- result_queue.put(experiment_result)
122
-
123
- # Create a version of experiment_result without "Status" and "Message" for database storage
124
- db_data = {key: experiment_result[key] for key in experiment_result if key not in ["Status", "Message"]}
125
-
126
- save_result(db_data)
127
-
128
- current_task = None
129
-
130
-
131
- def task_processor():
132
- """
133
- Background thread that processes tasks one by one.
134
- """
135
- global current_task, queue_counter
136
- task_start_time = None
137
- TIMEOUT_SECONDS = 165 # 2min45s
138
-
139
- while True:
140
- if current_task:
141
- # Check for timeout
142
- if task_start_time and (time.time() - task_start_time > TIMEOUT_SECONDS):
143
- print("sending timeout message to OT-2")
144
- mqtt_client.publish(
145
- OT2_COMMAND_TOPIC,
146
- json.dumps({
147
- "command": {"sensor_status": "sensor_timeout"},
148
- "experiment_id": current_task["experiment_id"],
149
- "session_id": current_task["session_id"]
150
- }),
151
- )
152
- result_queue.put({
153
- "Status": "Error",
154
- "Message": "Experiment timed out",
155
- "Student ID": current_task["session_id"],
156
- "Command": {
157
- "R": current_task["R"],
158
- "Y": current_task["Y"],
159
- "B": current_task["B"],
160
- "well": current_task["well"],
161
- },
162
- "Experiment ID": current_task["experiment_id"]
163
- })
164
- current_task = None
165
- task_start_time = None
166
- continue
167
-
168
- if not current_task and not task_queue.empty():
169
- # Fetch a new task from the queue
170
- student_id, experiment_id = task_queue.get() # NOTE: We'll store (student_id, experiment_id) instead of task
171
- queue_counter -= 1
172
- task_start_time = time.time()
173
-
174
- # NOTE: We retrieve the actual task from tasks_dict
175
- current_task = tasks_dict[(student_id, experiment_id)]
176
-
177
- # Mark status as "processing"
178
- current_task["status"] = "processing"
179
-
180
- mqtt_client.publish(
181
- OT2_COMMAND_TOPIC,
182
- json.dumps({
183
- "command": {
184
- "R": current_task["R"],
185
- "Y": current_task["Y"],
186
- "B": current_task["B"],
187
- "well": current_task["well"]
188
- },
189
- "experiment_id": current_task["experiment_id"],
190
- "session_id": current_task["session_id"]
191
- }),
192
- )
193
-
194
- time.sleep(1)
195
-
196
-
197
- processor_thread = threading.Thread(target=task_processor, daemon=True)
198
- processor_thread.start()
199
-
200
-
201
- def verify_student_id(student_id):
202
- """Verify student ID and check quota"""
203
- global queue_counter
204
- if not student_id:
205
- return [
206
- gr.update(interactive=False, value=0),
207
- gr.update(interactive=False, value=0),
208
- gr.update(interactive=False, value=0),
209
- "Please enter a Student ID",
210
- gr.update(interactive=False)
211
- ]
212
-
213
- quota_remaining = check_student_quota(student_id)
214
-
215
-
216
- if quota_remaining <= 0:
217
- return [
218
- gr.update(interactive=False, value=0),
219
- gr.update(interactive=False, value=0),
220
- gr.update(interactive=False, value=0),
221
- "No experiments remaining. Please contact administrator.",
222
- gr.update(interactive=False)
223
- ]
224
-
225
- return [
226
- gr.update(interactive=True, value=0),
227
- gr.update(interactive=True, value=0),
228
- gr.update(interactive=True, value=0),
229
- f"Student ID verified. Available experiments: {quota_remaining}\nCurrent queue length: {queue_counter} experiment(s)",
230
- gr.update(interactive=True)
231
- ]
232
-
233
- def update_status_with_queue(R, Y, B):
234
- """Check if RYB inputs are valid and return updated queue info"""
235
- global queue_counter
236
- validation_result = validate_ryb_input(R, Y, B)
237
- total = R + Y + B
238
- return [
239
- f"{validation_result['message']}\nCurrent queue length: {queue_counter} experiment(s)",
240
- gr.update(interactive=(total <= 300))
241
- ]
242
-
243
- def update_queue_display():
244
- """Refresh queue info for the UI"""
245
- global current_task, queue_counter
246
- num_available_wells = len(find_unused_wells())
247
- try:
248
- print(f"[DEBUG] Updating queue display - Counter: {queue_counter}")
249
- if current_task:
250
- status = f"""### Current Queue Status
251
- - Active experiment: Yes
252
- - Queue length: {queue_counter+1} experiment(s)
253
- - Available wells: {num_available_wells} wells
254
- - Expected wait time to obtain results > {(queue_counter+2)*2} mins """
255
- else:
256
- status = f"""### Current Queue Status
257
- - Active experiment: No
258
- - Queue length: {queue_counter} experiment(s)
259
- - Available wells: {num_available_wells} wells
260
- - Expected wait time to obtain results: 2 mins """
261
- return status
262
- except Exception as e:
263
- return f"Error getting queue status: {str(e)}"
264
-
265
-
266
- def add_to_queue(student_id, R, Y, B):
267
- global queue_counter
268
-
269
- if student_id == "debug":
270
- yield {
271
- "Status": "Error",
272
- "Message": "Debug ID cannot submit to real experiment queue. Please use your student id to submit experiment."
273
- }
274
- return
275
-
276
-
277
- # Validate RYB inputs
278
- validation_result = validate_ryb_input(R, Y, B)
279
- if not validation_result["is_valid"]:
280
- yield {
281
- "Status": "Error",
282
- "Message": validation_result["message"]
283
- }
284
- return
285
-
286
- # Check quota
287
- quota_remaining = check_student_quota(student_id)
288
- if quota_remaining <= 0:
289
- yield {
290
- "Status": "Error",
291
- "Message": "No experiments remaining"
292
- }
293
- return
294
-
295
- # Select well
296
- experiment_id = secrets.token_hex(4)
297
- try:
298
- empty_wells = find_unused_wells()
299
- if not empty_wells:
300
- raise ValueError("No available wells")
301
- selected_well = empty_wells[0]
302
-
303
-
304
- except Exception as e:
305
- yield {
306
- "Status": "Error",
307
- "Message": str(e)
308
- }
309
- return
310
-
311
- # NOTE: Create the task and store it in tasks_dict
312
- task = {
313
- "R": R,
314
- "Y": Y,
315
- "B": B,
316
- "well": selected_well,
317
- "session_id": student_id,
318
- "experiment_id": experiment_id,
319
- "status": "queued",
320
- }
321
- tasks_dict[(student_id, experiment_id)] = task # Keep track globally
322
-
323
- # Put only (student_id, experiment_id) in the Queue
324
- task_queue.put((student_id, experiment_id))
325
- queue_counter += 1
326
- update_used_wells([selected_well])
327
- decrement_student_quota(student_id)
328
-
329
-
330
- print(f"Task added: {task}")
331
-
332
- # First yield: "Queued"
333
- yield {
334
- "Status": "Queued",
335
- "Position": queue_counter,
336
- "Student ID": student_id,
337
- "Experiment ID": experiment_id,
338
- "Well": selected_well,
339
- "Volumes": {"R": R, "Y": Y, "B": B}
340
- }
341
-
342
- # NOTE: Wait until the task's status becomes 'processing'
343
- # This ensures we only yield "Running" when the backend actually starts the job.
344
- while tasks_dict[(student_id, experiment_id)]["status"] == "queued":
345
- time.sleep(20)
346
-
347
- # Second yield: "Running" (happens only after status is 'processing')
348
- yield {
349
- "Status": "Running",
350
- "Student ID": student_id,
351
- "Experiment ID": experiment_id,
352
- "Well": selected_well,
353
- "Volumes": {"R": R, "Y": Y, "B": B}
354
- }
355
-
356
- # Finally, wait for the result
357
- result = result_queue.get()
358
- yield result
359
-
360
- def debug_experiment(student_id, R, Y, B):
361
- if student_id != "debug":
362
- return {"Status": "Error", "Message": "Invalid debug request"}
363
-
364
- experiment_id = "debug-" + secrets.token_hex(4)
365
-
366
- yield {
367
- "Status": "Queued",
368
- "Position": "debug",
369
- "Student ID": student_id,
370
- "Experiment ID": experiment_id,
371
- "Well": "DEBUG-A1",
372
- "Volumes": {"R": R, "Y": Y, "B": B}
373
- }
374
-
375
- time.sleep(1)
376
-
377
- yield {
378
- "Status": "Running",
379
- "Student ID": student_id,
380
- "Experiment ID": experiment_id,
381
- "Well": "DEBUG-A1",
382
- "Volumes": {"R": R, "Y": Y, "B": B}
383
- }
384
-
385
- time.sleep(1)
386
- result_debug = {
387
- "Status": "Complete",
388
- "Message": "Debug mode - simulated result (no actual experiment performed)",
389
- "Student ID": student_id,
390
- "Command": {
391
- "R": R,
392
- "Y": Y,
393
- "B": B,
394
- "well": "DEBUG-A1"
395
- },
396
- "Sensor Data": {
397
- "ch583": 2800,
398
- "ch670": 3000,
399
- "ch510": 1700,
400
- "ch410": 240,
401
- "ch620": 3900,
402
- "ch470": 1000,
403
- "ch550": 2400,
404
- "ch440": 900
405
- },
406
- "Experiment ID": experiment_id
407
- }
408
-
409
- yield result_debug
410
-
411
-
412
- with gr.Blocks(title="OT-2 Liquid Color Matching Experiment Queue") as demo:
413
- gr.Markdown("## OT-2 Liquid Color Matching Experiment Queue")
414
- gr.Markdown("Enter R, Y, and B volumes (in µL). Total volume must not exceed 300 µL.(a volume of exactly 300 µL is recommended)")
415
-
416
- with gr.Row():
417
- with gr.Column(scale=2):
418
- with gr.Row():
419
- student_id_input = gr.Textbox(
420
- label="Student ID",
421
- placeholder="Enter your unique ID"
422
- )
423
- verify_id_btn = gr.Button("Verify ID")
424
-
425
- r_slider = gr.Slider(0, 300, step=1, label="Red (R) Volume (µL)", interactive=False)
426
- y_slider = gr.Slider(0, 300, step=1, label="Yellow (Y) Volume (µL)", interactive=False)
427
- b_slider = gr.Slider(0, 300, step=1, label="Blue (B) Volume (µL)", interactive=False)
428
- status_output = gr.Textbox(label="Status")
429
- submit_btn = gr.Button("Submit Experiment", interactive=False)
430
- result_output = gr.JSON(label="Experiment Status")
431
-
432
- with gr.Column(scale=1):
433
- gr.Markdown("### Queue Status")
434
- queue_status = gr.Markdown("Loading queue status...")
435
- update_status_btn = gr.Button("Refresh Queue Status")
436
- gr.Markdown("### YouTube Livestream")
437
- #src="https://www.youtube.com/embed/live_stream?channel=UCHBzCfYpGwoqygH9YNh9A6g"
438
- iframe_html = '''
439
- <div style="position: relative; width: 100%; padding-top: 56.25%;">
440
- <iframe
441
- style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;"
442
- src="https://www.youtube.com/embed/aahvV0BZjIo"
443
- title="OT-2 Livestream"
444
- frameborder="0"
445
- allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
446
- referrerpolicy="strict-origin-when-cross-origin"
447
- allowfullscreen>
448
- </iframe>
449
- </div>
450
- '''
451
- gr.HTML(iframe_html)
452
-
453
- verify_id_btn.click(
454
- verify_student_id,
455
- inputs=[student_id_input],
456
- outputs=[r_slider, y_slider, b_slider, status_output, submit_btn],
457
- api_name="verify_student_id"
458
- )
459
-
460
- r_slider.change(
461
- update_status_with_queue,
462
- inputs=[r_slider, y_slider, b_slider],
463
- outputs=[status_output, submit_btn]
464
- )
465
- y_slider.change(
466
- update_status_with_queue,
467
- inputs=[r_slider, y_slider, b_slider],
468
- outputs=[status_output, submit_btn]
469
- )
470
- b_slider.change(
471
- update_status_with_queue,
472
- inputs=[r_slider, y_slider, b_slider],
473
- outputs=[status_output, submit_btn]
474
- )
475
-
476
- # NOTE: concurrency_limit=3 is preserved; no changes here
477
- submit_btn.click(
478
- add_to_queue,
479
- inputs=[student_id_input, r_slider, y_slider, b_slider],
480
- outputs=result_output,
481
- api_name="submit",
482
- concurrency_limit=8
483
- ).then(
484
- update_queue_display,
485
- None,
486
- queue_status
487
- )
488
-
489
- update_status_btn.click(
490
- update_queue_display,
491
- None,
492
- queue_status,
493
- api_name="update_queue_display"
494
- )
495
-
496
- demo.load(
497
- update_queue_display,
498
- None,
499
- queue_status
500
- )
501
-
502
- debug_btn = gr.Button("Debug Submit", visible=False)
503
- debug_btn.click(
504
- debug_experiment,
505
- inputs=[student_id_input, r_slider, y_slider, b_slider],
506
- outputs=result_output,
507
- api_name="debug"
508
- )
509
-
510
-
511
- demo.queue
512
-
513
- if __name__ == "__main__":
514
- demo.launch()
 
1
  import gradio as gr
 
 
 
 
 
 
 
 
2
 
 
 
3
 
4
+ def greet(name):
5
+ return "Hello " + name + "!!"
 
 
 
6
 
7
 
8
+ demo = gr.Interface(fn=greet, inputs="text", outputs="text")
9
+ demo.launch()