# import pandas as pd # import numpy as np # import gradio as gr # from tensorflow.keras.models import load_model # from preprocess import create_features, cylindrical_encoding # import pickle # import os # class CrowdPredictor: # def __init__(self): # # Load model, encoder, and scaler # self.model = load_model('lstm_model.h5') # self.encoder = pickle.load(open('binary_encoder.pkl', 'rb')) # self.scaler = pickle.load(open('min_max_scaler.pkl', 'rb')) # # Load initial data # self.data = pd.read_csv('synthetic_crowd_data.csv')[-60:] # self.prev_crowd_count = self.data['crowd_count'].iloc[-1] # self.data['crowd_count'] = self.data['crowd_count'].shift(1) # self.data['datetime'] = pd.to_datetime(self.data['datetime']) # self.data.dropna(inplace=True) # # Initialize current datetime # self.curr_datetime = self.data['datetime'].iloc[-1] + pd.Timedelta(minutes=1) # def update_data(self, input_df): # # Update the internal data attribute # self.data = pd.concat([self.data, input_df], ignore_index=True) # self.data = self.data[-60:] # Keep only the last 60 records # self.curr_datetime = self.data['datetime'].iloc[-1] + pd.Timedelta(minutes=1) # def predict_single(self, camera_location): # os.system('cls') # Clear console # # Prepare the input row with updated datetime and previous crowd count # datetime = self.curr_datetime # input_df = pd.DataFrame({ # 'datetime': [datetime], # 'camera_location': [camera_location], # 'crowd_count': [self.prev_crowd_count] # }) # # Update data with new input # self.update_data(input_df) # # Feature engineering and scaling # features = create_features(self.data) # df = cylindrical_encoding(features) # df = self.encoder.transform(df) # df[['dayofyear', 'dayofmonth', 'weekofyear']] = self.scaler.transform(df[['dayofyear', 'dayofmonth', 'weekofyear']]) # # Model prediction # X = np.expand_dims(df, axis=0).astype('float32') # prediction = self.model.predict(X) # self.prev_crowd_count = int(prediction[0][0]) # return self.prev_crowd_count # def predict_batch(self, batch_data): # os.system('cls') # df = pd.read_csv(batch_data.name) # predictions = [] # for index, row in df.iterrows(): # camera_location_input = row['camera_location'] # prediction = self.predict_single(camera_location_input) # predictions.append(prediction) # return predictions # # Instantiate the predictor # predictor = CrowdPredictor() # # Gradio interface setup # with gr.Blocks() as prediction_block: # gr.Label("Crowd Count Prediction") # with gr.Tab("Single Prediction"): # # camera_location_input = gr.Text(label="Camera Location (Category)") # camera_location_input = gr.Dropdown(choices=[f"Camera_{i}" for i in range(1, 101)], label="Camera Location (Category)"), # single_predict_btn = gr.Button("Predict") # single_result = gr.Number(label="Predicted Crowd Count") # single_predict_btn.click( # predictor.predict_single, # inputs=camera_location_input, # outputs=single_result # ) # with gr.Tab("Batch Prediction"): # batch_input = gr.File(label="Upload CSV") # batch_predict_btn = gr.Button("Predict") # output = gr.Textbox(label="Predicted Crowd Counts") # batch_predict_btn.click( # predictor.predict_batch, # inputs=batch_input, # outputs=output # ) # prediction_block.launch(share=True, debug=True) import pandas as pd import numpy as np import gradio as gr from tensorflow.keras.models import load_model from preprocess import create_features, cylindrical_encoding import pickle import os class CrowdPredictor: def __init__(self): # Load model, encoder, and scaler self.model = load_model('lstm_model.h5') self.encoder = pickle.load(open('binary_encoder.pkl', 'rb')) self.scaler = pickle.load(open('min_max_scaler.pkl', 'rb')) # Load initial data self.data = pd.read_csv('synthetic_crowd_data.csv')[-60:] self.prev_crowd_count = self.data['crowd_count'].iloc[-1] self.data['crowd_count'] = self.data['crowd_count'].shift(1) self.data['datetime'] = pd.to_datetime(self.data['datetime']) self.data.dropna(inplace=True) # Initialize current datetime self.curr_datetime = self.data['datetime'].iloc[-1] + pd.Timedelta(minutes=1) def update_data(self, input_df): # Update the internal data attribute self.data = pd.concat([self.data, input_df], ignore_index=True) self.data = self.data[-60:] # Keep only the last 60 records self.curr_datetime = self.data['datetime'].iloc[-1] + pd.Timedelta(minutes=1) def predict_single(self, camera_location): os.system('cls') # Clear console # Prepare the input row with updated datetime and previous crowd count datetime = self.curr_datetime input_df = pd.DataFrame({ 'datetime': [datetime], 'camera_location': [camera_location], 'crowd_count': [self.prev_crowd_count] }) # Update data with new input self.update_data(input_df) # Feature engineering and scaling features = create_features(self.data) df = cylindrical_encoding(features) df = self.encoder.transform(df) df[['dayofyear', 'dayofmonth', 'weekofyear']] = self.scaler.transform(df[['dayofyear', 'dayofmonth', 'weekofyear']]) # Model prediction X = np.expand_dims(df, axis=0).astype('float32') prediction = self.model.predict(X) self.prev_crowd_count = int(prediction[0][0]) return self.prev_crowd_count def predict_batch(self, batch_data): os.system('cls') df = pd.read_csv(batch_data.name) predictions = [] for index, row in df.iterrows(): camera_location_input = row['camera_location'] prediction = self.predict_single(camera_location_input) predictions.append(prediction) # Prepare DataFrame for LinePlot output plot_data = self.data[['datetime', 'crowd_count']] return ', '.join(map(str, predictions)), plot_data def predict_nxt_m_minutes(self, camera_location, m): os.system('cls') predictions = [] for _ in range(int(m)): # Ensure m is an integer prediction = self.predict_single(camera_location) predictions.append(prediction) # Prepare DataFrame for LinePlot output plot_data = self.data[['datetime', 'crowd_count']] return ', '.join(map(str, predictions)), plot_data # Instantiate the predictor predictor = CrowdPredictor() # Gradio interface setup with gr.Blocks() as prediction_block: gr.Label("Crowd Count Prediction") # Single Prediction Tab with gr.Tab("Single Prediction"): single_camera_location = gr.Dropdown(choices=[f"Camera_{i}" for i in range(1, 101)], label="Camera Location (Category)") single_predict_btn = gr.Button("Predict") single_result = gr.Number(label="Predicted Crowd Count") single_predict_btn.click( predictor.predict_single, inputs=single_camera_location, outputs=single_result ) # Batch Prediction Tab with gr.Tab("Batch Prediction"): batch_input = gr.File(label="Upload CSV") batch_predict_btn = gr.Button("Predict") batch_output = gr.Textbox(label="Predicted Crowd Counts") batch_plot = gr.LinePlot( predictor.data, x="datetime", y="crowd_count", title="Crowd Count History", x_title="Datetime", y_title="Crowd Count", label="Crowd Count History" ) batch_predict_btn.click( predictor.predict_batch, inputs=batch_input, outputs=[batch_output, batch_plot] ) # Predict Next M Minutes Tab with gr.Tab("Predict next M minutes"): m = gr.Number(label="Minutes to predict", minimum=1) next_camera_location = gr.Dropdown(choices=[f"Camera_{i}" for i in range(1, 101)], label="Camera Location (Category)") predict_next_btn = gr.Button("Predict") next_output = gr.Textbox(label="Predicted Crowd Counts") next_plot = gr.LinePlot( predictor.data, x="datetime", y="crowd_count", title="Crowd Count History", x_title="Datetime", y_title="Crowd Count", label="Crowd Count History" ) predict_next_btn.click( predictor.predict_nxt_m_minutes, inputs=[next_camera_location, m], outputs=[next_output, next_plot] ) # Launch the Gradio app prediction_block.launch(share=True, debug=True)