import paho.mqtt.client as mqtt import os import gradio as gr import json import time import threading """ This Gradio app is used to request data from a 3D printer using MQTT. Author: Enrui (Edison) Lin """ # Environment Variables HOST = os.environ.get("host") PORT = int(os.environ.get("port")) USERNAME = os.environ.get("username") PASSWORD = os.environ.get("password") # Global variables to store received data latest_data = { "bed_temperature": "N/A", "nozzle_temperature": "N/A", "status": "N/A", "update_time": "Waiting for data...", } # MQTT Client setup client = None response_topic = None # Will be set dynamically def create_client(host, port, username, password): global client client = mqtt.Client() client.username_pw_set(username, password) client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) client.on_connect = on_connect client.on_message = on_message client.connect(host, port) client.loop_start() def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") def on_message(client, userdata, message): global latest_data print("Received message") try: data = json.loads(message.payload) latest_data["bed_temperature"] = data.get("bed_temperature", "N/A") latest_data["nozzle_temperature"] = data.get("nozzle_temperature", "N/A") latest_data["status"] = data.get("status", "N/A") latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) except Exception as e: print(f"Error parsing MQTT message: {e}") def get_data(serial): """Request data from the MQTT broker.""" global client, response_topic if client is None: create_client(HOST, PORT, USERNAME, PASSWORD) request_topic = f"bambu_a1_mini/request/{serial}" response_topic = f"bambu_a1_mini/response/{serial}" print(f"Subscribing to {response_topic}") client.subscribe(response_topic) # Send a request to get data client.publish(request_topic, json.dumps("HI")) global latest_data latest_data["bed_temperature"] = "N/A" timeout = 10 while latest_data["bed_temperature"] == "N/A" and timeout > 0: time.sleep(1) timeout -= 1 return latest_data["status"], latest_data["bed_temperature"], latest_data["nozzle_temperature"], latest_data["update_time"] # Gradio UI with gr.Blocks() as blocks: serial = gr.Textbox("0309CA471800852", label="Serial Number") send_button = gr.Button(value="Request Data") status_text = gr.Textbox(label="Status", interactive=False) bed_temp_text = gr.Textbox(label="Bed Temperature", interactive=False) nozzle_temp_text = gr.Textbox(label="Nozzle Temperature", interactive=False) update_time_text = gr.Textbox(label="Last Update", interactive=False) send_button.click(fn=get_data, inputs=[serial], outputs=[status_text, bed_temp_text, nozzle_temp_text, update_time_text]) blocks.launch()