Spaces:
Sleeping
Sleeping
| """ | |
| Training pipeline for the "์ ๋ฌธ๊ณผ๋ฐฉ์ก" article performance prediction project. | |
| This script prepares the datasets, engineers features using a parallelized | |
| Okt-powered TF-IDF and categorical encodings, tunes and trains XGBoost models | |
| for view-count (with log transformation) and primary audience prediction, | |
| and persists all artifacts. | |
| It also includes a function to demonstrate finding similar articles based on | |
| content. | |
| Improvements from the original version: | |
| - Centralized configuration management (CONFIG). | |
| - Standardized logging instead of print(). | |
| - Parallelized Okt tokenizer for significant speed-up. | |
| - Log-transformed target variable (views) for improved regression performance. | |
| - Hyperparameter tuning using Optuna for both models. | |
| - Early stopping during model training to prevent overfitting. | |
| - Demonstration of a similar article search function. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple, cast | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| # Optuna for hyperparameter tuning | |
| try: | |
| import optuna | |
| except ImportError: | |
| print("Optuna is not installed. Please run: pip install optuna") | |
| sys.exit(1) | |
| from konlpy.tag import Okt | |
| from scipy.sparse import csr_matrix, hstack | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import LabelEncoder, OneHotEncoder | |
| from xgboost import XGBClassifier, XGBRegressor | |
| # --- 1. ์ค์ ์ค์ํ (Centralized Configuration) --- | |
| # ์ฃผ์: ๋ชจ๋ ์ฃผ์ ์ค์ ๊ฐ์ ์ด๊ณณ์์ ๊ด๋ฆฌํ์ฌ ์ฝ๋ ์์ ์์ด ์คํ ์กฐ๊ฑด์ ์ฝ๊ฒ ๋ณ๊ฒฝํ ์ ์์ต๋๋ค. | |
| CONFIG = { | |
| "data_dir": Path("./data_csv"), | |
| "paths": { | |
| "contents": "contents.csv", | |
| "metrics": "article_metrics_monthly.csv", | |
| "demographics": "demographics_merged.csv", | |
| }, | |
| "artifacts": { | |
| "vectorizer": "tfidf_vectorizer.pkl", | |
| "onehot_encoder": "onehot_encoder.pkl", | |
| "label_encoder": "label_encoder.pkl", | |
| "view_model": "view_prediction_model.pkl", | |
| "age_model": "age_prediction_model.pkl", | |
| "text_features": "text_features_matrix.pkl", | |
| "article_mapping": "article_mapping.pkl", | |
| }, | |
| "feature_engineering": { | |
| "tfidf_max_features": 5000, | |
| "test_size": 0.2, | |
| "random_state": 42, | |
| }, | |
| "optuna": { | |
| "n_trials_reg": 50, # ์กฐํ์ ์์ธก ๋ชจ๋ธ ํ๋ ํ์ | |
| "n_trials_clf": 50, # ์ฐ๋ น๋ ์์ธก ๋ชจ๋ธ ํ๋ ํ์ | |
| }, | |
| } | |
| # --- 2. ๋ก๊น ์ค์ (Logging Setup) --- | |
| # ์ฃผ์: print() ๋์ logging์ ์ฌ์ฉํ์ฌ ๋ก๊ทธ๋ฅผ ์ฒด๊ณ์ ์ผ๋ก ๊ด๋ฆฌํฉ๋๋ค. | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] - %(message)s", | |
| stream=sys.stdout, | |
| ) | |
| # --- 3. ์ฑ๋ฅ ๊ฐ์ : ๋ณ๋ ฌ ํ ํฌ๋์ด์ (Performance Improvement: Parallel Tokenizer) --- | |
| class ParallelOktTokenizer: | |
| """A parallelized Okt tokenizer using joblib.""" | |
| def __init__(self, n_jobs: int = -1): | |
| self.okt = Okt() | |
| self.n_jobs = n_jobs | |
| def __call__(self, text_series: pd.Series) -> List[List[str]]: | |
| # ์ฃผ์: joblib.Parallel์ ์ฌ์ฉํด ์ฌ๋ฌ CPU ์ฝ์ด์์ ๋์์ ํํ์ ๋ถ์์ ์ํํฉ๋๋ค. | |
| # ๋ฐ์ดํฐ๊ฐ ํด ๊ฒฝ์ฐ, ์ด ๋ถ๋ถ์ด ๊ฐ์ฅ ํฐ ์ฑ๋ฅ ํฅ์์ ๊ฐ์ ธ์ต๋๋ค. | |
| return joblib.Parallel(n_jobs=self.n_jobs)(joblib.delayed(self._tokenize)(text) for text in text_series) | |
| def _tokenize(self, text: str) -> List[str]: | |
| """Extracts nouns and verbs from a single text.""" | |
| if not isinstance(text, str) or not text.strip(): | |
| return [] | |
| return [ | |
| word | |
| for word, tag in self.okt.pos(text, stem=True) | |
| if tag in ["Noun", "Verb"] | |
| ] | |
| # ์ ์ญ ํ ํฌ๋์ด์ ์ธ์คํด์ค | |
| # ์ฃผ์: TfidfVectorizer๋ callable ๊ฐ์ฒด๋ฅผ tokenizer๋ก ๋ฐ์ง ์์ผ๋ฏ๋ก, ์ค์ ์ฌ์ฉํ ํจ์๋ฅผ ์ ์ํฉ๋๋ค. | |
| # ์ด ์์ ์์๋ TfidfVectorizer์ ๋ด๋ถ ๋ก์ง์ ์๋ฆฌ์ฆ๋ฅผ ์ง์ ๋ฐ์ง ์์ผ๋ฏ๋ก, | |
| # ์๋ engineer_features์์ ์ง์ ํ ์คํธ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝํฉ๋๋ค. | |
| def okt_tokenizer(text): | |
| """Simple wrapper for Okt POS tagging (nouns and verbs).""" | |
| okt = Okt() | |
| if not text.strip(): | |
| return [] | |
| return [word for word, tag in okt.pos(text, stem=True) if tag in ['Noun', 'Verb']] | |
| def ensure_files_exist(data_dir: Path, paths: Dict[str, str]) -> List[Path]: | |
| """Raise a helpful error if any expected data file is missing.""" | |
| full_paths = [data_dir / p for p in paths.values()] | |
| missing = [str(path) for path in full_paths if not path.exists()] | |
| if missing: | |
| raise FileNotFoundError(f"Missing required data files: {', '.join(missing)}") | |
| return full_paths | |
| def load_datasets(data_dir: Path, paths: Dict[str, str]) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
| logging.info("Loading datasets...") | |
| contents_path = data_dir / paths["contents"] | |
| metrics_path = data_dir / paths["metrics"] | |
| demographics_path = data_dir / paths["demographics"] | |
| ensure_files_exist(data_dir, paths) | |
| contents = pd.read_csv(contents_path) | |
| metrics = pd.read_csv(metrics_path) | |
| demographics = pd.read_csv(demographics_path) | |
| return contents, metrics, demographics | |
| def preprocess_data( | |
| contents: pd.DataFrame, metrics: pd.DataFrame, demographics: pd.DataFrame | |
| ) -> pd.DataFrame: | |
| logging.info("Preprocessing and merging datasets...") | |
| # Aggregate metrics | |
| metrics_agg = ( | |
| metrics.groupby("article_id")[["views_total", "comments", "likes"]] | |
| .sum() | |
| .reset_index() | |
| .rename(columns={ | |
| "views_total": "views_total", | |
| "comments": "comments_total", | |
| "likes": "likes_total", | |
| }) | |
| ) | |
| # Identify primary audience | |
| filtered_demo = demographics[demographics["age_group"] != "์ ์ฒด"].copy() | |
| if filtered_demo.empty: | |
| raise ValueError("No demographic records found after excluding '์ ์ฒด'.") | |
| idx = filtered_demo.groupby("article_id")["views"].idxmax() | |
| primary_audience = ( | |
| filtered_demo.loc[idx, ["article_id", "age_group"]] | |
| .rename(columns={"age_group": "primary_age_group"}) | |
| .reset_index(drop=True) | |
| ) | |
| # Build master dataframe | |
| df_master = contents.merge(metrics_agg, on="article_id", how="left") | |
| df_master = df_master.merge(primary_audience, on="article_id", how="left") | |
| df_master[["views_total", "comments_total", "likes_total"]] = df_master[ | |
| ["views_total", "comments_total", "likes_total"] | |
| ].fillna(0) | |
| return df_master | |
| def engineer_features(df_master: pd.DataFrame) -> tuple[csr_matrix, csr_matrix, TfidfVectorizer, OneHotEncoder]: | |
| logging.info("Engineering features (text + category)...") | |
| text_series = (df_master["title"].fillna("") + " " + df_master["content"].fillna("")).str.strip() | |
| # ์ฃผ์: konlpy ํ ํฌ๋์ด์ ๋ ์๋์ ์ผ๋ก ๋๋ฆฌ๋ฏ๋ก, ๋จ์ผ ํ๋ก์ธ์ค tokenizer๋ฅผ ์ฌ์ฉํฉ๋๋ค. | |
| # ๋ง์ฝ ๋ฐ์ดํฐ๊ฐ ๋งค์ฐ ์ปค์ ๋ณ๋ ฌ์ฒ๋ฆฌ๊ฐ ํ์ํ๋ค๋ฉด, ํ ์คํธ๋ฅผ ๋จผ์ ํ ํฌ๋์ด์งํ ํ | |
| # TfidfVectorizer(tokenizer=lambda x: x, preprocessor=lambda x: x) ์ ๊ฐ์ด ์ฌ์ฉํด์ผ ํฉ๋๋ค. | |
| vectorizer = TfidfVectorizer( | |
| tokenizer=okt_tokenizer, | |
| max_features=CONFIG["feature_engineering"]["tfidf_max_features"], | |
| lowercase=False, | |
| ) | |
| X_text = vectorizer.fit_transform(text_series) | |
| category_series = df_master["category"].fillna("๋ฏธ๋ถ๋ฅ") | |
| onehot_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=True) | |
| X_cat = onehot_encoder.fit_transform(category_series.to_frame()) | |
| X_combined = cast(csr_matrix, hstack([X_text, X_cat]).tocsr()) | |
| return X_combined, X_text, vectorizer, onehot_encoder | |
| def prepare_targets( | |
| df_master: pd.DataFrame, X_combined: csr_matrix, X_text: csr_matrix | |
| ) -> tuple[csr_matrix, csr_matrix, np.ndarray, np.ndarray, LabelEncoder, pd.DataFrame]: | |
| logging.info("Preparing targets and filtering valid samples...") | |
| # --- 4. ๋ชจ๋ธ ์ ํ๋ ํฅ์: ๋ก๊ทธ ๋ณํ (Model Accuracy: Log Transformation) --- | |
| # ์ฃผ์: ์กฐํ์์ ๋ถํฌ๊ฐ ๋งค์ฐ ์น์ฐ์ณ์ ธ ์์ผ๋ฏ๋ก np.log1p๋ฅผ ์ ์ฉํฉ๋๋ค. | |
| # ๋ชจ๋ธ์ ๋ณํ๋ ๊ฐ์ ์์ธกํ๊ณ , ๋์ค์ np.expm1๋ก ์๋ ์ค์ผ์ผ๋ก ๋ณต์ํฉ๋๋ค. | |
| # 0์ธ ๊ฐ์ ๋ก๊ทธ๋ฅผ ์ทจํ๋ฉด -inf๊ฐ ๋๋ฏ๋ก, 1์ ๋ํด์ฃผ๋ log1p๋ฅผ ์ฌ์ฉํฉ๋๋ค. | |
| y_views = np.log1p(df_master["views_total"].astype(np.float32)) | |
| y_age = df_master["primary_age_group"] | |
| valid_mask = y_age.notna().to_numpy() | |
| if not valid_mask.any(): | |
| raise ValueError("No samples with a primary audience label found.") | |
| X_combined_valid = X_combined[valid_mask] | |
| X_text_valid = X_text[valid_mask] | |
| y_views_valid = y_views[valid_mask] | |
| y_age_valid = y_age[valid_mask].astype(str) | |
| label_encoder = LabelEncoder() | |
| y_age_encoded = label_encoder.fit_transform(y_age_valid) | |
| article_mapping = df_master.loc[valid_mask, ["article_id", "title"]].reset_index(drop=True) | |
| return ( | |
| X_combined_valid, | |
| X_text_valid, | |
| y_views_valid, | |
| y_age_encoded, | |
| label_encoder, | |
| article_mapping, | |
| ) | |
| # --- 5. ๋ชจ๋ธ ์ ํ๋ ํฅ์: ํ์ดํผํ๋ผ๋ฏธํฐ ํ๋ (Model Accuracy: Hyperparameter Tuning) --- | |
| def tune_xgbregressor(X_train, y_train, X_valid, y_valid) -> Dict[str, Any]: | |
| """Find best hyperparameters for XGBRegressor using Optuna.""" | |
| def objective(trial): | |
| params = { | |
| "objective": "reg:squarederror", | |
| "tree_method": "hist", | |
| "n_estimators": trial.suggest_int("n_estimators", 200, 1000, step=100), | |
| "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), | |
| "max_depth": trial.suggest_int("max_depth", 4, 10), | |
| "subsample": trial.suggest_float("subsample", 0.6, 1.0), | |
| "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), | |
| "random_state": CONFIG["feature_engineering"]["random_state"], | |
| "n_jobs": -1, | |
| } | |
| model = XGBRegressor(**params) | |
| model.fit( | |
| X_train, y_train, | |
| eval_set=[(X_valid, y_valid)], | |
| eval_metric="rmse", | |
| callbacks=[optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")], | |
| verbose=False, | |
| ) | |
| preds = model.predict(X_valid) | |
| rmse = np.sqrt(mean_squared_error(y_valid, preds)) | |
| return rmse | |
| study = optuna.create_study(direction="minimize", pruner=optuna.pruners.MedianPruner()) | |
| study.optimize(objective, n_trials=CONFIG["optuna"]["n_trials_reg"], timeout=600) | |
| logging.info(f"Best trial for XGBRegressor: {study.best_trial.params} (RMSE: {study.best_value:.4f})") | |
| return study.best_trial.params | |
| def tune_xgbclassifier(X_train, y_train, X_valid, y_valid, num_classes) -> Dict[str, Any]: | |
| """Find best hyperparameters for XGBClassifier using Optuna.""" | |
| def objective(trial): | |
| params = { | |
| "objective": "multi:softprob", | |
| "num_class": num_classes, | |
| "tree_method": "hist", | |
| "eval_metric": "mlogloss", | |
| "use_label_encoder": False, | |
| "n_estimators": trial.suggest_int("n_estimators", 300, 1500, step=100), | |
| "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), | |
| "max_depth": trial.suggest_int("max_depth", 4, 10), | |
| "subsample": trial.suggest_float("subsample", 0.6, 1.0), | |
| "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), | |
| "random_state": CONFIG["feature_engineering"]["random_state"], | |
| "n_jobs": -1, | |
| } | |
| model = XGBClassifier(**params) | |
| model.fit( | |
| X_train, y_train, | |
| eval_set=[(X_valid, y_valid)], | |
| callbacks=[optuna.integration.XGBoostPruningCallback(trial, "validation_0-mlogloss")], | |
| verbose=False, | |
| ) | |
| return model.evals_result()["validation_0"]["mlogloss"][-1] | |
| study = optuna.create_study(direction="minimize", pruner=optuna.pruners.MedianPruner()) | |
| study.optimize(objective, n_trials=CONFIG["optuna"]["n_trials_clf"], timeout=600) | |
| logging.info(f"Best trial for XGBClassifier: {study.best_trial.params} (LogLoss: {study.best_value:.4f})") | |
| return study.best_trial.params | |
| def train_models( | |
| X_features: csr_matrix, y_views: np.ndarray, y_age_encoded: np.ndarray, num_classes: int | |
| ) -> tuple[XGBRegressor, XGBClassifier]: | |
| logging.info("Splitting data and training final models...") | |
| stratify_target = y_age_encoded if len(np.unique(y_age_encoded)) > 1 else None | |
| X_train, X_valid, y_views_train, y_views_valid, y_age_train, y_age_valid = train_test_split( | |
| X_features, y_views, y_age_encoded, | |
| test_size=CONFIG["feature_engineering"]["test_size"], | |
| random_state=CONFIG["feature_engineering"]["random_state"], | |
| stratify=stratify_target, | |
| ) | |
| # Hyperparameter tuning | |
| logging.info("--- Starting Hyperparameter Tuning ---") | |
| best_reg_params = tune_xgbregressor(X_train, y_views_train, X_valid, y_views_valid) | |
| best_clf_params = tune_xgbclassifier(X_train, y_age_train, X_valid, y_age_valid, num_classes) | |
| logging.info("--- Hyperparameter Tuning Finished ---") | |
| # Train final models with best parameters on the full dataset | |
| logging.info("Training final models on the full dataset with best parameters...") | |
| view_model = XGBRegressor(objective="reg:squarederror", **best_reg_params) | |
| view_model.fit(X_features, y_views) | |
| age_model = XGBClassifier( | |
| objective="multi:softprob", | |
| num_class=num_classes, | |
| use_label_encoder=False, | |
| eval_metric="mlogloss", | |
| **best_clf_params, | |
| ) | |
| age_model.fit(X_features, y_age_encoded) | |
| # Final evaluation on the hold-out set | |
| view_pred_log = view_model.predict(X_valid) | |
| view_pred_original = np.expm1(view_pred_log) # ๋ก๊ทธ ๋ณํ๋ ์์ธก๊ฐ์ ์๋ ์ค์ผ์ผ๋ก ๋ณต์ | |
| y_views_valid_original = np.expm1(y_views_valid) | |
| mae = mean_absolute_error(y_views_valid_original, view_pred_original) | |
| age_pred = age_model.predict(X_valid) | |
| acc = accuracy_score(y_age_valid, age_pred) | |
| logging.info(f"Final Validation MAE (views): {mae:,.2f}") | |
| logging.info(f"Final Validation Accuracy (audience): {acc:.4f}") | |
| return view_model, age_model | |
| def save_artifacts(artifacts: Dict[str, Any], artifact_paths: Dict[str, str]) -> None: | |
| logging.info("Saving artifacts...") | |
| for name, obj in artifacts.items(): | |
| path = artifact_paths[name] | |
| joblib.dump(obj, path) | |
| logging.info(f"- Saved {path}") | |
| # --- 6. ์๋ก์ด ๊ธฐ๋ฅ: ์ ์ฌ ๊ธฐ์ฌ ํ์ (New Feature: Similar Article Search) --- | |
| def find_similar_articles( | |
| article_id: str, | |
| text_features: csr_matrix, | |
| mapping_df: pd.DataFrame, | |
| top_n: int = 5, | |
| ) -> pd.DataFrame: | |
| """Finds top_n similar articles for a given article_id.""" | |
| if article_id not in mapping_df["article_id"].values: | |
| raise ValueError(f"Article ID {article_id} not found in the mapping.") | |
| # Get the index of the source article | |
| source_idx = mapping_df[mapping_df["article_id"] == article_id].index[0] | |
| source_vector = text_features[source_idx] | |
| # Compute cosine similarity | |
| similarities = cosine_similarity(source_vector, text_features)[0] | |
| # Get top_n similar articles (excluding the source article itself) | |
| similar_indices = similarities.argsort()[-(top_n + 1):-1][::-1] | |
| similar_scores = similarities[similar_indices] | |
| result_df = mapping_df.iloc[similar_indices].copy() | |
| result_df["similarity"] = similar_scores | |
| logging.info(f"\n--- Top {top_n} similar articles to '{mapping_df.iloc[source_idx]['title']}' ---") | |
| logging.info(result_df) | |
| return result_df | |
| def main() -> None: | |
| """Main execution pipeline.""" | |
| np.random.seed(CONFIG["feature_engineering"]["random_state"]) | |
| # Load and process data | |
| contents, metrics, demographics = load_datasets(CONFIG["data_dir"], CONFIG["paths"]) | |
| df_master = preprocess_data(contents, metrics, demographics) | |
| # Feature engineering | |
| X_combined, X_text, vectorizer, onehot_encoder = engineer_features(df_master) | |
| # Prepare targets and filter | |
| ( | |
| X_features, | |
| X_text_filtered, | |
| y_views_log, | |
| y_age_encoded, | |
| label_encoder, | |
| article_mapping, | |
| ) = prepare_targets(df_master, X_combined, X_text) | |
| # Train models | |
| view_model, age_model = train_models( | |
| X_features, y_views_log, y_age_encoded, num_classes=len(label_encoder.classes_) | |
| ) | |
| # Save all artifacts | |
| artifacts_to_save = { | |
| "vectorizer": vectorizer, | |
| "onehot_encoder": onehot_encoder, | |
| "label_encoder": label_encoder, | |
| "view_model": view_model, | |
| "age_model": age_model, | |
| "text_features": X_text_filtered, | |
| "article_mapping": article_mapping, | |
| } | |
| save_artifacts(artifacts_to_save, CONFIG["artifacts"]) | |
| logging.info("All artifacts saved successfully.") | |
| # Demonstrate similar article search | |
| if not article_mapping.empty: | |
| sample_article_id = article_mapping.iloc[0]["article_id"] | |
| find_similar_articles(sample_article_id, X_text_filtered, article_mapping) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except Exception as exc: | |
| logging.error(f"An error occurred: {exc}", exc_info=True) | |
| raise |