import os import random import json from datetime import datetime, timezone, timedelta from typing import Dict, List, Optional import pandas as pd import numpy as np from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import HfApi from dotenv import load_dotenv TARGET_USERS = 20 RECORDS_PER_USER = 500 def require_env(var_name: str) -> str: value = os.getenv(var_name) if not value: raise RuntimeError(f"환경변수 {var_name}가 필요합니다.") return value def jitter(value: float, scale: float = 0.02) -> float: """값에 ±scale 비율의 노이즈를 추가""" if value is None: return None return value * (1 + random.uniform(-scale, scale)) def jitter_abs(value: float, amount: float) -> float: """절대값 기준 노이즈 추가""" if value is None: return None return value + random.uniform(-amount, amount) def augment_sensor_vector(x: float, y: float, z: float, noise: float = 0.02) -> tuple: """ 3축 센서 데이터를 물리적으로 자연스럽게 증폭 → 3축은 동일한 비율로 scaling + 개별 작은 노이즈 """ if x is None or y is None or z is None: return (x, y, z) scale = 1 + random.uniform(-noise, noise) return ( round(x * scale + random.uniform(-0.01, 0.01), 4), round(y * scale + random.uniform(-0.01, 0.01), 4), round(z * scale + random.uniform(-0.01, 0.01), 4), ) def compute_rms(x: float, y: float, z: float, base_noise: float = 0.02) -> float: """3축 mean 기반으로 RMS 재계산""" if x is None or y is None or z is None: return None base = np.sqrt(x**2 + y**2 + z**2) return round(base * (1 + random.uniform(-base_noise, base_noise)), 4) def augment_record_strict(row: dict) -> dict: """물리적 제약을 지키면서 센서 데이터를 자연스럽게 증폭""" new = row.copy() # timestamp jitter if "timestamp_utc" in row and isinstance(row["timestamp_utc"], str): try: t = datetime.fromisoformat(row["timestamp_utc"].replace("Z", "+00:00")) t = t + timedelta(milliseconds=random.randint(-150, 150)) new["timestamp_utc"] = t.isoformat() except: pass # window jitter if "window_id" in row and row["window_id"] is not None: new["window_id"] = int(row["window_id"] + random.randint(-1, 1)) if "window_start_ms" in row and row["window_start_ms"] is not None: new["window_start_ms"] = row["window_start_ms"] + random.randint(-50, 50) if "window_end_ms" in row and row["window_end_ms"] is not None: new["window_end_ms"] = new["window_start_ms"] + 2000 # window_size_ms와 일치 # --- Accelerometer mean --- if all(f in row and row[f] is not None for f in ["acc_x_mean", "acc_y_mean", "acc_z_mean"]): new["acc_x_mean"], new["acc_y_mean"], new["acc_z_mean"] = augment_sensor_vector( row["acc_x_mean"], row["acc_y_mean"], row["acc_z_mean"], noise=0.03 ) # --- Gyro mean --- if all(f in row and row[f] is not None for f in ["gyro_x_mean", "gyro_y_mean", "gyro_z_mean"]): new["gyro_x_mean"], new["gyro_y_mean"], new["gyro_z_mean"] = augment_sensor_vector( row["gyro_x_mean"], row["gyro_y_mean"], row["gyro_z_mean"], noise=0.03 ) # --- Linear accel mean --- if all(f in row and row[f] is not None for f in ["linacc_x_mean", "linacc_y_mean", "linacc_z_mean"]): new["linacc_x_mean"], new["linacc_y_mean"], new["linacc_z_mean"] = augment_sensor_vector( row["linacc_x_mean"], row["linacc_y_mean"], row["linacc_z_mean"], noise=0.03 ) # --- Gravity vector (물리적 제약: 크기가 약 9.8) --- if all(f in row and row[f] is not None for f in ["gravity_x_mean", "gravity_y_mean", "gravity_z_mean"]): gx, gy, gz = augment_sensor_vector( row["gravity_x_mean"], row["gravity_y_mean"], row["gravity_z_mean"], noise=0.01 ) g_mag = np.sqrt(gx**2 + gy**2 + gz**2) if g_mag > 0: scale = 9.8 / g_mag new["gravity_x_mean"] = round(gx * scale, 4) new["gravity_y_mean"] = round(gy * scale, 4) new["gravity_z_mean"] = round(gz * scale, 4) # --- Recompute RMS from sensor means --- if all(f in new and new[f] is not None for f in ["acc_x_mean", "acc_y_mean", "acc_z_mean"]): new["rms_acc"] = compute_rms( new["acc_x_mean"], new["acc_y_mean"], new["acc_z_mean"], base_noise=0.03 ) elif "rms_acc" in row and row["rms_acc"] is not None: new["rms_acc"] = jitter(row["rms_acc"], 0.03) if all(f in new and new[f] is not None for f in ["gyro_x_mean", "gyro_y_mean", "gyro_z_mean"]): new["rms_gyro"] = compute_rms( new["gyro_x_mean"], new["gyro_y_mean"], new["gyro_z_mean"], base_noise=0.03 ) elif "rms_gyro" in row and row["rms_gyro"] is not None: new["rms_gyro"] = jitter(row["rms_gyro"], 0.03) # --- std values scale with RMS --- if "rms_acc" in new and new["rms_acc"] is not None and "rms_acc" in row and row["rms_acc"] is not None and row["rms_acc"] > 0: rms_ratio = new["rms_acc"] / row["rms_acc"] for col in ["acc_x_std", "acc_y_std", "acc_z_std"]: if col in row and row[col] is not None: new[col] = max(0.01, row[col] * rms_ratio * jitter(1, 0.1)) if "rms_gyro" in new and new["rms_gyro"] is not None and "rms_gyro" in row and row["rms_gyro"] is not None and row["rms_gyro"] > 0: rms_ratio = new["rms_gyro"] / row["rms_gyro"] for col in ["gyro_x_std", "gyro_y_std", "gyro_z_std"]: if col in row and row[col] is not None: new[col] = max(0.001, row[col] * rms_ratio * jitter(1, 0.1)) # --- frequency (weak positive correlation with RMS) --- if "mean_freq_acc" in row and row["mean_freq_acc"] is not None and "rms_acc" in new and new["rms_acc"] is not None: new["mean_freq_acc"] = round(jitter_abs(row["mean_freq_acc"], new["rms_acc"] * 0.3), 2) elif "mean_freq_acc" in row and row["mean_freq_acc"] is not None: new["mean_freq_acc"] = round(jitter(row["mean_freq_acc"], 0.02), 2) if "mean_freq_gyro" in row and row["mean_freq_gyro"] is not None and "rms_gyro" in new and new["rms_gyro"] is not None: new["mean_freq_gyro"] = round(jitter_abs(row["mean_freq_gyro"], new["rms_gyro"] * 0.3), 2) elif "mean_freq_gyro" in row and row["mean_freq_gyro"] is not None: new["mean_freq_gyro"] = round(jitter(row["mean_freq_gyro"], 0.02), 2) # --- entropy: increases when RMS increases --- if "entropy_acc" in row and row["entropy_acc"] is not None and "rms_acc" in new and new["rms_acc"] is not None and "rms_acc" in row and row["rms_acc"] is not None and row["rms_acc"] > 0: new["entropy_acc"] = min(1.0, max(0.05, row["entropy_acc"] * (new["rms_acc"] / row["rms_acc"]) * jitter(1, 0.1))) elif "entropy_acc" in row and row["entropy_acc"] is not None: new["entropy_acc"] = min(1.0, max(0.05, jitter(row["entropy_acc"], 0.02))) if "entropy_gyro" in row and row["entropy_gyro"] is not None and "rms_gyro" in new and new["rms_gyro"] is not None and "rms_gyro" in row and row["rms_gyro"] is not None and row["rms_gyro"] > 0: new["entropy_gyro"] = min(1.0, max(0.05, row["entropy_gyro"] * (new["rms_gyro"] / row["rms_gyro"]) * jitter(1, 0.1))) elif "entropy_gyro" in row and row["entropy_gyro"] is not None: new["entropy_gyro"] = min(1.0, max(0.05, jitter(row["entropy_gyro"], 0.02))) # --- jerk: depends on std and RMS --- if "jerk_mean" in row and row["jerk_mean"] is not None: if "acc_x_std" in row and row["acc_x_std"] is not None: new["jerk_mean"] = round(jitter_abs(row["jerk_mean"], row["acc_x_std"] * 0.3), 4) else: new["jerk_mean"] = round(jitter(row["jerk_mean"], 0.02), 4) if "jerk_std" in row and row["jerk_std"] is not None: if "acc_x_std" in row and row["acc_x_std"] is not None: new["jerk_std"] = max(0.001, round(jitter_abs(row["jerk_std"], row["acc_x_std"] * 0.1), 4)) else: new["jerk_std"] = max(0.001, round(jitter(row["jerk_std"], 0.01), 4)) # --- stability index (inverse to entropy) --- entropy_avg = 0.5 if "entropy_acc" in new and new["entropy_acc"] is not None and "entropy_gyro" in new and new["entropy_gyro"] is not None: entropy_avg = (new["entropy_acc"] + new["entropy_gyro"]) / 2 elif "entropy_acc" in new and new["entropy_acc"] is not None: entropy_avg = new["entropy_acc"] elif "entropy_gyro" in new and new["entropy_gyro"] is not None: entropy_avg = new["entropy_gyro"] new["stability_index"] = round(max(0.4, min(0.99, 1 - entropy_avg * 0.3)), 4) # --- fatigue model (RMS, 주파수 기반) --- # fatigue는 augment_user_data에서 시간적 연속성을 고려하여 계산 # 여기서는 기본값만 설정 (나중에 덮어씌워짐) if "fatigue" in row and row["fatigue"] is not None: # 기본적으로 RMS와 주파수 기반으로 약간 조정 if "rms_acc" in new and new["rms_acc"] is not None and "rms_acc" in row and row["rms_acc"] is not None and row["rms_acc"] > 0.1: rms_factor = new["rms_acc"] / row["rms_acc"] else: rms_factor = 1.0 if "mean_freq_acc" in new and new["mean_freq_acc"] is not None and "mean_freq_acc" in row and row["mean_freq_acc"] is not None and row["mean_freq_acc"] > 1: freq_factor = row["mean_freq_acc"] / new["mean_freq_acc"] else: freq_factor = 1.0 fatigue_delta = rms_factor * 0.05 - freq_factor * 0.03 new["fatigue"] = min(0.95, max(0.05, row["fatigue"] + fatigue_delta + random.uniform(-0.02, 0.02))) new["fatigue_level"] = 0 if new["fatigue"] < 0.3 else 1 if new["fatigue"] < 0.6 else 2 else: # fatigue가 없으면 기본값 설정 new["fatigue"] = 0.1 new["fatigue_level"] = 0 # fatigue_prev는 augment_user_data에서 설정됨 if "fatigue_prev" in row and row["fatigue_prev"] is not None: new["fatigue_prev"] = row["fatigue_prev"] else: new["fatigue_prev"] = 0.05 # --- baseline values (preserve) --- if "rms_base" in row: new["rms_base"] = row["rms_base"] if "freq_base" in row: new["freq_base"] = row["freq_base"] # --- user_emb: NEVER change --- if "user_emb" in row: new["user_emb"] = row["user_emb"] # --- other fields --- if "overlap_rate" in row and row["overlap_rate"] is not None: new["overlap_rate"] = max(0.3, min(0.7, jitter(row["overlap_rate"], 0.02))) if "window_size_ms" in row: new["window_size_ms"] = row.get("window_size_ms", 2000) if "quality_flag" in row: if random.random() < 0.05: # 5% 확률로 변경 new["quality_flag"] = 0 if row["quality_flag"] == 1 else 1 else: new["quality_flag"] = row["quality_flag"] # session_id 약간 변형 if "session_id" in row and row["session_id"]: parts = str(row["session_id"]).split("_") if len(parts) > 1: try: session_num = int(parts[-1]) new["session_id"] = "_".join(parts[:-1]) + "_" + str(session_num + random.randint(-5, 5)) except: new["session_id"] = row["session_id"] else: new["session_id"] = row["session_id"] return new def augment_user_data(df: pd.DataFrame, target_count: int, new_user_id: str = None) -> pd.DataFrame: """ 사용자별 데이터를 증폭하여 목표 개수만큼 생성 새로운 사용자인 경우 시간적 연속성을 유지 """ if len(df) >= target_count: return df.head(target_count) need = target_count - len(df) # 새로운 사용자인 경우 (기존 데이터가 없거나 새 사용자 ID가 제공된 경우) is_new_user = new_user_id is not None or len(df) == 0 if is_new_user and len(df) > 0: # 새로운 사용자는 항상 target_count만큼 생성 (참조 데이터 길이와 무관) base_row = df.iloc[0].to_dict() new_rows = [] # 시간 기반 초기값 설정 if "timestamp_utc" in base_row and base_row["timestamp_utc"]: try: base_time = datetime.fromisoformat(str(base_row["timestamp_utc"]).replace("Z", "+00:00")) except: base_time = datetime.now(timezone.utc) else: base_time = datetime.now(timezone.utc) base_window_id = 1 # 새 사용자는 window_id를 1부터 시작 base_window_start = 0 # 새 사용자는 window_start_ms를 0부터 시작 prev_fatigue = base_row.get("fatigue", 0.1) if base_row.get("fatigue") is not None else 0.1 # 새로운 사용자는 항상 target_count만큼 생성 for i in range(target_count): # 샘플 레코드 선택 sample_idx = random.randint(0, len(df) - 1) sample = df.iloc[sample_idx].to_dict() # 새로운 레코드 생성 new_row = augment_record_strict(sample) # 새로운 사용자 ID 설정 if new_user_id: new_row["user_id"] = new_user_id # 시간적 연속성 유지 window_interval = 2000 # window_size_ms new_row["window_id"] = base_window_id + i new_row["window_start_ms"] = base_window_start + i * window_interval new_row["window_end_ms"] = new_row["window_start_ms"] + window_interval # timestamp 연속성 유지 new_row["timestamp_utc"] = (base_time + timedelta(milliseconds=i * window_interval)).isoformat() # 피로도 연속성 유지 (이전 피로도는 직전 레코드의 피로도) if i > 0: new_row["fatigue_prev"] = prev_fatigue else: # 첫 레코드는 참조 데이터의 피로도에서 약간 낮게 시작 new_row["fatigue_prev"] = max(0.05, prev_fatigue - random.uniform(0, 0.05)) # 현재 피로도는 이전 피로도 기반으로 약간 증가하는 경향 (실제 측정과 유사) if "fatigue" in new_row and new_row["fatigue"] is not None: # 피로도는 시간에 따라 점진적으로 증가하는 경향 fatigue_base = new_row["fatigue_prev"] if "fatigue_prev" in new_row else prev_fatigue # 약간의 증가 + 노이즈 fatigue_increase = random.uniform(0, 0.02) # 시간에 따른 점진적 증가 new_row["fatigue"] = min(0.95, max(0.05, fatigue_base + fatigue_increase + random.uniform(-0.01, 0.01))) new_row["fatigue_level"] = 0 if new_row["fatigue"] < 0.3 else 1 if new_row["fatigue"] < 0.6 else 2 prev_fatigue = new_row["fatigue"] # 세션 ID 생성 (새 사용자이므로 새로운 세션) if "session_id" in new_row: new_row["session_id"] = f"session_{i // 10 + 1:03d}" # 10개 레코드당 세션 # measure_date는 기존 데이터에 있는 경우에만 설정 if "measure_date" in sample: try: measure_time = datetime.fromisoformat(new_row["timestamp_utc"].replace("Z", "+00:00")) new_row["measure_date"] = measure_time.strftime("%Y-%m-%d") except: new_row["measure_date"] = base_time.strftime("%Y-%m-%d") new_rows.append(new_row) return pd.DataFrame(new_rows) else: # 기존 사용자 데이터 증폭 (시간적 연속성 유지) new_rows = [] last_row = df.iloc[-1].to_dict() # 마지막 레코드의 시간 정보 가져오기 if "timestamp_utc" in last_row and last_row["timestamp_utc"]: try: last_time = datetime.fromisoformat(str(last_row["timestamp_utc"]).replace("Z", "+00:00")) except: last_time = datetime.now(timezone.utc) else: last_time = datetime.now(timezone.utc) last_window_id = last_row.get("window_id", 0) if last_row.get("window_id") is not None else 0 last_window_start = last_row.get("window_end_ms", 0) if last_row.get("window_end_ms") is not None else 0 prev_fatigue = last_row.get("fatigue", 0.1) if last_row.get("fatigue") is not None else 0.1 for i in range(need): # 샘플 레코드 선택 sample_idx = random.randint(0, len(df) - 1) sample = df.iloc[sample_idx].to_dict() # 새로운 레코드 생성 new_row = augment_record_strict(sample) # 시간적 연속성 유지 window_interval = 2000 new_row["window_id"] = last_window_id + i + 1 new_row["window_start_ms"] = last_window_start + i * window_interval new_row["window_end_ms"] = new_row["window_start_ms"] + window_interval # timestamp 연속성 유지 new_row["timestamp_utc"] = (last_time + timedelta(milliseconds=(i + 1) * window_interval)).isoformat() # 피로도 연속성 유지 new_row["fatigue_prev"] = prev_fatigue if "fatigue" in new_row and new_row["fatigue"] is not None: # 피로도는 시간에 따라 점진적으로 증가하는 경향 fatigue_increase = random.uniform(0, 0.02) # 시간에 따른 점진적 증가 new_row["fatigue"] = min(0.95, max(0.05, prev_fatigue + fatigue_increase + random.uniform(-0.01, 0.01))) new_row["fatigue_level"] = 0 if new_row["fatigue"] < 0.3 else 1 if new_row["fatigue"] < 0.6 else 2 prev_fatigue = new_row["fatigue"] # measure_date는 기존 데이터에 있는 경우에만 설정 if "measure_date" in sample: try: measure_time = datetime.fromisoformat(new_row["timestamp_utc"].replace("Z", "+00:00")) new_row["measure_date"] = measure_time.strftime("%Y-%m-%d") except: new_row["measure_date"] = last_time.strftime("%Y-%m-%d") new_rows.append(new_row) return pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True) def main(): load_dotenv() repo_id = require_env("HF_DATA_REPO_ID") token = require_env("HF_DATA_TOKEN") print(f"📂 기존 데이터셋 로드 중: {repo_id}") # 개별 parquet 파일을 모두 로드 (user로 시작하지 않는 파일도 포함) api = HfApi() try: files = api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token) # 모든 parquet 파일 필터링 (user로 시작하지 않는 것도 포함) parquet_files = [f for f in files if f.endswith(".parquet")] print(f"📊 Parquet 파일 수: {len(parquet_files)}") existing = DatasetDict() for file_path in parquet_files: try: # 파일명에서 사용자 ID 추출 # 형식: data/user_xxx.parquet 또는 data/user_xxx-00000-of-00001.parquet filename = file_path.split("/")[-1] if "/" in file_path else file_path # .parquet 확장자 제거 filename_no_ext = filename.replace(".parquet", "") # -00000-of-00001 부분이 있으면 제거, 없으면 그대로 사용 if "-" in filename_no_ext: user_id = filename_no_ext.split("-")[0] else: user_id = filename_no_ext # local_user로 시작하는 파일은 제외 if user_id.startswith("local_user"): print(f"⏭️ {user_id}: local_user로 시작하는 파일은 제외") continue # 개별 파일을 pandas로 직접 로드 from huggingface_hub import hf_hub_download import tempfile # 파일 다운로드 local_path = hf_hub_download( repo_id=repo_id, filename=file_path, repo_type="dataset", token=token ) # pandas로 직접 읽기 df = pd.read_parquet(local_path) if len(df) > 0: existing[user_id] = Dataset.from_pandas(df, preserve_index=False) print(f"✅ {user_id}: {len(df)} 레코드 로드") else: print(f"⚠️ {user_id}: 빈 데이터셋, 건너뜀") except Exception as e2: print(f"⚠️ {file_path}: 로드 실패 ({str(e2)[:100]}), 건너뜀") continue except Exception as e3: print(f"❌ 데이터셋 로드 완전 실패: {e3}") return # 유효한 사용자만 필터링 (데이터가 있는 사용자만, local_user 제외) valid_users = {} for user_id in existing.keys(): # local_user로 시작하는 사용자는 제외 if user_id.startswith("local_user"): print(f"⏭️ {user_id}: local_user로 시작하는 사용자는 제외") continue try: user_data = existing[user_id] if len(user_data) > 0: valid_users[user_id] = user_data else: print(f"⚠️ {user_id}: 빈 데이터셋, 건너뜀") except Exception as e: print(f"⚠️ {user_id}: 데이터 접근 실패 ({e}), 건너뜀") continue if len(valid_users) == 0: print("❌ 유효한 사용자 데이터가 없습니다.") return print(f"✅ 유효한 사용자 수: {len(valid_users)}명") # 현재 총 레코드 수 계산 current_total = sum(len(valid_users[user_id]) for user_id in valid_users) print(f"📊 현재 총 레코드 수: {current_total}") # 기존 사용자 목록 가져오기 (샘플링용) all_users = list(valid_users.keys()) if len(all_users) == 0: print("❌ 증폭할 참조 데이터가 없습니다.") return # 새로운 사용자 20명 생성 (기존 사용자 데이터를 참조하여 증폭) print(f"🎯 새로운 사용자 {TARGET_USERS}명 생성 중...") print(f"📋 참조 사용자: {len(all_users)}명") print(f"🎯 사용자당 목표 레코드 수: {RECORDS_PER_USER}") # 새로운 사용자 데이터셋 생성 new_user_datasets = {} for i in range(1, TARGET_USERS + 1): # 새로운 사용자 ID 생성 new_user_id = f"augmented_user_{i:03d}" # 기존 사용자 중 랜덤 선택 (참조용) reference_user_id = random.choice(all_users) reference_df = valid_users[reference_user_id].to_pandas() if len(reference_df) == 0: print(f"⚠️ 참조 사용자 {reference_user_id}의 데이터가 비어있어 건너뜀") continue try: # 참조 데이터를 증폭하여 새로운 사용자 데이터 생성 (새 사용자 ID 전달) new_user_df = augment_user_data(reference_df, RECORDS_PER_USER, new_user_id=new_user_id) # user_id 컬럼이 없으면 추가 if "user_id" not in new_user_df.columns: new_user_df["user_id"] = new_user_id else: new_user_df["user_id"] = new_user_id new_user_datasets[new_user_id] = Dataset.from_pandas(new_user_df, preserve_index=False) actual_count = len(new_user_df) print(f"📈 {new_user_id}: {actual_count} 레코드 생성 (참조: {reference_user_id}, 목표: {RECORDS_PER_USER})") if actual_count != RECORDS_PER_USER: print(f" ⚠️ 경고: 생성된 레코드 수({actual_count})가 목표({RECORDS_PER_USER})와 다릅니다!") except Exception as e: print(f"❌ {new_user_id}: 생성 실패 ({e}), 건너뜀") continue if len(new_user_datasets) == 0: print("❌ 새로운 사용자 데이터가 생성되지 않았습니다.") return # 기존 데이터의 스키마 확인 (첫 번째 사용자 데이터 기준) print("🔧 기존 데이터 스키마 확인 중...") reference_user_id = list(valid_users.keys())[0] reference_df = valid_users[reference_user_id].to_pandas() existing_columns = set(reference_df.columns) print(f" 📋 기존 데이터 컬럼 수: {len(existing_columns)}") print(f" 📋 기존 데이터 컬럼: {sorted(existing_columns)}") # 새로운 사용자 데이터를 기존 스키마에 맞춤 print("🔧 새로운 사용자 데이터를 기존 스키마에 맞추는 중...") for user_id in new_user_datasets.keys(): df = new_user_datasets[user_id].to_pandas() # 기존에 없는 컬럼 제거 columns_to_remove = set(df.columns) - existing_columns if columns_to_remove: df = df.drop(columns=list(columns_to_remove)) print(f" ⚠️ {user_id}: 불필요한 컬럼 제거: {columns_to_remove}") # 기존에 있는데 없는 컬럼 추가 (None으로) columns_to_add = existing_columns - set(df.columns) if columns_to_add: for col in columns_to_add: df[col] = None print(f" ➕ {user_id}: 누락된 컬럼 추가: {columns_to_add}") # 컬럼 순서를 기존 데이터와 동일하게 맞춤 df = df[list(reference_df.columns)] new_user_datasets[user_id] = Dataset.from_pandas(df, preserve_index=False) print(f" ✅ {user_id}: 스키마 정규화 완료") # 기존 데이터셋에 새로운 사용자 데이터 추가 final_datasets = {} # 기존 사용자 데이터 유지 for user_id in valid_users.keys(): final_datasets[user_id] = valid_users[user_id] # 새로운 사용자 데이터 추가 for user_id in new_user_datasets.keys(): final_datasets[user_id] = new_user_datasets[user_id] final_dict = DatasetDict(final_datasets) new_users_total = sum(len(new_user_datasets[user_id]) for user_id in new_user_datasets) total_records = sum(len(final_dict[user_id]) for user_id in final_dict) print(f"📊 새로운 사용자들의 총 레코드 수: {new_users_total}") print(f"📊 전체 데이터셋 총 레코드 수: {total_records}") print(f"📊 새로운 parquet 파일 수: {len(new_user_datasets)}개") # local_user로 시작하는 파일 삭제 print("🗑️ local_user로 시작하는 파일 삭제 중...") try: files_to_delete = [] for file_path in parquet_files: filename = file_path.split("/")[-1] if "/" in file_path else file_path filename_no_ext = filename.replace(".parquet", "") # -00000-of-00001 부분이 있으면 제거 if "-" in filename_no_ext: user_id = filename_no_ext.split("-")[0] else: user_id = filename_no_ext if user_id.startswith("local_user"): files_to_delete.append(file_path) for file_path in files_to_delete: try: api.delete_file(path_in_repo=file_path, repo_id=repo_id, repo_type="dataset", token=token) print(f" ✅ 삭제: {file_path}") except Exception as e: print(f" ⚠️ 삭제 실패 ({file_path}): {str(e)[:100]}") if files_to_delete: print(f"🗑️ {len(files_to_delete)}개 파일 삭제 완료") else: print("ℹ️ 삭제할 local_user 파일이 없습니다") except Exception as e: print(f"⚠️ 파일 삭제 중 오류 발생: {str(e)[:100]}") print(f"📤 Hugging Face Hub에 업로드 중: {repo_id}") final_dict.push_to_hub(repo_id, token=token, private=True) print("✅ 업로드 완료") if __name__ == "__main__": main()