Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from queue import Queue | |
| import threading | |
| import time | |
| import paho.mqtt.client as mqtt | |
| import json | |
| import secrets | |
| from DB_utils import ( | |
| find_unused_wells, | |
| update_used_wells, | |
| save_result, | |
| get_student_quota, | |
| decrement_student_quota, | |
| ) | |
| import os | |
| # NOTE: New global dict to store tasks keyed by (student_id, experiment_id) | |
| tasks_dict = {} | |
| task_queue = Queue() | |
| result_queue = Queue() | |
| current_task = None | |
| sensor_results = None | |
| queue_counter = task_queue.qsize() | |
| MQTT_BROKER = os.getenv("MQTT_BROKER") | |
| MQTT_PORT = int(os.getenv("MQTT_PORT")) | |
| MQTT_USERNAME = os.getenv("MQTT_USERNAME") | |
| MQTT_PASSWORD = os.getenv("MQTT_PASSWORD") | |
| OT2_SERIAL = "OT2CEP20240218R04" | |
| PICO_ID = "e66130100f895134" | |
| OT2_COMMAND_TOPIC = f"command/ot2/{OT2_SERIAL}/pipette" | |
| OT2_STATUS_TOPIC = f"status/ot2/{OT2_SERIAL}/complete" | |
| SENSOR_COMMAND_TOPIC = f"command/picow/{PICO_ID}/as7341/read" | |
| SENSOR_DATA_TOPIC = f"color-mixing/picow/{PICO_ID}/as7341" | |
| def check_student_quota(student_id): | |
| """Check student's remaining experiment quota""" | |
| student_quota = get_student_quota(student_id) | |
| return student_quota | |
| def validate_ryb_input(R, Y, B): | |
| """Validate RYB input volumes""" | |
| total = R + Y + B | |
| if total > 300: | |
| return { | |
| "is_valid": False, | |
| "message": f"Total volume cannot exceed 300 µL. Current total: {total} µL.", | |
| } | |
| return {"is_valid": True, "message": f"Current total: {total} µL."} | |
| mqtt_client = mqtt.Client() | |
| mqtt_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) | |
| mqtt_client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) | |
| def on_connect(client, userdata, flags, rc): | |
| print(f"Connected to MQTT Broker with result code {rc}") | |
| client.subscribe([(OT2_STATUS_TOPIC, 2), (SENSOR_DATA_TOPIC, 2)]) | |
| def on_message(client, userdata, msg): | |
| global current_task, sensor_results | |
| try: | |
| payload = json.loads(msg.payload.decode("utf-8")) | |
| if msg.topic == OT2_STATUS_TOPIC: | |
| handle_sensor_status(payload) | |
| elif msg.topic == SENSOR_DATA_TOPIC: | |
| print("Sensor data received") | |
| sensor_results = payload | |
| mqtt_client.publish( | |
| OT2_COMMAND_TOPIC, | |
| json.dumps( | |
| { | |
| "command": {"sensor_status": "read"}, | |
| "experiment_id": payload["experiment_id"], | |
| "session_id": payload["session_id"], | |
| } | |
| ), | |
| ) | |
| except Exception as e: | |
| print(f"Error processing MQTT message: {e}") | |
| mqtt_client.on_connect = on_connect | |
| mqtt_client.on_message = on_message | |
| mqtt_client.connect(MQTT_BROKER, MQTT_PORT) | |
| mqtt_client.loop_start() | |
| def handle_sensor_status(payload): | |
| global current_task, sensor_results | |
| if "in_place" in json.dumps(payload): | |
| mqtt_client.publish( | |
| SENSOR_COMMAND_TOPIC, | |
| json.dumps( | |
| { | |
| "command": { | |
| "R": current_task["R"], | |
| "Y": current_task["Y"], | |
| "B": current_task["B"], | |
| "well": current_task["well"], | |
| }, | |
| "experiment_id": current_task["experiment_id"], | |
| "session_id": current_task["session_id"], | |
| } | |
| ), | |
| ) | |
| elif payload["status"]["sensor_status"] == "charging": | |
| experiment_result = { | |
| "Status": "Complete", | |
| "Message": "Experiment completed successfully!", | |
| "Student ID": current_task["session_id"], | |
| "Command": { | |
| "R": current_task["R"], | |
| "Y": current_task["Y"], | |
| "B": current_task["B"], | |
| "well": current_task["well"], | |
| }, | |
| "Sensor Data": sensor_results["sensor_data"], | |
| "Experiment ID": current_task["experiment_id"], | |
| } | |
| # Store full result in result queue | |
| result_queue.put(experiment_result) | |
| # Create a version of experiment_result without "Status" and "Message" for database storage | |
| db_data = { | |
| key: experiment_result[key] | |
| for key in experiment_result | |
| if key not in ["Status", "Message"] | |
| } | |
| save_result(db_data) | |
| current_task = None | |
| def task_processor(): | |
| """ | |
| Background thread that processes tasks one by one. | |
| """ | |
| global current_task, queue_counter | |
| task_start_time = None | |
| TIMEOUT_SECONDS = 165 # 2min45s | |
| while True: | |
| if current_task: | |
| # Check for timeout | |
| if task_start_time and (time.time() - task_start_time > TIMEOUT_SECONDS): | |
| print("sending timeout message to OT-2") | |
| mqtt_client.publish( | |
| OT2_COMMAND_TOPIC, | |
| json.dumps( | |
| { | |
| "command": {"sensor_status": "sensor_timeout"}, | |
| "experiment_id": current_task["experiment_id"], | |
| "session_id": current_task["session_id"], | |
| } | |
| ), | |
| ) | |
| result_queue.put( | |
| { | |
| "Status": "Error", | |
| "Message": "Experiment timed out", | |
| "Student ID": current_task["session_id"], | |
| "Command": { | |
| "R": current_task["R"], | |
| "Y": current_task["Y"], | |
| "B": current_task["B"], | |
| "well": current_task["well"], | |
| }, | |
| "Experiment ID": current_task["experiment_id"], | |
| } | |
| ) | |
| current_task = None | |
| task_start_time = None | |
| continue | |
| if not current_task and not task_queue.empty(): | |
| # Fetch a new task from the queue | |
| student_id, experiment_id = ( | |
| task_queue.get() | |
| ) # NOTE: We'll store (student_id, experiment_id) instead of task | |
| queue_counter -= 1 | |
| task_start_time = time.time() | |
| # NOTE: We retrieve the actual task from tasks_dict | |
| current_task = tasks_dict[(student_id, experiment_id)] | |
| # Mark status as "processing" | |
| current_task["status"] = "processing" | |
| mqtt_client.publish( | |
| OT2_COMMAND_TOPIC, | |
| json.dumps( | |
| { | |
| "command": { | |
| "R": current_task["R"], | |
| "Y": current_task["Y"], | |
| "B": current_task["B"], | |
| "well": current_task["well"], | |
| }, | |
| "experiment_id": current_task["experiment_id"], | |
| "session_id": current_task["session_id"], | |
| } | |
| ), | |
| ) | |
| time.sleep(1) | |
| processor_thread = threading.Thread(target=task_processor, daemon=True) | |
| processor_thread.start() | |
| def verify_student_id(student_id): | |
| """Verify student ID and check quota""" | |
| global queue_counter | |
| if not student_id: | |
| return [ | |
| gr.update(interactive=False, value=0), | |
| gr.update(interactive=False, value=0), | |
| gr.update(interactive=False, value=0), | |
| "Please enter a Student ID", | |
| gr.update(interactive=False), | |
| ] | |
| quota_remaining = check_student_quota(student_id) | |
| if quota_remaining <= 0: | |
| return [ | |
| gr.update(interactive=False, value=0), | |
| gr.update(interactive=False, value=0), | |
| gr.update(interactive=False, value=0), | |
| "No experiments remaining. Please contact administrator.", | |
| gr.update(interactive=False), | |
| ] | |
| return [ | |
| gr.update(interactive=True, value=0), | |
| gr.update(interactive=True, value=0), | |
| gr.update(interactive=True, value=0), | |
| f"Student ID verified. Available experiments: {quota_remaining}\nCurrent queue length: {queue_counter} experiment(s)", | |
| gr.update(interactive=True), | |
| ] | |
| def update_status_with_queue(R, Y, B): | |
| """Check if RYB inputs are valid and return updated queue info""" | |
| global queue_counter | |
| validation_result = validate_ryb_input(R, Y, B) | |
| total = R + Y + B | |
| return [ | |
| f"{validation_result['message']}\nCurrent queue length: {queue_counter} experiment(s)", | |
| gr.update(interactive=(total <= 300)), | |
| ] | |
| def update_queue_display(): | |
| """Refresh queue info for the UI""" | |
| global current_task, queue_counter | |
| num_available_wells = len(find_unused_wells()) | |
| try: | |
| print(f"[DEBUG] Updating queue display - Counter: {queue_counter}") | |
| if current_task: | |
| status = f"""### Current Queue Status | |
| - Active experiment: Yes | |
| - Queue length: {queue_counter+1} experiment(s) | |
| - Available wells: {num_available_wells} wells | |
| - Expected wait time to obtain results > {(queue_counter+2)*2} mins """ | |
| else: | |
| status = f"""### Current Queue Status | |
| - Active experiment: No | |
| - Queue length: {queue_counter} experiment(s) | |
| - Available wells: {num_available_wells} wells | |
| - Expected wait time to obtain results: 2 mins """ | |
| return status | |
| except Exception as e: | |
| return f"Error getting queue status: {str(e)}" | |
| def add_to_queue(student_id, R, Y, B): | |
| global queue_counter | |
| if student_id == "debug": | |
| yield { | |
| "Status": "Error", | |
| "Message": "Debug ID cannot submit to real experiment queue. Please use your student id to submit experiment.", | |
| } | |
| return | |
| # Validate RYB inputs | |
| validation_result = validate_ryb_input(R, Y, B) | |
| if not validation_result["is_valid"]: | |
| yield {"Status": "Error", "Message": validation_result["message"]} | |
| return | |
| # Check quota | |
| quota_remaining = check_student_quota(student_id) | |
| if quota_remaining <= 0: | |
| yield {"Status": "Error", "Message": "No experiments remaining"} | |
| return | |
| # Select well | |
| experiment_id = secrets.token_hex(4) | |
| try: | |
| empty_wells = find_unused_wells() | |
| if not empty_wells: | |
| raise ValueError("No available wells") | |
| selected_well = empty_wells[0] | |
| except Exception as e: | |
| yield {"Status": "Error", "Message": str(e)} | |
| return | |
| # NOTE: Create the task and store it in tasks_dict | |
| task = { | |
| "R": R, | |
| "Y": Y, | |
| "B": B, | |
| "well": selected_well, | |
| "session_id": student_id, | |
| "experiment_id": experiment_id, | |
| "status": "queued", | |
| } | |
| tasks_dict[(student_id, experiment_id)] = task # Keep track globally | |
| # Put only (student_id, experiment_id) in the Queue | |
| task_queue.put((student_id, experiment_id)) | |
| queue_counter += 1 | |
| update_used_wells([selected_well]) | |
| decrement_student_quota(student_id) | |
| print(f"Task added: {task}") | |
| # First yield: "Queued" | |
| yield { | |
| "Status": "Queued", | |
| "Position": queue_counter, | |
| "Student ID": student_id, | |
| "Experiment ID": experiment_id, | |
| "Well": selected_well, | |
| "Volumes": {"R": R, "Y": Y, "B": B}, | |
| } | |
| # NOTE: Wait until the task's status becomes 'processing' | |
| # This ensures we only yield "Running" when the backend actually starts the job. | |
| while tasks_dict[(student_id, experiment_id)]["status"] == "queued": | |
| time.sleep(20) | |
| # Second yield: "Running" (happens only after status is 'processing') | |
| yield { | |
| "Status": "Running", | |
| "Student ID": student_id, | |
| "Experiment ID": experiment_id, | |
| "Well": selected_well, | |
| "Volumes": {"R": R, "Y": Y, "B": B}, | |
| } | |
| # Finally, wait for the result | |
| result = result_queue.get() | |
| yield result | |
| def debug_experiment(student_id, R, Y, B): | |
| if student_id != "debug": | |
| return {"Status": "Error", "Message": "Invalid debug request"} | |
| experiment_id = "debug-" + secrets.token_hex(4) | |
| yield { | |
| "Status": "Queued", | |
| "Position": "debug", | |
| "Student ID": student_id, | |
| "Experiment ID": experiment_id, | |
| "Well": "DEBUG-A1", | |
| "Volumes": {"R": R, "Y": Y, "B": B}, | |
| } | |
| time.sleep(1) | |
| yield { | |
| "Status": "Running", | |
| "Student ID": student_id, | |
| "Experiment ID": experiment_id, | |
| "Well": "DEBUG-A1", | |
| "Volumes": {"R": R, "Y": Y, "B": B}, | |
| } | |
| time.sleep(1) | |
| result_debug = { | |
| "Status": "Complete", | |
| "Message": "Debug mode - simulated result (no actual experiment performed)", | |
| "Student ID": student_id, | |
| "Command": {"R": R, "Y": Y, "B": B, "well": "DEBUG-A1"}, | |
| "Sensor Data": { | |
| "ch583": 2800, | |
| "ch670": 3000, | |
| "ch510": 1700, | |
| "ch410": 240, | |
| "ch620": 3900, | |
| "ch470": 1000, | |
| "ch550": 2400, | |
| "ch440": 900, | |
| }, | |
| "Experiment ID": experiment_id, | |
| } | |
| yield result_debug | |
| with gr.Blocks(title="OT-2 Liquid Color Matching Experiment Queue") as demo: | |
| gr.Markdown("## OT-2 Liquid Color Matching Experiment Queue") | |
| gr.Markdown( | |
| "Enter R, Y, and B volumes (in µL). Total volume must not exceed 300 µL.(a volume of exactly 300 µL is recommended)" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| with gr.Row(): | |
| student_id_input = gr.Textbox( | |
| label="Student ID", placeholder="Enter your unique ID" | |
| ) | |
| verify_id_btn = gr.Button("Verify ID") | |
| r_slider = gr.Slider( | |
| 0, 300, step=1, label="Red (R) Volume (µL)", interactive=False | |
| ) | |
| y_slider = gr.Slider( | |
| 0, 300, step=1, label="Yellow (Y) Volume (µL)", interactive=False | |
| ) | |
| b_slider = gr.Slider( | |
| 0, 300, step=1, label="Blue (B) Volume (µL)", interactive=False | |
| ) | |
| status_output = gr.Textbox(label="Status") | |
| submit_btn = gr.Button("Submit Experiment", interactive=False) | |
| result_output = gr.JSON(label="Experiment Status") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Queue Status") | |
| queue_status = gr.Markdown("Loading queue status...") | |
| update_status_btn = gr.Button("Refresh Queue Status") | |
| gr.Markdown("### YouTube Livestream") | |
| # src="https://www.youtube.com/embed/live_stream?channel=UCHBzCfYpGwoqygH9YNh9A6g" | |
| iframe_html = """ | |
| <div style="position: relative; width: 100%; padding-top: 56.25%;"> | |
| <iframe | |
| style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;" | |
| src="https://www.youtube.com/embed/aahvV0BZjIo" | |
| title="OT-2 Livestream" | |
| frameborder="0" | |
| allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" | |
| referrerpolicy="strict-origin-when-cross-origin" | |
| allowfullscreen> | |
| </iframe> | |
| </div> | |
| """ | |
| gr.HTML(iframe_html) | |
| verify_id_btn.click( | |
| verify_student_id, | |
| inputs=[student_id_input], | |
| outputs=[r_slider, y_slider, b_slider, status_output, submit_btn], | |
| api_name="verify_student_id", | |
| ) | |
| r_slider.change( | |
| update_status_with_queue, | |
| inputs=[r_slider, y_slider, b_slider], | |
| outputs=[status_output, submit_btn], | |
| ) | |
| y_slider.change( | |
| update_status_with_queue, | |
| inputs=[r_slider, y_slider, b_slider], | |
| outputs=[status_output, submit_btn], | |
| ) | |
| b_slider.change( | |
| update_status_with_queue, | |
| inputs=[r_slider, y_slider, b_slider], | |
| outputs=[status_output, submit_btn], | |
| ) | |
| # NOTE: concurrency_limit=3 is preserved; no changes here | |
| submit_btn.click( | |
| add_to_queue, | |
| inputs=[student_id_input, r_slider, y_slider, b_slider], | |
| outputs=result_output, | |
| api_name="submit", | |
| concurrency_limit=8, | |
| ).then(update_queue_display, None, queue_status) | |
| update_status_btn.click( | |
| update_queue_display, None, queue_status, api_name="update_queue_display" | |
| ) | |
| demo.load(update_queue_display, None, queue_status) | |
| debug_btn = gr.Button("Debug Submit", visible=False) | |
| debug_btn.click( | |
| debug_experiment, | |
| inputs=[student_id_input, r_slider, y_slider, b_slider], | |
| outputs=result_output, | |
| api_name="debug", | |
| ) | |
| demo.queue | |
| if __name__ == "__main__": | |
| demo.launch() | |