import os import json from typing import List, Optional from fastapi import FastAPI, HTTPException, Request from pydantic import BaseModel, Field, ConfigDict import oracledb from dotenv import load_dotenv import json import requests import pandas as pd from datetime import datetime from datasets import Dataset, DatasetDict, load_dataset # 로컬 개발: .env 파일 로드 (있으면) load_dotenv() # ----- 환경 변수 ----- DB_USER = os.environ["DB_USER"] DB_PASSWORD = os.environ["DB_PASSWORD"] WALLET_DIR = os.environ["WALLET_DIR"] WALLET_PASSWORD = os.environ["WALLET_PASSWORD"] # tnsnames.ora 안의 alias 확인 (보통 *_high) TNS_ALIAS = os.environ.get("DB_TNS_ALIAS", "musclecare_high") # Hugging Face 설정 HF_DATA_REPO_ID = os.getenv("HF_DATA_REPO_ID") HF_DATA_TOKEN = os.getenv("HF_DATA_TOKEN") # ----- 연결 풀: Thin + mTLS (지갑) ----- # 절대 호출하지 마세요: oracledb.init_oracle_client() # (Thick로 빠져서 실패 가능) pool: oracledb.ConnectionPool = oracledb.create_pool( user=DB_USER, password=DB_PASSWORD, dsn=TNS_ALIAS, # Wallet tnsnames.ora의 alias config_dir=WALLET_DIR, wallet_location=WALLET_DIR, wallet_password=WALLET_PASSWORD, min=1, max=4, increment=1, homogeneous=True, timeout=60, retry_count=6, retry_delay=2 ) app = FastAPI(title="MuscleCare Hybrid Server (mTLS)") # ----- 모델 ----- class StatePayload(BaseModel): model_config = ConfigDict(protected_namespaces=()) user_id: str rms_base: Optional[float] = None freq_base: Optional[float] = None user_emb: Optional[List[float]] = Field(default=None, description="length=12") model_version: Optional[str] = None # 배치 데이터용 스키마 class BatchDataItem(BaseModel): user_id: str session_id: str measure_date: str rms: float freq: float fatigue: float mode: str window_count: int windows: List[dict] = Field(default_factory=list) measurement_count: int class BatchUploadPayload(BaseModel): batch_data: List[BatchDataItem] batch_size: int batch_date: str # ----- 유틸 ----- def clob_json(obj) -> str: return json.dumps(obj, separators=(",", ":"), ensure_ascii=False) # ----- 엔드포인트 ----- @app.get("/") def root(): """루트 엔드포인트 - 서버 상태 확인""" return { "status": "running", "message": "MuscleCare API Server", "version": "1.0.0", "endpoints": { "health": "/health (빠른 체크)", "health_db": "/health/db (DB 연결 체크)", "docs": "/docs", "upload_state": "/upload_state", "upload_batch_dataset": "/upload_batch_dataset (배치 데이터)", "user_dataset": "/user_dataset/{user_id}" } } @app.get("/health") def health(): try: # 간단한 health 체크 - DB 연결 없이 서버 상태만 확인 return { "ok": True, "server": "running", "timestamp": datetime.now().isoformat(), "status": "healthy" } except Exception as e: return {"ok": False, "error": str(e)} @app.get("/health/db") def health_db(): """DB 연결을 포함한 상세 health 체크""" try: with pool.acquire() as conn: with conn.cursor() as cur: cur.execute("SELECT 1 FROM DUAL") v = cur.fetchone()[0] return {"ok": True, "db": v, "server": "running"} except Exception as e: return {"ok": False, "db": "error", "error": str(e)} @app.post("/upload_state") def upload_state(p: StatePayload): # MERGE INTO MuscleCare.user_state try: emb_json = None if p.user_emb is not None: if len(p.user_emb) != 12: raise HTTPException(400, "user_emb must have length=12") emb_json = clob_json(p.user_emb) with pool.acquire() as conn: with conn.cursor() as cur: cur.execute(""" MERGE INTO MuscleCare.user_state t USING ( SELECT :user_id AS user_id FROM dual ) s ON (t.user_id = s.user_id) WHEN MATCHED THEN UPDATE SET rms_base = :rms_base, freq_base = :freq_base, user_emb = :user_emb, model_version = :model_version, last_sync = CURRENT_TIMESTAMP WHEN NOT MATCHED THEN INSERT (user_id, rms_base, freq_base, user_emb, model_version, last_sync) VALUES (:user_id, :rms_base, :freq_base, :user_emb, :model_version, CURRENT_TIMESTAMP) """, dict( user_id=p.user_id, rms_base=p.rms_base, freq_base=p.freq_base, user_emb=emb_json, model_version=p.model_version )) conn.commit() return {"ok": True} except HTTPException: raise except Exception as e: raise HTTPException(500, f"upload_state failed: {e}") @app.get("/user_dataset/{user_id}") async def read_user_dataset(user_id: str): """Hugging Face Hub에서 사용자 데이터 조회""" try: # Hugging Face 환경변수 확인 hf_repo_id = os.getenv("HF_DATA_REPO_ID") hf_token = os.getenv("HF_DATA_TOKEN") if not hf_repo_id or not hf_token: raise HTTPException(status_code=500, detail="Hugging Face 설정이 필요합니다 (HF_DATA_REPO_ID, HF_DATA_TOKEN)") # Hugging Face Hub에서 사용자 데이터 로드 try: dataset = load_dataset(hf_repo_id, split=user_id, token=hf_token) data = dataset.to_pandas().to_dict(orient="records") # 최근 5개 레코드 반환 recent_data = data[-5:] if len(data) > 5 else data return { "user_id": user_id, "count": len(data), "recent_data": recent_data, "filename": f"{user_id}.parquet", "source": "huggingface_hub", "repo_id": hf_repo_id } except Exception as e: # 데이터가 없는 경우 return { "user_id": user_id, "count": 0, "recent_data": [], "source": "huggingface_hub", "repo_id": hf_repo_id, "message": "No data found" } except HTTPException: raise except Exception as e: print(f"❌ Hugging Face Hub 조회 실패: {e}") raise HTTPException(status_code=500, detail=f"Hugging Face Hub 조회 실패: {str(e)}") @app.post("/upload_batch_dataset") async def upload_batch_dataset(payload: BatchUploadPayload): """배치 단위로 사용자 데이터를 Hugging Face Hub로 푸시""" try: # Hugging Face 환경변수 확인 hf_repo_id = os.getenv("HF_DATA_REPO_ID") hf_token = os.getenv("HF_DATA_TOKEN") if not hf_repo_id or not hf_token: raise HTTPException(status_code=500, detail="Hugging Face 설정이 필요합니다 (HF_DATA_REPO_ID, HF_DATA_TOKEN)") # 사용자별로 데이터 그룹화 user_data_groups = {} for item in payload.batch_data: user_id = item.user_id if user_id not in user_data_groups: user_data_groups[user_id] = [] # 데이터 변환 record = { "session_id": item.session_id, "measure_date": item.measure_date, "rms": item.rms, "freq": item.freq, "fatigue": item.fatigue, "mode": item.mode, "window_count": item.window_count, "windows": item.windows, "measurement_count": item.measurement_count, "batch_date": payload.batch_date, "batch_size": payload.batch_size, "timestamp": datetime.now().isoformat() } user_data_groups[user_id].append(record) results = {} # 현재 repo에 있는 모든 split 불러오기 try: existing = load_dataset(hf_repo_id, token=hf_token) all_splits = list(existing.keys()) print(f"📂 기존 splits: {all_splits}") except Exception: existing = DatasetDict() print("📂 기존 repo 없음 → 새로 생성") # 현재 사용자만 업데이트 for user_id, records in user_data_groups.items(): try: df = pd.DataFrame(records) new_dataset = Dataset.from_pandas(df) if user_id in existing: # 기존 데이터프레임과 병합 old_df = existing[user_id].to_pandas() merged = pd.concat([old_df, df], ignore_index=True) existing[user_id] = Dataset.from_pandas(merged) print(f"📊 {user_id}: 기존 데이터와 병합 ({len(old_df)} + {len(df)} = {len(merged)}개 레코드)") else: existing[user_id] = new_dataset print(f"📊 {user_id}: 신규 데이터 추가 ({len(df)}개 레코드)") results[user_id] = { "status": "success", "new_rows": len(records), "filename": f"{user_id}.parquet" } except Exception as e: print(f"❌ {user_id} 처리 실패: {e}") results[user_id] = { "status": "failed", "error": str(e) } # 모든 split 통째로 다시 push try: existing.push_to_hub(hf_repo_id, token=hf_token, private=True) print(f"✅ 전체 DatasetDict 푸시 완료: {len(existing)}개 사용자") except Exception as e: print(f"❌ 전체 푸시 실패: {e}") raise HTTPException(status_code=500, detail=f"전체 푸시 실패: {str(e)}") return { "batch_date": payload.batch_date, "batch_size": payload.batch_size, "processed_users": len(user_data_groups), "results": results, "repo_id": hf_repo_id, "message": f"Batch upload completed for {len(user_data_groups)} users" } except HTTPException: raise except Exception as e: print(f"❌ 배치 푸시 실패: {e}") raise HTTPException(status_code=500, detail=f"배치 푸시 실패: {str(e)}")