File size: 4,718 Bytes
477c54f
 
 
 
 
911cfbb
 
477c54f
ebe5556
911cfbb
ebe5556
911cfbb
bdcfebb
911cfbb
42a2cdc
 
 
 
a717a8d
 
42a2cdc
 
 
 
441e4aa
911cfbb
283213b
477c54f
911cfbb
 
 
 
 
 
 
 
3eea612
42a2cdc
911cfbb
42a2cdc
71f7aba
911cfbb
 
42a2cdc
 
911cfbb
42a2cdc
911cfbb
 
0ab1fe2
bdcfebb
911cfbb
 
 
 
 
 
28d8ebb
911cfbb
 
 
 
 
 
 
 
ebe5556
42a2cdc
911cfbb
5df4eda
d98b776
 
42a2cdc
 
 
 
911cfbb
d98b776
911cfbb
 
 
 
 
 
 
42a2cdc
bdcfebb
911cfbb
 
 
 
 
 
 
bdcfebb
 
 
911cfbb
 
 
 
 
 
eb09393
 
911cfbb
 
 
 
 
a0f3c4e
bdcfebb
911cfbb
 
 
 
 
 
bdcfebb
911cfbb
 
477c54f
911cfbb
 
 
eb09393
911cfbb
 
42a2cdc
911cfbb
 
e0dfbb4
911cfbb
528c363
911cfbb
 
 
26dfcef
 
911cfbb
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import gradio as gr
import paho.mqtt.client as mqtt
import json
import time
import os
import requests
import traceback
from PIL import Image
import io
import threading

print("Starting MQTT Print Control...")

# Environment variables
BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host")
BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883))
BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user")
BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass")
DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial")

RPI_HOST = os.environ.get("RPI_HOST", "default_host")
RPI_PORT = int(os.environ.get("RPI_PORT", 1883))
RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user")
RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass")

latest_data = {}


# Centralized MQTT client setup
def setup_mqtt_client(host, port, username, password, on_message):
    client = mqtt.Client()
    client.username_pw_set(username, password)
    client.on_message = on_message
    client.connect(host, port)
    client.loop_start()
    return client


# Single callback responsibility
def bambu_on_message(client, userdata, message):
    global latest_data
    latest_data.update(json.loads(message.payload))
    latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


def rpi_on_message(client, userdata, message):
    global latest_data
    latest_data.update(json.loads(message.payload))
    latest_data["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


# Centralized error handling
def main_logic():
    global bambu_client, rpi_client
    try:
        bambu_client = setup_mqtt_client(
            BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD, bambu_on_message
        )
        rpi_client = setup_mqtt_client(
            RPI_HOST, RPI_PORT, RPI_USERNAME, RPI_PASSWORD, rpi_on_message
        )
        bambu_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}")
        rpi_client.subscribe(f"bambu_a1_mini/response/{DEFAULT_SERIAL}")
    except Exception:
        print("MQTT setup failed:")
        traceback.print_exc()


# Sending print parameters functionality
def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed):
    try:
        params = {
            "nozzle_temp": nozzle_temp,
            "bed_temp": bed_temp,
            "print_speed": print_speed,
            "fan_speed": fan_speed,
            "command": "generate_gcode",
        }
        topic = f"bambu_a1_mini/request/{DEFAULT_SERIAL}"
        rpi_client.publish(topic, json.dumps(params))
        return "Parameters sent successfully."
    except Exception:
        print("Failed to send parameters:")
        traceback.print_exc()
        return "Failed to send parameters."


# Simple Gradio interface
def get_status():
    return (
        latest_data.get("status", "N/A"),
        latest_data.get("bed_temperature", "N/A"),
        latest_data.get("nozzle_temperature", "N/A"),
        latest_data.get("update_time", "Waiting for data..."),
    )


# Image capture functionality
def capture_image():
    try:
        img_url = latest_data.get("image_url", None)
        if img_url and img_url != "N/A":
            response = requests.get(img_url, timeout=10)
            if response.status_code == 200:
                return Image.open(io.BytesIO(response.content))
        return None
    except Exception:
        print("Image capture failed:")
        traceback.print_exc()
        return None


with gr.Blocks(title="Simplified Printer Control") as demo:
    status = gr.Textbox(label="Status")
    bed_temp = gr.Textbox(label="Bed Temperature")
    nozzle_temp = gr.Textbox(label="Nozzle Temperature")
    update_time = gr.Textbox(label="Last Update")
    image_display = gr.Image(label="Captured Image", type="pil")

    nozzle_input = gr.Slider(
        label="Nozzle Temperature", minimum=180, maximum=250, value=200
    )
    bed_input = gr.Slider(label="Bed Temperature", minimum=40, maximum=100, value=60)
    speed_input = gr.Slider(label="Print Speed", minimum=20, maximum=150, value=60)
    fan_input = gr.Slider(label="Fan Speed", minimum=0, maximum=100, value=100)

    refresh = gr.Button("Refresh")
    refresh.click(fn=get_status, outputs=[status, bed_temp, nozzle_temp, update_time])

    capture_btn = gr.Button("Capture Image")
    capture_btn.click(fn=capture_image, outputs=image_display)

    send_params_btn = gr.Button("Send Parameters")
    send_params_btn.click(
        fn=send_print_parameters,
        inputs=[nozzle_input, bed_input, speed_input, fan_input],
        outputs=status,
    )

print("App initializing...")
threading.Thread(target=main_logic, daemon=True).start()
demo.launch()