Spaces:
Build error
Build error
File size: 5,639 Bytes
81a456f 9d5db1c 81a456f 9d5db1c 81a456f 9d5db1c 98639b7 9d5db1c 98639b7 9d5db1c 98639b7 81a456f 98639b7 81a456f 9d5db1c 81a456f 9d5db1c 81a456f 9d5db1c 81a456f 9d5db1c 81a456f 9d5db1c 81a456f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 | import serial
import serial.tools.list_ports
import time
import streamlit as st
import threading
import pickle
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import LabelEncoder
import pandas as pd
# Global variables to store bus information and hardware statuses
bus_info = {
'bus_name': 'N/A',
'seat_status': 'N/A',
'distance': 'N/A',
'led_status': 'OFF',
'buzzer_status': 'OFF',
}
# Function to collect available serial ports data
def collect_serial_ports():
ports = serial.tools.list_ports.comports()
port_data = []
for port in ports:
data = {
'port': port.device,
'description': port.description,
'manufacturer': port.manufacturer if port.manufacturer else 'Unknown'
}
port_data.append(data)
return port_data
# Function to preprocess and label the data for model training
def preprocess_data(port_data):
df = pd.DataFrame(port_data)
# Encode the 'description' and 'manufacturer' fields to numeric
le = LabelEncoder()
df['description'] = le.fit_transform(df['description'])
df['manufacturer'] = le.fit_transform(df['manufacturer'])
return df, le
# Train a simple decision tree classifier (this is just an example)
def train_model():
# Example: manually labeled data where '1' represents Arduino and '0' is other devices
port_data = collect_serial_ports()
df, le = preprocess_data(port_data)
df['label'] = [1, 0] # 1 = Arduino, 0 = Other device (Replace with actual labeled data)
# Train the classifier
X = df.drop(columns=['port', 'label'])
y = df['label']
clf = DecisionTreeClassifier()
clf.fit(X, y)
# Save the model and label encoder
with open('serial_port_model.pkl', 'wb') as model_file:
pickle.dump(clf, model_file)
with open('label_encoder.pkl', 'wb') as le_file:
pickle.dump(le, le_file)
# Function to predict the serial port based on the AI model
def predict_serial_port(port_data):
# Load the trained model and label encoder
with open('serial_port_model.pkl', 'rb') as model_file:
clf = pickle.load(model_file)
with open('label_encoder.pkl', 'rb') as le_file:
le = pickle.load(le_file)
# Preprocess the data
df, _ = preprocess_data(port_data)
# Predict the label for the serial ports (Arduino or not)
prediction = clf.predict(df.drop(columns=['port', 'label']))
# Return the port if it's predicted to be an Arduino (label = 1)
for i, pred in enumerate(prediction):
if pred == 1: # If prediction is 1, it's the Arduino
return port_data[i]['port']
return None # No Arduino found
# Function to initialize the serial connection with the correct port
def initialize_serial_connection():
port_data = collect_serial_ports() # Get available ports data
com_port = predict_serial_port(port_data) # Get the correct port from the AI model
if com_port:
try:
ser = serial.Serial(com_port, 9600) # Open serial port
time.sleep(2) # Wait for the connection to initialize
print(f"Connected to {com_port}")
return ser
except serial.SerialException as e:
st.error(f"Error opening serial port: {e}")
return None
else:
st.error("No Arduino device found.")
return None
# Initialize serial connection
ser = initialize_serial_connection()
# Function to update bus information based on received serial data
def display_bus_info():
global bus_info
try:
while True:
if ser.in_waiting > 0:
# Read data from serial port
data = ser.readline().decode('utf-8', errors='ignore').strip()
# Debug print to see the raw serial data
print(f"Raw Data: {data}")
if "Bus Name:" in data:
bus_info['bus_name'] = data.split("Bus Name:")[1].strip()
elif "Seat Status:" in data:
bus_info['seat_status'] = data.split("Seat Status:")[1].strip()
elif "Distance:" in data:
bus_info['distance'] = data.split("Distance:")[1].strip()
elif "LED:" in data:
bus_info['led_status'] = data.split("LED:")[1].strip()
elif "Buzzer:" in data:
bus_info['buzzer_status'] = data.split("Buzzer:")[1].strip()
time.sleep(1) # Small delay to prevent flooding the terminal
except KeyboardInterrupt:
print("Exiting...")
# Start the bus information display in a separate thread
bus_info_thread = threading.Thread(target=display_bus_info)
bus_info_thread.daemon = True
bus_info_thread.start()
# Streamlit Web Interface
st.title('Bus System Status')
# Display the bus information
col1, col2 = st.columns(2)
with col1:
st.subheader('Bus Name')
bus_name = st.empty()
with col2:
st.subheader('Seat Status')
seat_status = st.empty()
st.subheader('Distance to Bus')
distance_display = st.empty()
# Display LED and Buzzer status
st.subheader('LED Status')
led_status_display = st.empty()
st.subheader('Buzzer Status')
buzzer_status_display = st.empty()
# Update the information on the web interface every second
while True:
bus_name.text(bus_info['bus_name'])
seat_status.text(bus_info['seat_status'])
distance_display.text(f"Distance: {bus_info['distance']} cm")
led_status_display.text(f"LED: {bus_info['led_status']}")
buzzer_status_display.text(f"Buzzer: {bus_info['buzzer_status']}")
time.sleep(1)
|