Spaces:
Sleeping
Sleeping
| import serial | |
| import serial.tools.list_ports | |
| import time | |
| import streamlit as st | |
| import threading | |
| import pickle | |
| from sklearn.tree import DecisionTreeClassifier | |
| from sklearn.preprocessing import LabelEncoder | |
| import pandas as pd | |
| # Global variables to store bus information and hardware statuses | |
| bus_info = { | |
| 'bus_name': 'N/A', | |
| 'seat_status': 'N/A', | |
| 'distance': 'N/A', | |
| 'led_status': 'OFF', | |
| 'buzzer_status': 'OFF', | |
| } | |
| # Function to collect available serial ports data | |
| def collect_serial_ports(): | |
| ports = serial.tools.list_ports.comports() | |
| port_data = [] | |
| for port in ports: | |
| data = { | |
| 'port': port.device, | |
| 'description': port.description, | |
| 'manufacturer': port.manufacturer if port.manufacturer else 'Unknown' | |
| } | |
| port_data.append(data) | |
| return port_data | |
| # Function to preprocess and label the data for model training | |
| def preprocess_data(port_data): | |
| df = pd.DataFrame(port_data) | |
| # Encode the 'description' and 'manufacturer' fields to numeric | |
| le = LabelEncoder() | |
| df['description'] = le.fit_transform(df['description']) | |
| df['manufacturer'] = le.fit_transform(df['manufacturer']) | |
| return df, le | |
| # Train a simple decision tree classifier (this is just an example) | |
| def train_model(): | |
| # Example: manually labeled data where '1' represents Arduino and '0' is other devices | |
| port_data = collect_serial_ports() | |
| df, le = preprocess_data(port_data) | |
| df['label'] = [1, 0] # 1 = Arduino, 0 = Other device (Replace with actual labeled data) | |
| # Train the classifier | |
| X = df.drop(columns=['port', 'label']) | |
| y = df['label'] | |
| clf = DecisionTreeClassifier() | |
| clf.fit(X, y) | |
| # Save the model and label encoder | |
| with open('serial_port_model.pkl', 'wb') as model_file: | |
| pickle.dump(clf, model_file) | |
| with open('label_encoder.pkl', 'wb') as le_file: | |
| pickle.dump(le, le_file) | |
| # Function to predict the serial port based on the AI model | |
| def predict_serial_port(port_data): | |
| # Load the trained model and label encoder | |
| with open('serial_port_model.pkl', 'rb') as model_file: | |
| clf = pickle.load(model_file) | |
| with open('label_encoder.pkl', 'rb') as le_file: | |
| le = pickle.load(le_file) | |
| # Preprocess the data | |
| df, _ = preprocess_data(port_data) | |
| # Predict the label for the serial ports (Arduino or not) | |
| prediction = clf.predict(df.drop(columns=['port', 'label'])) | |
| # Return the port if it's predicted to be an Arduino (label = 1) | |
| for i, pred in enumerate(prediction): | |
| if pred == 1: # If prediction is 1, it's the Arduino | |
| return port_data[i]['port'] | |
| return None # No Arduino found | |
| # Function to initialize the serial connection with the correct port | |
| def initialize_serial_connection(): | |
| port_data = collect_serial_ports() # Get available ports data | |
| com_port = predict_serial_port(port_data) # Get the correct port from the AI model | |
| if com_port: | |
| try: | |
| ser = serial.Serial(com_port, 9600) # Open serial port | |
| time.sleep(2) # Wait for the connection to initialize | |
| print(f"Connected to {com_port}") | |
| return ser | |
| except serial.SerialException as e: | |
| st.error(f"Error opening serial port: {e}") | |
| return None | |
| else: | |
| st.error("No Arduino device found.") | |
| return None | |
| # Initialize serial connection | |
| ser = initialize_serial_connection() | |
| # Function to update bus information based on received serial data | |
| def display_bus_info(): | |
| global bus_info | |
| try: | |
| while True: | |
| if ser.in_waiting > 0: | |
| # Read data from serial port | |
| data = ser.readline().decode('utf-8', errors='ignore').strip() | |
| # Debug print to see the raw serial data | |
| print(f"Raw Data: {data}") | |
| if "Bus Name:" in data: | |
| bus_info['bus_name'] = data.split("Bus Name:")[1].strip() | |
| elif "Seat Status:" in data: | |
| bus_info['seat_status'] = data.split("Seat Status:")[1].strip() | |
| elif "Distance:" in data: | |
| bus_info['distance'] = data.split("Distance:")[1].strip() | |
| elif "LED:" in data: | |
| bus_info['led_status'] = data.split("LED:")[1].strip() | |
| elif "Buzzer:" in data: | |
| bus_info['buzzer_status'] = data.split("Buzzer:")[1].strip() | |
| time.sleep(1) # Small delay to prevent flooding the terminal | |
| except KeyboardInterrupt: | |
| print("Exiting...") | |
| # Start the bus information display in a separate thread | |
| bus_info_thread = threading.Thread(target=display_bus_info) | |
| bus_info_thread.daemon = True | |
| bus_info_thread.start() | |
| # Streamlit Web Interface | |
| st.title('Bus System Status') | |
| # Display the bus information | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader('Bus Name') | |
| bus_name = st.empty() | |
| with col2: | |
| st.subheader('Seat Status') | |
| seat_status = st.empty() | |
| st.subheader('Distance to Bus') | |
| distance_display = st.empty() | |
| # Display LED and Buzzer status | |
| st.subheader('LED Status') | |
| led_status_display = st.empty() | |
| st.subheader('Buzzer Status') | |
| buzzer_status_display = st.empty() | |
| # Update the information on the web interface every second | |
| while True: | |
| bus_name.text(bus_info['bus_name']) | |
| seat_status.text(bus_info['seat_status']) | |
| distance_display.text(f"Distance: {bus_info['distance']} cm") | |
| led_status_display.text(f"LED: {bus_info['led_status']}") | |
| buzzer_status_display.text(f"Buzzer: {bus_info['buzzer_status']}") | |
| time.sleep(1) | |