Merry99 commited on
Commit
3c70e7c
ยท
1 Parent(s): b545869

add batch update logs

Browse files
Files changed (1) hide show
  1. app.py +137 -0
app.py CHANGED
@@ -66,6 +66,23 @@ class LogUploadPayload(BaseModel):
66
  window_count: int
67
  measurement_count: int
68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
  # ----- ์œ ํ‹ธ -----
71
  def clob_json(obj) -> str:
@@ -233,6 +250,126 @@ async def upload_logs(payload: LogUploadPayload):
233
  raise HTTPException(status_code=500, detail=f"๋กœ๊ทธ ์—…๋กœ๋“œ ์‹คํŒจ: {str(e)}")
234
 
235
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
236
  def df_to_dataset(df):
237
  """DataFrame์„ Dataset์œผ๋กœ ๋ณ€ํ™˜"""
238
  return Dataset.from_pandas(df)
 
66
  window_count: int
67
  measurement_count: int
68
 
69
+ class BatchLogItem(BaseModel):
70
+ user_id: str
71
+ session_id: str
72
+ measure_date: str
73
+ rms: float
74
+ freq: float
75
+ fatigue: float
76
+ rms_base: Optional[float] = None
77
+ freq_base: Optional[float] = None
78
+ user_emb: Optional[List[float]] = Field(default=None, description="length=12")
79
+ mode: str
80
+ window_count: int
81
+ measurement_count: int
82
+
83
+ class BatchLogsPayload(BaseModel):
84
+ batch_data: List[BatchLogItem]
85
+
86
 
87
  # ----- ์œ ํ‹ธ -----
88
  def clob_json(obj) -> str:
 
250
  raise HTTPException(status_code=500, detail=f"๋กœ๊ทธ ์—…๋กœ๋“œ ์‹คํŒจ: {str(e)}")
251
 
252
 
