Broadcast_paper / train_and_save_models.py
Choi jun hyeok
update prompt
be91dcc
"""
Training pipeline for the "์‹ ๋ฌธ๊ณผ๋ฐฉ์†ก" article performance prediction project.
This script prepares the datasets, engineers features using a parallelized
Okt-powered TF-IDF and categorical encodings, tunes and trains XGBoost models
for view-count (with log transformation) and primary audience prediction,
and persists all artifacts.
It also includes a function to demonstrate finding similar articles based on
content.
Improvements from the original version:
- Centralized configuration management (CONFIG).
- Standardized logging instead of print().
- Parallelized Okt tokenizer for significant speed-up.
- Log-transformed target variable (views) for improved regression performance.
- Hyperparameter tuning using Optuna for both models.
- Early stopping during model training to prevent overfitting.
- Demonstration of a similar article search function.
"""
from __future__ import annotations
import logging
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple, cast
import joblib
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
# Optuna for hyperparameter tuning
try:
import optuna
except ImportError:
print("Optuna is not installed. Please run: pip install optuna")
sys.exit(1)
from konlpy.tag import Okt
from scipy.sparse import csr_matrix, hstack
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from xgboost import XGBClassifier, XGBRegressor
# --- 1. ์„ค์ • ์ค‘์•™ํ™” (Centralized Configuration) ---
# ์ฃผ์„: ๋ชจ๋“  ์ฃผ์š” ์„ค์ •๊ฐ’์„ ์ด๊ณณ์—์„œ ๊ด€๋ฆฌํ•˜์—ฌ ์ฝ”๋“œ ์ˆ˜์ • ์—†์ด ์‹คํ—˜ ์กฐ๊ฑด์„ ์‰ฝ๊ฒŒ ๋ณ€๊ฒฝํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
CONFIG = {
"data_dir": Path("./data_csv"),
"paths": {
"contents": "contents.csv",
"metrics": "article_metrics_monthly.csv",
"demographics": "demographics_merged.csv",
},
"artifacts": {
"vectorizer": "tfidf_vectorizer.pkl",
"onehot_encoder": "onehot_encoder.pkl",
"label_encoder": "label_encoder.pkl",
"view_model": "view_prediction_model.pkl",
"age_model": "age_prediction_model.pkl",
"text_features": "text_features_matrix.pkl",
"article_mapping": "article_mapping.pkl",
},
"feature_engineering": {
"tfidf_max_features": 5000,
"test_size": 0.2,
"random_state": 42,
},
"optuna": {
"n_trials_reg": 50, # ์กฐํšŒ์ˆ˜ ์˜ˆ์ธก ๋ชจ๋ธ ํŠœ๋‹ ํšŸ์ˆ˜
"n_trials_clf": 50, # ์—ฐ๋ น๋Œ€ ์˜ˆ์ธก ๋ชจ๋ธ ํŠœ๋‹ ํšŸ์ˆ˜
},
}
# --- 2. ๋กœ๊น… ์„ค์ • (Logging Setup) ---
# ์ฃผ์„: print() ๋Œ€์‹  logging์„ ์‚ฌ์šฉํ•˜์—ฌ ๋กœ๊ทธ๋ฅผ ์ฒด๊ณ„์ ์œผ๋กœ ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] - %(message)s",
stream=sys.stdout,
)
# --- 3. ์„ฑ๋Šฅ ๊ฐœ์„ : ๋ณ‘๋ ฌ ํ† ํฌ๋‚˜์ด์ € (Performance Improvement: Parallel Tokenizer) ---
class ParallelOktTokenizer:
"""A parallelized Okt tokenizer using joblib."""
def __init__(self, n_jobs: int = -1):
self.okt = Okt()
self.n_jobs = n_jobs
def __call__(self, text_series: pd.Series) -> List[List[str]]:
# ์ฃผ์„: joblib.Parallel์„ ์‚ฌ์šฉํ•ด ์—ฌ๋Ÿฌ CPU ์ฝ”์–ด์—์„œ ๋™์‹œ์— ํ˜•ํƒœ์†Œ ๋ถ„์„์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
# ๋ฐ์ดํ„ฐ๊ฐ€ ํด ๊ฒฝ์šฐ, ์ด ๋ถ€๋ถ„์ด ๊ฐ€์žฅ ํฐ ์„ฑ๋Šฅ ํ–ฅ์ƒ์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.
return joblib.Parallel(n_jobs=self.n_jobs)(joblib.delayed(self._tokenize)(text) for text in text_series)
def _tokenize(self, text: str) -> List[str]:
"""Extracts nouns and verbs from a single text."""
if not isinstance(text, str) or not text.strip():
return []
return [
word
for word, tag in self.okt.pos(text, stem=True)
if tag in ["Noun", "Verb"]
]
# ์ „์—ญ ํ† ํฌ๋‚˜์ด์ € ์ธ์Šคํ„ด์Šค
# ์ฃผ์„: TfidfVectorizer๋Š” callable ๊ฐ์ฒด๋ฅผ tokenizer๋กœ ๋ฐ›์ง€ ์•Š์œผ๋ฏ€๋กœ, ์‹ค์ œ ์‚ฌ์šฉํ•  ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.
# ์ด ์˜ˆ์ œ์—์„œ๋Š” TfidfVectorizer์˜ ๋‚ด๋ถ€ ๋กœ์ง์ƒ ์‹œ๋ฆฌ์ฆˆ๋ฅผ ์ง์ ‘ ๋ฐ›์ง€ ์•Š์œผ๋ฏ€๋กœ,
# ์•„๋ž˜ engineer_features์—์„œ ์ง์ ‘ ํ…์ŠคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๋ณ€๊ฒฝํ•ฉ๋‹ˆ๋‹ค.
def okt_tokenizer(text):
"""Simple wrapper for Okt POS tagging (nouns and verbs)."""
okt = Okt()
if not text.strip():
return []
return [word for word, tag in okt.pos(text, stem=True) if tag in ['Noun', 'Verb']]
def ensure_files_exist(data_dir: Path, paths: Dict[str, str]) -> List[Path]:
"""Raise a helpful error if any expected data file is missing."""
full_paths = [data_dir / p for p in paths.values()]
missing = [str(path) for path in full_paths if not path.exists()]
if missing:
raise FileNotFoundError(f"Missing required data files: {', '.join(missing)}")
return full_paths
def load_datasets(data_dir: Path, paths: Dict[str, str]) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
logging.info("Loading datasets...")
contents_path = data_dir / paths["contents"]
metrics_path = data_dir / paths["metrics"]
demographics_path = data_dir / paths["demographics"]
ensure_files_exist(data_dir, paths)
contents = pd.read_csv(contents_path)
metrics = pd.read_csv(metrics_path)
demographics = pd.read_csv(demographics_path)
return contents, metrics, demographics
def preprocess_data(
contents: pd.DataFrame, metrics: pd.DataFrame, demographics: pd.DataFrame
) -> pd.DataFrame:
logging.info("Preprocessing and merging datasets...")
# Aggregate metrics
metrics_agg = (
metrics.groupby("article_id")[["views_total", "comments", "likes"]]
.sum()
.reset_index()
.rename(columns={
"views_total": "views_total",
"comments": "comments_total",
"likes": "likes_total",
})
)
# Identify primary audience
filtered_demo = demographics[demographics["age_group"] != "์ „์ฒด"].copy()
if filtered_demo.empty:
raise ValueError("No demographic records found after excluding '์ „์ฒด'.")
idx = filtered_demo.groupby("article_id")["views"].idxmax()
primary_audience = (
filtered_demo.loc[idx, ["article_id", "age_group"]]
.rename(columns={"age_group": "primary_age_group"})
.reset_index(drop=True)
)
# Build master dataframe
df_master = contents.merge(metrics_agg, on="article_id", how="left")
df_master = df_master.merge(primary_audience, on="article_id", how="left")
df_master[["views_total", "comments_total", "likes_total"]] = df_master[
["views_total", "comments_total", "likes_total"]
].fillna(0)
return df_master
def engineer_features(df_master: pd.DataFrame) -> tuple[csr_matrix, csr_matrix, TfidfVectorizer, OneHotEncoder]:
logging.info("Engineering features (text + category)...")
text_series = (df_master["title"].fillna("") + " " + df_master["content"].fillna("")).str.strip()
# ์ฃผ์„: konlpy ํ† ํฌ๋‚˜์ด์ €๋Š” ์ƒ๋Œ€์ ์œผ๋กœ ๋А๋ฆฌ๋ฏ€๋กœ, ๋‹จ์ผ ํ”„๋กœ์„ธ์Šค tokenizer๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
# ๋งŒ์•ฝ ๋ฐ์ดํ„ฐ๊ฐ€ ๋งค์šฐ ์ปค์„œ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๊ฐ€ ํ•„์š”ํ•˜๋‹ค๋ฉด, ํ…์ŠคํŠธ๋ฅผ ๋จผ์ € ํ† ํฌ๋‚˜์ด์ง•ํ•œ ํ›„
# TfidfVectorizer(tokenizer=lambda x: x, preprocessor=lambda x: x) ์™€ ๊ฐ™์ด ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.
vectorizer = TfidfVectorizer(
tokenizer=okt_tokenizer,
max_features=CONFIG["feature_engineering"]["tfidf_max_features"],
lowercase=False,
)
X_text = vectorizer.fit_transform(text_series)
category_series = df_master["category"].fillna("๋ฏธ๋ถ„๋ฅ˜")
onehot_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=True)
X_cat = onehot_encoder.fit_transform(category_series.to_frame())
X_combined = cast(csr_matrix, hstack([X_text, X_cat]).tocsr())
return X_combined, X_text, vectorizer, onehot_encoder
def prepare_targets(
df_master: pd.DataFrame, X_combined: csr_matrix, X_text: csr_matrix
) -> tuple[csr_matrix, csr_matrix, np.ndarray, np.ndarray, LabelEncoder, pd.DataFrame]:
logging.info("Preparing targets and filtering valid samples...")
# --- 4. ๋ชจ๋ธ ์ •ํ™•๋„ ํ–ฅ์ƒ: ๋กœ๊ทธ ๋ณ€ํ™˜ (Model Accuracy: Log Transformation) ---
# ์ฃผ์„: ์กฐํšŒ์ˆ˜์˜ ๋ถ„ํฌ๊ฐ€ ๋งค์šฐ ์น˜์šฐ์ณ์ ธ ์žˆ์œผ๋ฏ€๋กœ np.log1p๋ฅผ ์ ์šฉํ•ฉ๋‹ˆ๋‹ค.
# ๋ชจ๋ธ์€ ๋ณ€ํ™˜๋œ ๊ฐ’์„ ์˜ˆ์ธกํ•˜๊ณ , ๋‚˜์ค‘์— np.expm1๋กœ ์›๋ž˜ ์Šค์ผ€์ผ๋กœ ๋ณต์›ํ•ฉ๋‹ˆ๋‹ค.
# 0์ธ ๊ฐ’์— ๋กœ๊ทธ๋ฅผ ์ทจํ•˜๋ฉด -inf๊ฐ€ ๋˜๋ฏ€๋กœ, 1์„ ๋”ํ•ด์ฃผ๋Š” log1p๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
y_views = np.log1p(df_master["views_total"].astype(np.float32))
y_age = df_master["primary_age_group"]
valid_mask = y_age.notna().to_numpy()
if not valid_mask.any():
raise ValueError("No samples with a primary audience label found.")
X_combined_valid = X_combined[valid_mask]
X_text_valid = X_text[valid_mask]
y_views_valid = y_views[valid_mask]
y_age_valid = y_age[valid_mask].astype(str)
label_encoder = LabelEncoder()
y_age_encoded = label_encoder.fit_transform(y_age_valid)
article_mapping = df_master.loc[valid_mask, ["article_id", "title"]].reset_index(drop=True)
return (
X_combined_valid,
X_text_valid,
y_views_valid,
y_age_encoded,
label_encoder,
article_mapping,
)
# --- 5. ๋ชจ๋ธ ์ •ํ™•๋„ ํ–ฅ์ƒ: ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ ํŠœ๋‹ (Model Accuracy: Hyperparameter Tuning) ---
def tune_xgbregressor(X_train, y_train, X_valid, y_valid) -> Dict[str, Any]:
"""Find best hyperparameters for XGBRegressor using Optuna."""
def objective(trial):
params = {
"objective": "reg:squarederror",
"tree_method": "hist",
"n_estimators": trial.suggest_int("n_estimators", 200, 1000, step=100),
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 4, 10),
"subsample": trial.suggest_float("subsample", 0.6, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
"random_state": CONFIG["feature_engineering"]["random_state"],
"n_jobs": -1,
}
model = XGBRegressor(**params)
model.fit(
X_train, y_train,
eval_set=[(X_valid, y_valid)],
eval_metric="rmse",
callbacks=[optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")],
verbose=False,
)
preds = model.predict(X_valid)
rmse = np.sqrt(mean_squared_error(y_valid, preds))
return rmse
study = optuna.create_study(direction="minimize", pruner=optuna.pruners.MedianPruner())
study.optimize(objective, n_trials=CONFIG["optuna"]["n_trials_reg"], timeout=600)
logging.info(f"Best trial for XGBRegressor: {study.best_trial.params} (RMSE: {study.best_value:.4f})")
return study.best_trial.params
def tune_xgbclassifier(X_train, y_train, X_valid, y_valid, num_classes) -> Dict[str, Any]:
"""Find best hyperparameters for XGBClassifier using Optuna."""
def objective(trial):
params = {
"objective": "multi:softprob",
"num_class": num_classes,
"tree_method": "hist",
"eval_metric": "mlogloss",
"use_label_encoder": False,
"n_estimators": trial.suggest_int("n_estimators", 300, 1500, step=100),
"learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
"max_depth": trial.suggest_int("max_depth", 4, 10),
"subsample": trial.suggest_float("subsample", 0.6, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0),
"random_state": CONFIG["feature_engineering"]["random_state"],
"n_jobs": -1,
}
model = XGBClassifier(**params)
model.fit(
X_train, y_train,
eval_set=[(X_valid, y_valid)],
callbacks=[optuna.integration.XGBoostPruningCallback(trial, "validation_0-mlogloss")],
verbose=False,
)
return model.evals_result()["validation_0"]["mlogloss"][-1]
study = optuna.create_study(direction="minimize", pruner=optuna.pruners.MedianPruner())
study.optimize(objective, n_trials=CONFIG["optuna"]["n_trials_clf"], timeout=600)
logging.info(f"Best trial for XGBClassifier: {study.best_trial.params} (LogLoss: {study.best_value:.4f})")
return study.best_trial.params
def train_models(
X_features: csr_matrix, y_views: np.ndarray, y_age_encoded: np.ndarray, num_classes: int
) -> tuple[XGBRegressor, XGBClassifier]:
logging.info("Splitting data and training final models...")
stratify_target = y_age_encoded if len(np.unique(y_age_encoded)) > 1 else None
X_train, X_valid, y_views_train, y_views_valid, y_age_train, y_age_valid = train_test_split(
X_features, y_views, y_age_encoded,
test_size=CONFIG["feature_engineering"]["test_size"],
random_state=CONFIG["feature_engineering"]["random_state"],
stratify=stratify_target,
)
# Hyperparameter tuning
logging.info("--- Starting Hyperparameter Tuning ---")
best_reg_params = tune_xgbregressor(X_train, y_views_train, X_valid, y_views_valid)
best_clf_params = tune_xgbclassifier(X_train, y_age_train, X_valid, y_age_valid, num_classes)
logging.info("--- Hyperparameter Tuning Finished ---")
# Train final models with best parameters on the full dataset
logging.info("Training final models on the full dataset with best parameters...")
view_model = XGBRegressor(objective="reg:squarederror", **best_reg_params)
view_model.fit(X_features, y_views)
age_model = XGBClassifier(
objective="multi:softprob",
num_class=num_classes,
use_label_encoder=False,
eval_metric="mlogloss",
**best_clf_params,
)
age_model.fit(X_features, y_age_encoded)
# Final evaluation on the hold-out set
view_pred_log = view_model.predict(X_valid)
view_pred_original = np.expm1(view_pred_log) # ๋กœ๊ทธ ๋ณ€ํ™˜๋œ ์˜ˆ์ธก๊ฐ’์„ ์›๋ž˜ ์Šค์ผ€์ผ๋กœ ๋ณต์›
y_views_valid_original = np.expm1(y_views_valid)
mae = mean_absolute_error(y_views_valid_original, view_pred_original)
age_pred = age_model.predict(X_valid)
acc = accuracy_score(y_age_valid, age_pred)
logging.info(f"Final Validation MAE (views): {mae:,.2f}")
logging.info(f"Final Validation Accuracy (audience): {acc:.4f}")
return view_model, age_model
def save_artifacts(artifacts: Dict[str, Any], artifact_paths: Dict[str, str]) -> None:
logging.info("Saving artifacts...")
for name, obj in artifacts.items():
path = artifact_paths[name]
joblib.dump(obj, path)
logging.info(f"- Saved {path}")
# --- 6. ์ƒˆ๋กœ์šด ๊ธฐ๋Šฅ: ์œ ์‚ฌ ๊ธฐ์‚ฌ ํƒ์ƒ‰ (New Feature: Similar Article Search) ---
def find_similar_articles(
article_id: str,
text_features: csr_matrix,
mapping_df: pd.DataFrame,
top_n: int = 5,
) -> pd.DataFrame:
"""Finds top_n similar articles for a given article_id."""
if article_id not in mapping_df["article_id"].values:
raise ValueError(f"Article ID {article_id} not found in the mapping.")
# Get the index of the source article
source_idx = mapping_df[mapping_df["article_id"] == article_id].index[0]
source_vector = text_features[source_idx]
# Compute cosine similarity
similarities = cosine_similarity(source_vector, text_features)[0]
# Get top_n similar articles (excluding the source article itself)
similar_indices = similarities.argsort()[-(top_n + 1):-1][::-1]
similar_scores = similarities[similar_indices]
result_df = mapping_df.iloc[similar_indices].copy()
result_df["similarity"] = similar_scores
logging.info(f"\n--- Top {top_n} similar articles to '{mapping_df.iloc[source_idx]['title']}' ---")
logging.info(result_df)
return result_df
def main() -> None:
"""Main execution pipeline."""
np.random.seed(CONFIG["feature_engineering"]["random_state"])
# Load and process data
contents, metrics, demographics = load_datasets(CONFIG["data_dir"], CONFIG["paths"])
df_master = preprocess_data(contents, metrics, demographics)
# Feature engineering
X_combined, X_text, vectorizer, onehot_encoder = engineer_features(df_master)
# Prepare targets and filter
(
X_features,
X_text_filtered,
y_views_log,
y_age_encoded,
label_encoder,
article_mapping,
) = prepare_targets(df_master, X_combined, X_text)
# Train models
view_model, age_model = train_models(
X_features, y_views_log, y_age_encoded, num_classes=len(label_encoder.classes_)
)
# Save all artifacts
artifacts_to_save = {
"vectorizer": vectorizer,
"onehot_encoder": onehot_encoder,
"label_encoder": label_encoder,
"view_model": view_model,
"age_model": age_model,
"text_features": X_text_filtered,
"article_mapping": article_mapping,
}
save_artifacts(artifacts_to_save, CONFIG["artifacts"])
logging.info("All artifacts saved successfully.")
# Demonstrate similar article search
if not article_mapping.empty:
sample_article_id = article_mapping.iloc[0]["article_id"]
find_similar_articles(sample_article_id, X_text_filtered, article_mapping)
if __name__ == "__main__":
try:
main()
except Exception as exc:
logging.error(f"An error occurred: {exc}", exc_info=True)
raise