File size: 4,718 Bytes
477c54f 911cfbb 477c54f ebe5556 911cfbb ebe5556 911cfbb bdcfebb 911cfbb 42a2cdc a717a8d 42a2cdc 441e4aa 911cfbb 283213b 477c54f 911cfbb 3eea612 42a2cdc 911cfbb 42a2cdc 71f7aba 911cfbb 42a2cdc 911cfbb 42a2cdc 911cfbb 0ab1fe2 bdcfebb 911cfbb 28d8ebb 911cfbb ebe5556 42a2cdc 911cfbb 5df4eda d98b776 42a2cdc 911cfbb d98b776 911cfbb 42a2cdc bdcfebb 911cfbb bdcfebb 911cfbb eb09393 911cfbb a0f3c4e bdcfebb 911cfbb bdcfebb 911cfbb 477c54f 911cfbb eb09393 911cfbb 42a2cdc 911cfbb e0dfbb4 911cfbb 528c363 911cfbb 26dfcef 911cfbb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import gradio as gr
import paho.mqtt.client as mqtt
import json
import time
import os
import requests
import traceback
from PIL import Image
import io
import threading
print("Starting MQTT Print Control...")
# Environment variables
BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host")
BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883))
BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user")
BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass")
DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial")
RPI_HOST = os.environ.get("RPI_HOST", "default_host")
RPI_PORT = int(os.environ.get("RPI_PORT", 1883))
RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user")
RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass")
latest_data = {}
# Centralized MQTT client setup
def setup_mqtt_client(host, port, username, password, on_message):
client = mqtt.Client()
client.username_pw_set(username, password)
client.on_message = on_message
client.connect(host, port)
client.loop_start()
return client
# Single callback responsibility
def bambu_on_message(client, userdata, message):
global latest_data
latest_data.update(json.loads(message.payload))
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def rpi_on_message(client, userdata, message):
global latest_data
latest_data.update(json.loads(message.payload))
latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# Centralized error handling
def main_logic():
global bambu_client, rpi_client
try:
bambu_client = setup_mqtt_client(
BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD, bambu_on_message
)
rpi_client = setup_mqtt_client(
RPI_HOST, RPI_PORT, RPI_USERNAME, RPI_PASSWORD, rpi_on_message
)
bambu_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}")
rpi_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}")
except Exception:
print("MQTT setup failed:")
traceback.print_exc()
# Sending print parameters functionality
def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed):
try:
params = {
"nozzle_temp": nozzle_temp,
"bed_temp": bed_temp,
"print_speed": print_speed,
"fan_speed": fan_speed,
"command": "generate_gcode",
}
topic = f"bambu_a1_mini/request/{DEFAULT_SERIAL}"
rpi_client.publish(topic, json.dumps(params))
return "Parameters sent successfully."
except Exception:
print("Failed to send parameters:")
traceback.print_exc()
return "Failed to send parameters."
# Simple Gradio interface
def get_status():
return (
latest_data.get("status", "N/A"),
latest_data.get("bed_temperature", "N/A"),
latest_data.get("nozzle_temperature", "N/A"),
latest_data.get("update_time", "Waiting for data..."),
)
# Image capture functionality
def capture_image():
try:
img_url = latest_data.get("image_url", None)
if img_url and img_url != "N/A":
response = requests.get(img_url, timeout=10)
if response.status_code == 200:
return Image.open(io.BytesIO(response.content))
return None
except Exception:
print("Image capture failed:")
traceback.print_exc()
return None
with gr.Blocks(title="Simplified Printer Control") as demo:
status = gr.Textbox(label="Status")
bed_temp = gr.Textbox(label="Bed Temperature")
nozzle_temp = gr.Textbox(label="Nozzle Temperature")
update_time = gr.Textbox(label="Last Update")
image_display = gr.Image(label="Captured Image", type="pil")
nozzle_input = gr.Slider(
label="Nozzle Temperature", minimum=180, maximum=250, value=200
)
bed_input = gr.Slider(label="Bed Temperature", minimum=40, maximum=100, value=60)
speed_input = gr.Slider(label="Print Speed", minimum=20, maximum=150, value=60)
fan_input = gr.Slider(label="Fan Speed", minimum=0, maximum=100, value=100)
refresh = gr.Button("Refresh")
refresh.click(fn=get_status, outputs=[status, bed_temp, nozzle_temp, update_time])
capture_btn = gr.Button("Capture Image")
capture_btn.click(fn=capture_image, outputs=image_display)
send_params_btn = gr.Button("Send Parameters")
send_params_btn.click(
fn=send_print_parameters,
inputs=[nozzle_input, bed_input, speed_input, fan_input],
outputs=status,
)
print("App initializing...")
threading.Thread(target=main_logic, daemon=True).start()
demo.launch() |