Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from typing import List, Optional | |
| from fastapi import FastAPI, HTTPException, Request | |
| from pydantic import BaseModel, Field, ConfigDict | |
| import oracledb | |
| from dotenv import load_dotenv | |
| import json | |
| import requests | |
| import pandas as pd | |
| from datetime import datetime | |
| from datasets import Dataset, DatasetDict, load_dataset | |
| # ๋ก์ปฌ ๊ฐ๋ฐ: .env ํ์ผ ๋ก๋ (์์ผ๋ฉด) | |
| load_dotenv() | |
| # ----- ํ๊ฒฝ ๋ณ์ ----- | |
| DB_USER = os.environ["DB_USER"] | |
| DB_PASSWORD = os.environ["DB_PASSWORD"] | |
| WALLET_DIR = os.environ["WALLET_DIR"] | |
| WALLET_PASSWORD = os.environ["WALLET_PASSWORD"] | |
| # tnsnames.ora ์์ alias ํ์ธ (๋ณดํต *_high) | |
| TNS_ALIAS = os.environ.get("DB_TNS_ALIAS", "musclecare_high") | |
| # Hugging Face ์ค์ | |
| HF_DATA_REPO_ID = os.getenv("HF_DATA_REPO_ID") | |
| HF_DATA_TOKEN = os.getenv("HF_DATA_TOKEN") | |
| # ----- ์ฐ๊ฒฐ ํ: Thin + mTLS (์ง๊ฐ) ----- | |
| # ์ ๋ ํธ์ถํ์ง ๋ง์ธ์: oracledb.init_oracle_client() # (Thick๋ก ๋น ์ ธ์ ์คํจ ๊ฐ๋ฅ) | |
| pool: oracledb.ConnectionPool = oracledb.create_pool( | |
| user=DB_USER, | |
| password=DB_PASSWORD, | |
| dsn=TNS_ALIAS, # Wallet tnsnames.ora์ alias | |
| config_dir=WALLET_DIR, | |
| wallet_location=WALLET_DIR, | |
| wallet_password=WALLET_PASSWORD, | |
| min=1, max=4, increment=1, | |
| homogeneous=True, | |
| timeout=60, | |
| retry_count=6, retry_delay=2 | |
| ) | |
| app = FastAPI(title="MuscleCare Hybrid Server (mTLS)") | |
| # ----- ๋ชจ๋ธ ----- | |
| class StatePayload(BaseModel): | |
| model_config = ConfigDict(protected_namespaces=()) | |
| user_id: str | |
| rms_base: Optional[float] = None | |
| freq_base: Optional[float] = None | |
| user_emb: Optional[List[float]] = Field(default=None, description="length=12") | |
| model_version: Optional[str] = None | |
| # ๋ฐฐ์น ๋ฐ์ดํฐ์ฉ ์คํค๋ง | |
| class BatchDataItem(BaseModel): | |
| user_id: str | |
| session_id: str | |
| measure_date: str | |
| rms: float | |
| freq: float | |
| fatigue: float | |
| mode: str | |
| window_count: int | |
| measurement_count: int | |
| class BatchUploadPayload(BaseModel): | |
| batch_data: List[BatchDataItem] | |
| batch_size: int | |
| batch_date: str | |
| # ----- ์ ํธ ----- | |
| def clob_json(obj) -> str: | |
| return json.dumps(obj, separators=(",", ":"), ensure_ascii=False) | |
| # ----- ์๋ํฌ์ธํธ ----- | |
| def root(): | |
| """๋ฃจํธ ์๋ํฌ์ธํธ - ์๋ฒ ์ํ ํ์ธ""" | |
| return { | |
| "status": "running", | |
| "message": "MuscleCare API Server", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "health": "/health (๋น ๋ฅธ ์ฒดํฌ)", | |
| "health_db": "/health/db (DB ์ฐ๊ฒฐ ์ฒดํฌ)", | |
| "docs": "/docs", | |
| "upload_state": "/upload_state", | |
| "upload_logs": "/upload_logs (๋ฐฐ์น ๋ฐ์ดํฐ)", | |
| "user_dataset": "/user_dataset/{user_id}" | |
| } | |
| } | |
| def health(): | |
| try: | |
| # ๊ฐ๋จํ health ์ฒดํฌ - DB ์ฐ๊ฒฐ ์์ด ์๋ฒ ์ํ๋ง ํ์ธ | |
| return { | |
| "ok": True, | |
| "server": "running", | |
| "timestamp": datetime.now().isoformat(), | |
| "status": "healthy" | |
| } | |
| except Exception as e: | |
| return {"ok": False, "error": str(e)} | |
| def health_db(): | |
| """DB ์ฐ๊ฒฐ์ ํฌํจํ ์์ธ health ์ฒดํฌ""" | |
| try: | |
| with pool.acquire() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute("SELECT 1 FROM DUAL") | |
| v = cur.fetchone()[0] | |
| return {"ok": True, "db": v, "server": "running"} | |
| except Exception as e: | |
| return {"ok": False, "db": "error", "error": str(e)} | |
| def upload_state(p: StatePayload): | |
| # MERGE INTO MuscleCare.user_state | |
| try: | |
| emb_json = None | |
| if p.user_emb is not None: | |
| if len(p.user_emb) != 12: | |
| raise HTTPException(400, "user_emb must have length=12") | |
| emb_json = clob_json(p.user_emb) | |
| with pool.acquire() as conn: | |
| with conn.cursor() as cur: | |
| cur.execute(""" | |
| MERGE INTO MuscleCare.user_state t | |
| USING ( | |
| SELECT :user_id AS user_id FROM dual | |
| ) s | |
| ON (t.user_id = s.user_id) | |
| WHEN MATCHED THEN UPDATE SET | |
| rms_base = :rms_base, | |
| freq_base = :freq_base, | |
| user_emb = :user_emb, | |
| model_version = :model_version, | |
| last_sync = CURRENT_TIMESTAMP | |
| WHEN NOT MATCHED THEN INSERT | |
| (user_id, rms_base, freq_base, user_emb, model_version, last_sync) | |
| VALUES | |
| (:user_id, :rms_base, :freq_base, :user_emb, :model_version, CURRENT_TIMESTAMP) | |
| """, dict( | |
| user_id=p.user_id, | |
| rms_base=p.rms_base, | |
| freq_base=p.freq_base, | |
| user_emb=emb_json, | |
| model_version=p.model_version | |
| )) | |
| conn.commit() | |
| return {"ok": True} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(500, f"upload_state failed: {e}") | |
| async def read_user_dataset(user_id: str): | |
| """Hugging Face Hub์์ ์ฌ์ฉ์ ๋ฐ์ดํฐ ์กฐํ""" | |
| try: | |
| # Hugging Face ํ๊ฒฝ๋ณ์ ํ์ธ | |
| hf_repo_id = os.getenv("HF_DATA_REPO_ID") | |
| hf_token = os.getenv("HF_DATA_TOKEN") | |
| if not hf_repo_id or not hf_token: | |
| raise HTTPException(status_code=500, detail="Hugging Face ์ค์ ์ด ํ์ํฉ๋๋ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)") | |
| # Hugging Face Hub์์ ์ฌ์ฉ์ ๋ฐ์ดํฐ ๋ก๋ | |
| try: | |
| dataset = load_dataset(hf_repo_id, split=user_id, token=hf_token) | |
| data = dataset.to_pandas().to_dict(orient="records") | |
| # ์ต๊ทผ 5๊ฐ ๋ ์ฝ๋ ๋ฐํ | |
| recent_data = data[-5:] if len(data) > 5 else data | |
| return { | |
| "user_id": user_id, | |
| "count": len(data), | |
| "recent_data": recent_data, | |
| "filename": f"{user_id}.parquet", | |
| "source": "huggingface_hub", | |
| "repo_id": hf_repo_id | |
| } | |
| except Exception as e: | |
| # ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ | |
| return { | |
| "user_id": user_id, | |
| "count": 0, | |
| "recent_data": [], | |
| "source": "huggingface_hub", | |
| "repo_id": hf_repo_id, | |
| "message": "No data found" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"โ Hugging Face Hub ์กฐํ ์คํจ: {e}") | |
| raise HTTPException(status_code=500, detail=f"Hugging Face Hub ์กฐํ ์คํจ: {str(e)}") | |
| async def upload_logs(payload: BatchUploadPayload): | |
| """๋ฐฐ์น ๋จ์๋ก ์ฌ์ฉ์ ๋ฐ์ดํฐ๋ฅผ Hugging Face Hub๋ก ํธ์""" | |
| try: | |
| # Hugging Face ํ๊ฒฝ๋ณ์ ํ์ธ | |
| hf_repo_id = os.getenv("HF_DATA_REPO_ID") | |
| hf_token = os.getenv("HF_DATA_TOKEN") | |
| if not hf_repo_id or not hf_token: | |
| raise HTTPException(status_code=500, detail="Hugging Face ์ค์ ์ด ํ์ํฉ๋๋ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)") | |
| # ์ฌ์ฉ์๋ณ๋ก ๋ฐ์ดํฐ ๊ทธ๋ฃนํ | |
| user_data_groups = {} | |
| for item in payload.batch_data: | |
| user_id = item.user_id | |
| if user_id not in user_data_groups: | |
| user_data_groups[user_id] = [] | |
| # ๋ฐ์ดํฐ ๋ณํ | |
| record = { | |
| "session_id": item.session_id, | |
| "measure_date": item.measure_date, | |
| "rms": item.rms, | |
| "freq": item.freq, | |
| "fatigue": item.fatigue, | |
| "mode": item.mode, | |
| "window_count": item.window_count, | |
| "measurement_count": item.measurement_count, | |
| "batch_date": payload.batch_date, | |
| "batch_size": payload.batch_size, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| user_data_groups[user_id].append(record) | |
| results = {} | |
| # ํ์ฌ repo์ ์๋ ๋ชจ๋ split ๋ถ๋ฌ์ค๊ธฐ | |
| try: | |
| existing = load_dataset(hf_repo_id, token=hf_token) | |
| all_splits = list(existing.keys()) | |
| print(f"๐ ๊ธฐ์กด splits: {all_splits}") | |
| # ๊ธฐ์กด ๋ฐ์ดํฐ๋ฅผ ์์ ํ ์๋ก ์์ฑ (์คํค๋ง ํต์ผ) | |
| new_existing = DatasetDict() | |
| for user_id in existing.keys(): | |
| df = existing[user_id].to_pandas() | |
| # ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์๋ก ์์ฑํ์ฌ ์คํค๋ง ํต์ผ | |
| new_existing[user_id] = df_to_dataset(df) | |
| print(f"๐ง {user_id}: ๊ธฐ์กด ๋ฐ์ดํฐ ์ฌ์์ฑ ์๋ฃ") | |
| existing = new_existing | |
| except Exception: | |
| existing = DatasetDict() | |
| print("๐ ๊ธฐ์กด repo ์์ โ ์๋ก ์์ฑ") | |
| # ํ์ฌ ์ฌ์ฉ์๋ง ์ ๋ฐ์ดํธ | |
| for user_id, records in user_data_groups.items(): | |
| try: | |
| # ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ | |
| new_df = pd.DataFrame(records) | |
| new_dataset = df_to_dataset(new_df) | |
| if user_id in existing: | |
| # ๊ธฐ์กด ๋ฐ์ดํฐ์ ๋ณํฉ | |
| old_df = existing[user_id].to_pandas() | |
| merged_df = pd.concat([old_df, new_df], ignore_index=True) | |
| existing[user_id] = df_to_dataset(merged_df) | |
| print(f"๐ {user_id}: ๊ธฐ์กด ๋ฐ์ดํฐ์ ๋ณํฉ ({len(old_df)} + {len(new_df)} = {len(merged_df)}๊ฐ ๋ ์ฝ๋)") | |
| else: | |
| existing[user_id] = new_dataset | |
| print(f"๐ {user_id}: ์ ๊ท ๋ฐ์ดํฐ ์ถ๊ฐ ({len(new_df)}๊ฐ ๋ ์ฝ๋)") | |
| results[user_id] = { | |
| "status": "success", | |
| "new_rows": len(records), | |
| "filename": f"{user_id}.parquet" | |
| } | |
| except Exception as e: | |
| print(f"โ {user_id} ์ฒ๋ฆฌ ์คํจ: {e}") | |
| results[user_id] = { | |
| "status": "failed", | |
| "error": str(e) | |
| } | |
| # ๋ชจ๋ split ํต์งธ๋ก ๋ค์ push | |
| try: | |
| existing.push_to_hub(hf_repo_id, token=hf_token, private=True) | |
| print(f"โ ์ ์ฒด DatasetDict ํธ์ ์๋ฃ: {len(existing)}๊ฐ ์ฌ์ฉ์") | |
| except Exception as e: | |
| print(f"โ ์ ์ฒด ํธ์ ์คํจ: {e}") | |
| raise HTTPException(status_code=500, detail=f"์ ์ฒด ํธ์ ์คํจ: {str(e)}") | |
| return { | |
| "batch_date": payload.batch_date, | |
| "batch_size": payload.batch_size, | |
| "processed_users": len(user_data_groups), | |
| "results": results, | |
| "repo_id": hf_repo_id, | |
| "message": f"Batch upload completed for {len(user_data_groups)} users" | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"โ ๋ฐฐ์น ํธ์ ์คํจ: {e}") | |
| raise HTTPException(status_code=500, detail=f"๋ฐฐ์น ํธ์ ์คํจ: {str(e)}") | |
| def df_to_dataset(df): | |
| """DataFrame์ Dataset์ผ๋ก ๋ณํ""" | |
| return Dataset.from_pandas(df) | |