File size: 2,995 Bytes
c20c614 3bea791 c20c614 934a13c 2ba6349 c20c614 3bea791 c20c614 3bea791 c20c614 2ba6349 c20c614 0097dfb c20c614 2ba6349 c20c614 2ba6349 cf7533f 2ba6349 c422163 3bea791 2ba6349 c20c614 2ba6349 c20c614 2ba6349 c20c614 2ba6349 3bea791 2ba6349 3bea791 2ba6349 3bea791 b0f1fb9 3bea791 b0f1fb9 3bea791 31206fe 2ba6349 c20c614 4f18e25 2ba6349 c20c614 2ba6349 3bea791 2ba6349 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import paho.mqtt.client as mqtt
import os
import gradio as gr
import json
import time
import threading
"""
This Gradio app is used to request data from a 3D printer using MQTT.
Author: Enrui (Edison) Lin
"""
# Environment Variables
HOST = os.environ.get("host")
PORT = int(os.environ.get("port"))
USERNAME = os.environ.get("username")
PASSWORD = os.environ.get("password")
# Global variables to store received data
latest_data = {
"bed_temperature": "N/A",
"nozzle_temperature": "N/A",
"status": "N/A",
"update_time": "Waiting for data...",
}
# MQTT Client setup
client = None
response_topic = None # Will be set dynamically
def create_client(host, port, username, password):
global client
client = mqtt.Client()
client.username_pw_set(username, password)
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
client.on_connect = on_connect
client.on_message = on_message
client.connect(host, port)
client.loop_start()
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
def on_message(client, userdata, message):
global latest_data
print("Received message")
try:
data = json.loads(message.payload)
latest_data["bed_temperature"] = data.get("bed_temperature", "N/A")
latest_data["nozzle_temperature"] = data.get("nozzle_temperature", "N/A")
latest_data["status"] = data.get("status", "N/A")
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
except Exception as e:
print(f"Error parsing MQTT message: {e}")
def get_data(serial):
"""Request data from the MQTT broker."""
global client, response_topic
if client is None:
create_client(HOST, PORT, USERNAME, PASSWORD)
request_topic = f"bambu_a1_mini/request/{serial}"
response_topic = f"bambu_a1_mini/response/{serial}"
print(f"Subscribing to {response_topic}")
client.subscribe(response_topic)
# Send a request to get data
client.publish(request_topic, json.dumps("HI"))
global latest_data
latest_data["bed_temperature"] = "N/A"
timeout = 10
while latest_data["bed_temperature"] == "N/A" and timeout > 0:
time.sleep(1)
timeout -= 1
return latest_data["status"], latest_data["bed_temperature"], latest_data["nozzle_temperature"], latest_data["update_time"]
# Gradio UI
with gr.Blocks() as blocks:
serial = gr.Textbox("0309CA471800852", label="Serial Number")
send_button = gr.Button(value="Request Data")
status_text = gr.Textbox(label="Status", interactive=False)
bed_temp_text = gr.Textbox(label="Bed Temperature", interactive=False)
nozzle_temp_text = gr.Textbox(label="Nozzle Temperature", interactive=False)
update_time_text = gr.Textbox(label="Last Update", interactive=False)
send_button.click(fn=get_data, inputs=[serial], outputs=[status_text, bed_temp_text, nozzle_temp_text, update_time_text])
blocks.launch()
|