sgbaird commited on
Commit
d14e0f6
·
1 Parent(s): 42d8ce6

rename to app_ot2

Browse files
Files changed (1) hide show
  1. app_ot2.py +514 -0
app_ot2.py ADDED
@@ -0,0 +1,514 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ from queue import Queue
3
+ import threading
4
+ import time
5
+ import paho.mqtt.client as mqtt
6
+ import json
7
+ import secrets
8
+ from DB_utls import find_unused_wells, update_used_wells, save_result, get_student_quota, decrement_student_quota
9
+ import os
10
+
11
+ # NOTE: New global dict to store tasks keyed by (student_id, experiment_id)
12
+ tasks_dict = {}
13
+
14
+ task_queue = Queue()
15
+ result_queue = Queue()
16
+ current_task = None
17
+ sensor_results = None
18
+ queue_counter = task_queue.qsize()
19
+
20
+
21
+
22
+ MQTT_BROKER = os.getenv("MQTT_BROKER")
23
+ MQTT_PORT = int(os.getenv("MQTT_PORT"))
24
+ MQTT_USERNAME = os.getenv("MQTT_USERNAME")
25
+ MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
26
+
27
+
28
+ OT2_SERIAL = "OT2CEP20240218R04"
29
+ PICO_ID = "e66130100f895134"
30
+
31
+ OT2_COMMAND_TOPIC = f"command/ot2/{OT2_SERIAL}/pipette"
32
+ OT2_STATUS_TOPIC = f"status/ot2/{OT2_SERIAL}/complete"
33
+ SENSOR_COMMAND_TOPIC = f"command/picow/{PICO_ID}/as7341/read"
34
+ SENSOR_DATA_TOPIC = f"color-mixing/picow/{PICO_ID}/as7341"
35
+
36
+
37
+ def check_student_quota(student_id):
38
+ """Check student's remaining experiment quota"""
39
+ student_quota = get_student_quota(student_id)
40
+ return student_quota
41
+
42
+ def validate_ryb_input(R, Y, B):
43
+ """Validate RYB input volumes"""
44
+ total = R + Y + B
45
+ if total > 300:
46
+ return {
47
+ "is_valid": False,
48
+ "message": f"Total volume cannot exceed 300 µL. Current total: {total} µL."
49
+ }
50
+ return {
51
+ "is_valid": True,
52
+ "message": f"Current total: {total} µL."
53
+ }
54
+
55
+
56
+ mqtt_client = mqtt.Client()
57
+ mqtt_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
58
+ mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
59
+
60
+ def on_connect(client, userdata, flags, rc):
61
+ print(f"Connected to MQTT Broker with result code {rc}")
62
+ client.subscribe([(OT2_STATUS_TOPIC, 2), (SENSOR_DATA_TOPIC, 2)])
63
+
64
+ def on_message(client, userdata, msg):
65
+ global current_task, sensor_results
66
+ try:
67
+ payload = json.loads(msg.payload.decode("utf-8"))
68
+ if msg.topic == OT2_STATUS_TOPIC:
69
+ handle_sensor_status(payload)
70
+ elif msg.topic == SENSOR_DATA_TOPIC:
71
+ print("Sensor data received")
72
+ sensor_results = payload
73
+ mqtt_client.publish(
74
+ OT2_COMMAND_TOPIC,
75
+ json.dumps({
76
+ "command": {"sensor_status": "read"},
77
+ "experiment_id": payload["experiment_id"],
78
+ "session_id": payload["session_id"]
79
+ }),
80
+ )
81
+ except Exception as e:
82
+ print(f"Error processing MQTT message: {e}")
83
+
84
+ mqtt_client.on_connect = on_connect
85
+ mqtt_client.on_message = on_message
86
+ mqtt_client.connect(MQTT_BROKER, MQTT_PORT)
87
+ mqtt_client.loop_start()
88
+
89
+ def handle_sensor_status(payload):
90
+ global current_task, sensor_results
91
+ if "in_place" in json.dumps(payload):
92
+ mqtt_client.publish(
93
+ SENSOR_COMMAND_TOPIC,
94
+ json.dumps({
95
+ "command": {
96
+ "R": current_task["R"],
97
+ "Y": current_task["Y"],
98
+ "B": current_task["B"],
99
+ "well": current_task["well"]
100
+ },
101
+ "experiment_id": current_task["experiment_id"],
102
+ "session_id": current_task["session_id"]
103
+ })
104
+ )
105
+ elif payload["status"]["sensor_status"] == "charging":
106
+
107
+ experiment_result = {
108
+ "Status": "Complete",
109
+ "Message": "Experiment completed successfully!",
110
+ "Student ID": current_task["session_id"],
111
+ "Command": {
112
+ "R": current_task["R"],
113
+ "Y": current_task["Y"],
114
+ "B": current_task["B"],
115
+ "well": current_task["well"],
116
+ },
117
+ "Sensor Data": sensor_results["sensor_data"],
118
+ "Experiment ID": current_task["experiment_id"]
119
+ }
120
+ # Store full result in result queue
121
+ result_queue.put(experiment_result)
122
+
123
+ # Create a version of experiment_result without "Status" and "Message" for database storage
124
+ db_data = {key: experiment_result[key] for key in experiment_result if key not in ["Status", "Message"]}
125
+
126
+ save_result(db_data)
127
+
128
+ current_task = None
129
+
130
+
131
+ def task_processor():
132
+ """
133
+ Background thread that processes tasks one by one.
134
+ """
135
+ global current_task, queue_counter
136
+ task_start_time = None
137
+ TIMEOUT_SECONDS = 165 # 2min45s
138
+
139
+ while True:
140
+ if current_task:
141
+ # Check for timeout
142
+ if task_start_time and (time.time() - task_start_time > TIMEOUT_SECONDS):
143
+ print("sending timeout message to OT-2")
144
+ mqtt_client.publish(
145
+ OT2_COMMAND_TOPIC,
146
+ json.dumps({
147
+ "command": {"sensor_status": "sensor_timeout"},
148
+ "experiment_id": current_task["experiment_id"],
149
+ "session_id": current_task["session_id"]
150
+ }),
151
+ )
152
+ result_queue.put({
153
+ "Status": "Error",
154
+ "Message": "Experiment timed out",
155
+ "Student ID": current_task["session_id"],
156
+ "Command": {
157
+ "R": current_task["R"],
158
+ "Y": current_task["Y"],
159
+ "B": current_task["B"],
160
+ "well": current_task["well"],
161
+ },
162
+ "Experiment ID": current_task["experiment_id"]
163
+ })
164
+ current_task = None
165
+ task_start_time = None
166
+ continue
167
+
168
+ if not current_task and not task_queue.empty():
169
+ # Fetch a new task from the queue
170
+ student_id, experiment_id = task_queue.get() # NOTE: We'll store (student_id, experiment_id) instead of task
171
+ queue_counter -= 1
172
+ task_start_time = time.time()
173
+
174
+ # NOTE: We retrieve the actual task from tasks_dict
175
+ current_task = tasks_dict[(student_id, experiment_id)]
176
+
177
+ # Mark status as "processing"
178
+ current_task["status"] = "processing"
179
+
180
+ mqtt_client.publish(
181
+ OT2_COMMAND_TOPIC,
182
+ json.dumps({
183
+ "command": {
184
+ "R": current_task["R"],
185
+ "Y": current_task["Y"],
186
+ "B": current_task["B"],
187
+ "well": current_task["well"]
188
+ },
189
+ "experiment_id": current_task["experiment_id"],
190
+ "session_id": current_task["session_id"]
191
+ }),
192
+ )
193
+
194
+ time.sleep(1)
195
+
196
+
197
+ processor_thread = threading.Thread(target=task_processor, daemon=True)
198
+ processor_thread.start()
199
+
200
+
201
+ def verify_student_id(student_id):
202
+ """Verify student ID and check quota"""
203
+ global queue_counter
204
+ if not student_id:
205
+ return [
206
+ gr.update(interactive=False, value=0),
207
+ gr.update(interactive=False, value=0),
208
+ gr.update(interactive=False, value=0),
209
+ "Please enter a Student ID",
210
+ gr.update(interactive=False)
211
+ ]
212
+
213
+ quota_remaining = check_student_quota(student_id)
214
+
215
+
216
+ if quota_remaining <= 0:
217
+ return [
218
+ gr.update(interactive=False, value=0),
219
+ gr.update(interactive=False, value=0),
220
+ gr.update(interactive=False, value=0),
221
+ "No experiments remaining. Please contact administrator.",
222
+ gr.update(interactive=False)
223
+ ]
224
+
225
+ return [
226
+ gr.update(interactive=True, value=0),
227
+ gr.update(interactive=True, value=0),
228
+ gr.update(interactive=True, value=0),
229
+ f"Student ID verified. Available experiments: {quota_remaining}\nCurrent queue length: {queue_counter} experiment(s)",
230
+ gr.update(interactive=True)
231
+ ]
232
+
233
+ def update_status_with_queue(R, Y, B):
234
+ """Check if RYB inputs are valid and return updated queue info"""
235
+ global queue_counter
236
+ validation_result = validate_ryb_input(R, Y, B)
237
+ total = R + Y + B
238
+ return [
239
+ f"{validation_result['message']}\nCurrent queue length: {queue_counter} experiment(s)",
240
+ gr.update(interactive=(total <= 300))
241
+ ]
242
+
243
+ def update_queue_display():
244
+ """Refresh queue info for the UI"""
245
+ global current_task, queue_counter
246
+ num_available_wells = len(find_unused_wells())
247
+ try:
248
+ print(f"[DEBUG] Updating queue display - Counter: {queue_counter}")
249
+ if current_task:
250
+ status = f"""### Current Queue Status
251
+ - Active experiment: Yes
252
+ - Queue length: {queue_counter+1} experiment(s)
253
+ - Available wells: {num_available_wells} wells
254
+ - Expected wait time to obtain results > {(queue_counter+2)*2} mins """
255
+ else:
256
+ status = f"""### Current Queue Status
257
+ - Active experiment: No
258
+ - Queue length: {queue_counter} experiment(s)
259
+ - Available wells: {num_available_wells} wells
260
+ - Expected wait time to obtain results: 2 mins """
261
+ return status
262
+ except Exception as e:
263
+ return f"Error getting queue status: {str(e)}"
264
+
265
+
266
+ def add_to_queue(student_id, R, Y, B):
267
+ global queue_counter
268
+
269
+ if student_id == "debug":
270
+ yield {
271
+ "Status": "Error",
272
+ "Message": "Debug ID cannot submit to real experiment queue. Please use your student id to submit experiment."
273
+ }
274
+ return
275
+
276
+
277
+ # Validate RYB inputs
278
+ validation_result = validate_ryb_input(R, Y, B)
279
+ if not validation_result["is_valid"]:
280
+ yield {
281
+ "Status": "Error",
282
+ "Message": validation_result["message"]
283
+ }
284
+ return
285
+
286
+ # Check quota
287
+ quota_remaining = check_student_quota(student_id)
288
+ if quota_remaining <= 0:
289
+ yield {
290
+ "Status": "Error",
291
+ "Message": "No experiments remaining"
292
+ }
293
+ return
294
+
295
+ # Select well
296
+ experiment_id = secrets.token_hex(4)
297
+ try:
298
+ empty_wells = find_unused_wells()
299
+ if not empty_wells:
300
+ raise ValueError("No available wells")
301
+ selected_well = empty_wells[0]
302
+
303
+
304
+ except Exception as e:
305
+ yield {
306
+ "Status": "Error",
307
+ "Message": str(e)
308
+ }
309
+ return
310
+
311
+ # NOTE: Create the task and store it in tasks_dict
312
+ task = {
313
+ "R": R,
314
+ "Y": Y,
315
+ "B": B,
316
+ "well": selected_well,
317
+ "session_id": student_id,
318
+ "experiment_id": experiment_id,
319
+ "status": "queued",
320
+ }
321
+ tasks_dict[(student_id, experiment_id)] = task # Keep track globally
322
+
323
+ # Put only (student_id, experiment_id) in the Queue
324
+ task_queue.put((student_id, experiment_id))
325
+ queue_counter += 1
326
+ update_used_wells([selected_well])
327
+ decrement_student_quota(student_id)
328
+
329
+
330
+ print(f"Task added: {task}")
331
+
332
+ # First yield: "Queued"
333
+ yield {
334
+ "Status": "Queued",
335
+ "Position": queue_counter,
336
+ "Student ID": student_id,
337
+ "Experiment ID": experiment_id,
338
+ "Well": selected_well,
339
+ "Volumes": {"R": R, "Y": Y, "B": B}
340
+ }
341
+
342
+ # NOTE: Wait until the task's status becomes 'processing'
343
+ # This ensures we only yield "Running" when the backend actually starts the job.
344
+ while tasks_dict[(student_id, experiment_id)]["status"] == "queued":
345
+ time.sleep(20)
346
+
347
+ # Second yield: "Running" (happens only after status is 'processing')
348
+ yield {
349
+ "Status": "Running",
350
+ "Student ID": student_id,
351
+ "Experiment ID": experiment_id,
352
+ "Well": selected_well,
353
+ "Volumes": {"R": R, "Y": Y, "B": B}
354
+ }
355
+
356
+ # Finally, wait for the result
357
+ result = result_queue.get()
358
+ yield result
359
+
360
+ def debug_experiment(student_id, R, Y, B):
361
+ if student_id != "debug":
362
+ return {"Status": "Error", "Message": "Invalid debug request"}
363
+
364
+ experiment_id = "debug-" + secrets.token_hex(4)
365
+
366
+ yield {
367
+ "Status": "Queued",
368
+ "Position": "debug",
369
+ "Student ID": student_id,
370
+ "Experiment ID": experiment_id,
371
+ "Well": "DEBUG-A1",
372
+ "Volumes": {"R": R, "Y": Y, "B": B}
373
+ }
374
+
375
+ time.sleep(1)
376
+
377
+ yield {
378
+ "Status": "Running",
379
+ "Student ID": student_id,
380
+ "Experiment ID": experiment_id,
381
+ "Well": "DEBUG-A1",
382
+ "Volumes": {"R": R, "Y": Y, "B": B}
383
+ }
384
+
385
+ time.sleep(1)
386
+ result_debug = {
387
+ "Status": "Complete",
388
+ "Message": "Debug mode - simulated result (no actual experiment performed)",
389
+ "Student ID": student_id,
390
+ "Command": {
391
+ "R": R,
392
+ "Y": Y,
393
+ "B": B,
394
+ "well": "DEBUG-A1"
395
+ },
396
+ "Sensor Data": {
397
+ "ch583": 2800,
398
+ "ch670": 3000,
399
+ "ch510": 1700,
400
+ "ch410": 240,
401
+ "ch620": 3900,
402
+ "ch470": 1000,
403
+ "ch550": 2400,
404
+ "ch440": 900
405
+ },
406
+ "Experiment ID": experiment_id
407
+ }
408
+
409
+ yield result_debug
410
+
411
+
412
+ with gr.Blocks(title="OT-2 Liquid Color Matching Experiment Queue") as demo:
413
+ gr.Markdown("## OT-2 Liquid Color Matching Experiment Queue")
414
+ gr.Markdown("Enter R, Y, and B volumes (in µL). Total volume must not exceed 300 µL.(a volume of exactly 300 µL is recommended)")
415
+
416
+ with gr.Row():
417
+ with gr.Column(scale=2):
418
+ with gr.Row():
419
+ student_id_input = gr.Textbox(
420
+ label="Student ID",
421
+ placeholder="Enter your unique ID"
422
+ )
423
+ verify_id_btn = gr.Button("Verify ID")
424
+
425
+ r_slider = gr.Slider(0, 300, step=1, label="Red (R) Volume (µL)", interactive=False)
426
+ y_slider = gr.Slider(0, 300, step=1, label="Yellow (Y) Volume (µL)", interactive=False)
427
+ b_slider = gr.Slider(0, 300, step=1, label="Blue (B) Volume (µL)", interactive=False)
428
+ status_output = gr.Textbox(label="Status")
429
+ submit_btn = gr.Button("Submit Experiment", interactive=False)
430
+ result_output = gr.JSON(label="Experiment Status")
431
+
432
+ with gr.Column(scale=1):
433
+ gr.Markdown("### Queue Status")
434
+ queue_status = gr.Markdown("Loading queue status...")
435
+ update_status_btn = gr.Button("Refresh Queue Status")
436
+ gr.Markdown("### YouTube Livestream")
437
+ #src="https://www.youtube.com/embed/live_stream?channel=UCHBzCfYpGwoqygH9YNh9A6g"
438
+ iframe_html = '''
439
+ <div style="position: relative; width: 100%; padding-top: 56.25%;">
440
+ <iframe
441
+ style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;"
442
+ src="https://www.youtube.com/embed/aahvV0BZjIo"
443
+ title="OT-2 Livestream"
444
+ frameborder="0"
445
+ allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
446
+ referrerpolicy="strict-origin-when-cross-origin"
447
+ allowfullscreen>
448
+ </iframe>
449
+ </div>
450
+ '''
451
+ gr.HTML(iframe_html)
452
+
453
+ verify_id_btn.click(
454
+ verify_student_id,
455
+ inputs=[student_id_input],
456
+ outputs=[r_slider, y_slider, b_slider, status_output, submit_btn],
457
+ api_name="verify_student_id"
458
+ )
459
+
460
+ r_slider.change(
461
+ update_status_with_queue,
462
+ inputs=[r_slider, y_slider, b_slider],
463
+ outputs=[status_output, submit_btn]
464
+ )
465
+ y_slider.change(
466
+ update_status_with_queue,
467
+ inputs=[r_slider, y_slider, b_slider],
468
+ outputs=[status_output, submit_btn]
469
+ )
470
+ b_slider.change(
471
+ update_status_with_queue,
472
+ inputs=[r_slider, y_slider, b_slider],
473
+ outputs=[status_output, submit_btn]
474
+ )
475
+
476
+ # NOTE: concurrency_limit=3 is preserved; no changes here
477
+ submit_btn.click(
478
+ add_to_queue,
479
+ inputs=[student_id_input, r_slider, y_slider, b_slider],
480
+ outputs=result_output,
481
+ api_name="submit",
482
+ concurrency_limit=8
483
+ ).then(
484
+ update_queue_display,
485
+ None,
486
+ queue_status
487
+ )
488
+
489
+ update_status_btn.click(
490
+ update_queue_display,
491
+ None,
492
+ queue_status,
493
+ api_name="update_queue_display"
494
+ )
495
+
496
+ demo.load(
497
+ update_queue_display,
498
+ None,
499
+ queue_status
500
+ )
501
+
502
+ debug_btn = gr.Button("Debug Submit", visible=False)
503
+ debug_btn.click(
504
+ debug_experiment,
505
+ inputs=[student_id_input, r_slider, y_slider, b_slider],
506
+ outputs=result_output,
507
+ api_name="debug"
508
+ )
509
+
510
+
511
+ demo.queue
512
+
513
+ if __name__ == "__main__":
514
+ demo.launch()