Merry99 commited on
Commit
9668c3f
ยท
1 Parent(s): 99fe5a3

fix: log data field

Browse files
Files changed (1) hide show
  1. app.py +57 -140
app.py CHANGED
@@ -1,12 +1,10 @@
1
  import os
2
  import json
3
  from typing import List, Optional
4
- from fastapi import FastAPI, HTTPException, Request
5
  from pydantic import BaseModel, Field, ConfigDict
6
  import oracledb
7
  from dotenv import load_dotenv
8
- import json
9
- import requests
10
  import pandas as pd
11
  from datetime import datetime
12
  from datasets import Dataset, DatasetDict, load_dataset
@@ -54,23 +52,20 @@ class StatePayload(BaseModel):
54
  user_emb: Optional[List[float]] = Field(default=None, description="length=12")
55
  model_version: Optional[str] = None
56
 
57
- # ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ์šฉ ์Šคํ‚ค๋งˆ
58
- class BatchDataItem(BaseModel):
59
  user_id: str
60
  session_id: str
61
  measure_date: str
62
  rms: float
63
  freq: float
64
  fatigue: float
 
 
 
65
  mode: str
66
  window_count: int
67
  measurement_count: int
68
 
69
- class BatchUploadPayload(BaseModel):
70
- batch_data: List[BatchDataItem]
71
- batch_size: int
72
- batch_date: str
73
-
74
 
75
  # ----- ์œ ํ‹ธ -----
76
  def clob_json(obj) -> str:
@@ -89,7 +84,7 @@ def root():
89
  "health_db": "/health/db (DB ์—ฐ๊ฒฐ ์ฒดํฌ)",
90
  "docs": "/docs",
91
  "upload_state": "/upload_state",
92
- "upload_logs": "/upload_logs (๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ)",
93
  "user_dataset": "/user_dataset/{user_id}"
94
  }
95
  }
@@ -162,54 +157,9 @@ def upload_state(p: StatePayload):
162
  raise HTTPException(500, f"upload_state failed: {e}")
163
 
164
 