253
+ @app.post("/upload_batch_logs")
254
+ async def upload_batch_logs(payload: BatchLogsPayload):
255
+ """๋ฐฐ์น˜ ๋กœ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ Hugging Face Hub์— ๋ณ‘๋ ฌ ์•„๋‹Œ ์ผ๊ด„ ๋ฐ˜์˜ (์Šคํ‚ค๋งˆ ์ •๊ทœํ™” ํฌํ•จ)"""
256
+ try:
257
+ hf_repo_id = os.getenv("HF_DATA_REPO_ID")
258
+ hf_token = os.getenv("HF_DATA_TOKEN")
259
+ if not hf_repo_id or not hf_token:
260
+ raise HTTPException(status_code=500, detail="Hugging Face ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
261
+
262
+ # ์ƒˆ ์Šคํ‚ค๋งˆ ์ •์˜
263
+ target_cols = [
264
+ "session_id", "measure_date", "rms", "freq", "fatigue",
265
+ "rms_base", "freq_base", "user_emb", "mode", "window_count",
266
+ "measurement_count", "timestamp"
267
+ ]
268
+
269
+ # ๊ธฐ์กด ๋ฐ์ดํ„ฐ ๋กœ๋“œ
270
+ try:
271
+ existing = load_dataset(hf_repo_id, token=hf_token)
272
+ print("๐Ÿ“‚ ๊ธฐ์กด DatasetDict ๋กœ๋“œ ์™„๋ฃŒ")
273
+ except Exception:
274
+ existing = DatasetDict()
275
+ print("๐Ÿ“‚ ๊ธฐ์กด repo ์—†์Œ โ†’ ์ƒˆ๋กœ ์ƒ์„ฑ")
276
+
277
+ # ๊ธฐ์กด ์Šคํ‚ค๋งˆ ์ •๊ทœํ™”: ๋ถˆํ•„์š” ์ปฌ๋Ÿผ ์ œ๊ฑฐ, ๋ˆ„๋ฝ ์ปฌ๋Ÿผ ์ถ”๊ฐ€
278
+ def normalize_existing_df(df: pd.DataFrame) -> pd.DataFrame:
279
+ # user_emb๊ฐ€ ๋ฌธ์ž์—ด์ธ ๊ฒฝ์šฐ ํŒŒ์‹ฑ ์‹œ๋„
280
+ if "user_emb" in df.columns:
281
+ def _parse_emb(x):
282
+ if isinstance(x, list):
283
+ return x
284
+ if isinstance(x, str):
285
+ try:
286
+ import json as _json
287
+ v = _json.loads(x)
288
+ return v if isinstance(v, list) else []
289
+ except Exception:
290
+ return []
291
+ return []
292
+ df["user_emb"] = df["user_emb"].apply(_parse_emb)
293
+
294
+ # ํƒ€์ž„์Šคํƒฌํ”„ ์—†์œผ๋ฉด ์ถ”๊ฐ€
295
+ if "timestamp" not in df.columns:
296
+ df["timestamp"] = datetime.now().isoformat()
297
+
298
+ # ํƒ€๊ฒŸ ์ปฌ๋Ÿผ ์„ธํŠธ๋กœ ๋งž์ถ”๊ธฐ
299
+ for c in target_cols:
300
+ if c not in df.columns:
301
+ df[c] = None
302
+ # ์—ฌ๋ถ„ ์ปฌ๋Ÿผ ์ œ๊ฑฐ
303
+ df = df[target_cols]
304
+ return df
305
+
306
+ # payload๋ฅผ ์‚ฌ์šฉ์ž๋ณ„๋กœ ๊ทธ๋ฃนํ™”
307
+ user_groups: dict[str, list[dict]] = {}
308
+ for item in payload.batch_data:
309
+ # ๋ ˆ์ฝ”๋“œ ์ƒ์„ฑ
310
+ rec = {
311
+ "session_id": item.session_id,
312
+ "measure_date": item.measure_date,
313
+ "rms": item.rms,
314
+ "freq": item.freq,
315
+ "fatigue": item.fatigue,
316
+ "rms_base": item.rms_base,
317
+ "freq_base": item.freq_base,
318
+ "user_emb": item.user_emb,
319
+ "mode": item.mode,
320
+ "window_count": item.window_count,
321
+ "measurement_count": item.measurement_count,
322
+ "timestamp": datetime.now().isoformat()
323
+ }
324
+ user_groups.setdefault(item.user_id, []).append(rec)
325
+
326
+ results = {}
327
+
328
+ # ์‚ฌ์šฉ์ž๋ณ„ ๋ณ‘ํ•ฉ ์ฒ˜๋ฆฌ
329
+ for user_id, records in user_groups.items():
330
+ try:
331
+ new_df = pd.DataFrame(records)
332
+ # ์ƒˆ DF๋„ ํƒ€๊ฒŸ ์Šคํ‚ค๋งˆ๋กœ ๋ณด์ •
333
+ for c in target_cols:
334
+ if c not in new_df.columns:
335
+ new_df[c] = None
336
+ new_df = new_df[target_cols]
337
+
338
+ if user_id in existing:
339
+ old_df = existing[user_id].to_pandas()
340
+ old_df = normalize_existing_df(old_df)
341
+ merged = pd.concat([old_df, new_df], ignore_index=True)
342
+ existing[user_id] = df_to_dataset(merged)
343
+ print(f"๐Ÿ“Š {user_id}: ๋ณ‘ํ•ฉ ({len(old_df)} + {len(new_df)} = {len(merged)})")
344
+ else:
345
+ existing[user_id] = df_to_dataset(new_df)
346
+ print(f"๐Ÿ“Š {user_id}: ์‹ ๊ทœ ์ƒ์„ฑ ({len(new_df)})")
347
+
348
+ results[user_id] = {"status": "success", "new_rows": len(records)}
349
+ except Exception as e:
350
+ print(f"โŒ {user_id} ์ฒ˜๋ฆฌ ์‹คํŒจ: {e}")
351
+ results[user_id] = {"status": "failed", "error": str(e)}
352
+
353
+ # ๏ฟฝ๏ฟฝ์‹œ
354
+ try:
355
+ existing.push_to_hub(hf_repo_id, token=hf_token, private=True)
356
+ print(f"โœ… DatasetDict ํ‘ธ์‹œ ์™„๋ฃŒ: {len(existing)} users")
357
+ except Exception as e:
358
+ print(f"โŒ ์ „์ฒด ํ‘ธ์‹œ ์‹คํŒจ: {e}")
359
+ raise HTTPException(status_code=500, detail=f"์ „์ฒด ํ‘ธ์‹œ ์‹คํŒจ: {str(e)}")
360
+
361
+ return {
362
+ "processed_users": len(user_groups),
363
+ "total_rows": sum(len(v) for v in user_groups.values()),
364
+ "results": results,
365
+ }
366
+
367
+ except HTTPException:
368
+ raise
369
+ except Exception as e:
370
+ print(f"โŒ ๋ฐฐ์น˜ ๋กœ๊ทธ ์—…๋กœ๋“œ ์‹คํŒจ: {e}")
371
+ raise HTTPException(status_code=500, detail=f"๋ฐฐ์น˜ ๋กœ๊ทธ ์—…๋กœ๋“œ ์‹คํŒจ: {str(e)}")
372
+
373
  def df_to_dataset(df):
374
  """DataFrame์„ Dataset์œผ๋กœ ๋ณ€ํ™˜"""
375
  return Dataset.from_pandas(df)