a1-mini-status / app.py
sgbaird's picture
revert
a21f3c3 verified
import paho.mqtt.client as mqtt
import os
import gradio as gr
import json
import time
import threading
"""
This Gradio app is used to request data from a 3D printer using MQTT.
Author: Enrui (Edison) Lin
"""
# Environment Variables
HOST = os.environ.get("host")
PORT = int(os.environ.get("port"))
USERNAME = os.environ.get("username")
PASSWORD = os.environ.get("password")
# Global variables to store received data
latest_data = {
"bed_temperature": "N/A",
"nozzle_temperature": "N/A",
"status": "N/A",
"update_time": "Waiting for data...",
}
# MQTT Client setup
client = None
response_topic = None # Will be set dynamically
def create_client(host, port, username, password):
global client
client = mqtt.Client()
client.username_pw_set(username, password)
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
client.on_connect = on_connect
client.on_message = on_message
client.connect(host, port)
client.loop_start()
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
def on_message(client, userdata, message):
global latest_data
print("Received message")
try:
data = json.loads(message.payload)
latest_data["bed_temperature"] = data.get("bed_temperature", "N/A")
latest_data["nozzle_temperature"] = data.get("nozzle_temperature", "N/A")
latest_data["status"] = data.get("status", "N/A")
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
except Exception as e:
print(f"Error parsing MQTT message: {e}")
def get_data(serial):
"""Request data from the MQTT broker."""
global client, response_topic
if client is None:
create_client(HOST, PORT, USERNAME, PASSWORD)
request_topic = f"bambu_a1_mini/request/{serial}"
response_topic = f"bambu_a1_mini/response/{serial}"
print(f"Subscribing to {response_topic}")
client.subscribe(response_topic)
# Send a request to get data
client.publish(request_topic, json.dumps("HI"))
global latest_data
latest_data["bed_temperature"] = "N/A"
timeout = 10
while latest_data["bed_temperature"] == "N/A" and timeout > 0:
time.sleep(1)
timeout -= 1
return latest_data["status"], latest_data["bed_temperature"], latest_data["nozzle_temperature"], latest_data["update_time"]
# Gradio UI
with gr.Blocks() as blocks:
serial = gr.Textbox("0309CA471800852", label="Serial Number")
send_button = gr.Button(value="Request Data")
status_text = gr.Textbox(label="Status", interactive=False)
bed_temp_text = gr.Textbox(label="Bed Temperature", interactive=False)
nozzle_temp_text = gr.Textbox(label="Nozzle Temperature", interactive=False)
update_time_text = gr.Textbox(label="Last Update", interactive=False)
send_button.click(fn=get_data, inputs=[serial], outputs=[status_text, bed_temp_text, nozzle_temp_text, update_time_text])
blocks.launch()