Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import paho.mqtt.client as mqtt | |
| import json | |
| import threading | |
| st.set_page_config(page_title="ESP32 GPIO Monitor", layout="wide") | |
| # Global GPIO state dictionary | |
| gpio_states = {} | |
| # MQTT settings | |
| MQTT_BROKER = "f9f4cc2f89f343b6860eac5a854b67ee.s1.eu.hivemq.cloud" | |
| MQTT_PORT = 8883 | |
| MQTT_TOPIC = "esp32/io_state" | |
| MQTT_USER = "user1" | |
| MQTT_PASS = "Test1234" | |
| # MQTT callback functions | |
| def on_connect(client, userdata, flags, rc): | |
| if rc == 0: | |
| print("Connected to MQTT Broker!") | |
| client.subscribe(MQTT_TOPIC) | |
| else: | |
| print(f"Failed to connect, return code {rc}") | |
| def on_message(client, userdata, msg): | |
| try: | |
| payload = msg.payload.decode() | |
| data = json.loads(payload) | |
| gpio_states.update(data) | |
| except Exception as e: | |
| print(f"Error processing message: {e}") | |
| def mqtt_thread(): | |
| client = mqtt.Client() | |
| client.username_pw_set(MQTT_USER, MQTT_PASS) | |
| client.tls_set() # Use default system CA certs | |
| client.on_connect = on_connect | |
| client.on_message = on_message | |
| client.connect(MQTT_BROKER, MQTT_PORT) | |
| client.loop_forever() | |
| # Start MQTT in background thread | |
| threading.Thread(target=mqtt_thread, daemon=True).start() | |
| # Streamlit UI | |
| st.title("ESP32 GPIO State Monitor") | |
| placeholder = st.empty() | |
| while True: | |
| with placeholder.container(): | |
| st.subheader("Current GPIO States") | |
| if gpio_states: | |
| for pin, state in sorted(gpio_states.items(), key=lambda x: int(x[0][4:])): | |
| st.write(f"🔌 {pin} → {'🟢 HIGH' if state else '🔴 LOW'}") | |
| else: | |
| st.info("Waiting for MQTT messages from ESP32...") | |
| st.sleep(1) | |