evelyn-nesher19's picture
Update MQTT credentials
459f9bc verified
import threading
from collections import deque
import gradio as gr
import paho.mqtt.client as mqtt
import json
import pandas as pd
from datetime import datetime
import plotly.express as px
# Data buffer to store received weight data
data_buffer = deque(maxlen=100)
# MQTT client and configuration variables
client = mqtt.Client()
MQTT_BROKER = ""
MQTT_PORT = 8883
MQTT_TOPIC = "FX-120i/e6613008e3659f2f"
MQTT_USERNAME = ""
MQTT_PASSWORD = ""
# Global variables to track connection and data status
connection_status = "❌ Not connected"
data_status = "⌛ Waiting for data..."
# Mutex for thread-safe status updates
status_lock = threading.Lock()
def get_status():
"""
Returns the current status of the MQTT connection and data reception.
"""
with status_lock:
return f"**MQTT Connection:** {connection_status}\n**Data Status:** {data_status}"
def update_data_status(new_status):
"""
Safely updates the data reception status.
"""
global data_status
with status_lock:
data_status = new_status
def on_connect(client, userdata, flags, rc):
"""
Callback for MQTT connection events.
"""
global connection_status
if rc == 0:
client.subscribe(MQTT_TOPIC)
connection_status = "✅ Connected"
else:
connection_status = f"❌ Connection failed (code {rc})"
def on_message(client, userdata, msg):
"""
Callback for processing incoming MQTT messages.
"""
global data_status
try:
payload = json.loads(msg.payload.decode())
weight = float(payload["Current Weight"].replace('g', '').strip().replace('+', ''))
timestamp = f"{payload['Date']} {payload['Time']}"
dt = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
data_buffer.append({'timestamp': dt, 'weight': weight})
update_data_status("✅ Receiving data")
# Reset the inactivity timer
reset_inactivity_timer()
except Exception as e:
print(f"Error processing message: {e}")
def connect_mqtt(hivemq_host, username, password):
"""
Connects to the MQTT broker with the provided credentials.
"""
global MQTT_BROKER, MQTT_USERNAME, MQTT_PASSWORD, connection_status
try:
MQTT_BROKER = hivemq_host
MQTT_USERNAME = username
MQTT_PASSWORD = password
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
if not client._ssl:
client.tls_set()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
except Exception as e:
connection_status = f"❌ Connection error: {str(e)}"
return f"Connection error: {str(e)}"
# Inactivity timer
inactivity_timer = None
def reset_inactivity_timer():
"""
Resets the inactivity timer to monitor data reception.
"""
global inactivity_timer
if inactivity_timer is not None:
inactivity_timer.cancel()
inactivity_timer = threading.Timer(5.0, on_inactivity)
inactivity_timer.start()
def on_inactivity():
"""
Callback for when no data is received within the timeout period.
"""
update_data_status("⌛ Waiting for data...")
def update_graph():
"""
Updates the Plotly graph with the latest data from the data buffer.
"""
if not data_buffer:
fig = px.line(title="Waiting for Data...")
else:
df = pd.DataFrame(list(data_buffer))
fig = px.line(df, x='timestamp', y='weight', title="Weight Over Time")
fig.update_layout(
xaxis=dict(
type="date",
tickformat="%H:%M:%S",
title="Time"
),
yaxis=dict(title="Weight (g)")
)
return fig
# Gradio application interface
with gr.Blocks() as app:
gr.Markdown("# Weight Monitor")
gr.Markdown("## Connect to MQTT")
# Input fields for HiveMQ connection
with gr.Row():
hivemq_host = gr.Textbox(
label="HiveMQ Host:",
value="248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"
)
mqtt_username = gr.Textbox(
label="HiveMQ Username:",
value="sgbaird"
)
mqtt_password = gr.Textbox(
label="HiveMQ Password:",
value="D.Pq5gYtejYbU#L",
type="password"
)
# Connect button
connect_button = gr.Button("Connect")
# Status indicator
status_md = gr.Markdown(
get_status,
every = 10 # Update status every 10 seconds
)
# Graph display with auto-refresh
plot = gr.Plot(
update_graph,
every = 2.5 # Updates every 2.5 seconds
)
# Connect button action
connect_button.click(
fn=connect_mqtt,
inputs=[hivemq_host, mqtt_username, mqtt_password],
outputs=gr.Markdown() # Show connection result
)
if __name__ == "__main__":
app.launch()