Bus_track / app.py
Muthuraja18's picture
Update app.py (#3)
9d5db1c verified
raw
history blame
5.64 kB
import serial
import serial.tools.list_ports
import time
import streamlit as st
import threading
import pickle
from sklearn.tree import DecisionTreeClassifier
from sklearn.preprocessing import LabelEncoder
import pandas as pd
# Global variables to store bus information and hardware statuses
bus_info = {
'bus_name': 'N/A',
'seat_status': 'N/A',
'distance': 'N/A',
'led_status': 'OFF',
'buzzer_status': 'OFF',
}
# Function to collect available serial ports data
def collect_serial_ports():
ports = serial.tools.list_ports.comports()
port_data = []
for port in ports:
data = {
'port': port.device,
'description': port.description,
'manufacturer': port.manufacturer if port.manufacturer else 'Unknown'
}
port_data.append(data)
return port_data
# Function to preprocess and label the data for model training
def preprocess_data(port_data):
df = pd.DataFrame(port_data)
# Encode the 'description' and 'manufacturer' fields to numeric
le = LabelEncoder()
df['description'] = le.fit_transform(df['description'])
df['manufacturer'] = le.fit_transform(df['manufacturer'])
return df, le
# Train a simple decision tree classifier (this is just an example)
def train_model():
# Example: manually labeled data where '1' represents Arduino and '0' is other devices
port_data = collect_serial_ports()
df, le = preprocess_data(port_data)
df['label'] = [1, 0] # 1 = Arduino, 0 = Other device (Replace with actual labeled data)
# Train the classifier
X = df.drop(columns=['port', 'label'])
y = df['label']
clf = DecisionTreeClassifier()
clf.fit(X, y)
# Save the model and label encoder
with open('serial_port_model.pkl', 'wb') as model_file:
pickle.dump(clf, model_file)
with open('label_encoder.pkl', 'wb') as le_file:
pickle.dump(le, le_file)
# Function to predict the serial port based on the AI model
def predict_serial_port(port_data):
# Load the trained model and label encoder
with open('serial_port_model.pkl', 'rb') as model_file:
clf = pickle.load(model_file)
with open('label_encoder.pkl', 'rb') as le_file:
le = pickle.load(le_file)
# Preprocess the data
df, _ = preprocess_data(port_data)
# Predict the label for the serial ports (Arduino or not)
prediction = clf.predict(df.drop(columns=['port', 'label']))
# Return the port if it's predicted to be an Arduino (label = 1)
for i, pred in enumerate(prediction):
if pred == 1: # If prediction is 1, it's the Arduino
return port_data[i]['port']
return None # No Arduino found
# Function to initialize the serial connection with the correct port
def initialize_serial_connection():
port_data = collect_serial_ports() # Get available ports data
com_port = predict_serial_port(port_data) # Get the correct port from the AI model
if com_port:
try:
ser = serial.Serial(com_port, 9600) # Open serial port
time.sleep(2) # Wait for the connection to initialize
print(f"Connected to {com_port}")
return ser
except serial.SerialException as e:
st.error(f"Error opening serial port: {e}")
return None
else:
st.error("No Arduino device found.")
return None
# Initialize serial connection
ser = initialize_serial_connection()
# Function to update bus information based on received serial data
def display_bus_info():
global bus_info
try:
while True:
if ser.in_waiting > 0:
# Read data from serial port
data = ser.readline().decode('utf-8', errors='ignore').strip()
# Debug print to see the raw serial data
print(f"Raw Data: {data}")
if "Bus Name:" in data:
bus_info['bus_name'] = data.split("Bus Name:")[1].strip()
elif "Seat Status:" in data:
bus_info['seat_status'] = data.split("Seat Status:")[1].strip()
elif "Distance:" in data:
bus_info['distance'] = data.split("Distance:")[1].strip()
elif "LED:" in data:
bus_info['led_status'] = data.split("LED:")[1].strip()
elif "Buzzer:" in data:
bus_info['buzzer_status'] = data.split("Buzzer:")[1].strip()
time.sleep(1) # Small delay to prevent flooding the terminal
except KeyboardInterrupt:
print("Exiting...")
# Start the bus information display in a separate thread
bus_info_thread = threading.Thread(target=display_bus_info)
bus_info_thread.daemon = True
bus_info_thread.start()
# Streamlit Web Interface
st.title('Bus System Status')
# Display the bus information
col1, col2 = st.columns(2)
with col1:
st.subheader('Bus Name')
bus_name = st.empty()
with col2:
st.subheader('Seat Status')
seat_status = st.empty()
st.subheader('Distance to Bus')
distance_display = st.empty()
# Display LED and Buzzer status
st.subheader('LED Status')
led_status_display = st.empty()
st.subheader('Buzzer Status')
buzzer_status_display = st.empty()
# Update the information on the web interface every second
while True:
bus_name.text(bus_info['bus_name'])
seat_status.text(bus_info['seat_status'])
distance_display.text(f"Distance: {bus_info['distance']} cm")
led_status_display.text(f"LED: {bus_info['led_status']}")
buzzer_status_display.text(f"Buzzer: {bus_info['buzzer_status']}")
time.sleep(1)