165
- @app.get("/user_dataset/{user_id}")
166
- async def read_user_dataset(user_id: str):
167
- """Hugging Face Hub์—์„œ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ ์กฐํšŒ"""
168
- try:
169
- # Hugging Face ํ™˜๊ฒฝ๋ณ€์ˆ˜ ํ™•์ธ
170
- hf_repo_id = os.getenv("HF_DATA_REPO_ID")
171
- hf_token = os.getenv("HF_DATA_TOKEN")
172
-
173
- if not hf_repo_id or not hf_token:
174
- raise HTTPException(status_code=500, detail="Hugging Face ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
175
-
176
- # Hugging Face Hub์—์„œ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ ๋กœ๋“œ
177
- try:
178
- dataset = load_dataset(hf_repo_id, split=user_id, token=hf_token)
179
- data = dataset.to_pandas().to_dict(orient="records")
180
-
181
- # ์ตœ๊ทผ 5๊ฐœ ๋ ˆ์ฝ”๋“œ ๋ฐ˜ํ™˜
182
- recent_data = data[-5:] if len(data) > 5 else data
183
-
184
- return {
185
- "user_id": user_id,
186
- "count": len(data),
187
- "recent_data": recent_data,
188
- "filename": f"{user_id}.parquet",
189
- "source": "huggingface_hub",
190
- "repo_id": hf_repo_id
191
- }
192
-
193
- except Exception as e:
194
- # ๋ฐ์ดํ„ฐ๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ
195
- return {
196
- "user_id": user_id,
197
- "count": 0,
198
- "recent_data": [],
199
- "source": "huggingface_hub",
200
- "repo_id": hf_repo_id,
201
- "message": "No data found"
202
- }
203
-
204
- except HTTPException:
205
- raise
206
- except Exception as e:
207
- print(f"โŒ Hugging Face Hub ์กฐํšŒ ์‹คํŒจ: {e}")
208
- raise HTTPException(status_code=500, detail=f"Hugging Face Hub ์กฐํšŒ ์‹คํŒจ: {str(e)}")
209
-
210
  @app.post("/upload_logs")
211
- async def upload_logs(payload: BatchUploadPayload):
212
- """๋ฐฐ์น˜ ๋‹จ์œ„๋กœ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ๋ฅผ Hugging Face Hub๋กœ ํ‘ธ์‹œ"""
213
  try:
214
  # Hugging Face ํ™˜๊ฒฝ๋ณ€์ˆ˜ ํ™•์ธ
215
  hf_repo_id = os.getenv("HF_DATA_REPO_ID")
@@ -218,102 +168,69 @@ async def upload_logs(payload: BatchUploadPayload):
218
  if not hf_repo_id or not hf_token:
219
  raise HTTPException(status_code=500, detail="Hugging Face ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
220
 
221
- # ์‚ฌ์šฉ์ž๋ณ„๋กœ ๋ฐ์ดํ„ฐ ๊ทธ๋ฃนํ™”
222
- user_data_groups = {}
223
- for item in payload.batch_data:
224
- user_id = item.user_id
225
- if user_id not in user_data_groups:
226
- user_data_groups[user_id] = []
227
-
228
- # ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜
229
- record = {
230
- "session_id": item.session_id,
231
- "measure_date": item.measure_date,
232
- "rms": item.rms,
233
- "freq": item.freq,
234
- "fatigue": item.fatigue,
235
- "mode": item.mode,
236
- "window_count": item.window_count,
237
- "measurement_count": item.measurement_count,
238
- "batch_date": payload.batch_date,
239
- "batch_size": payload.batch_size,
240
- "timestamp": datetime.now().isoformat()
241
- }
242
- user_data_groups[user_id].append(record)
243
 
244
- results = {}
245
-
246
- # ํ˜„์žฌ repo์— ์žˆ๋Š” ๋ชจ๋“  split ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
247
  try:
248
  existing = load_dataset(hf_repo_id, token=hf_token)
249
- all_splits = list(existing.keys())
250
- print(f"๐Ÿ“‚ ๊ธฐ์กด splits: {all_splits}")
251
-
252
- # ๊ธฐ์กด ๋ฐ์ดํ„ฐ๋ฅผ ์™„์ „ํžˆ ์ƒˆ๋กœ ์ƒ์„ฑ (์Šคํ‚ค๋งˆ ํ†ต์ผ)
253
- new_existing = DatasetDict()
254
- for user_id in existing.keys():
255
- df = existing[user_id].to_pandas()
256
- # ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์ƒˆ๋กœ ์ƒ์„ฑํ•˜์—ฌ ์Šคํ‚ค๋งˆ ํ†ต์ผ
257
- new_existing[user_id] = df_to_dataset(df)
258
- print(f"๐Ÿ”ง {user_id}: ๊ธฐ์กด ๋ฐ์ดํ„ฐ ์žฌ์ƒ์„ฑ ์™„๋ฃŒ")
259
- existing = new_existing
260
-
261
  except Exception:
262
  existing = DatasetDict()
263
  print("๐Ÿ“‚ ๊ธฐ์กด repo ์—†์Œ โ†’ ์ƒˆ๋กœ ์ƒ์„ฑ")
264
 
265
- # ํ˜„์žฌ ์‚ฌ์šฉ์ž๋งŒ ์—…๋ฐ์ดํŠธ
266
- for user_id, records in user_data_groups.items():
267
- try:
268
- # ์ƒˆ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
269
- new_df = pd.DataFrame(records)
270
- new_dataset = df_to_dataset(new_df)
271
-
272
- if user_id in existing:
273
- # ๊ธฐ์กด ๋ฐ์ดํ„ฐ์™€ ๋ณ‘ํ•ฉ
274
- old_df = existing[user_id].to_pandas()
275
- merged_df = pd.concat([old_df, new_df], ignore_index=True)
276
- existing[user_id] = df_to_dataset(merged_df)
277
- print(f"๐Ÿ“Š {user_id}: ๊ธฐ์กด ๋ฐ์ดํ„ฐ์™€ ๋ณ‘ํ•ฉ ({len(old_df)} + {len(new_df)} = {len(merged_df)}๊ฐœ ๋ ˆ์ฝ”๋“œ)")
278
- else:
279
- existing[user_id] = new_dataset
280
- print(f"๐Ÿ“Š {user_id}: ์‹ ๊ทœ ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€ ({len(new_df)}๊ฐœ ๋ ˆ์ฝ”๋“œ)")
281
-
282
- results[user_id] = {
283
- "status": "success",
284
- "new_rows": len(records),
285
- "filename": f"{user_id}.parquet"
286
- }
287
-
288
- except Exception as e:
289
- print(f"โŒ {user_id} ์ฒ˜๋ฆฌ ์‹คํŒจ: {e}")
290
- results[user_id] = {
291
- "status": "failed",
292
- "error": str(e)
293
- }
294
-
295
- # ๋ชจ๋“  split ํ†ต์งธ๋กœ ๋‹ค์‹œ push
296
  try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  existing.push_to_hub(hf_repo_id, token=hf_token, private=True)
298
- print(f"โœ… ์ „์ฒด DatasetDict ํ‘ธ์‹œ ์™„๋ฃŒ: {len(existing)}๊ฐœ ์‚ฌ์šฉ์ž")
299
- except Exception as e:
300
- print(f"โŒ ์ „์ฒด ํ‘ธ์‹œ ์‹คํŒจ: {e}")
301
- raise HTTPException(status_code=500, detail=f"์ „์ฒด ํ‘ธ์‹œ ์‹คํŒจ: {str(e)}")
302
 
303
- return {
304
- "batch_date": payload.batch_date,
305
- "batch_size": payload.batch_size,
306
- "processed_users": len(user_data_groups),
307
- "results": results,
308
- "repo_id": hf_repo_id,
309
- "message": f"Batch upload completed for {len(user_data_groups)} users"
310
- }
 
 
 
 
311
 
312
  except HTTPException:
313
  raise
314
  except Exception as e:
315
- print(f"โŒ ๋ฐฐ์น˜ ํ‘ธ์‹œ ์‹คํŒจ: {e}")
316
- raise HTTPException(status_code=500, detail=f"๋ฐฐ์น˜ ํ‘ธ์‹œ ์‹คํŒจ: {str(e)}")
317
 
318
 
319
  def df_to_dataset(df):
 
1
  import os
2
  import json
3
  from typing import List, Optional
4
+ from fastapi import FastAPI, HTTPException
5
  from pydantic import BaseModel, Field, ConfigDict
6
  import oracledb
7
  from dotenv import load_dotenv
 
 
8
  import pandas as pd
9
  from datetime import datetime
10
  from datasets import Dataset, DatasetDict, load_dataset
 
52
  user_emb: Optional[List[float]] = Field(default=None, description="length=12")
53
  model_version: Optional[str] = None
54
 
55
+ class LogUploadPayload(BaseModel):
 
56
  user_id: str
57
  session_id: str
58
  measure_date: str
59
  rms: float
60
  freq: float
61
  fatigue: float
62
+ rms_base: Optional[float] = None
63
+ freq_base: Optional[float] = None
64
+ user_emb: Optional[List[float]] = Field(default=None, description="length=12")
65
  mode: str
66
  window_count: int
67
  measurement_count: int
68
 
 
 
 
 
 
69
 
70
  # ----- ์œ ํ‹ธ -----
71
  def clob_json(obj) -> str:
 
84
  "health_db": "/health/db (DB ์—ฐ๊ฒฐ ์ฒดํฌ)",
85
  "docs": "/docs",
86
  "upload_state": "/upload_state",
87
+ "upload_logs": "/upload_logs (๊ฐœ๋ณ„ ๋กœ๊ทธ ๋ฐ์ดํ„ฐ)",
88
  "user_dataset": "/user_dataset/{user_id}"
89
  }
90
  }
 
