Merry99 commited on
Commit
13f75fb
·
1 Parent(s): 26524df

change log to dataset & field

Browse files
Files changed (1) hide show
  1. app.py +103 -129
app.py CHANGED
@@ -17,36 +17,59 @@ HF_DATA_TOKEN = os.getenv("HF_DATA_TOKEN")
17
  app = FastAPI(title="MuscleCare FastAPI Server")
18
 
19
  # ----- 모델 -----
20
- class LogUploadPayload(BaseModel):
21
- user_id: str
22
- session_id: str
23
- measure_date: str
24
- rms: float
25
- freq: float
26
- fatigue: float
27
- rms_base: Optional[float] = None
28
- freq_base: Optional[float] = None
29
- user_emb: Optional[List[float]] = Field(default=None, description="length=12")
30
- mode: str
31
- window_count: int
32
- measurement_count: int
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
- class BatchLogItem(BaseModel):
35
- user_id: str
36
- session_id: str
37
- measure_date: str
38
- rms: float
39
- freq: float
40
- fatigue: float
41
  rms_base: Optional[float] = None
42
  freq_base: Optional[float] = None
43
- user_emb: Optional[List[float]] = Field(default=None, description="length=12")
44
- mode: str
45
- window_count: int
46
- measurement_count: int
 
 
 
 
 
47
 
48
- class BatchLogsPayload(BaseModel):
49
- batch_data: List[BatchLogItem]
 
50
 
51
 
52
  # ----- 엔드포인트 -----
@@ -60,7 +83,7 @@ def root():
60
  "endpoints": {
61
  "health": "/health (빠른 체크)",
62
  "docs": "/docs",
63
- "upload_logs": "/upload_logs (개별 로그 데이터)",
64
  "user_dataset": "/user_dataset/{user_id}"
65
  }
66
  }
@@ -78,85 +101,9 @@ def health():
78
  except Exception as e:
79
  return {"ok": False, "error": str(e)}
80
 
