import threading from collections import deque import gradio as gr import paho.mqtt.client as mqtt import json import pandas as pd from datetime import datetime import plotly.express as px # Data buffer to store received weight data data_buffer = deque(maxlen=100) # MQTT client and configuration variables client = mqtt.Client() MQTT_BROKER = "" MQTT_PORT = 8883 MQTT_TOPIC = "FX-120i/e6613008e3659f2f" MQTT_USERNAME = "" MQTT_PASSWORD = "" # Global variables to track connection and data status connection_status = "❌ Not connected" data_status = "⌛ Waiting for data..." # Mutex for thread-safe status updates status_lock = threading.Lock() def get_status(): """ Returns the current status of the MQTT connection and data reception. """ with status_lock: return f"**MQTT Connection:** {connection_status}\n**Data Status:** {data_status}" def update_data_status(new_status): """ Safely updates the data reception status. """ global data_status with status_lock: data_status = new_status def on_connect(client, userdata, flags, rc): """ Callback for MQTT connection events. """ global connection_status if rc == 0: client.subscribe(MQTT_TOPIC) connection_status = "✅ Connected" else: connection_status = f"❌ Connection failed (code {rc})" def on_message(client, userdata, msg): """ Callback for processing incoming MQTT messages. """ global data_status try: payload = json.loads(msg.payload.decode()) weight = float(payload["Current Weight"].replace('g', '').strip().replace('+', '')) timestamp = f"{payload['Date']} {payload['Time']}" dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") data_buffer.append({'timestamp': dt, 'weight': weight}) update_data_status("✅ Receiving data") # Reset the inactivity timer reset_inactivity_timer() except Exception as e: print(f"Error processing message: {e}") def connect_mqtt(hivemq_host, username, password): """ Connects to the MQTT broker with the provided credentials. """ global MQTT_BROKER, MQTT_USERNAME, MQTT_PASSWORD, connection_status try: MQTT_BROKER = hivemq_host MQTT_USERNAME = username MQTT_PASSWORD = password client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) if not client._ssl: client.tls_set() client.on_connect = on_connect client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) client.loop_start() except Exception as e: connection_status = f"❌ Connection error: {str(e)}" return f"Connection error: {str(e)}" # Inactivity timer inactivity_timer = None def reset_inactivity_timer(): """ Resets the inactivity timer to monitor data reception. """ global inactivity_timer if inactivity_timer is not None: inactivity_timer.cancel() inactivity_timer = threading.Timer(5.0, on_inactivity) inactivity_timer.start() def on_inactivity(): """ Callback for when no data is received within the timeout period. """ update_data_status("⌛ Waiting for data...") def update_graph(): """ Updates the Plotly graph with the latest data from the data buffer. """ if not data_buffer: fig = px.line(title="Waiting for Data...") else: df = pd.DataFrame(list(data_buffer)) fig = px.line(df, x='timestamp', y='weight', title="Weight Over Time") fig.update_layout( xaxis=dict( type="date", tickformat="%H:%M:%S", title="Time" ), yaxis=dict(title="Weight (g)") ) return fig # Gradio application interface with gr.Blocks() as app: gr.Markdown("# Weight Monitor") gr.Markdown("## Connect to MQTT") # Input fields for HiveMQ connection with gr.Row(): hivemq_host = gr.Textbox( label="HiveMQ Host:", value="248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud" ) mqtt_username = gr.Textbox( label="HiveMQ Username:", value="sgbaird" ) mqtt_password = gr.Textbox( label="HiveMQ Password:", value="D.Pq5gYtejYbU#L", type="password" ) # Connect button connect_button = gr.Button("Connect") # Status indicator status_md = gr.Markdown( get_status, every = 10 # Update status every 10 seconds ) # Graph display with auto-refresh plot = gr.Plot( update_graph, every = 2.5 # Updates every 2.5 seconds ) # Connect button action connect_button.click( fn=connect_mqtt, inputs=[hivemq_host, mqtt_username, mqtt_password], outputs=gr.Markdown() # Show connection result ) if __name__ == "__main__": app.launch()