157
  raise HTTPException(500, f"upload_state failed: {e}")
158
 
159
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  @app.post("/upload_logs")
161
+ async def upload_logs(payload: LogUploadPayload):
162
+ """๊ฐœ๋ณ„ ๋กœ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ Hugging Face Hub๋กœ ํ‘ธ์‹œ"""
163
  try:
164
  # Hugging Face ํ™˜๊ฒฝ๋ณ€์ˆ˜ ํ™•์ธ
165
  hf_repo_id = os.getenv("HF_DATA_REPO_ID")
 
168
  if not hf_repo_id or not hf_token:
169
  raise HTTPException(status_code=500, detail="Hugging Face ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
170
 
171
+ # ๋‹จ์ผ ๋ ˆ์ฝ”๋“œ ์ƒ์„ฑ
172
+ record = {
173
+ "session_id": payload.session_id,
174
+ "measure_date": payload.measure_date,
175
+ "rms": payload.rms,
176
+ "freq": payload.freq,
177
+ "fatigue": payload.fatigue,
178
+ "rms_base": payload.rms_base,
179
+ "freq_base": payload.freq_base,
180
+ "user_emb": payload.user_emb,
181
+ "mode": payload.mode,
182
+ "window_count": payload.window_count,
183
+ "measurement_count": payload.measurement_count,
184
+ "timestamp": datetime.now().isoformat()
185
+ }
 
 
 
 
 
 
 
186
 
187
+ # ํ˜„์žฌ repo์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
 
 
188
  try:
189
  existing = load_dataset(hf_repo_id, token=hf_token)
190
+ print(f"๐Ÿ“‚ ๊ธฐ์กด ๋ฐ์ดํ„ฐ ๋กœ๋“œ ์™„๋ฃŒ")
 
 
 
 
 
 
 
 
 
 
 
191
  except Exception:
192
  existing = DatasetDict()
193
  print("๐Ÿ“‚ ๊ธฐ์กด repo ์—†์Œ โ†’ ์ƒˆ๋กœ ์ƒ์„ฑ")
194
 
195
+ # ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
196
+ user_id = payload.user_id
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  try:
198
+ # ์ƒˆ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ
199
+ new_df = pd.DataFrame([record])
200
+ new_dataset = df_to_dataset(new_df)
201
+
202
+ if user_id in existing:
203
+ # ๊ธฐ์กด ๋ฐ์ดํ„ฐ์™€ ๋ณ‘ํ•ฉ
204
+ old_df = existing[user_id].to_pandas()
205
+ merged_df = pd.concat([old_df, new_df], ignore_index=True)
206
+ existing[user_id] = df_to_dataset(merged_df)
207
+ print(f"๐Ÿ“Š {user_id}: ๊ธฐ์กด ๋ฐ์ดํ„ฐ์™€ ๋ณ‘ํ•ฉ ({len(old_df)} + 1 = {len(merged_df)}๊ฐœ ๋ ˆ์ฝ”๋“œ)")
208
+ else:
209
+ existing[user_id] = new_dataset
210
+ print(f"๐Ÿ“Š {user_id}: ์‹ ๊ทœ ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€ (1๊ฐœ ๋ ˆ์ฝ”๋“œ)")
211
+
212
+ # ๋ฐ์ดํ„ฐ ํ‘ธ์‹œ
213
  existing.push_to_hub(hf_repo_id, token=hf_token, private=True)
214
+ print(f"โœ… {user_id} ๋ฐ์ดํ„ฐ ํ‘ธ์‹œ ์™„๋ฃŒ")
 
 
 
215
 
216
+ return {
217
+ "user_id": user_id,
218
+ "status": "success",
219
+ "new_rows": 1,
220
+ "filename": f"{user_id}.parquet",
221
+ "repo_id": hf_repo_id,
222
+ "message": f"Log uploaded successfully for user {user_id}"
223
+ }
224
+
225
+ except Exception as e:
226
+ print(f"โŒ {user_id} ์ฒ˜๋ฆฌ ์‹คํŒจ: {e}")
227
+ raise HTTPException(status_code=500, detail=f"๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์‹คํŒจ: {str(e)}")
228
 
229
  except HTTPException:
230
  raise
231
  except Exception as e:
232
+ print(f"โŒ ๋กœ๊ทธ ์—…๋กœ๋“œ ์‹คํŒจ: {e}")
233
+ raise HTTPException(status_code=500, detail=f"๋กœ๊ทธ ์—…๋กœ๋“œ ์‹คํŒจ: {str(e)}")
234
 
235
 
236
  def df_to_dataset(df):