import gradio as gr import paho.mqtt.client as mqtt import json import time import os import requests import traceback from PIL import Image import io import threading print("Starting MQTT Print Control...") # Environment variables BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host") BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883)) BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user") BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass") DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial") RPI_HOST = os.environ.get("RPI_HOST", "default_host") RPI_PORT = int(os.environ.get("RPI_PORT", 1883)) RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user") RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass") latest_data = {} # Centralized MQTT client setup def setup_mqtt_client(host, port, username, password, on_message): client = mqtt.Client() client.username_pw_set(username, password) client.on_message = on_message client.connect(host, port) client.loop_start() return client # Single callback responsibility def bambu_on_message(client, userdata, message): global latest_data latest_data.update(json.loads(message.payload)) latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) def rpi_on_message(client, userdata, message): global latest_data latest_data.update(json.loads(message.payload)) latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # Centralized error handling def main_logic(): global bambu_client, rpi_client try: bambu_client = setup_mqtt_client( BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD, bambu_on_message ) rpi_client = setup_mqtt_client( RPI_HOST, RPI_PORT, RPI_USERNAME, RPI_PASSWORD, rpi_on_message ) bambu_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}") rpi_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}") except Exception: print("MQTT setup failed:") traceback.print_exc() # Sending print parameters functionality def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed): try: params = { "nozzle_temp": nozzle_temp, "bed_temp": bed_temp, "print_speed": print_speed, "fan_speed": fan_speed, "command": "generate_gcode", } topic = f"bambu_a1_mini/request/{DEFAULT_SERIAL}" rpi_client.publish(topic, json.dumps(params)) return "Parameters sent successfully." except Exception: print("Failed to send parameters:") traceback.print_exc() return "Failed to send parameters." # Simple Gradio interface def get_status(): return ( latest_data.get("status", "N/A"), latest_data.get("bed_temperature", "N/A"), latest_data.get("nozzle_temperature", "N/A"), latest_data.get("update_time", "Waiting for data..."), ) # Image capture functionality def capture_image(): try: img_url = latest_data.get("image_url", None) if img_url and img_url != "N/A": response = requests.get(img_url, timeout=10) if response.status_code == 200: return Image.open(io.BytesIO(response.content)) return None except Exception: print("Image capture failed:") traceback.print_exc() return None with gr.Blocks(title="Simplified Printer Control") as demo: status = gr.Textbox(label="Status") bed_temp = gr.Textbox(label="Bed Temperature") nozzle_temp = gr.Textbox(label="Nozzle Temperature") update_time = gr.Textbox(label="Last Update") image_display = gr.Image(label="Captured Image", type="pil") nozzle_input = gr.Slider( label="Nozzle Temperature", minimum=180, maximum=250, value=200 ) bed_input = gr.Slider(label="Bed Temperature", minimum=40, maximum=100, value=60) speed_input = gr.Slider(label="Print Speed", minimum=20, maximum=150, value=60) fan_input = gr.Slider(label="Fan Speed", minimum=0, maximum=100, value=100) refresh = gr.Button("Refresh") refresh.click(fn=get_status, outputs=[status, bed_temp, nozzle_temp, update_time]) capture_btn = gr.Button("Capture Image") capture_btn.click(fn=capture_image, outputs=image_display) send_params_btn = gr.Button("Send Parameters") send_params_btn.click( fn=send_print_parameters, inputs=[nozzle_input, bed_input, speed_input, fan_input], outputs=status, ) print("App initializing...") threading.Thread(target=main_logic, daemon=True).start() demo.launch()