from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import os import joblib import pandas as pd from contextlib import asynccontextmanager HF_TOKEN = os.getenv("HF_TOKEN") HF_USERNAME = os.getenv("HF_USERNAME", "vancevo") DATASET_REPO = os.getenv("HF_DATASET_REPO", "loyalty-behavior-dataset") DATASET_REF = f"{HF_USERNAME}/{DATASET_REPO}" RAW_DATASET_REF = f"{HF_USERNAME}/online-retail-ii" # In-memory DataFrames loyalty_dataset_df = None raw_df = None # Online Retail II raw data customers_df = None # Unique customers extracted from raw # In-memory Models xgb_dormancy = None rf_downgrade = None def pull_dataset(): """Pull processed loyalty dataset từ HuggingFace bằng hf_hub_download.""" global loyalty_dataset_df try: from huggingface_hub import hf_hub_download print(f"Đang pull loyalty dataset: {DATASET_REF}...") file_path = hf_hub_download( repo_id=DATASET_REF, filename="data/train-00000-of-00001.parquet", repo_type="dataset", token=HF_TOKEN if HF_TOKEN and HF_TOKEN != "hf_xxxxxxxxxxxxxxxxx" else None ) loyalty_dataset_df = pd.read_parquet(file_path, engine='fastparquet') print(f"✅ Loyalty dataset: {loyalty_dataset_df.shape}") except Exception as e: print(f"Warning: Loyalty dataset lỗi ({e})") def pull_raw_dataset(): """Pull raw Online Retail II dataset từ HuggingFace qua hf_hub_download.""" global raw_df, customers_df try: from huggingface_hub import hf_hub_download print(f"Đang pull raw dataset: {RAW_DATASET_REF}...") file_path = hf_hub_download( repo_id=RAW_DATASET_REF, filename="data/train-00000-of-00001.parquet", repo_type="dataset", token=HF_TOKEN if HF_TOKEN and HF_TOKEN != "hf_xxxxxxxxxxxxxxxxx" else None ) raw_df = pd.read_parquet(file_path, engine='fastparquet') print(f"✅ Raw dataset: {raw_df.shape}") _build_customers() except Exception as e: print(f"Warning: Raw dataset tải thất bại ({e}). Thử đọc local CSV...") local_csv = "online_retail_II.csv" if os.path.exists(local_csv): raw_df = pd.read_csv(local_csv, encoding="utf-8", encoding_errors="replace", dtype=str) raw_df.columns = [c.strip().replace(" ", "_") for c in raw_df.columns] print(f"✅ Raw dataset (local CSV): {raw_df.shape}") _build_customers() # Sample names for generation (since raw data doesn't have names) SAMPLE_NAMES = [ "Anh Tuấn", "Bảo Châu", "Cẩm Tú", "Duy Mạnh", "Elena Rodriguez", "Hoàng Nam", "Lan Anh", "Minh Đức", "Ngọc Diệp", "Phúc Lâm", "Quỳnh Chi", "Sơn Tùng", "Thanh Hằng", "Uyên Linh", "Việt Anh", "John Smith", "Maria Garcia", "David Chen", "Yuki Tanaka", "Ahmed Hassan" ] def _build_customers(): """Xây dựng bảng unique customers từ raw_df bằng vectorized operations (nhanh hơn 100x).""" global customers_df if raw_df is None: return print("Đang khởi tạo chỉ mục khách hàng...") cid_col = "Customer_ID" if "Customer_ID" in raw_df.columns else "Customer ID" # Làm sạch dữ liệu df = raw_df.dropna(subset=[cid_col]).copy() df[cid_col] = df[cid_col].astype(float).astype(str).str.replace(".0", "", regex=False) # Tính toán tổng hợp # Lấy thông tin Country và Description cuối cùng của mỗi khách hàng # (Giả định dòng cuối cùng là thông tin mới nhất) last_info = df.groupby(cid_col).tail(1).set_index(cid_col) # Tính tổng đơn hàng và tổng chi tiêu stats = df.groupby(cid_col).agg({ 'Invoice': 'nunique', 'Price': lambda x: pd.to_numeric(x, errors='coerce').sum() }) # Kết hợp lại res = stats.join(last_info[['Country', 'Description']]) res = res.reset_index() # Map sang định dạng mong muốn res['customer_name'] = res[cid_col].apply(lambda x: SAMPLE_NAMES[int(float(x)) % len(SAMPLE_NAMES)]) customers_df = pd.DataFrame({ "customer_id": res[cid_col], "customer_name": res['customer_name'], "country": res['Country'].fillna("Unknown"), "top_product": res['Description'].str.slice(0, 60).fillna(""), "total_orders": res['Invoice'], "total_spend": res['Price'].round(2) }) print(f"✅ Customers index built: {len(customers_df)} unique customers") @asynccontextmanager async def lifespan(app: FastAPI): global xgb_dormancy, rf_downgrade # 1. Tải Dataset pull_dataset() pull_raw_dataset() # 2. Tải Models USERNAME = os.getenv("HF_USERNAME", "vancevo") MODEL_REPO = os.getenv("HF_MODEL_REPO", "loyalty-models") REPO_ID = f"{USERNAME}/{MODEL_REPO}" print(f"Bắt đầu nạp models từ {REPO_ID}...") xgb_dormancy = load_model(REPO_ID, "xgboost_dormancy.pkl") rf_downgrade = load_model(REPO_ID, "rf_downgrade.pkl") yield app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class PredictionRequest(BaseModel): customer_id: str features: dict class TrainRequest(BaseModel): dataset_name: str model_name: str # Hàm tiện ích để load model (thử tải từ HF, nếu lỗi dùng local) def load_model(repo_id: str, filename: str): local_path = f"models/{filename}" hf_token = os.getenv("HF_TOKEN") try: from huggingface_hub import hf_hub_download, login import joblib if hf_token and hf_token != "hf_xxxxxxxxxxxxxxxxx": try: login(token=hf_token) except: pass print(f"Đang tải {filename} từ {repo_id}...") model_path = hf_hub_download( repo_id=repo_id, filename=filename, token=hf_token if hf_token and hf_token != "hf_xxxxxxxxxxxxxxxxx" else None ) model = joblib.load(model_path) print(f"✅ Đã nạp xong model: {filename}") return model except Exception as e: print(f"❌ Lỗi khi nạp {filename} từ HF: {e}") if os.path.exists(local_path): import joblib print(f"🔄 Đang nạp {filename} từ file cục bộ...") return joblib.load(local_path) return None # In-memory Models xgb_dormancy = None rf_downgrade = None # Danh sách features cần thiết theo đúng thứ tự lúc train FEATURE_COLS = [ 'Recency', 'Frequency', 'Monetary', 'QuantitySum', 'AvgUnitPrice', 'UniqueProducts', 'AvgBasketValue', 'Frequency_3M', 'Monetary_3M', 'QuantitySum_3M', 'UniqueProducts_3M', 'Frequency_6M', 'Monetary_6M', 'QuantitySum_6M', 'UniqueProducts_6M', 'AvgBasketValue_3M_Mean', 'Recency_3M_Mean', 'Monetary_Change_1M', 'Frequency_Change_1M', 'Recency_Change_1M', 'LoyaltyScore', 'Hidden_State' ] @app.get("/api") def root(): return {"message": "Loyalty Prediction API is running."} @app.get("/api/customers") def get_customers( search: str = Query("", description="Tìm theo Customer ID hoặc Tên"), page: int = 1, page_size: int = 20, ): """Trả danh sách unique customers từ Online Retail II raw dataset.""" if customers_df is None: return {"status": "loading", "data": [], "total": 0} df = customers_df if search: mask = ( df["customer_id"].str.contains(search, case=False, na=False) | df["customer_name"].str.contains(search, case=False, na=False) | df["country"].str.contains(search, case=False, na=False) ) df = df[mask] total = len(df) start = (page - 1) * page_size records = df.iloc[start : start + page_size].to_dict(orient="records") return {"data": records, "total": total, "page": page, "page_size": page_size} @app.get("/api/raw-data") def get_raw_data(page: int = 1, page_size: int = 20, search: str = ""): """Endpoint đặc biệt để lấy dữ liệu thô (Customer ID & Customer Name).""" if customers_df is None: raise HTTPException(status_code=503, detail="Dữ liệu thô chưa sẵn sàng") df = customers_df[["customer_id", "customer_name", "country", "total_orders", "total_spend"]] if search: mask = ( df["customer_id"].str.contains(search, case=False, na=False) | df["customer_name"].str.contains(search, case=False, na=False) ) df = df[mask] total = len(df) start = (page - 1) * page_size rows = df.iloc[start : start + page_size].to_dict(orient="records") return {"data": rows, "total": total, "page": page, "page_size": page_size} @app.get("/api/raw/rows") def raw_rows(page: int = 1, page_size: int = 20, customer_id: str = ""): """Trả dữ liệu giao dịch thô của 1 khách hàng hoặc tất cả (phân trang).""" if raw_df is None: raise HTTPException(status_code=503, detail="Raw dataset chưa sẵn sàng") cid_col = "Customer_ID" if "Customer_ID" in raw_df.columns else "Customer ID" df = raw_df if customer_id: df = df[df[cid_col].astype(str) == customer_id] total = len(df) start = (page - 1) * page_size rows = df.iloc[start : start + page_size].fillna("").to_dict(orient="records") return {"data": rows, "total": total, "page": page, "page_size": page_size} @app.get("/api/dataset/info") def dataset_info(): if loyalty_dataset_df is None: return {"status": "not_loaded", "message": "Dataset chưa được pull"} return { "status": "loaded", "source": DATASET_REF, "rows": len(loyalty_dataset_df), "columns": list(loyalty_dataset_df.columns), } @app.get("/api/dataset/rows") def dataset_rows(page: int = 1, page_size: int = 20): """Trả dữ liệu dataset dạng phân trang để hiển thị trên FE Table.""" if loyalty_dataset_df is None: raise HTTPException(status_code=503, detail="Dataset chưa sẵn sàng") total = len(loyalty_dataset_df) start = (page - 1) * page_size end = min(start + page_size, total) rows = loyalty_dataset_df.iloc[start:end].to_dict(orient="records") return {"total": total, "page": page, "page_size": page_size, "data": rows} @app.post("/api/predict") def predict(req: PredictionRequest): if xgb_dormancy is None or rf_downgrade is None: raise HTTPException(status_code=500, detail="Models are not loaded.") input_data = {} customer_info = {} # Tìm trong dataset thật nếu có customer_id if loyalty_dataset_df is not None and req.customer_id: # Tìm bản ghi cuối cùng của customer theo cột Customer ID try: cid_str = str(float(req.customer_id)) col_name = "Customer ID" if "Customer ID" in loyalty_dataset_df.columns else "Customer_ID" if col_name in loyalty_dataset_df.columns: # Ép kiểu an toàn trước khi so sánh mask = loyalty_dataset_df[col_name].astype(str).str.replace(".0", "", regex=False) == cid_str.replace(".0", "") else: cid = float(req.customer_id) mask = loyalty_dataset_df.index == cid if mask.any(): row = loyalty_dataset_df[mask].iloc[-1] input_data = row[FEATURE_COLS].to_dict() customer_info = { "loyalty_score": round(float(row.get("LoyaltyScore", 0)), 2), "hidden_state": int(row.get("Hidden_State", 0)), } except Exception as e: print(f"Lỗi khi tìm customer_id {req.customer_id}: {e}") pass # Nếu user truyền features thủ công, ưu tiên dùng if req.features: input_data.update(req.features) # Fallback: nếu vẫn chưa đủ features thì báo lỗi thay vì sinh ngẫu nhiên missing = [c for c in FEATURE_COLS if c not in input_data] if missing: # Điền 0 cho các cột còn thiếu for c in missing: input_data[c] = 0 df_input = pd.DataFrame([input_data]) dormancy_prob = float(xgb_dormancy.predict_proba(df_input)[0][1]) downgrade_prob = float(rf_downgrade.predict_proba(df_input)[0][1]) # Xác định tier loyalty_score = input_data.get("LoyaltyScore", 0) if loyalty_score >= 4: tier = "Gold" elif loyalty_score >= 2.5: tier = "Silver" else: tier = "Bronze" # Hành động can thiệp theo báo cáo if dormancy_prob > 0.80: action = "Gửi voucher kích hoạt mua lại ngay" risk_level = "HIGH" elif downgrade_prob > 0.60: action = "Ưu đãi giữ hạng và chăm sóc VIP" risk_level = "HIGH" elif downgrade_prob > 0.50: action = "Gửi nhắc nhở quyền lợi và ưu đãi nhẹ" risk_level = "MEDIUM" elif dormancy_prob > 0.40: action = "Email cảm ơn và đề xuất sản phẩm liên quan" risk_level = "MEDIUM" else: action = "Chăm sóc định kỳ, duy trì quan hệ" risk_level = "LOW" return { "customer_id": req.customer_id, "tier": tier, "loyalty_score": round(loyalty_score, 2), "hidden_state": customer_info.get("hidden_state", int(input_data.get("Hidden_State", 0))), "downgrade_probability": round(downgrade_prob, 4), "dormancy_probability": round(dormancy_prob, 4), "risk_level": risk_level, "recommended_action": action, "features_used": {k: round(float(v), 2) for k, v in input_data.items()}, } @app.post("/api/train") def train(req: TrainRequest): return { "message": f"Training initiated for model {req.model_name} with dataset {req.dataset_name}", "status": "success" }