sgbaird commited on
Commit
42d8ce6
·
1 Parent(s): 1245696

copy from ot2 hf

Browse files
Files changed (4) hide show
  1. DB_utls.py +154 -0
  2. app.py +514 -0
  3. requirements.txt +5 -0
  4. well_status_utils.py +95 -0
DB_utls.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #from lambda_mongo_utils import send_data_to_mongodb
2
+ from pymongo import MongoClient
3
+ from datetime import datetime
4
+ from prefect import task
5
+ import pandas as pd
6
+ import os
7
+
8
+ MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD")
9
+
10
+ blinded_connection_string = os.getenv("blinded_connection_string")
11
+
12
+ connection_string = blinded_connection_string.replace("<db_password>", MONGODB_PASSWORD)
13
+
14
+
15
+ @task
16
+ def generate_empty_well():
17
+ dbclient = MongoClient(connection_string)
18
+ db = dbclient["LCM-OT-2-SLD"]
19
+ collection = db["wells"]
20
+ rows = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
21
+ columns = [str(i) for i in range(1, 13)]
22
+ for row in rows:
23
+ for col in columns:
24
+ well = f"{row}{col}"
25
+ metadata = {
26
+ "well": well,
27
+ "status": "empty",
28
+ "project": "OT2"
29
+ }
30
+
31
+ #send_data_to_mongodb(collection="wells", data=metadata)
32
+ query = {"well": well}
33
+ update_data = {"$set": metadata}
34
+ result = collection.update_one(query, update_data, upsert=True)
35
+
36
+ # close connection
37
+ dbclient.close()
38
+
39
+ @task
40
+ def update_used_wells(used_wells):
41
+ dbclient = MongoClient(connection_string)
42
+ db = dbclient["LCM-OT-2-SLD"]
43
+ collection = db["wells"]
44
+
45
+ for well in used_wells:
46
+ metadata = {
47
+ "well": well,
48
+ "status": "used",
49
+ "project": "OT2"
50
+ }
51
+ #send_data_to_mongodb(collection="wells", data=metadata)
52
+ query = {"well": well}
53
+ update_data = {"$set": metadata}
54
+ result = collection.update_one(query, update_data, upsert=True)
55
+
56
+ # close connection
57
+ dbclient.close()
58
+
59
+ @task
60
+ def find_unused_wells():
61
+ dbclient = MongoClient(connection_string)
62
+ db = dbclient["LCM-OT-2-SLD"]
63
+ collection = db["wells"]
64
+ query = {"status": "empty"}
65
+ response = list(collection.find(query))
66
+ df = pd.DataFrame(response)
67
+
68
+ # Extract the "well" column as a list
69
+ if "well" not in df.columns:
70
+ raise ValueError("No available wells.")
71
+ # Sort obtained list
72
+ def well_sort_key(well):
73
+ row = well[0] # A, B, C, ...
74
+ col = int(well[1:]) # 1, 2, ..., 12
75
+ return (row, col)
76
+
77
+ empty_wells = sorted(df["well"].tolist(), key=well_sort_key)
78
+ #print(empty_wells)
79
+
80
+ # close connection
81
+ dbclient.close()
82
+
83
+ # Check if there are any empty wells
84
+ if len(empty_wells) == 0:
85
+ raise ValueError("No empty wells found")
86
+ #print(empty_wells)
87
+ return empty_wells
88
+
89
+ @task
90
+ def save_result(result_data):
91
+ dbclient = MongoClient(connection_string)
92
+ db = dbclient["LCM-OT-2-SLD"]
93
+ collection = db["MSE403_result"]
94
+ #collection = db["test_result"]
95
+ result_data["timestamp"] = datetime.utcnow() # UTC time
96
+ insert_result = collection.insert_one(result_data)
97
+ inserted_id = insert_result.inserted_id
98
+ # close connection
99
+ dbclient.close()
100
+ return inserted_id
101
+
102
+ def get_student_quota(student_id):
103
+ with MongoClient(connection_string) as client:
104
+ db = client["LCM-OT-2-SLD"]
105
+ collection = db["student"]
106
+ student = collection.find_one({"student_id": student_id})
107
+ if student is None:
108
+ raise ValueError(f"Student ID '{student_id}' not found in the database.")
109
+ return student.get("quota", 0)
110
+
111
+ def decrement_student_quota(student_id):
112
+ dbclient = MongoClient(connection_string)
113
+ db = dbclient["LCM-OT-2-SLD"]
114
+ collection = db["student"]
115
+
116
+ student = collection.find_one({"student_id": student_id})
117
+ if not student:
118
+ return f"Student ID {student_id} not found."
119
+ if student.get("quota", 0) <= 0:
120
+ return f"Student ID {student_id} has no remaining quota."
121
+
122
+
123
+ result = collection.update_one(
124
+ {"student_id": student_id, "quota": {"$gt": 0}},
125
+ {"$inc": {"quota": -1}}
126
+ )
127
+
128
+ if result.modified_count > 0:
129
+ return f"Student ID {student_id}'s quota update successfully."
130
+ else:
131
+ return f"Quota update failed for Student ID {student_id}."
132
+
133
+ def add_student_quota(student_id, quota):
134
+ """
135
+ Adds a new student with a given quota.
136
+ :param student_id: The ID of the student.
137
+ :param quota: The initial quota for the student.
138
+ """
139
+ dbclient = MongoClient(connection_string)
140
+ db = dbclient["LCM-OT-2-SLD"]
141
+ collection = db["student"]
142
+ student_data = {"student_id": student_id, "quota": quota}
143
+ collection.update_one({"student_id": student_id}, {"$set": student_data}, upsert=True)
144
+ dbclient.close()
145
+
146
+
147
+ if __name__ == "__main__":
148
+ generate_empty_well()
149
+ #find_unused_wells()
150
+ #test_id = "test"
151
+ #quota = 999
152
+ #add_student_quota(test_id, quota)
153
+
154
+
app.py ADDED
@@ -0,0 +1,514 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ from queue import Queue
3
+ import threading
4
+ import time
5
+ import paho.mqtt.client as mqtt
6
+ import json
7
+ import secrets
8
+ from DB_utls import find_unused_wells, update_used_wells, save_result, get_student_quota, decrement_student_quota
9
+ import os
10
+
11
+ # NOTE: New global dict to store tasks keyed by (student_id, experiment_id)
12
+ tasks_dict = {}
13
+
14
+ task_queue = Queue()
15
+ result_queue = Queue()
16
+ current_task = None
17
+ sensor_results = None
18
+ queue_counter = task_queue.qsize()
19
+
20
+
21
+
22
+ MQTT_BROKER = os.getenv("MQTT_BROKER")
23
+ MQTT_PORT = int(os.getenv("MQTT_PORT"))
24
+ MQTT_USERNAME = os.getenv("MQTT_USERNAME")
25
+ MQTT_PASSWORD = os.getenv("MQTT_PASSWORD")
26
+
27
+
28
+ OT2_SERIAL = "OT2CEP20240218R04"
29
+ PICO_ID = "e66130100f895134"
30
+
31
+ OT2_COMMAND_TOPIC = f"command/ot2/{OT2_SERIAL}/pipette"
32
+ OT2_STATUS_TOPIC = f"status/ot2/{OT2_SERIAL}/complete"
33
+ SENSOR_COMMAND_TOPIC = f"command/picow/{PICO_ID}/as7341/read"
34
+ SENSOR_DATA_TOPIC = f"color-mixing/picow/{PICO_ID}/as7341"
35
+
36
+
37
+ def check_student_quota(student_id):
38
+ """Check student's remaining experiment quota"""
39
+ student_quota = get_student_quota(student_id)
40
+ return student_quota
41
+
42
+ def validate_ryb_input(R, Y, B):
43
+ """Validate RYB input volumes"""
44
+ total = R + Y + B
45
+ if total > 300:
46
+ return {
47
+ "is_valid": False,
48
+ "message": f"Total volume cannot exceed 300 µL. Current total: {total} µL."
49
+ }
50
+ return {
51
+ "is_valid": True,
52
+ "message": f"Current total: {total} µL."
53
+ }
54
+
55
+
56
+ mqtt_client = mqtt.Client()
57
+ mqtt_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
58
+ mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
59
+
60
+ def on_connect(client, userdata, flags, rc):
61
+ print(f"Connected to MQTT Broker with result code {rc}")
62
+ client.subscribe([(OT2_STATUS_TOPIC, 2), (SENSOR_DATA_TOPIC, 2)])
63
+
64
+ def on_message(client, userdata, msg):
65
+ global current_task, sensor_results
66
+ try:
67
+ payload = json.loads(msg.payload.decode("utf-8"))
68
+ if msg.topic == OT2_STATUS_TOPIC:
69
+ handle_sensor_status(payload)
70
+ elif msg.topic == SENSOR_DATA_TOPIC:
71
+ print("Sensor data received")
72
+ sensor_results = payload
73
+ mqtt_client.publish(
74
+ OT2_COMMAND_TOPIC,
75
+ json.dumps({
76
+ "command": {"sensor_status": "read"},
77
+ "experiment_id": payload["experiment_id"],
78
+ "session_id": payload["session_id"]
79
+ }),
80
+ )
81
+ except Exception as e:
82
+ print(f"Error processing MQTT message: {e}")
83
+
84
+ mqtt_client.on_connect = on_connect
85
+ mqtt_client.on_message = on_message
86
+ mqtt_client.connect(MQTT_BROKER, MQTT_PORT)
87
+ mqtt_client.loop_start()
88
+
89
+ def handle_sensor_status(payload):
90
+ global current_task, sensor_results
91
+ if "in_place" in json.dumps(payload):
92
+ mqtt_client.publish(
93
+ SENSOR_COMMAND_TOPIC,
94
+ json.dumps({
95
+ "command": {
96
+ "R": current_task["R"],
97
+ "Y": current_task["Y"],
98
+ "B": current_task["B"],
99
+ "well": current_task["well"]
100
+ },
101
+ "experiment_id": current_task["experiment_id"],
102
+ "session_id": current_task["session_id"]
103
+ })
104
+ )
105
+ elif payload["status"]["sensor_status"] == "charging":
106
+
107
+ experiment_result = {
108
+ "Status": "Complete",
109
+ "Message": "Experiment completed successfully!",
110
+ "Student ID": current_task["session_id"],
111
+ "Command": {
112
+ "R": current_task["R"],
113
+ "Y": current_task["Y"],
114
+ "B": current_task["B"],
115
+ "well": current_task["well"],
116
+ },
117
+ "Sensor Data": sensor_results["sensor_data"],
118
+ "Experiment ID": current_task["experiment_id"]
119
+ }
120
+ # Store full result in result queue
121
+ result_queue.put(experiment_result)
122
+
123
+ # Create a version of experiment_result without "Status" and "Message" for database storage
124
+ db_data = {key: experiment_result[key] for key in experiment_result if key not in ["Status", "Message"]}
125
+
126
+ save_result(db_data)
127
+
128
+ current_task = None
129
+
130
+
131
+ def task_processor():
132
+ """
133
+ Background thread that processes tasks one by one.
134
+ """
135
+ global current_task, queue_counter
136
+ task_start_time = None
137
+ TIMEOUT_SECONDS = 165 # 2min45s
138
+
139
+ while True:
140
+ if current_task:
141
+ # Check for timeout
142
+ if task_start_time and (time.time() - task_start_time > TIMEOUT_SECONDS):
143
+ print("sending timeout message to OT-2")
144
+ mqtt_client.publish(
145
+ OT2_COMMAND_TOPIC,
146
+ json.dumps({
147
+ "command": {"sensor_status": "sensor_timeout"},
148
+ "experiment_id": current_task["experiment_id"],
149
+ "session_id": current_task["session_id"]
150
+ }),
151
+ )
152
+ result_queue.put({
153
+ "Status": "Error",
154
+ "Message": "Experiment timed out",
155
+ "Student ID": current_task["session_id"],
156
+ "Command": {
157
+ "R": current_task["R"],
158
+ "Y": current_task["Y"],
159
+ "B": current_task["B"],
160
+ "well": current_task["well"],
161
+ },
162
+ "Experiment ID": current_task["experiment_id"]
163
+ })
164
+ current_task = None
165
+ task_start_time = None
166
+ continue
167
+
168
+ if not current_task and not task_queue.empty():
169
+ # Fetch a new task from the queue
170
+ student_id, experiment_id = task_queue.get() # NOTE: We'll store (student_id, experiment_id) instead of task
171
+ queue_counter -= 1
172
+ task_start_time = time.time()
173
+
174
+ # NOTE: We retrieve the actual task from tasks_dict
175
+ current_task = tasks_dict[(student_id, experiment_id)]
176
+
177
+ # Mark status as "processing"
178
+ current_task["status"] = "processing"
179
+
180
+ mqtt_client.publish(
181
+ OT2_COMMAND_TOPIC,
182
+ json.dumps({
183
+ "command": {
184
+ "R": current_task["R"],
185
+ "Y": current_task["Y"],
186
+ "B": current_task["B"],
187
+ "well": current_task["well"]
188
+ },
189
+ "experiment_id": current_task["experiment_id"],
190
+ "session_id": current_task["session_id"]
191
+ }),
192
+ )
193
+
194
+ time.sleep(1)
195
+
196
+
197
+ processor_thread = threading.Thread(target=task_processor, daemon=True)
198
+ processor_thread.start()
199
+
200
+
201
+ def verify_student_id(student_id):
202
+ """Verify student ID and check quota"""
203
+ global queue_counter
204
+ if not student_id:
205
+ return [
206
+ gr.update(interactive=False, value=0),
207
+ gr.update(interactive=False, value=0),
208
+ gr.update(interactive=False, value=0),
209
+ "Please enter a Student ID",
210
+ gr.update(interactive=False)
211
+ ]
212
+
213
+ quota_remaining = check_student_quota(student_id)
214
+
215
+
216
+ if quota_remaining <= 0:
217
+ return [
218
+ gr.update(interactive=False, value=0),
219
+ gr.update(interactive=False, value=0),
220
+ gr.update(interactive=False, value=0),
221
+ "No experiments remaining. Please contact administrator.",
222
+ gr.update(interactive=False)
223
+ ]
224
+
225
+ return [
226
+ gr.update(interactive=True, value=0),
227
+ gr.update(interactive=True, value=0),
228
+ gr.update(interactive=True, value=0),
229
+ f"Student ID verified. Available experiments: {quota_remaining}\nCurrent queue length: {queue_counter} experiment(s)",
230
+ gr.update(interactive=True)
231
+ ]
232
+
233
+ def update_status_with_queue(R, Y, B):
234
+ """Check if RYB inputs are valid and return updated queue info"""
235
+ global queue_counter
236
+ validation_result = validate_ryb_input(R, Y, B)
237
+ total = R + Y + B
238
+ return [
239
+ f"{validation_result['message']}\nCurrent queue length: {queue_counter} experiment(s)",
240
+ gr.update(interactive=(total <= 300))
241
+ ]
242
+
243
+ def update_queue_display():
244
+ """Refresh queue info for the UI"""
245
+ global current_task, queue_counter
246
+ num_available_wells = len(find_unused_wells())
247
+ try:
248
+ print(f"[DEBUG] Updating queue display - Counter: {queue_counter}")
249
+ if current_task:
250
+ status = f"""### Current Queue Status
251
+ - Active experiment: Yes
252
+ - Queue length: {queue_counter+1} experiment(s)
253
+ - Available wells: {num_available_wells} wells
254
+ - Expected wait time to obtain results > {(queue_counter+2)*2} mins """
255
+ else:
256
+ status = f"""### Current Queue Status
257
+ - Active experiment: No
258
+ - Queue length: {queue_counter} experiment(s)
259
+ - Available wells: {num_available_wells} wells
260
+ - Expected wait time to obtain results: 2 mins """
261
+ return status
262
+ except Exception as e:
263
+ return f"Error getting queue status: {str(e)}"
264
+
265
+
266
+ def add_to_queue(student_id, R, Y, B):
267
+ global queue_counter
268
+
269
+ if student_id == "debug":
270
+ yield {
271
+ "Status": "Error",
272
+ "Message": "Debug ID cannot submit to real experiment queue. Please use your student id to submit experiment."
273
+ }
274
+ return
275
+
276
+
277
+ # Validate RYB inputs
278
+ validation_result = validate_ryb_input(R, Y, B)
279
+ if not validation_result["is_valid"]:
280
+ yield {
281
+ "Status": "Error",
282
+ "Message": validation_result["message"]
283
+ }
284
+ return
285
+
286
+ # Check quota
287
+ quota_remaining = check_student_quota(student_id)
288
+ if quota_remaining <= 0:
289
+ yield {
290
+ "Status": "Error",
291
+ "Message": "No experiments remaining"
292
+ }
293
+ return
294
+
295
+ # Select well
296
+ experiment_id = secrets.token_hex(4)
297
+ try:
298
+ empty_wells = find_unused_wells()
299
+ if not empty_wells:
300
+ raise ValueError("No available wells")
301
+ selected_well = empty_wells[0]
302
+
303
+
304
+ except Exception as e:
305
+ yield {
306
+ "Status": "Error",
307
+ "Message": str(e)
308
+ }
309
+ return
310
+
311
+ # NOTE: Create the task and store it in tasks_dict
312
+ task = {
313
+ "R": R,
314
+ "Y": Y,
315
+ "B": B,
316
+ "well": selected_well,
317
+ "session_id": student_id,
318
+ "experiment_id": experiment_id,
319
+ "status": "queued",
320
+ }
321
+ tasks_dict[(student_id, experiment_id)] = task # Keep track globally
322
+
323
+ # Put only (student_id, experiment_id) in the Queue
324
+ task_queue.put((student_id, experiment_id))
325
+ queue_counter += 1
326
+ update_used_wells([selected_well])
327
+ decrement_student_quota(student_id)
328
+
329
+
330
+ print(f"Task added: {task}")
331
+
332
+ # First yield: "Queued"
333
+ yield {
334
+ "Status": "Queued",
335
+ "Position": queue_counter,
336
+ "Student ID": student_id,
337
+ "Experiment ID": experiment_id,
338
+ "Well": selected_well,
339
+ "Volumes": {"R": R, "Y": Y, "B": B}
340
+ }
341
+
342
+ # NOTE: Wait until the task's status becomes 'processing'
343
+ # This ensures we only yield "Running" when the backend actually starts the job.
344
+ while tasks_dict[(student_id, experiment_id)]["status"] == "queued":
345
+ time.sleep(20)
346
+
347
+ # Second yield: "Running" (happens only after status is 'processing')
348
+ yield {
349
+ "Status": "Running",
350
+ "Student ID": student_id,
351
+ "Experiment ID": experiment_id,
352
+ "Well": selected_well,
353
+ "Volumes": {"R": R, "Y": Y, "B": B}
354
+ }
355
+
356
+ # Finally, wait for the result
357
+ result = result_queue.get()
358
+ yield result
359
+
360
+ def debug_experiment(student_id, R, Y, B):
361
+ if student_id != "debug":
362
+ return {"Status": "Error", "Message": "Invalid debug request"}
363
+
364
+ experiment_id = "debug-" + secrets.token_hex(4)
365
+
366
+ yield {
367
+ "Status": "Queued",
368
+ "Position": "debug",
369
+ "Student ID": student_id,
370
+ "Experiment ID": experiment_id,
371
+ "Well": "DEBUG-A1",
372
+ "Volumes": {"R": R, "Y": Y, "B": B}
373
+ }
374
+
375
+ time.sleep(1)
376
+
377
+ yield {
378
+ "Status": "Running",
379
+ "Student ID": student_id,
380
+ "Experiment ID": experiment_id,
381
+ "Well": "DEBUG-A1",
382
+ "Volumes": {"R": R, "Y": Y, "B": B}
383
+ }
384
+
385
+ time.sleep(1)
386
+ result_debug = {
387
+ "Status": "Complete",
388
+ "Message": "Debug mode - simulated result (no actual experiment performed)",
389
+ "Student ID": student_id,
390
+ "Command": {
391
+ "R": R,
392
+ "Y": Y,
393
+ "B": B,
394
+ "well": "DEBUG-A1"
395
+ },
396
+ "Sensor Data": {
397
+ "ch583": 2800,
398
+ "ch670": 3000,
399
+ "ch510": 1700,
400
+ "ch410": 240,
401
+ "ch620": 3900,
402
+ "ch470": 1000,
403
+ "ch550": 2400,
404
+ "ch440": 900
405
+ },
406
+ "Experiment ID": experiment_id
407
+ }
408
+
409
+ yield result_debug
410
+
411
+
412
+ with gr.Blocks(title="OT-2 Liquid Color Matching Experiment Queue") as demo:
413
+ gr.Markdown("## OT-2 Liquid Color Matching Experiment Queue")
414
+ gr.Markdown("Enter R, Y, and B volumes (in µL). Total volume must not exceed 300 µL.(a volume of exactly 300 µL is recommended)")
415
+
416
+ with gr.Row():
417
+ with gr.Column(scale=2):
418
+ with gr.Row():
419
+ student_id_input = gr.Textbox(
420
+ label="Student ID",
421
+ placeholder="Enter your unique ID"
422
+ )
423
+ verify_id_btn = gr.Button("Verify ID")
424
+
425
+ r_slider = gr.Slider(0, 300, step=1, label="Red (R) Volume (µL)", interactive=False)
426
+ y_slider = gr.Slider(0, 300, step=1, label="Yellow (Y) Volume (µL)", interactive=False)
427
+ b_slider = gr.Slider(0, 300, step=1, label="Blue (B) Volume (µL)", interactive=False)
428
+ status_output = gr.Textbox(label="Status")
429
+ submit_btn = gr.Button("Submit Experiment", interactive=False)
430
+ result_output = gr.JSON(label="Experiment Status")
431
+
432
+ with gr.Column(scale=1):
433
+ gr.Markdown("### Queue Status")
434
+ queue_status = gr.Markdown("Loading queue status...")
435
+ update_status_btn = gr.Button("Refresh Queue Status")
436
+ gr.Markdown("### YouTube Livestream")
437
+ #src="https://www.youtube.com/embed/live_stream?channel=UCHBzCfYpGwoqygH9YNh9A6g"
438
+ iframe_html = '''
439
+ <div style="position: relative; width: 100%; padding-top: 56.25%;">
440
+ <iframe
441
+ style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;"
442
+ src="https://www.youtube.com/embed/aahvV0BZjIo"
443
+ title="OT-2 Livestream"
444
+ frameborder="0"
445
+ allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
446
+ referrerpolicy="strict-origin-when-cross-origin"
447
+ allowfullscreen>
448
+ </iframe>
449
+ </div>
450
+ '''
451
+ gr.HTML(iframe_html)
452
+
453
+ verify_id_btn.click(
454
+ verify_student_id,
455
+ inputs=[student_id_input],
456
+ outputs=[r_slider, y_slider, b_slider, status_output, submit_btn],
457
+ api_name="verify_student_id"
458
+ )
459
+
460
+ r_slider.change(
461
+ update_status_with_queue,
462
+ inputs=[r_slider, y_slider, b_slider],
463
+ outputs=[status_output, submit_btn]
464
+ )
465
+ y_slider.change(
466
+ update_status_with_queue,
467
+ inputs=[r_slider, y_slider, b_slider],
468
+ outputs=[status_output, submit_btn]
469
+ )
470
+ b_slider.change(
471
+ update_status_with_queue,
472
+ inputs=[r_slider, y_slider, b_slider],
473
+ outputs=[status_output, submit_btn]
474
+ )
475
+
476
+ # NOTE: concurrency_limit=3 is preserved; no changes here
477
+ submit_btn.click(
478
+ add_to_queue,
479
+ inputs=[student_id_input, r_slider, y_slider, b_slider],
480
+ outputs=result_output,
481
+ api_name="submit",
482
+ concurrency_limit=8
483
+ ).then(
484
+ update_queue_display,
485
+ None,
486
+ queue_status
487
+ )
488
+
489
+ update_status_btn.click(
490
+ update_queue_display,
491
+ None,
492
+ queue_status,
493
+ api_name="update_queue_display"
494
+ )
495
+
496
+ demo.load(
497
+ update_queue_display,
498
+ None,
499
+ queue_status
500
+ )
501
+
502
+ debug_btn = gr.Button("Debug Submit", visible=False)
503
+ debug_btn.click(
504
+ debug_experiment,
505
+ inputs=[student_id_input, r_slider, y_slider, b_slider],
506
+ outputs=result_output,
507
+ api_name="debug"
508
+ )
509
+
510
+
511
+ demo.queue
512
+
513
+ if __name__ == "__main__":
514
+ demo.launch()
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ gradio
2
+ paho-mqtt
3
+ pandas
4
+ pymongo
5
+ prefect
well_status_utils.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pymongo import MongoClient
2
+ from prefect import task
3
+ import pandas as pd
4
+
5
+ import os
6
+
7
+ MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD")
8
+
9
+ blinded_connection_string = os.getenv("blinded_connection_string")
10
+
11
+ connection_string = blinded_connection_string.replace("<db_password>", MONGODB_PASSWORD)
12
+
13
+
14
+ @task
15
+ def generate_empty_well():
16
+ dbclient = MongoClient(connection_string)
17
+ db = dbclient["LCM-OT-2-SLD"]
18
+ collection = db["wells"]
19
+ rows = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H']
20
+ columns = [str(i) for i in range(1, 13)]
21
+ for row in rows:
22
+ for col in columns:
23
+ well = f"{row}{col}"
24
+ metadata = {
25
+ "well": well,
26
+ "status": "empty",
27
+ "project": "OT2"
28
+ }
29
+
30
+ #send_data_to_mongodb(collection="wells", data=metadata)
31
+ query = {"well": well}
32
+ update_data = {"$set": metadata}
33
+ result = collection.update_one(query, update_data, upsert=True)
34
+
35
+ # close connection
36
+ dbclient.close()
37
+
38
+ @task
39
+ def update_used_wells(used_wells):
40
+ dbclient = MongoClient(connection_string)
41
+ db = dbclient["LCM-OT-2-SLD"]
42
+ collection = db["wells"]
43
+
44
+ for well in used_wells:
45
+ metadata = {
46
+ "well": well,
47
+ "status": "used",
48
+ "project": "OT2"
49
+ }
50
+ #send_data_to_mongodb(collection="wells", data=metadata)
51
+ query = {"well": well}
52
+ update_data = {"$set": metadata}
53
+ result = collection.update_one(query, update_data, upsert=True)
54
+
55
+ # close connection
56
+ dbclient.close()
57
+
58
+ @task
59
+ def find_unused_wells():
60
+ dbclient = MongoClient(connection_string)
61
+ db = dbclient["LCM-OT-2-SLD"]
62
+ collection = db["wells"]
63
+ query = {"status": "empty"}
64
+ response = list(collection.find(query))
65
+ df = pd.DataFrame(response)
66
+
67
+ # Extract the "well" column as a list
68
+ if "well" not in df.columns:
69
+ raise ValueError("The returned data does not contain the 'well' field.")
70
+ # Sort obtained list
71
+
72
+ def well_sort_key(well):
73
+ row = well[0] # A, B, C, ...
74
+ col = int(well[1:]) # 1, 2, ..., 12
75
+ return (row, col)
76
+
77
+ empty_wells = sorted(df["well"].tolist(), key=well_sort_key)
78
+ #print(empty_wells)
79
+
80
+ # close connection
81
+ dbclient.close()
82
+
83
+ # Check if there are any empty wells
84
+ if len(empty_wells) == 0:
85
+ raise ValueError("No empty wells found")
86
+ #print(empty_wells)
87
+ return empty_wells
88
+
89
+
90
+
91
+ if __name__ == "__main__":
92
+ generate_empty_well()
93
+ #find_unused_wells()
94
+
95
+