book-rec-with-LLMs / src /services /recommend_service.py
ymlin105's picture
chore: remove obsolete files and update project structure
6ad997d
import pickle
import pandas as pd
import lightgbm as lgb
import xgboost as xgb
import numpy as np
from pathlib import Path
from src.config import MMR_LAMBDA_DEFAULT, POPULARITY_GAMMA_DEFAULT, MAX_PER_CATEGORY_DEFAULT
from src.recsys.recall.fusion import RecallFusion
from src.recsys.ranking.features import FeatureEngineer
from src.recsys.ranking.explainer import RankingExplainer
from src.recsys.ranking.din import DINRanker
from src.core.diversity_reranker import DiversityReranker
from src.utils import setup_logger
logger = setup_logger(__name__)
class RecommendationService:
def __init__(self, data_dir='data/rec', model_dir='data/model'):
self.data_dir = Path(data_dir)
self.model_dir = Path(model_dir)
self.fusion = RecallFusion(data_dir, f'{model_dir}/recall')
self.fe = FeatureEngineer(data_dir, f'{model_dir}/recall')
self.ranker = None
self.ranker_loaded = False
self.din_ranker = DINRanker(str(data_dir), str(model_dir))
self.din_ranker_loaded = False
self.xgb_ranker = None
self.meta_model = None
self.use_stacking = False
self.explainer = None # SHAP explainer (V2.7)
def load_resources(self):
if self.ranker_loaded:
return
logger.info("Loading Recommendation Service resources...")
self.fusion.load_models()
self.fe.load_base_data()
# Prefer DIN ranker when available (deep model)
din_path = self.model_dir / 'ranking/din_ranker.pt'
if din_path.exists():
if self.din_ranker.load():
self.din_ranker_loaded = True
logger.info("DIN ranker loaded — using deep model for ranking")
# Load LGBM ranker (fallback when DIN not available)
ranker_path = self.model_dir / 'ranking/lgbm_ranker.txt'
if ranker_path.exists():
self.ranker = lgb.Booster(model_file=str(ranker_path))
logger.info(f"Ranker loaded from {ranker_path}")
self.ranker_loaded = True
# Initialize SHAP explainer (V2.7)
try:
self.explainer = RankingExplainer(self.ranker)
except Exception as e:
logger.warning(f"Failed to initialize SHAP explainer: {e}")
self.explainer = None
# Load XGBoost ranker (for stacking)
xgb_path = self.model_dir / 'ranking/xgb_ranker.json'
if xgb_path.exists():
try:
self.xgb_ranker = xgb.XGBClassifier()
# For older models/new xgboost versions, loading might raise TypeError if type isn't set
self.xgb_ranker.load_model(str(xgb_path))
logger.info(f"XGBoost ranker loaded from {xgb_path}")
except Exception as e:
logger.warning(f"Failed to load XGBoost ranker (stacking might be suboptimal): {e}")
# Fallback to booster if it's a raw booster dump
try:
self.xgb_ranker = xgb.Booster()
self.xgb_ranker.load_model(str(xgb_path))
logger.info("XGBoost ranker loaded as raw Booster.")
except Exception as e:
logger.debug("XGBoost Booster fallback failed: %s", e)
self.xgb_ranker = None
# Load stacking meta-model
meta_path = self.model_dir / 'ranking/stacking_meta.pkl'
if meta_path.exists():
with open(meta_path, 'rb') as f:
meta_data = pickle.load(f)
self.meta_model = meta_data['meta_model']
self.use_stacking = True
logger.info(f"Stacking meta-model loaded — stacking ENABLED")
else:
logger.warning(f"Ranker model not found at {ranker_path}, prediction will be skipped")
# Deduplication now uses MetadataStore for Title lookups (Zero-RAM mode)
from src.data.stores.metadata_store import metadata_store
self.metadata_store = metadata_store
logger.info("RecommendationService: Zero-RAM mode enabled for metadata lookups.")
# P0: Diversity Reranker (MMR + Popularity penalty + Category constraint)
self.diversity_reranker = DiversityReranker(
metadata_store=metadata_store,
data_dir=str(self.data_dir),
mmr_lambda=MMR_LAMBDA_DEFAULT,
popularity_gamma=POPULARITY_GAMMA_DEFAULT,
max_per_category=MAX_PER_CATEGORY_DEFAULT,
)
def get_recommendations(
self,
user_id,
top_k=10,
filter_favorites=True,
enable_diversity_rerank: bool = True,
real_time_sequence=None,
):
"""
Get personalized recommendations for a user.
Args:
enable_diversity_rerank: If True, apply MMR + popularity penalty + category
diversity (P0 optimization). Can disable for A/B testing.
real_time_sequence: P1 - List of ISBNs from current session (e.g. just-clicked).
Injected into SASRec recall and DIN/LGBM ranking.
Returns:
List of (isbn, score, explanations) tuples where explanations
is a list of dicts with feature contributions from SHAP.
"""
from src.data.stores.profile_store import list_favorites
self.load_resources()
# P1: Build effective sequence (offline + real-time) for SASRec/DIN
effective_seq = None
override_user_emb = None
if real_time_sequence:
sasrec = self.fusion.sasrec
base = getattr(sasrec, "user_sequences", {}).get(user_id, [])
id2item = getattr(sasrec, "id_to_item", {})
base_isbns = [id2item[i] for i in base if i in id2item]
effective_seq = (base_isbns + list(real_time_sequence))[-50:]
try:
override_user_emb = sasrec._compute_emb_from_seq(effective_seq)
except Exception:
override_user_emb = None
# 0. Get User Context (Favorites) for filtering
fav_isbns = set()
if filter_favorites:
try:
from src.data.stores.profile_store import list_favorites as _list_favorites
user_favs = _list_favorites(user_id)
fav_isbns = set(user_favs)
except Exception as e:
logger.warning(f"Could not fetch favorites for filtering: {e}")
# 1. Recall (P1: inject real_time_seq into SASRec)
candidates = self.fusion.get_recall_items(
user_id, k=200, real_time_seq=real_time_sequence
)
# P1: Cold-start fallback — when recall returns empty, use popularity
if not candidates:
pop_recs = self.fusion.popularity.recommend(user_id, top_k=200)
candidates = list(pop_recs)
if not candidates:
return []
# Deduplicate candidates (keep highest score)
unique_candidates = {}
for item, score in candidates:
if item not in unique_candidates:
unique_candidates[item] = score
candidates = list(unique_candidates.items())
candidate_items = [item for item, score in candidates]
# 2. Ranking
valid_candidates = [item for item in candidate_items if item not in fav_isbns]
if not valid_candidates:
return []
if self.din_ranker_loaded:
# DIN: deep model; P1: override_hist for real-time
aux_arr = None
if self.din_ranker.aux_feature_names:
X_df = self.fe.generate_features_batch(
user_id,
valid_candidates,
override_user_emb=override_user_emb,
override_user_seq=effective_seq,
)
for col in self.din_ranker.aux_feature_names:
if col not in X_df.columns:
X_df[col] = 0
aux_arr = X_df[self.din_ranker.aux_feature_names].values.astype(np.float32)
scores = self.din_ranker.predict(
user_id,
valid_candidates,
aux_arr,
override_hist=effective_seq,
)
explanations_list = [[] for _ in valid_candidates]
final_scores = list(zip(valid_candidates, scores, explanations_list))
final_scores.sort(key=lambda x: x[1], reverse=True)
elif self.ranker_loaded:
# LGBM / stacking path. P1: override for real-time
X_df = self.fe.generate_features_batch(
user_id,
valid_candidates,
override_user_emb=override_user_emb,
override_user_seq=effective_seq,
)
model_features = self.ranker.feature_name()
for col in model_features:
if col not in X_df.columns:
X_df[col] = 0
X_df = X_df[model_features]
if self.use_stacking and self.xgb_ranker is not None and self.meta_model is not None:
lgb_scores = self.ranker.predict(X_df)
if isinstance(self.xgb_ranker, xgb.Booster):
dtest = xgb.DMatrix(X_df)
xgb_scores = self.xgb_ranker.predict(dtest)
else:
xgb_scores = self.xgb_ranker.predict_proba(X_df)[:, 1]
meta_features = np.column_stack([lgb_scores, xgb_scores])
scores = self.meta_model.predict_proba(meta_features)[:, 1]
else:
scores = self.ranker.predict(X_df)
explanations_list = []
if self.explainer is not None:
try:
explanations_list = self.explainer.explain(X_df, top_k=3)
except Exception as e:
explanations_list = [[] for _ in valid_candidates]
else:
explanations_list = [[] for _ in valid_candidates]
final_scores = list(zip(valid_candidates, scores, explanations_list))
final_scores.sort(key=lambda x: x[1], reverse=True)
else:
# Fallback to recall scores, but filter
final_scores = []
for item, score in candidates:
if item not in fav_isbns:
final_scores.append((item, score, []))
# 2.5 P0: Diversity Rerank (MMR + popularity penalty + category constraint)
if enable_diversity_rerank and final_scores:
final_scores = self.diversity_reranker.rerank(
final_scores,
top_k=top_k * 2, # Oversample for title dedup
)
# 3. Deduplication by Title
unique_results = []
seen_titles = set()
for isbn, score, explanation in final_scores:
meta = self.metadata_store.get_book_metadata(str(isbn))
title = meta.get("title", "").lower().strip() if meta else ""
# If title is found and seen, skip
if title and title in seen_titles:
continue
if title:
seen_titles.add(title)
unique_results.append((isbn, score, explanation))
if len(unique_results) >= top_k:
break
return unique_results
def get_popular_books(self, limit: int = 24) -> list:
"""
P2: Return popular books for onboarding selection.
Used when new user has no history — lets them pick 3–5 to seed preferences.
"""
self.load_resources()
recs = self.fusion.popularity.recommend(user_id=None, top_k=limit)
results = []
seen_titles = set()
for isbn, _ in recs:
meta = self.metadata_store.get_book_metadata(str(isbn))
title = (meta.get("title") or "").lower().strip()
if title and title in seen_titles:
continue
if title:
seen_titles.add(title)
results.append((isbn, meta or {}))
if len(results) >= limit:
break
return results
if __name__ == "__main__":
import logging
logger.setLevel(logging.INFO)
service = RecommendationService()
# Test user
df = pd.read_csv('data/rec/train.csv')
user_id = df['user_id'].iloc[0]
logger.info(f"Getting recommendations for {user_id}...")
recs = service.get_recommendations(user_id)
print("\nTop Recommendations:")
for item, score, explanation in recs:
print(f"ISBN: {item}, Score: {score:.4f}")
for exp in explanation:
print(f" → {exp['feature']}: {exp['contribution']:+.4f} ({exp['direction']})")