import os
import pickle
import numpy as np
import pandas as pd
import seaborn as sns
import streamlit as st
import onnxruntime as ort
import plotly.express as px
from scipy.stats import zscore
import matplotlib.pyplot as plt
from skl2onnx import convert_sklearn
from sklearn.feature_selection import RFE
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from skl2onnx.common.data_types import FloatTensorType
from streamlit_extras.metric_cards import style_metric_cards
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
st.title("Customer Category Prediction (Case: Turkey Customer)")
st.write("Prediction Customer in Turkey with Probability Using Ensemble Technique Based")
# Load CSS style
with open('src/static/styles.css') as f:
st.markdown(f"", unsafe_allow_html=True)
# Load Dataset
@st.cache_data
def load_data():
# URL GitHub
github_url = "https://raw.githubusercontent.com/fendy07/customer-prediction/refs/heads/main/data/customer_shopping_data.csv"
# Path lokal (fallback)
local_path = "src/data/customer_shopping_data.csv"
try:
retail = pd.read_csv(github_url)
print("✅ Data loaded from GitHub")
return retail
except Exception as e:
print(f"⚠️ Failed to load from GitHub: {e}. Loading from local path.")
# Load from local path
if os.path.exists(local_path):
retail = pd.read_csv(local_path)
print("✅ Data loaded from local path")
return retail
else:
raise Exception("❌ Data not found in GitHub or local!")
retail = load_data()
X = retail.loc[:, ['age', 'gender', 'price', 'payment_method', 'shopping_mall']]
y = retail[['category']]
# Encode categorical variables
le = LabelEncoder()
X['gender'] = le.fit_transform(X['gender'])
X['payment_method'] = le.fit_transform(X['payment_method'])
X['shopping_mall'] = le.fit_transform(X['shopping_mall'])
y_encoded = le.fit_transform(y)
# Splitting data
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=44)
# Preprocessing
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Outlier detection using Z-Score
z_scores = np.abs(zscore(X_train_scaled))
threshold = 5
outliers = np.where(z_scores > threshold)
X_train_clean = X_train_scaled[(z_scores < threshold).all(axis=1)]
y_train_clean = y_train[(z_scores < threshold).all(axis=1)]
#------------ MODEL TRAINING SECTION ---------
with st.expander("🔄 MODEL TRAINING & MANAGEMENT"):
st.subheader("Train or Load Model")
col1, col2 = st.columns(2)
with col1:
st.write("### Training Parameters")
n_estimators = st.slider("Number of Trees (n_estimators)",
min_value=50, max_value=500, value=300, step=50)
test_size = st.slider("Test Size",
min_value=0.1, max_value=0.4, value=0.2, step=0.05)
random_state = st.number_input("Random State",
min_value=0, max_value=100, value=44)
n_features = st.slider("Number of Features to Select (RFE)",
min_value=1, max_value=5, value=5)
train_button = st.button("🚀 TRAIN NEW MODEL", type="primary")
with col2:
st.write("### Model Management")
model_format = st.radio("Choose Model Format:",
["ONNX Model (.onnx)", "Pickle Model (.pkl)"])
load_option = st.radio("Choose Model Source:",
["Load Existing Model", "Use Newly Trained Model"])
if load_option == "Load Existing Model":
if model_format == "ONNX Model (.onnx)":
model_path = 'src/model/best_model_rf.onnx'
metadata_path = 'src/model/model_metadata.pkl'
if os.path.exists(model_path) and os.path.exists(metadata_path):
st.success("✅ ONNX model found!")
model_loaded = True
use_onnx = True
else:
st.error("❌ ONNX model not found. Please train a new model first.")
model_loaded = False
use_onnx = False
else:
model_path = 'src/model/best_model_rf.pkl'
if os.path.exists(model_path):
st.success("✅ Pickle model found!")
model_loaded = True
use_onnx = False
else:
st.error("❌ Pickle model not found. Please train a new model first.")
model_loaded = False
use_onnx = False
else:
model_loaded = False
use_onnx = False
# Initialize session state for model
if 'trained_model' not in st.session_state:
st.session_state.trained_model = None
st.session_state.trained_rfe = None
st.session_state.trained_scaler = None
st.session_state.trained_le = None
st.session_state.model_metrics = None
st.session_state.onnx_session = None
# Train new model
if train_button:
with st.spinner("Training model... Please wait..."):
# Re-split data with new test_size
X_train_new, X_test_new, y_train_new, y_test_new = train_test_split(
X, y_encoded, test_size=test_size, random_state=random_state
)
# Preprocessing
scaler_new = StandardScaler()
X_train_scaled_new = scaler_new.fit_transform(X_train_new)
X_test_scaled_new = scaler_new.transform(X_test_new)
# Outlier removal
z_scores_new = np.abs(zscore(X_train_scaled_new))
X_train_clean_new = X_train_scaled_new[(z_scores_new < threshold).all(axis=1)]
y_train_clean_new = y_train_new[(z_scores_new < threshold).all(axis=1)]
# Model training with RFE
classifier_new = RandomForestClassifier(n_estimators=n_estimators, random_state=random_state)
rfe_new = RFE(classifier_new, n_features_to_select=n_features)
X_train_rfe = rfe_new.fit_transform(X_train_clean_new, y_train_clean_new)
X_test_rfe = rfe_new.transform(X_test_scaled_new)
# Fit the model
classifier_new.fit(X_train_rfe, y_train_clean_new)
# Predictions
y_pred_new = classifier_new.predict(X_test_rfe)
# Calculate metrics
metrics = {
'accuracy': accuracy_score(y_test_new, y_pred_new),
'precision': precision_score(y_test_new, y_pred_new, average='weighted'),
'recall': recall_score(y_test_new, y_pred_new, average='weighted'),
'f1_score': f1_score(y_test_new, y_pred_new, average='weighted')
}
# Save to session state
st.session_state.trained_model = classifier_new
st.session_state.trained_rfe = rfe_new
st.session_state.trained_scaler = scaler_new
st.session_state.trained_le = le
st.session_state.model_metrics = metrics
st.session_state.X_test = X_test_rfe
st.session_state.y_test = y_test_new
st.session_state.y_pred = y_pred_new
# Save as Pickle
model_package = {
'classifier': classifier_new,
'rfe': rfe_new,
'scaler': scaler_new,
'label_encoder': le,
'metrics': metrics,
'n_features': n_features
}
with open('src/model/best_model_rf.pkl', 'wb') as f:
pickle.dump(model_package, f)
# Convert and Save as ONNX
try:
# Define initial type for ONNX conversion
initial_type = [('float_input', FloatTensorType([None, n_features]))]
# Convert model to ONNX
onnx_model = convert_sklearn(classifier_new, initial_types=initial_type,
target_opset=12)
# Save ONNX model
with open('src/model/best_model_rf.onnx', 'wb') as f:
f.write(onnx_model.SerializeToString())
# Save metadata (scaler, rfe, label_encoder) separately
metadata = {
'scaler': scaler_new,
'rfe': rfe_new,
'label_encoder': le,
'metrics': metrics,
'n_features': n_features,
'feature_names': ['age', 'gender', 'price', 'payment_method', 'shopping_mall']
}
with open('src/model/model_metadata.pkl', 'wb') as f:
pickle.dump(metadata, f)
st.success(f"✅ Model trained and saved successfully!")
st.success(f"📊 Accuracy: {metrics['accuracy']:.4f}")
st.success(f"💾 Saved as: Pickle (.pkl) and ONNX (.onnx)")
except Exception as e:
st.warning(f"⚠️ Model saved as Pickle only. ONNX conversion failed: {str(e)}")
st.balloons()
# Determine which model to use
if load_option == "Use Newly Trained Model" and st.session_state.trained_model is not None:
classifier = st.session_state.trained_model
rfe = st.session_state.trained_rfe
scaler = st.session_state.trained_scaler
le_model = st.session_state.trained_le
X_test_final = st.session_state.X_test
y_test_final = st.session_state.y_test
y_pred_final = st.session_state.y_pred
accuracy = st.session_state.model_metrics['accuracy']
precision = st.session_state.model_metrics['precision']
recall = st.session_state.model_metrics['recall']
f1 = st.session_state.model_metrics['f1_score']
onnx_session = None
st.info("🔵 Using newly trained model from this session")
elif model_loaded and use_onnx:
# Load ONNX Model
try:
onnx_session = ort.InferenceSession('src/model/best_model_rf.onnx')
# Load metadata
with open('src/model/model_metadata.pkl', 'rb') as f:
metadata = pickle.load(f)
scaler = metadata['scaler']
rfe = metadata['rfe']
le_model = metadata['label_encoder']
metrics = metadata.get('metrics', {})
# Apply transformations
X_train_rfe = rfe.fit_transform(X_train_clean, y_train_clean)
X_test_final = rfe.transform(X_test_scaled)
# Predict using ONNX
input_name = onnx_session.get_inputs()[0].name
label_name = onnx_session.get_outputs()[0].name
y_pred_final = onnx_session.run([label_name], {input_name: X_test_final.astype(np.float32)})[0]
y_test_final = y_test
# Calculate metrics
accuracy = metrics.get('accuracy', accuracy_score(y_test_final, y_pred_final))
precision = metrics.get('precision', precision_score(y_test_final, y_pred_final, average='weighted'))
recall = metrics.get('recall', recall_score(y_test_final, y_pred_final, average='weighted'))
f1 = metrics.get('f1_score', f1_score(y_test_final, y_pred_final, average='weighted'))
classifier = None # ONNX doesn't need sklearn classifier
st.info("🟢 Using ONNX model from file")
except Exception as e:
st.error(f"Failed to load ONNX model: {str(e)}")
st.warning("Falling back to default model...")
model_loaded = False
use_onnx = False
onnx_session = None
elif model_loaded and not use_onnx:
# Load Pickle Model
with open('src/model/best_model_rf.pkl', 'rb') as f:
model_data = pickle.load(f)
if isinstance(model_data, dict):
classifier = model_data['classifier']
rfe = model_data.get('rfe', None)
scaler = model_data.get('scaler', scaler)
le_model = model_data.get('label_encoder', le)
if rfe is None:
rfe = RFE(classifier, n_features_to_select=5)
# Apply transformations
X_train_rfe = rfe.fit_transform(X_train_clean, y_train_clean)
X_test_final = rfe.transform(X_test_scaled)
classifier.fit(X_train_rfe, y_train_clean)
y_pred_final = classifier.predict(X_test_final)
y_test_final = y_test
# Calculate metrics
accuracy = accuracy_score(y_test_final, y_pred_final)
precision = precision_score(y_test_final, y_pred_final, average='weighted')
recall = recall_score(y_test_final, y_pred_final, average='weighted')
f1 = f1_score(y_test_final, y_pred_final, average='weighted')
else:
classifier = model_data
le_model = le
if hasattr(classifier, 'named_steps') or hasattr(classifier, 'steps'):
y_pred_final = classifier.predict(X_test)
y_test_final = y_test
X_test_final = X_test_scaled
rfe = None
else:
rfe = RFE(classifier, n_features_to_select=5)
X_train_rfe = rfe.fit_transform(X_train_clean, y_train_clean)
X_test_final = rfe.transform(X_test_scaled)
classifier.fit(X_train_rfe, y_train_clean)
y_pred_final = classifier.predict(X_test_final)
y_test_final = y_test
accuracy = accuracy_score(y_test_final, y_pred_final)
precision = precision_score(y_test_final, y_pred_final, average='weighted')
recall = recall_score(y_test_final, y_pred_final, average='weighted')
f1 = f1_score(y_test_final, y_pred_final, average='weighted')
onnx_session = None
st.info("🟢 Using Pickle model from file")
else:
# Default: train on the fly
classifier = RandomForestClassifier(n_estimators=300, random_state=44)
rfe = RFE(classifier, n_features_to_select=5)
X_train_rfe = rfe.fit_transform(X_train_clean, y_train_clean)
X_test_final = rfe.transform(X_test_scaled)
classifier.fit(X_train_rfe, y_train_clean)
y_pred_final = classifier.predict(X_test_final)
y_test_final = y_test
le_model = le
accuracy = accuracy_score(y_test_final, y_pred_final)
precision = precision_score(y_test_final, y_pred_final, average='weighted')
recall = recall_score(y_test_final, y_pred_final, average='weighted')
f1 = f1_score(y_test_final, y_pred_final, average='weighted')
onnx_session = None
st.warning("⚠️ Using default model (trained on-the-fly)")
# Evaluation Metrics
with st.expander("📊 EVALUATION METRICS"):
col1, col2, col3, col4 = st.columns(4)
col1.metric("ACCURACY", value=f'{accuracy:.4f}', delta='Accuracy Score')
col2.metric("PRECISION", value=f'{precision:.4f}', delta='Precision Score With Weighted Average')
col3.metric("RECALL", value=f'{recall:.4f}', delta='Recall Score With Weighted Average')
col4.metric("F1 SCORE", value=f'{f1:.4f}', delta='F1 Score with Weighted Average')
style_metric_cards(background_color='#FFFFFF', border_left_color='#9900AD', border_color='#1F66BD', box_shadow='#F71938')
st.write(f"NOTES: Hasil evaluasi metriks yang diterapkan sangat baik dan sudah sesuai dengan hasil pelatihan model algoritma Random Forest.", unsafe_allow_html=True)
# Prediction Table
with st.expander("📋 PREDICTION TABLE"):
prediction_table = pd.DataFrame({
'age': X_test_final[:, 0].ravel(),
'gender': X_test_final[:, 1].ravel(),
'price': X_test_final[:, 2].ravel(),
'payment_method': X_test_final[:, 3].ravel(),
'shopping_mall': X_test_final[:, 4].ravel(),
'Category | Actual Y': y_test_final.ravel(),
'Y_Predicted': y_pred_final.ravel(),
'Accuracy': [accuracy] * len(y_test_final),
'Precision': [precision] * len(y_test_final),
'Recall': [recall] * len(y_test_final),
'F1 Score': [f1] * len(y_test_final)
})
st.dataframe(prediction_table, use_container_width=True)
st.write(f'NOTES: Pada bagian tabel prediksi ini menggunakan data yang telah diolah sebelumnya sehingga sangat berbeda dengan data asli.', unsafe_allow_html=True)
# Download Predicted Table in CSV
df_predict = prediction_table.to_csv(index=False).encode('utf-8')
st.download_button(label="📥 DOWNLOAD PREDICTED DATA",
data=df_predict,
key="download_predict.csv",
file_name='data_predict.csv')
# Confusion Matrix and Feature Importance
with st.expander("🔍 CONFUSION MATRIX & FEATURE IMPORTANCE"):
col1, col2 = st.columns(2)
with col1:
target_names = ['Books', 'Clothing', 'Cosmetics', 'Food & Beverage',
'Shoes', 'Souvenir', 'Technology', 'Toys']
cm = confusion_matrix(y_test_final, y_pred_final)
plt.figure(figsize=(15, 8))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d', xticklabels=target_names, yticklabels=target_names)
plt.title('Confusion Matrix Customer Category Prediction')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
st.pyplot(fig=plt, use_container_width=True)
# Ganti bagian Feature Importance (sekitar baris 390-410) dengan kode ini:
with col2:
# Feature Importance only available for sklearn models, not ONNX
if classifier is not None:
try:
# Check if classifier is a Pipeline
if hasattr(classifier, 'named_steps'):
# Try common pipeline step names
if 'randomforestclassifier' in classifier.named_steps:
actual_classifier = classifier.named_steps['randomforestclassifier']
elif 'classifier' in classifier.named_steps:
actual_classifier = classifier.named_steps['classifier']
elif 'model' in classifier.named_steps:
actual_classifier = classifier.named_steps['model']
else:
# Get the last step (usually the classifier)
actual_classifier = list(classifier.named_steps.values())[-1]
feature_importance = actual_classifier.feature_importances_
# Check if classifier has 'steps' attribute (another Pipeline format)
elif hasattr(classifier, 'steps'):
# Get the last step which is typically the classifier
actual_classifier = classifier.steps[-1][1]
feature_importance = actual_classifier.feature_importances_
# Direct classifier (not a pipeline)
elif hasattr(classifier, 'feature_importances_'):
feature_importance = classifier.feature_importances_
else:
raise AttributeError("No feature_importances_ found")
# Create feature importance plot
feature_names = ['age', 'gender', 'price', 'payment_method', 'shopping_mall']
importance_df = pd.DataFrame({
"Feature": feature_names,
"Importance": feature_importance
})
importance_df = importance_df.sort_values("Importance", ascending=True)
bar = px.bar(importance_df, x='Importance', y='Feature')
bar.update_layout(
title={
'text': 'Feature Importance Model Random Forest',
'xanchor': 'center',
'yanchor': 'top',
'x': 0.5,
'y': 0.95
}
)
st.plotly_chart(bar, use_container_width=True)
except (AttributeError, KeyError, IndexError) as e:
st.warning(f"⚠️ Feature importance is not available for this model type.\n\nDetails: {str(e)}")
st.info("💡 This usually happens when:\n- The model is a Pipeline without a RandomForest classifier\n- The model is loaded from ONNX format\n- The classifier doesn't support feature importance")
else:
st.info("📊 Feature importance is not available for ONNX models.\nPlease use Pickle model to view feature importance.")
st.write(f'NOTES: Hasil feature importance menunjukkan data fitur Price lebih dominan dibandingkan fitur lainnya dan evaluasi dengan Confusion Matrix terlihat sudah sangat cukup baik dalam hal identifikasi tiap kategori.', unsafe_allow_html=True)
#------------ PREDICT NEW DATA ---------
with st.expander("🎯 PREDICT NEW DATA"):
with st.form("input_form", clear_on_submit=True):
x1 = st.number_input("Age", min_value=0, max_value=100)
x2 = st.selectbox("Gender", ["Male", "Female"])
x3 = st.number_input("Price", min_value=0.0, max_value=10000.0, step=0.1)
x4 = st.selectbox("Payment Method", ["Cash", "Credit Card", "Debit Card"])
x5 = st.selectbox("Shopping Mall", ["Mall of Istanbul", "Kanyon",
"Metrocity", "Metropol AVM",
"Istinye Park", "Zorlu Center",
"Cevahir AVM", "Forum Istanbul",
"Viaport Outlet", "Emaar Square Mall"])
submitted = st.form_submit_button(label="🔮 PREDICT")
if submitted:
new_data = pd.DataFrame({'age': [x1], 'gender': [x2], 'price': [x3],
'payment_method': [x4], 'shopping_mall': [x5]})
le_gender = LabelEncoder()
le_payment_method = LabelEncoder()
le_shopping_mall = LabelEncoder()
# Fit with original data to ensure consistent encoding
le_gender.fit(retail['gender'])
le_payment_method.fit(retail['payment_method'])
le_shopping_mall.fit(retail['shopping_mall'])
new_data['gender'] = le_gender.transform(new_data['gender'])
new_data['payment_method'] = le_payment_method.transform(new_data['payment_method'])
new_data['shopping_mall'] = le_shopping_mall.transform(new_data['shopping_mall'])
# Apply transformations
new_data_scaled = scaler.transform(new_data)
if rfe is not None:
new_data_rfe = rfe.transform(new_data_scaled.reshape(1, -1))
else:
new_data_rfe = new_data_scaled.reshape(1, -1)
# Make prediction based on model type
if onnx_session is not None:
# ONNX Prediction
input_name = onnx_session.get_inputs()[0].name
label_name = onnx_session.get_outputs()[0].name
prob_name = onnx_session.get_outputs()[1].name
pred_result = onnx_session.run([label_name, prob_name],
{input_name: new_data_rfe.astype(np.float32)})
predict_category = pred_result[0]
predict_proba = pred_result[1]
else:
# Sklearn Prediction
if hasattr(classifier, 'named_steps') or hasattr(classifier, 'steps'):
predict_category = classifier.predict(new_data)
predict_proba = classifier.predict_proba(new_data)
else:
predict_category = classifier.predict(new_data_rfe)
predict_proba = classifier.predict_proba(new_data_rfe)
prediction = le_model.inverse_transform(predict_category)
st.write(f"Predicted Category: {prediction[0]}", unsafe_allow_html=True)
# Show probability
st.write("### Prediction Probability:")
target_names = ['Books', 'Clothing', 'Cosmetics', 'Food & Beverage',
'Shoes', 'Souvenir', 'Technology', 'Toys']
prob_df = pd.DataFrame({'Category': target_names, 'Probability': predict_proba[0]})
prob_df = prob_df.sort_values('Probability', ascending=False)
fig = px.bar(prob_df, x='Probability', y='Category', orientation='h',
title='Prediction Probability for Each Category')
st.plotly_chart(fig, use_container_width=True)