fan-control / app.py
sgbaird's picture
refactor: Improve code readability and performance in app.py
9f557a5
import json
import queue
import time
import paho.mqtt.client as mqtt
import streamlit as st
import matplotlib.pyplot as plt
from datetime import datetime, timezone
# Initialize Streamlit app
st.title("Fan Control Panel")
# Description and context
st.markdown(
"""
This application accesses a public test demo of a small computer fan (Canakit
RPi 5 fan) controlled by a Pico W microcontroller via an EMC2101 fan controller.
Send speed commands to the motor and visualize the RPM data.
For context, see the [fan control
code](https://github.com/AccelerationConsortium/ac-training-lab/tree/main/src/ac_training_lab/picow/fan-control)
[[permalink](https://github.com/AccelerationConsortium/ac-training-lab/tree/a8b9cad0dc8c162bdff20c0d0fdff2f45c5f0012/src/ac_training_lab/picow/fan-control)]
. You may also be interested in the Acceleration Consortium's ["Hello World"
microcourse](https://ac-microcourses.readthedocs.io/en/latest/courses/hello-world/index.html)
for self-driving labs.
"""
)
# MQTT Configuration
with st.form("mqtt_form"):
HIVEMQ_HOST = st.text_input(
"Enter your HiveMQ host:",
"248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud",
type="password",
)
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird")
HIVEMQ_PASSWORD = st.text_input(
"Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password"
)
PORT = st.number_input(
"Enter the port number:", min_value=1, max_value=65535, value=8883
)
PICO_ID = st.text_input("Enter your Pico ID:", "test-fan", type="password")
# Slider for fan speed
SPEED = st.slider(
"Select the Fan Speed (0-100%):", min_value=0, max_value=100, value=50
)
submit_button = st.form_submit_button(label="Send Speed Command")
# Topics
COMMAND_TOPIC = f"fan-control/picow/{PICO_ID}/speed"
SENSOR_DATA_TOPIC = f"fan-control/picow/{PICO_ID}/rpm"
# Queue for incoming sensor data
rpm_data_queue = queue.Queue()
# Initialize or load session state variables
if "rpm_data" not in st.session_state:
st.session_state.rpm_data = []
if "elapsed_times" not in st.session_state:
st.session_state.elapsed_times = []
if "initial_timestamp" not in st.session_state:
st.session_state.initial_timestamp = None
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def create_paho_client(tls=True):
client = mqtt.Client(protocol=mqtt.MQTTv5)
if tls:
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
return client
# Define setup separately since sensor_data is dynamic
def setup_paho_client(
client, sensor_data_topic, hostname, username, password=None, port=8883
):
def on_message(client, userdata, msg):
rpm_data_queue.put(json.loads(msg.payload))
def on_connect(client, userdata, flags, rc, properties=None):
if rc == 0:
print("Connected successfully")
client.subscribe(sensor_data_topic, qos=1)
else:
print(f"Connection failed with code {rc}")
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password)
client.connect(hostname, port)
client.loop_start() # Start non-blocking loop
return client
def send_command(client, command_topic, msg):
print("Sending command...")
result = client.publish(command_topic, json.dumps(msg), qos=2)
result.wait_for_publish() # Ensure the message is sent
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"Command sent: {msg} to topic {command_topic}")
else:
print(f"Failed to send command: {result.rc}")
# Function to plot RPM data
fig, ax = plt.subplots()
def plot_rpm_data(elapsed_times, rpms, placeholder):
ax.clear() # Clear the previous plot
# Filter data to only include the last 90 seconds
current_time = elapsed_times[-1]
filtered_times = [time for time in elapsed_times if current_time - time <= 90]
filtered_rpms = rpms[-len(filtered_times) :]
ax.plot(filtered_times, filtered_rpms, marker="o")
ax.set_xlabel("Time (s)")
ax.set_ylabel("RPM")
ax.set_title("Fan RPM Over Time")
ax.xaxis.set_major_locator(plt.MaxNLocator(10)) # Show max 10 ticks on x-axis
plt.xticks(rotation=45) # Rotate x-axis labels for better readability
placeholder.pyplot(fig)
# Publish button
if submit_button:
if not PICO_ID or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
st.error("Please enter all required fields.")
else:
command_msg = {"speed": SPEED}
st.info(f"Sending speed command {command_msg} to Pico ID {PICO_ID}...")
client = create_paho_client(tls=True)
client = setup_paho_client(
client,
SENSOR_DATA_TOPIC,
HIVEMQ_HOST,
HIVEMQ_USERNAME,
password=HIVEMQ_PASSWORD,
port=int(PORT),
)
send_command(client, COMMAND_TOPIC, command_msg)
# Create a placeholder for the plot
plot_placeholder = st.empty()
# Continuously fetch RPM data and update the plot
while True:
try:
data = rpm_data_queue.get(timeout=30)
rpm = data["rpm"]
current_timestamp = datetime.fromtimestamp(
data["utc_timestamp"], tz=timezone.utc
)
if st.session_state.initial_timestamp is None:
st.session_state.initial_timestamp = current_timestamp
elapsed_time = (
current_timestamp - st.session_state.initial_timestamp
).total_seconds()
st.session_state.rpm_data.append(rpm)
st.session_state.elapsed_times.append(elapsed_time)
plot_rpm_data(
st.session_state.elapsed_times,
st.session_state.rpm_data,
plot_placeholder,
)
except queue.Empty:
st.error("No sensor data received within the timeout period.")
break
# Stop the MQTT loop
client.loop_stop()