81
- @app.post("/upload_logs")
82
- async def upload_logs(payload: LogUploadPayload):
83
- """개별 로그 데이터 Hugging Face Hub 푸시"""
84
- try:
85
- # Hugging Face 환경변수 확인
86
- hf_repo_id = os.getenv("HF_DATA_REPO_ID")
87
- hf_token = os.getenv("HF_DATA_TOKEN")
88
-
89
- if not hf_repo_id or not hf_token:
90
- raise HTTPException(status_code=500, detail="Hugging Face 설정이 필요합니다 (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
91
-
92
- # 단일 레코드 생성
93
- record = {
94
- "session_id": payload.session_id,
95
- "measure_date": payload.measure_date,
96
- "rms": payload.rms,
97
- "freq": payload.freq,
98
- "fatigue": payload.fatigue,
99
- "rms_base": payload.rms_base,
100
- "freq_base": payload.freq_base,
101
- "user_emb": payload.user_emb,
102
- "mode": payload.mode,
103
- "window_count": payload.window_count,
104
- "measurement_count": payload.measurement_count,
105
- "timestamp": datetime.now().isoformat()
106
- }
107
-
108
- # 현재 repo에 있는 데이터 불러오기
109
- try:
110
- existing = load_dataset(hf_repo_id, token=hf_token)
111
- print(f"📂 기존 데이터 로드 완료")
112
- except Exception:
113
- existing = DatasetDict()
114
- print("📂 기존 repo 없음 → 새로 생성")
115
-
116
- # 사용자 데이터 처리
117
- user_id = payload.user_id
118
- try:
119
- # 새 데이터 처리
120
- new_df = pd.DataFrame([record])
121
- new_dataset = df_to_dataset(new_df)
122
-
123
- if user_id in existing:
124
- # 기존 데이터와 병합
125
- old_df = existing[user_id].to_pandas()
126
- merged_df = pd.concat([old_df, new_df], ignore_index=True)
127
- existing[user_id] = df_to_dataset(merged_df)
128
- print(f"📊 {user_id}: 기존 데이터와 병합 ({len(old_df)} + 1 = {len(merged_df)}개 레코드)")
129
- else:
130
- existing[user_id] = new_dataset
131
- print(f"📊 {user_id}: 신규 데이터 추가 (1개 레코드)")
132
-
133
- # 데이터 푸시
134
- existing.push_to_hub(hf_repo_id, token=hf_token, private=True)
135
- print(f"✅ {user_id} 데이터 푸시 완료")
136
-
137
- return {
138
- "user_id": user_id,
139
- "status": "success",
140
- "new_rows": 1,
141
- "filename": f"{user_id}.parquet",
142
- "repo_id": hf_repo_id,
143
- "message": f"Log uploaded successfully for user {user_id}"
144
- }
145
-
146
- except Exception as e:
147
- print(f"❌ {user_id} 처리 실패: {e}")
148
- raise HTTPException(status_code=500, detail=f"데이터 처리 실패: {str(e)}")
149
-
150
- except HTTPException:
151
- raise
152
- except Exception as e:
153
- print(f"❌ 로그 업로드 실패: {e}")
154
- raise HTTPException(status_code=500, detail=f"로그 업로드 실패: {str(e)}")
155
-
156
-
157
- @app.post("/upload_batch_logs")
158
- async def upload_batch_logs(payload: BatchLogsPayload):
159
- """배치 로그 데이터를 Hugging Face Hub에 병렬 아닌 일괄 반영 (스키마 정규화 포함)"""
160
  try:
161
  hf_repo_id = os.getenv("HF_DATA_REPO_ID")
162
  hf_token = os.getenv("HF_DATA_TOKEN")
@@ -165,9 +112,48 @@ async def upload_batch_logs(payload: BatchLogsPayload):
165
 
166
  # 새 스키마 정의
167
  target_cols = [
168
- "session_id", "measure_date", "rms", "freq", "fatigue",
169
- "rms_base", "freq_base", "user_emb", "mode", "window_count",
170
- "measurement_count", "timestamp"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
  ]
172
 
173
  # 기존 데이터 로드
@@ -196,8 +182,8 @@ async def upload_batch_logs(payload: BatchLogsPayload):
196
  df["user_emb"] = df["user_emb"].apply(_parse_emb)
197
 
198
  # 타임스탬프 없으면 추가
199
- if "timestamp" not in df.columns:
200
- df["timestamp"] = datetime.now().isoformat()
201
 
202
  # 타겟 컬럼 세트로 맞추기
203
  for c in target_cols:
@@ -210,22 +196,10 @@ async def upload_batch_logs(payload: BatchLogsPayload):
210
  # payload를 사용자별로 그룹화
211
  user_groups: dict[str, list[dict]] = {}
212
  for item in payload.batch_data:
213
- # 레코드 생성
214
- rec = {
215
- "session_id": item.session_id,
216
- "measure_date": item.measure_date,
217
- "rms": item.rms,
218
- "freq": item.freq,
219
- "fatigue": item.fatigue,
220
- "rms_base": item.rms_base,
221
- "freq_base": item.freq_base,
222
- "user_emb": item.user_emb,
223
- "mode": item.mode,
224
- "window_count": item.window_count,
225
- "measurement_count": item.measurement_count,
226
- "timestamp": datetime.now().isoformat()
227
- }
228
- user_groups.setdefault(item.user_id, []).append(rec)
229
 
230
  results = {}
231
 
@@ -271,8 +245,8 @@ async def upload_batch_logs(payload: BatchLogsPayload):
271
  except HTTPException:
272
  raise
273
  except Exception as e:
274
- print(f"❌ 배치 로그 업로드 실패: {e}")
275
- raise HTTPException(status_code=500, detail=f"배치 로그 업로드 실패: {str(e)}")
276
 
277
  def df_to_dataset(df):
278
  """DataFrame을 Dataset으로 변환"""
 
17
  app = FastAPI(title="MuscleCare FastAPI Server")
18
 
19
  # ----- 모델 -----
20
+ class DatasetItem(BaseModel):
21
+ user_id: int
22
+ session_id: Optional[str] = None
23
+ window_id: int
24
+ window_start_ms: int
25
+ window_end_ms: int
26
+ timestamp_utc: Optional[str] = None
27
+
28
+ acc_x_mean: Optional[float] = None
29
+ acc_y_mean: Optional[float] = None
30
+ acc_z_mean: Optional[float] = None
31
+ gyro_x_mean: Optional[float] = None
32
+ gyro_y_mean: Optional[float] = None
33
+ gyro_z_mean: Optional[float] = None
34
+ linacc_x_mean: Optional[float] = None
35
+ linacc_y_mean: Optional[float] = None
36
+ linacc_z_mean: Optional[float] = None
37
+ gravity_x_mean: Optional[float] = None
38
+ gravity_y_mean: Optional[float] = None
39
+ gravity_z_mean: Optional[float] = None
40
+
41
+ acc_x_std: Optional[float] = None
42
+ acc_y_std: Optional[float] = None
43
+ acc_z_std: Optional[float] = None
44
+ gyro_x_std: Optional[float] = None
45
+ gyro_y_std: Optional[float] = None
46
+ gyro_z_std: Optional[float] = None
47
+
48
+ rms_acc: Optional[float] = None
49
+ rms_gyro: Optional[float] = None
50
+ mean_freq_acc: Optional[float] = None
51
+ mean_freq_gyro: Optional[float] = None
52
+ entropy_acc: Optional[float] = None
53
+ entropy_gyro: Optional[float] = None
54
+ jerk_mean: Optional[float] = None
55
+ jerk_std: Optional[float] = None
56
+ stability_index: Optional[float] = None
57
 
 
 
 
 
 
 
 
58
  rms_base: Optional[float] = None
59
  freq_base: Optional[float] = None
60
+ user_emb: Optional[List[float]] = Field(default=None, description="length=12 vector")
61
+
62
+ fatigue_prev: Optional[float] = None
63
+ fatigue: Optional[float] = None
64
+ fatigue_level: Optional[int] = None
65
+
66
+ quality_flag: Optional[int] = 1
67
+ window_size_ms: Optional[int] = 2000
68
+ overlap_rate: Optional[float] = 0.5
69
 
70
+
71
+ class DatasetBatchPayload(BaseModel):
72
+ batch_data: List[DatasetItem]
73
 
74
 
75
  # ----- 엔드포인트 -----
 
83
  "endpoints": {
84
  "health": "/health (빠른 체크)",
85
  "docs": "/docs",
86
+ "upload_dataset": "/upload_dataset (배치 데이터 업로드)",
87
  "user_dataset": "/user_dataset/{user_id}"
88
  }
89
  }
 
