espDisplay / app.py
BilalAurangzeb's picture
Update app.py
7cdbfee verified
import streamlit as st
import paho.mqtt.client as mqtt
import json
import threading
st.set_page_config(page_title="ESP32 GPIO Monitor", layout="wide")
# Global GPIO state dictionary
gpio_states = {}
# MQTT settings
MQTT_BROKER = "f9f4cc2f89f343b6860eac5a854b67ee.s1.eu.hivemq.cloud"
MQTT_PORT = 8883
MQTT_TOPIC = "esp32/io_state"
MQTT_USER = "user1"
MQTT_PASS = "Test1234"
# MQTT callback functions
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(MQTT_TOPIC)
else:
print(f"Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
try:
payload = msg.payload.decode()
data = json.loads(payload)
gpio_states.update(data)
except Exception as e:
print(f"Error processing message: {e}")
def mqtt_thread():
client = mqtt.Client()
client.username_pw_set(MQTT_USER, MQTT_PASS)
client.tls_set() # Use default system CA certs
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, MQTT_PORT)
client.loop_forever()
# Start MQTT in background thread
threading.Thread(target=mqtt_thread, daemon=True).start()
# Streamlit UI
st.title("ESP32 GPIO State Monitor")
placeholder = st.empty()
while True:
with placeholder.container():
st.subheader("Current GPIO States")
if gpio_states:
for pin, state in sorted(gpio_states.items(), key=lambda x: int(x[0][4:])):
st.write(f"🔌 {pin}{'🟢 HIGH' if state else '🔴 LOW'}")
else:
st.info("Waiting for MQTT messages from ESP32...")
st.sleep(1)