|
|
import json |
|
|
import queue |
|
|
import threading |
|
|
|
|
|
import paho.mqtt.client as mqtt |
|
|
import streamlit as st |
|
|
|
|
|
|
|
|
st.title("RGB Command and Sensor Data Panel") |
|
|
|
|
|
|
|
|
HIVEMQ_HOST = st.text_input( |
|
|
"Enter your HiveMQ host:", |
|
|
"248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud", |
|
|
type="password", |
|
|
) |
|
|
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird") |
|
|
HIVEMQ_PASSWORD = st.text_input( |
|
|
"Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password" |
|
|
) |
|
|
PORT = st.number_input( |
|
|
"Enter the port number:", min_value=1, max_value=65535, value=8883 |
|
|
) |
|
|
|
|
|
|
|
|
pico_id = st.text_input("Enter your Pico ID:", "test", type="password") |
|
|
|
|
|
max_power = 0.3 |
|
|
max_value = round(max_power * 255) |
|
|
|
|
|
|
|
|
st.info( |
|
|
f"The maximum power has been reduced. The upper limit for RGB values is {max_value} instead of 255." |
|
|
) |
|
|
|
|
|
|
|
|
R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0) |
|
|
G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0) |
|
|
B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0) |
|
|
|
|
|
|
|
|
if "messages" not in st.session_state: |
|
|
st.session_state.messages = [] |
|
|
if "locked" not in st.session_state: |
|
|
st.session_state.locked = False |
|
|
|
|
|
|
|
|
sensor_data_queue = queue.Queue() |
|
|
|
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def get_paho_client( |
|
|
sensor_data_topic, hostname, username, password=None, port=8883, tls=True |
|
|
): |
|
|
|
|
|
client = mqtt.Client(protocol=mqtt.MQTTv5) |
|
|
|
|
|
def on_message(client, userdata, msg): |
|
|
sensor_data_queue.put(json.loads(msg.payload)) |
|
|
|
|
|
def on_connect(client, userdata, flags, rc, properties=None): |
|
|
if rc != 0: |
|
|
print("Connected with result code " + str(rc)) |
|
|
client.subscribe(sensor_data_topic, qos=1) |
|
|
|
|
|
client.on_connect = on_connect |
|
|
client.on_message = on_message |
|
|
|
|
|
if tls: |
|
|
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) |
|
|
client.username_pw_set(username, password) |
|
|
client.connect(hostname, port) |
|
|
client.loop_start() |
|
|
|
|
|
return client |
|
|
|
|
|
|
|
|
def send_and_receive(client, command_topic, msg, queue_timeout=60): |
|
|
client.publish(command_topic, json.dumps(msg), qos=2) |
|
|
|
|
|
while True: |
|
|
sensor_data = sensor_data_queue.get(True, queue_timeout) |
|
|
return sensor_data |
|
|
|
|
|
|
|
|
|
|
|
if st.button("Send RGB Command") and not st.session_state.locked: |
|
|
if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: |
|
|
st.error("Please enter all required fields.") |
|
|
else: |
|
|
st.session_state.locked = True |
|
|
command_topic = f"{pico_id}/neopixel" |
|
|
sensor_data_topic = f"{pico_id}/as7341" |
|
|
|
|
|
client = get_paho_client( |
|
|
sensor_data_topic, |
|
|
HIVEMQ_HOST, |
|
|
HIVEMQ_USERNAME, |
|
|
password=HIVEMQ_PASSWORD, |
|
|
port=int(PORT), |
|
|
tls=True, |
|
|
) |
|
|
|
|
|
command_msg = {"R": R, "G": G, "B": B} |
|
|
sensor_data = send_and_receive( |
|
|
client, command_topic, command_msg, queue_timeout=30 |
|
|
) |
|
|
|
|
|
st.session_state.locked = False |
|
|
st.success("Command sent successfully!") |
|
|
st.write("Sensor Data Received:", sensor_data) |
|
|
|
|
|
|
|
|
for message in st.session_state.messages: |
|
|
st.write(f"Received message: {message}") |
|
|
|