Spaces:
Sleeping
Sleeping
| import os, json | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.signal import welch | |
| from scipy.stats import skew, kurtosis | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.decomposition import PCA | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.metrics import confusion_matrix, classification_report, roc_curve, roc_auc_score | |
| import shap | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from joblib import dump # ใช้สำหรับบันทึก model | |
| # ======================== DATA LOADING ======================== | |
| def load_tremor_data(base_path, folders): | |
| """โหลดข้อมูล tremor จากไฟล์ JSON ทั้ง format เก่าและใหม่""" | |
| all_data = [] | |
| for folder, label in folders.items(): | |
| folder_path = os.path.join(base_path, folder) | |
| print(f"📂 Loading folder: {folder_path}") | |
| for file_name in os.listdir(folder_path): | |
| if not file_name.endswith(".json"): | |
| continue | |
| file_path = os.path.join(folder_path, file_name) | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| except Exception as e: | |
| print(f"❌ Error reading {file_name}: {e}") | |
| continue | |
| if "recording" in data: | |
| rec = data["recording"] | |
| elif "data" in data and "recording" in data["data"]: | |
| rec = data["data"]["recording"] | |
| else: | |
| print(f"⚠️ Skip: {file_name} (no 'recording' field found)") | |
| continue | |
| records = rec.get("recordedData", []) | |
| fmt = rec.get("recordingFormat", []) | |
| if not records or not fmt or len(records) < 5: | |
| print(f"⚠️ Skip empty or too short: {file_name}") | |
| continue | |
| try: | |
| df = pd.DataFrame([r["data"] for r in records], columns=fmt) | |
| df["ts"] = [r.get("ts", None) for r in records] | |
| df["label"] = label | |
| df["file"] = file_name | |
| all_data.append(df) | |
| except Exception as e: | |
| print(f"⚠️ Parse error {file_name}: {e}") | |
| continue | |
| if not all_data: | |
| print("❌ No valid files found.") | |
| return pd.DataFrame() | |
| df_all = pd.concat(all_data, ignore_index=True) | |
| print(f"✅ Loaded total rows: {len(df_all)}, files: {len(all_data)}") | |
| return df_all | |
| # ======================== FEATURE EXTRACTION ======================== | |
| def compute_rms(x): return np.sqrt(np.mean(x**2)) | |
| def compute_sma(x, y, z): return np.mean(np.abs(x) + np.abs(y) + np.abs(z)) | |
| def compute_vector_mag(x, y, z): return np.sqrt(x**2 + y**2 + z**2) | |
| def compute_entropy(signal, bins=30): | |
| hist, _ = np.histogram(signal, bins=bins, density=True) | |
| hist = hist[hist > 0] | |
| return -np.sum(hist * np.log(hist)) | |
| def compute_freq_features(signal, fs=50): | |
| f, Pxx = welch(signal, fs=fs, nperseg=min(256, len(signal))) | |
| if len(Pxx) == 0: | |
| return {"dom_freq": 0, "band_power_4_6": 0, "spec_entropy": 0} | |
| dom_freq = f[np.argmax(Pxx)] | |
| band_mask = (f >= 4) & (f <= 6) | |
| band_power = np.trapz(Pxx[band_mask], f[band_mask]) | |
| Pxx_norm = Pxx / np.sum(Pxx) | |
| spec_entropy = -np.sum(Pxx_norm * np.log(Pxx_norm + 1e-12)) | |
| return {"dom_freq": dom_freq, "band_power_4_6": band_power, "spec_entropy": spec_entropy} | |
| def extract_essential_features(df, fs=50): | |
| feats = {} | |
| for sensor in ["ax", "ay", "az", "gx", "gy", "gz"]: | |
| sig = df[sensor].values | |
| feats[f"{sensor}_rms"] = compute_rms(sig) | |
| feats[f"{sensor}_mean"] = np.mean(sig) | |
| feats[f"{sensor}_std"] = np.std(sig) | |
| feats[f"{sensor}_skew"] = skew(sig) | |
| feats[f"{sensor}_kurtosis"] = kurtosis(sig) | |
| feats[f"{sensor}_entropy"] = compute_entropy(sig) | |
| f_feats = compute_freq_features(sig, fs) | |
| for k, v in f_feats.items(): | |
| feats[f"{sensor}_{k}"] = v | |
| feats["acc_sma"] = compute_sma(df["ax"], df["ay"], df["az"]) | |
| feats["gyro_sma"] = compute_sma(df["gx"], df["gy"], df["gz"]) | |
| feats["acc_gyro_corr"] = np.corrcoef( | |
| compute_vector_mag(df["ax"], df["ay"], df["az"]), | |
| compute_vector_mag(df["gx"], df["gy"], df["gz"]) | |
| )[0, 1] | |
| feats["label"] = df["label"].iloc[0] | |
| feats["file"] = df["file"].iloc[0] | |
| return feats | |
| def create_feature_dataset(df_all, fs=50): | |
| features = [extract_essential_features(g, fs) for _, g in df_all.groupby("file")] | |
| return pd.DataFrame(features) | |
| # ======================== VISUALIZATION FUNCTIONS ======================== | |
| def plot_pca_clustering(df_features, X_scaled, model): | |
| """ | |
| Plot PCA clustering visualization | |
| Parameters: | |
| - df_features: DataFrame ของคุณลักษณะ | |
| - X_scaled: ข้อมูลคุณลักษณะที่ผ่านการ scaling | |
| - model: โมเดลที่ฝึกแล้ว | |
| Returns: | |
| - pca: PCA object | |
| - df_plot: DataFrame สำหรับ plotting | |
| """ | |
| pca = PCA(n_components=2) | |
| X_pca = pca.fit_transform(X_scaled) | |
| # สร้าง DataFrame สำหรับ plotting | |
| df_plot = df_features.copy() | |
| df_plot["pca1"] = X_pca[:, 0] | |
| df_plot["pca2"] = X_pca[:, 1] | |
| df_plot["pred"] = model.predict(X_scaled) | |
| plt.figure(figsize=(8, 6)) | |
| sns.scatterplot( | |
| data=df_plot, | |
| x="pca1", y="pca2", | |
| hue="label", style="pred", | |
| palette={"normal": "#4CAF50", "pd": "#E91E63"}, | |
| s=90, alpha=0.9 | |
| ) | |
| plt.title("🧩 PCA Clustering Visualization (PD vs Normal)", fontsize=14) | |
| plt.xlabel("PCA 1") | |
| plt.ylabel("PCA 2") | |
| plt.legend(title="Label / Prediction") | |
| plt.show() | |
| return pca, df_plot | |
| def plot_pca_biplot(df_features, X_scaled, X, pca=None): | |
| """ | |
| Plot PCA biplot with feature loading vectors | |
| Parameters: | |
| - df_features: DataFrame ของคุณลักษณะ | |
| - X_scaled: ข้อมูลคุณลักษณะที่ผ่านการ scaling | |
| - X: ข้อมูลคุณลักษณะดั้งเดิม | |
| - pca: PCA object (ถ้ามี) | |
| Returns: | |
| - loadings: DataFrame ของ loading vectors | |
| - df_plot: DataFrame สำหรับ plotting | |
| """ | |
| if pca is None: | |
| pca = PCA(n_components=2) | |
| X_pca = pca.fit_transform(X_scaled) | |
| else: | |
| X_pca = pca.transform(X_scaled) | |
| # สร้าง DataFrame สำหรับ plotting | |
| df_plot = df_features.copy() | |
| df_plot["pca1"] = X_pca[:, 0] | |
| df_plot["pca2"] = X_pca[:, 1] | |
| loadings = pd.DataFrame( | |
| pca.components_.T, | |
| columns=['PCA1', 'PCA2'], | |
| index=X.columns | |
| ) | |
| # แสดง top feature ที่มีผลต่อ PCA1 และ PCA2 | |
| print("\n📊 Top 10 features influencing PCA1:") | |
| print(loadings['PCA1'].sort_values(ascending=False).head(10)) | |
| print("\n📊 Top 10 features influencing PCA2:") | |
| print(loadings['PCA2'].sort_values(ascending=False).head(10)) | |
| # Plot loading vectors (Biplot) | |
| plt.figure(figsize=(10, 8)) | |
| sns.scatterplot( | |
| data=df_plot, | |
| x="pca1", y="pca2", | |
| hue="label", | |
| palette={"normal": "#4CAF50", "pd": "#E91E63"}, | |
| s=80, alpha=0.9 | |
| ) | |
| # เพิ่ม loading vectors | |
| for i in range(len(loadings)): | |
| plt.arrow(0, 0, loadings.PCA1[i]*10, loadings.PCA2[i]*10, | |
| color='gray', alpha=0.5, head_width=0.3) | |
| plt.text(loadings.PCA1[i]*11, loadings.PCA2[i]*11, | |
| loadings.index[i], fontsize=8, color='black') | |
| plt.title("📈 PCA Biplot: Feature Loading Direction", fontsize=13) | |
| plt.xlabel("PCA 1") | |
| plt.ylabel("PCA 2") | |
| plt.grid(True, alpha=0.3) | |
| plt.show() | |
| return loadings, df_plot | |
| def plot_roc_curve(y_true, y_proba, model_name="Random Forest"): | |
| """ | |
| Plot ROC curve | |
| Parameters: | |
| - y_true: ค่าเป้าหมายจริง | |
| - y_proba: ความน่าจะเป็นที่ทำนาย | |
| - model_name: ชื่อโมเดล | |
| Returns: | |
| - roc_auc: ROC AUC score | |
| - fpr: False Positive Rates | |
| - tpr: True Positive Rates | |
| """ | |
| fpr, tpr, thresholds = roc_curve(y_true, y_proba) | |
| roc_auc = roc_auc_score(y_true, y_proba) | |
| plt.figure(figsize=(6, 6)) | |
| plt.plot(fpr, tpr, color="#E91E63", lw=2, label=f"ROC curve (AUC = {roc_auc:.2f})") | |
| plt.plot([0, 1], [0, 1], color="gray", linestyle="--") | |
| plt.xlabel("False Positive Rate") | |
| plt.ylabel("True Positive Rate") | |
| plt.title(f"🧩 ROC Curve – {model_name} (PD vs Normal)") | |
| plt.legend(loc="lower right") | |
| plt.grid(True, alpha=0.3) | |
| plt.show() | |
| return roc_auc, fpr, tpr | |
| def plot_shap_analysis(model, X_scaled, X, plot_type="both"): | |
| """ | |
| SHAP analysis และ visualization | |
| Parameters: | |
| - model: โมเดลที่ฝึกแล้ว | |
| - X_scaled: ข้อมูลคุณลักษณะที่ผ่านการ scaling | |
| - X: ข้อมูลคุณลักษณะดั้งเดิม | |
| - plot_type: ประเภท plot ("bar", "beeswarm", "both") | |
| Returns: | |
| - explainer: SHAP explainer | |
| - shap_values: SHAP values | |
| """ | |
| explainer = shap.TreeExplainer(model) | |
| shap_values = explainer.shap_values(X_scaled) | |
| if plot_type in ["bar", "both"]: | |
| shap.summary_plot(shap_values[1], X, plot_type="bar", show=False) | |
| plt.title("SHAP Feature Importance (Bar Plot)") | |
| plt.tight_layout() | |
| plt.show() | |
| if plot_type in ["beeswarm", "both"]: | |
| shap.summary_plot(shap_values[1], X, show=False) | |
| plt.title("SHAP Feature Importance (Beeswarm Plot)") | |
| plt.tight_layout() | |
| plt.show() | |
| return explainer, shap_values | |
| # ======================== MODEL TRAINING ======================== | |
| def train_random_forest(X, y, n_estimators=300, max_depth=6, random_state=42): | |
| """ฝึก RandomForest พร้อมจัดการ NaN ใน y""" | |
| df_tmp = pd.DataFrame(X).copy() | |
| df_tmp["label"] = y | |
| df_tmp = df_tmp.dropna(subset=["label"]) | |
| df_tmp = df_tmp.dropna(axis=0, how="any") | |
| y_clean = df_tmp["label"].values | |
| X_clean = df_tmp.drop(columns=["label"]).values | |
| scaler = StandardScaler() | |
| X_scaled = scaler.fit_transform(X_clean) | |
| model = RandomForestClassifier( | |
| n_estimators=n_estimators, | |
| max_depth=max_depth, | |
| random_state=random_state, | |
| ) | |
| model.fit(X_scaled, y_clean) | |
| print(f"✅ Training complete ({len(y_clean)} samples used)") | |
| return model, scaler, X_scaled | |
| def evaluate_model(model, X_scaled, y_true): | |
| y_pred = model.predict(X_scaled) | |
| y_proba = model.predict_proba(X_scaled)[:, 1] | |
| print("\nConfusion Matrix:") | |
| print(confusion_matrix(y_true, y_pred)) | |
| print("\nClassification Report:") | |
| print(classification_report(y_true, y_pred, target_names=["Normal", "PD"])) | |
| return y_pred, y_proba | |
| # ======================== SAVE MODEL ======================== | |
| def save_rf_model(model, scaler, feature_names, base_path): | |
| model_dict = { | |
| "model": model, | |
| "scaler": scaler, | |
| "features": feature_names | |
| } | |
| save_path = os.path.join(base_path, "tremor_rf_model.joblib") | |
| dump(model_dict, save_path) | |
| print(f"💾 Model saved to {save_path}") | |
| return save_path | |