Merry99's picture
add deleteAll.py
2cd42be
raw
history blame
11 kB
import os
import json
from typing import List, Optional
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel, Field, ConfigDict
import oracledb
from dotenv import load_dotenv
import json
import requests
import pandas as pd
from datetime import datetime
from datasets import Dataset, DatasetDict, load_dataset
# ๋กœ์ปฌ ๊ฐœ๋ฐœ: .env ํŒŒ์ผ ๋กœ๋“œ (์žˆ์œผ๋ฉด)
load_dotenv()
# ----- ํ™˜๊ฒฝ ๋ณ€์ˆ˜ -----
DB_USER = os.environ["DB_USER"]
DB_PASSWORD = os.environ["DB_PASSWORD"]
WALLET_DIR = os.environ["WALLET_DIR"]
WALLET_PASSWORD = os.environ["WALLET_PASSWORD"]
# tnsnames.ora ์•ˆ์˜ alias ํ™•์ธ (๋ณดํ†ต *_high)
TNS_ALIAS = os.environ.get("DB_TNS_ALIAS", "musclecare_high")
# Hugging Face ์„ค์ •
HF_DATA_REPO_ID = os.getenv("HF_DATA_REPO_ID")
HF_DATA_TOKEN = os.getenv("HF_DATA_TOKEN")
# ----- ์—ฐ๊ฒฐ ํ’€: Thin + mTLS (์ง€๊ฐ‘) -----
# ์ ˆ๋Œ€ ํ˜ธ์ถœํ•˜์ง€ ๋งˆ์„ธ์š”: oracledb.init_oracle_client() # (Thick๋กœ ๋น ์ ธ์„œ ์‹คํŒจ ๊ฐ€๋Šฅ)
pool: oracledb.ConnectionPool = oracledb.create_pool(
user=DB_USER,
password=DB_PASSWORD,
dsn=TNS_ALIAS, # Wallet tnsnames.ora์˜ alias
config_dir=WALLET_DIR,
wallet_location=WALLET_DIR,
wallet_password=WALLET_PASSWORD,
min=1, max=4, increment=1,
homogeneous=True,
timeout=60,
retry_count=6, retry_delay=2
)
app = FastAPI(title="MuscleCare Hybrid Server (mTLS)")
# ----- ๋ชจ๋ธ -----
class StatePayload(BaseModel):
model_config = ConfigDict(protected_namespaces=())
user_id: str
rms_base: Optional[float] = None
freq_base: Optional[float] = None
user_emb: Optional[List[float]] = Field(default=None, description="length=12")
model_version: Optional[str] = None
# ๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ์šฉ ์Šคํ‚ค๋งˆ
class BatchDataItem(BaseModel):
user_id: str
session_id: str
measure_date: str
rms: float
freq: float
fatigue: float
mode: str
window_count: int
windows: List[dict] = Field(default_factory=list)
measurement_count: int
class BatchUploadPayload(BaseModel):
batch_data: List[BatchDataItem]
batch_size: int
batch_date: str
# ----- ์œ ํ‹ธ -----
def clob_json(obj) -> str:
return json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
# ----- ์—”๋“œํฌ์ธํŠธ -----
@app.get("/")
def root():
"""๋ฃจํŠธ ์—”๋“œํฌ์ธํŠธ - ์„œ๋ฒ„ ์ƒํƒœ ํ™•์ธ"""
return {
"status": "running",
"message": "MuscleCare API Server",
"version": "1.0.0",
"endpoints": {
"health": "/health (๋น ๋ฅธ ์ฒดํฌ)",
"health_db": "/health/db (DB ์—ฐ๊ฒฐ ์ฒดํฌ)",
"docs": "/docs",
"upload_state": "/upload_state",
"upload_batch_dataset": "/upload_batch_dataset (๋ฐฐ์น˜ ๋ฐ์ดํ„ฐ)",
"user_dataset": "/user_dataset/{user_id}"
}
}
@app.get("/health")
def health():
try:
# ๊ฐ„๋‹จํ•œ health ์ฒดํฌ - DB ์—ฐ๊ฒฐ ์—†์ด ์„œ๋ฒ„ ์ƒํƒœ๋งŒ ํ™•์ธ
return {
"ok": True,
"server": "running",
"timestamp": datetime.now().isoformat(),
"status": "healthy"
}
except Exception as e:
return {"ok": False, "error": str(e)}
@app.get("/health/db")
def health_db():
"""DB ์—ฐ๊ฒฐ์„ ํฌํ•จํ•œ ์ƒ์„ธ health ์ฒดํฌ"""
try:
with pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("SELECT 1 FROM DUAL")
v = cur.fetchone()[0]
return {"ok": True, "db": v, "server": "running"}
except Exception as e:
return {"ok": False, "db": "error", "error": str(e)}
@app.post("/upload_state")
def upload_state(p: StatePayload):
# MERGE INTO MuscleCare.user_state
try:
emb_json = None
if p.user_emb is not None:
if len(p.user_emb) != 12:
raise HTTPException(400, "user_emb must have length=12")
emb_json = clob_json(p.user_emb)
with pool.acquire() as conn:
with conn.cursor() as cur:
cur.execute("""
MERGE INTO MuscleCare.user_state t
USING (
SELECT :user_id AS user_id FROM dual
) s
ON (t.user_id = s.user_id)
WHEN MATCHED THEN UPDATE SET
rms_base = :rms_base,
freq_base = :freq_base,
user_emb = :user_emb,
model_version = :model_version,
last_sync = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN INSERT
(user_id, rms_base, freq_base, user_emb, model_version, last_sync)
VALUES
(:user_id, :rms_base, :freq_base, :user_emb, :model_version, CURRENT_TIMESTAMP)
""", dict(
user_id=p.user_id,
rms_base=p.rms_base,
freq_base=p.freq_base,
user_emb=emb_json,
model_version=p.model_version
))
conn.commit()
return {"ok": True}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, f"upload_state failed: {e}")
@app.get("/user_dataset/{user_id}")
async def read_user_dataset(user_id: str):
"""Hugging Face Hub์—์„œ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ ์กฐํšŒ"""
try:
# Hugging Face ํ™˜๊ฒฝ๋ณ€์ˆ˜ ํ™•์ธ
hf_repo_id = os.getenv("HF_DATA_REPO_ID")
hf_token = os.getenv("HF_DATA_TOKEN")
if not hf_repo_id or not hf_token:
raise HTTPException(status_code=500, detail="Hugging Face ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
# Hugging Face Hub์—์„œ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ ๋กœ๋“œ
try:
dataset = load_dataset(hf_repo_id, split=user_id, token=hf_token)
data = dataset.to_pandas().to_dict(orient="records")
# ์ตœ๊ทผ 5๊ฐœ ๋ ˆ์ฝ”๋“œ ๋ฐ˜ํ™˜
recent_data = data[-5:] if len(data) > 5 else data
return {
"user_id": user_id,
"count": len(data),
"recent_data": recent_data,
"filename": f"{user_id}.parquet",
"source": "huggingface_hub",
"repo_id": hf_repo_id
}
except Exception as e:
# ๋ฐ์ดํ„ฐ๊ฐ€ ์—†๋Š” ๊ฒฝ์šฐ
return {
"user_id": user_id,
"count": 0,
"recent_data": [],
"source": "huggingface_hub",
"repo_id": hf_repo_id,
"message": "No data found"
}
except HTTPException:
raise
except Exception as e:
print(f"โŒ Hugging Face Hub ์กฐํšŒ ์‹คํŒจ: {e}")
raise HTTPException(status_code=500, detail=f"Hugging Face Hub ์กฐํšŒ ์‹คํŒจ: {str(e)}")
@app.post("/upload_batch_dataset")
async def upload_batch_dataset(payload: BatchUploadPayload):
"""๋ฐฐ์น˜ ๋‹จ์œ„๋กœ ์‚ฌ์šฉ์ž ๋ฐ์ดํ„ฐ๋ฅผ Hugging Face Hub๋กœ ํ‘ธ์‹œ"""
try:
# Hugging Face ํ™˜๊ฒฝ๋ณ€์ˆ˜ ํ™•์ธ
hf_repo_id = os.getenv("HF_DATA_REPO_ID")
hf_token = os.getenv("HF_DATA_TOKEN")
if not hf_repo_id or not hf_token:
raise HTTPException(status_code=500, detail="Hugging Face ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค (HF_DATA_REPO_ID, HF_DATA_TOKEN)")
# ์‚ฌ์šฉ์ž๋ณ„๋กœ ๋ฐ์ดํ„ฐ ๊ทธ๋ฃนํ™”
user_data_groups = {}
for item in payload.batch_data:
user_id = item.user_id
if user_id not in user_data_groups:
user_data_groups[user_id] = []
# ๋ฐ์ดํ„ฐ ๋ณ€ํ™˜
record = {
"session_id": item.session_id,
"measure_date": item.measure_date,
"rms": item.rms,
"freq": item.freq,
"fatigue": item.fatigue,
"mode": item.mode,
"window_count": item.window_count,
"windows": item.windows,
"measurement_count": item.measurement_count,
"batch_date": payload.batch_date,
"batch_size": payload.batch_size,
"timestamp": datetime.now().isoformat()
}
user_data_groups[user_id].append(record)
results = {}
# ํ˜„์žฌ repo์— ์žˆ๋Š” ๋ชจ๋“  split ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
try:
existing = load_dataset(hf_repo_id, token=hf_token)
all_splits = list(existing.keys())
print(f"๐Ÿ“‚ ๊ธฐ์กด splits: {all_splits}")
except Exception:
existing = DatasetDict()
print("๐Ÿ“‚ ๊ธฐ์กด repo ์—†์Œ โ†’ ์ƒˆ๋กœ ์ƒ์„ฑ")
# ํ˜„์žฌ ์‚ฌ์šฉ์ž๋งŒ ์—…๋ฐ์ดํŠธ
for user_id, records in user_data_groups.items():
try:
df = pd.DataFrame(records)
new_dataset = Dataset.from_pandas(df)
if user_id in existing:
# ๊ธฐ์กด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„๊ณผ ๋ณ‘ํ•ฉ
old_df = existing[user_id].to_pandas()
merged = pd.concat([old_df, df], ignore_index=True)
existing[user_id] = Dataset.from_pandas(merged)
print(f"๐Ÿ“Š {user_id}: ๊ธฐ์กด ๋ฐ์ดํ„ฐ์™€ ๋ณ‘ํ•ฉ ({len(old_df)} + {len(df)} = {len(merged)}๊ฐœ ๋ ˆ์ฝ”๋“œ)")
else:
existing[user_id] = new_dataset
print(f"๐Ÿ“Š {user_id}: ์‹ ๊ทœ ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€ ({len(df)}๊ฐœ ๋ ˆ์ฝ”๋“œ)")
results[user_id] = {
"status": "success",
"new_rows": len(records),
"filename": f"{user_id}.parquet"
}
except Exception as e:
print(f"โŒ {user_id} ์ฒ˜๋ฆฌ ์‹คํŒจ: {e}")
results[user_id] = {
"status": "failed",
"error": str(e)
}
# ๋ชจ๋“  split ํ†ต์งธ๋กœ ๋‹ค์‹œ push
try:
existing.push_to_hub(hf_repo_id, token=hf_token, private=True)
print(f"โœ… ์ „์ฒด DatasetDict ํ‘ธ์‹œ ์™„๋ฃŒ: {len(existing)}๊ฐœ ์‚ฌ์šฉ์ž")
except Exception as e:
print(f"โŒ ์ „์ฒด ํ‘ธ์‹œ ์‹คํŒจ: {e}")
raise HTTPException(status_code=500, detail=f"์ „์ฒด ํ‘ธ์‹œ ์‹คํŒจ: {str(e)}")
return {
"batch_date": payload.batch_date,
"batch_size": payload.batch_size,
"processed_users": len(user_data_groups),
"results": results,
"repo_id": hf_repo_id,
"message": f"Batch upload completed for {len(user_data_groups)} users"
}
except HTTPException:
raise
except Exception as e:
print(f"โŒ ๋ฐฐ์น˜ ํ‘ธ์‹œ ์‹คํŒจ: {e}")
raise HTTPException(status_code=500, detail=f"๋ฐฐ์น˜ ํ‘ธ์‹œ ์‹คํŒจ: {str(e)}")