101
  except Exception as e:
102
  return {"ok": False, "error": str(e)}
103
 
104
+ @app.post("/upload_dataset")
105
+ async def upload_dataset(payload: DatasetBatchPayload):
106
+ """배치 데이터셋을 Hugging Face Hub 일괄 반영 (스키마 정규화 포함)"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  try:
108
  hf_repo_id = os.getenv("HF_DATA_REPO_ID")
109
  hf_token = os.getenv("HF_DATA_TOKEN")
 
112
 
113
  # 새 스키마 정의
114
  target_cols = [
115
+ "user_id",
116
+ "session_id",
117
+ "window_id",
118
+ "window_start_ms",
119
+ "window_end_ms",
120
+ "timestamp_utc",
121
+ "acc_x_mean",
122
+ "acc_y_mean",
123
+ "acc_z_mean",
124
+ "gyro_x_mean",
125
+ "gyro_y_mean",
126
+ "gyro_z_mean",
127
+ "linacc_x_mean",
128
+ "linacc_y_mean",
129
+ "linacc_z_mean",
130
+ "gravity_x_mean",
131
+ "gravity_y_mean",
132
+ "gravity_z_mean",
133
+ "acc_x_std",
134
+ "acc_y_std",
135
+ "acc_z_std",
136
+ "gyro_x_std",
137
+ "gyro_y_std",
138
+ "gyro_z_std",
139
+ "rms_acc",
140
+ "rms_gyro",
141
+ "mean_freq_acc",
142
+ "mean_freq_gyro",
143
+ "entropy_acc",
144
+ "entropy_gyro",
145
+ "jerk_mean",
146
+ "jerk_std",
147
+ "stability_index",
148
+ "rms_base",
149
+ "freq_base",
150
+ "user_emb",
151
+ "fatigue_prev",
152
+ "fatigue",
153
+ "fatigue_level",
154
+ "quality_flag",
155
+ "window_size_ms",
156
+ "overlap_rate",
157
  ]
158
 
159
  # 기존 데이터 로드
 
182
  df["user_emb"] = df["user_emb"].apply(_parse_emb)
183
 
184
  # 타임스탬프 없으면 추가
185
+ if "timestamp_utc" not in df.columns or df["timestamp_utc"].isnull().all():
186
+ df["timestamp_utc"] = datetime.now().isoformat()
187
 
188
  # 타겟 컬럼 세트로 맞추기
189
  for c in target_cols:
 
196
  # payload를 사용자별로 그룹화
197
  user_groups: dict[str, list[dict]] = {}
198
  for item in payload.batch_data:
199
+ rec = item.model_dump()
200
+ if not rec.get("timestamp_utc"):
201
+ rec["timestamp_utc"] = datetime.now().isoformat()
202
+ user_groups.setdefault(str(item.user_id), []).append(rec)
 
 
 
 
 
 
 
 
 
 
 
 
203
 
204
  results = {}
205
 
 
245
  except HTTPException:
246
  raise
247
  except Exception as e:
248
+ print(f"❌ 배치 데이터셋 업로드 실패: {e}")
249
+ raise HTTPException(status_code=500, detail=f"배치 데이터셋 업로드 실패: {str(e)}")
250
 
251
  def df_to_dataset(df):
252
  """DataFrame을 Dataset으로 변환"""