|
|
import paho.mqtt.client as mqtt |
|
|
import os |
|
|
import gradio as gr |
|
|
import json |
|
|
import time |
|
|
import threading |
|
|
|
|
|
""" |
|
|
This Gradio app is used to request data from a 3D printer using MQTT. |
|
|
|
|
|
Author: Enrui (Edison) Lin |
|
|
""" |
|
|
|
|
|
|
|
|
HOST = os.environ.get("host") |
|
|
PORT = int(os.environ.get("port")) |
|
|
USERNAME = os.environ.get("username") |
|
|
PASSWORD = os.environ.get("password") |
|
|
|
|
|
|
|
|
latest_data = { |
|
|
"bed_temperature": "N/A", |
|
|
"nozzle_temperature": "N/A", |
|
|
"status": "N/A", |
|
|
"update_time": "Waiting for data...", |
|
|
} |
|
|
|
|
|
|
|
|
client = None |
|
|
response_topic = None |
|
|
|
|
|
def create_client(host, port, username, password): |
|
|
global client |
|
|
client = mqtt.Client() |
|
|
client.username_pw_set(username, password) |
|
|
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) |
|
|
client.on_connect = on_connect |
|
|
client.on_message = on_message |
|
|
client.connect(host, port) |
|
|
client.loop_start() |
|
|
|
|
|
def on_connect(client, userdata, flags, rc): |
|
|
print(f"Connected with result code {rc}") |
|
|
|
|
|
def on_message(client, userdata, message): |
|
|
global latest_data |
|
|
print("Received message") |
|
|
try: |
|
|
data = json.loads(message.payload) |
|
|
latest_data["bed_temperature"] = data.get("bed_temperature", "N/A") |
|
|
latest_data["nozzle_temperature"] = data.get("nozzle_temperature", "N/A") |
|
|
latest_data["status"] = data.get("status", "N/A") |
|
|
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error parsing MQTT message: {e}") |
|
|
|
|
|
def get_data(serial): |
|
|
"""Request data from the MQTT broker.""" |
|
|
global client, response_topic |
|
|
|
|
|
if client is None: |
|
|
create_client(HOST, PORT, USERNAME, PASSWORD) |
|
|
|
|
|
request_topic = f"bambu_a1_mini/request/{serial}" |
|
|
response_topic = f"bambu_a1_mini/response/{serial}" |
|
|
|
|
|
print(f"Subscribing to {response_topic}") |
|
|
client.subscribe(response_topic) |
|
|
|
|
|
|
|
|
client.publish(request_topic, json.dumps("HI")) |
|
|
|
|
|
global latest_data |
|
|
latest_data["bed_temperature"] = "N/A" |
|
|
timeout = 10 |
|
|
while latest_data["bed_temperature"] == "N/A" and timeout > 0: |
|
|
time.sleep(1) |
|
|
timeout -= 1 |
|
|
|
|
|
return latest_data["status"], latest_data["bed_temperature"], latest_data["nozzle_temperature"], latest_data["update_time"] |
|
|
|
|
|
|
|
|
with gr.Blocks() as blocks: |
|
|
serial = gr.Textbox("0309CA471800852", label="Serial Number") |
|
|
send_button = gr.Button(value="Request Data") |
|
|
|
|
|
status_text = gr.Textbox(label="Status", interactive=False) |
|
|
bed_temp_text = gr.Textbox(label="Bed Temperature", interactive=False) |
|
|
nozzle_temp_text = gr.Textbox(label="Nozzle Temperature", interactive=False) |
|
|
update_time_text = gr.Textbox(label="Last Update", interactive=False) |
|
|
|
|
|
send_button.click(fn=get_data, inputs=[serial], outputs=[status_text, bed_temp_text, nozzle_temp_text, update_time_text]) |
|
|
|
|
|
blocks.launch() |
|
|
|