""" End-to-End 모델 학습 스크립트 (TensorFlow) 단일 윈도우(센서 특징 + user_emb)를 입력으로 받아 피로도를 예측하는 MLP 기반 회귀 모델을 학습하고 SavedModel/TFLite 형식으로 저장합니다. """ import os import json from typing import Dict, Iterable, Optional, Tuple, Union import numpy as np import pandas as pd import tensorflow as tf from sklearn.preprocessing import StandardScaler from tensorflow.keras.layers import BatchNormalization, Dense, Dropout, Input from tensorflow.keras.models import Model from load_dataset import load_musclecare_dataset FEATURE_COLUMNS = [ 'rms_acc', 'rms_gyro', 'mean_freq_acc', 'mean_freq_gyro', 'entropy_acc', 'entropy_gyro', 'jerk_mean', 'jerk_std', 'stability_index', 'fatigue_prev', ] DEFAULT_EPOCHS = 30 DEFAULT_EMBED_DIM = 12 DEFAULT_BATCH_SIZE = 64 def parse_user_emb(emb: Union[str, Iterable[float], np.ndarray]) -> np.ndarray: """사용자 임베딩을 numpy 배열로 변환""" arr: Optional[np.ndarray] = None if isinstance(emb, np.ndarray): arr = emb.astype(np.float32) elif isinstance(emb, str): try: arr = np.array(json.loads(emb), dtype=np.float32) except (json.JSONDecodeError, TypeError): arr = None elif isinstance(emb, Iterable): arr = np.array(list(emb), dtype=np.float32) if arr is None or arr.ndim == 0: arr = np.zeros(DEFAULT_EMBED_DIM, dtype=np.float32) return arr def pad_embedding(embedding: np.ndarray, target_dim: int) -> np.ndarray: """임베딩 길이를 target_dim에 맞춰 패딩""" padded = np.zeros(target_dim, dtype=np.float32) length = min(target_dim, embedding.size) padded[:length] = embedding[:length] return padded def dataset_split_to_dataframe(dataset_split) -> pd.DataFrame: """HuggingFace Dataset split을 pandas DataFrame으로 변환""" if hasattr(dataset_split, "to_pandas"): return dataset_split.to_pandas() return pd.DataFrame(dataset_split) def build_dataframe_from_source( dataset_source, exclude_sessions: Optional[Iterable[str]] = None ) -> pd.DataFrame: """데이터 소스를 단일 DataFrame으로 통합""" frames = [] exclude_sessions = set(exclude_sessions or []) if hasattr(dataset_source, "items"): iterator = dataset_source.items() else: iterator = [("all", dataset_source)] for split_name, split_dataset in iterator: df_split = dataset_split_to_dataframe(split_dataset) if df_split.empty: continue if exclude_sessions: if 'session_id' not in df_split.columns: raise KeyError("데이터셋에 'session_id' 컬럼이 없습니다.") df_split = df_split[~df_split['session_id'].isin(exclude_sessions)] if not df_split.empty: frames.append(df_split) print(f" - {split_name}: {len(df_split)}개 샘플 (필터링 후)") if not frames: return pd.DataFrame() return pd.concat(frames, ignore_index=True) def prepare_training_arrays( df: pd.DataFrame, feature_cols: Iterable[str] ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """단일 윈도우 입력을 위한 학습 데이터를 생성""" required_columns = set(feature_cols) | {'fatigue', 'user_emb'} missing_columns = required_columns - set(df.columns) if missing_columns: raise KeyError(f"데이터셋에 누락된 컬럼이 있습니다: {sorted(missing_columns)}") feature_values = ( df[list(feature_cols)] .astype(np.float32) .replace([np.inf, -np.inf], np.nan) .fillna(0.0) ) scaler = StandardScaler() features_scaled = scaler.fit_transform(feature_values).astype(np.float32) user_embeddings = np.stack([ emb.astype(np.float32) if isinstance(emb, np.ndarray) else np.zeros(DEFAULT_EMBED_DIM, dtype=np.float32) for emb in df['user_emb'] ]) X = np.concatenate([features_scaled, user_embeddings], axis=1).astype(np.float32) y = df['fatigue'].astype(np.float32).to_numpy() return X, y, scaler.mean_.astype(np.float32), scaler.scale_.astype(np.float32) def build_dense_regression_model( input_dim: int, learning_rate: float = 0.001 ) -> Model: """단일 윈도우 입력용 MLP 회귀 모델""" inputs = Input(shape=(input_dim,), name="features") x = Dense(128, activation='relu')(inputs) x = BatchNormalization()(x) x = Dropout(0.3)(x) x = Dense(64, activation='relu')(x) x = BatchNormalization()(x) x = Dropout(0.2)(x) outputs = Dense(1, activation='linear', name='fatigue')(x) model = Model(inputs=inputs, outputs=outputs) model.compile( optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss='mse', metrics=['mae'] ) return model def ensure_embeddings(df: pd.DataFrame) -> Tuple[pd.DataFrame, int]: """user_emb 컬럼을 numpy 배열로 정규화하고 통일된 차원으로 패딩""" if 'user_emb' not in df.columns: raise KeyError("데이터셋에 'user_emb' 컬럼이 없습니다.") df = df.copy() df['user_emb'] = df['user_emb'].apply(parse_user_emb) dims = [ emb.size for emb in df['user_emb'] if isinstance(emb, np.ndarray) and emb.size > 0 ] target_dim = max(dims) if dims else DEFAULT_EMBED_DIM df['user_emb'] = df['user_emb'].apply(lambda emb: pad_embedding(emb, target_dim)) return df, target_dim def main( data_list: Optional[Iterable[Dict]] = None, exclude_sessions: Optional[Iterable[str]] = None, epochs: int = DEFAULT_EPOCHS ) -> Optional[Dict[str, str]]: """ 메인 학습 함수 Args: data_list: 사용할 데이터 리스트 (None이면 전체 데이터 사용) exclude_sessions: 제외할 session_id 집합 (중복 방지용) epochs: 학습 에포크 수 """ print("=" * 80) print("MuscleCare Train AI - TensorFlow Single-Window Training") print("=" * 80) tf.keras.utils.set_random_seed(42) # 1️⃣ 데이터 로드 print("1️⃣ 데이터셋 로딩 중...") if data_list is None: dataset_source = load_musclecare_dataset() df = build_dataframe_from_source(dataset_source, exclude_sessions) else: df = pd.DataFrame(data_list) if exclude_sessions: df = df[~df['session_id'].isin(set(exclude_sessions))] if df.empty: print("⚠️ 학습 가능한 데이터가 없습니다. 학습을 종료합니다.") print("=" * 80) return None print(f"✅ 데이터 로드 완료: {len(df)}개 행") # 2️⃣ 사용자 임베딩 정규화 print("2️⃣ 사용자 임베딩 정규화 중...") df, emb_dim = ensure_embeddings(df) print(f"✅ 임베딩 차원: {emb_dim}") # 3️⃣ 학습 데이터 생성 print("3️⃣ 학습 데이터 생성 중...") X, y, scaler_mean, scaler_scale = prepare_training_arrays(df, FEATURE_COLUMNS) if X.size == 0: print("⚠️ 학습할 입력 데이터가 없습니다. 학습을 종료합니다.") print("=" * 80) return None num_samples, input_dim = X.shape print(f"✅ 학습 데이터 생성 완료: {num_samples}개 샘플, 입력 차원 {input_dim}") # 4️⃣ 모델 생성 print("4️⃣ 모델 생성 중...") model = build_dense_regression_model(input_dim) model.summary(print_fn=lambda x: print(" " + x)) print("✅ 모델 생성 완료") # 5️⃣ 모델 학습 print("5️⃣ 모델 학습 시작...") callbacks = [ tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True), tf.keras.callbacks.ReduceLROnPlateau(patience=3, factor=0.5), ] validation_split = 0.1 if num_samples >= 20 else 0.0 history = model.fit( X, y, epochs=epochs, batch_size=min(DEFAULT_BATCH_SIZE, num_samples), shuffle=True, validation_split=validation_split, callbacks=callbacks, verbose=1, ) print("✅ 모델 학습 완료") # 6️⃣ 모델 및 메타데이터 저장 print("6️⃣ 모델 저장 중...") model_dir = './model' os.makedirs(model_dir, exist_ok=True) keras_model_path = os.path.join(model_dir, 'cnn_gru_fatigue.keras') model.save(keras_model_path) metadata = { "feature_columns": list(FEATURE_COLUMNS), "embedding_dim": emb_dim, "input_dim": input_dim, "epochs": epochs, "num_samples": int(num_samples), "scaler": { "mean": scaler_mean.tolist(), "scale": scaler_scale.tolist(), }, "history": { "loss": history.history.get('loss', []), "mae": history.history.get('mae', []), "val_loss": history.history.get('val_loss', []), "val_mae": history.history.get('val_mae', []), }, } metadata_path = os.path.join(model_dir, 'cnn_gru_fatigue_metadata.json') with open(metadata_path, 'w', encoding='utf-8') as f: json.dump(metadata, f, ensure_ascii=False, indent=2) print(f"✅ 모델 저장 완료: {keras_model_path}") print(f" 메타데이터 저장: {metadata_path}") # 7️⃣ TFLite 변환 print("7️⃣ TFLite 변환 중...") converter = tf.lite.TFLiteConverter.from_keras_model(model) converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS, ] converter._experimental_lower_tensor_list_ops = False converter.optimizations = [tf.lite.Optimize.DEFAULT] tflite_model = converter.convert() tflite_model_path = os.path.join(model_dir, 'cnn_gru_fatigue.tflite') with open(tflite_model_path, 'wb') as f: f.write(tflite_model) print(f"✅ TFLite 모델 저장 완료: {tflite_model_path}") print("=" * 80) return { "keras": os.path.abspath(keras_model_path), "tflite": os.path.abspath(tflite_model_path), "metadata": os.path.abspath(metadata_path), } if __name__ == "__main__": main()