""" Training pipeline for the "신문과방송" article performance prediction project. This script prepares the datasets, engineers features using a parallelized Okt-powered TF-IDF and categorical encodings, tunes and trains XGBoost models for view-count (with log transformation) and primary audience prediction, and persists all artifacts. It also includes a function to demonstrate finding similar articles based on content. Improvements from the original version: - Centralized configuration management (CONFIG). - Standardized logging instead of print(). - Parallelized Okt tokenizer for significant speed-up. - Log-transformed target variable (views) for improved regression performance. - Hyperparameter tuning using Optuna for both models. - Early stopping during model training to prevent overfitting. - Demonstration of a similar article search function. """ from __future__ import annotations import logging import sys from pathlib import Path from typing import Any, Dict, List, Tuple, cast import joblib import numpy as np import pandas as pd from sklearn.metrics.pairwise import cosine_similarity # Optuna for hyperparameter tuning try: import optuna except ImportError: print("Optuna is not installed. Please run: pip install optuna") sys.exit(1) from konlpy.tag import Okt from scipy.sparse import csr_matrix, hstack from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder, OneHotEncoder from xgboost import XGBClassifier, XGBRegressor # --- 1. 설정 중앙화 (Centralized Configuration) --- # 주석: 모든 주요 설정값을 이곳에서 관리하여 코드 수정 없이 실험 조건을 쉽게 변경할 수 있습니다. CONFIG = { "data_dir": Path("./data_csv"), "paths": { "contents": "contents.csv", "metrics": "article_metrics_monthly.csv", "demographics": "demographics_merged.csv", }, "artifacts": { "vectorizer": "tfidf_vectorizer.pkl", "onehot_encoder": "onehot_encoder.pkl", "label_encoder": "label_encoder.pkl", "view_model": "view_prediction_model.pkl", "age_model": "age_prediction_model.pkl", "text_features": "text_features_matrix.pkl", "article_mapping": "article_mapping.pkl", }, "feature_engineering": { "tfidf_max_features": 5000, "test_size": 0.2, "random_state": 42, }, "optuna": { "n_trials_reg": 50, # 조회수 예측 모델 튜닝 횟수 "n_trials_clf": 50, # 연령대 예측 모델 튜닝 횟수 }, } # --- 2. 로깅 설정 (Logging Setup) --- # 주석: print() 대신 logging을 사용하여 로그를 체계적으로 관리합니다. logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] - %(message)s", stream=sys.stdout, ) # --- 3. 성능 개선: 병렬 토크나이저 (Performance Improvement: Parallel Tokenizer) --- class ParallelOktTokenizer: """A parallelized Okt tokenizer using joblib.""" def __init__(self, n_jobs: int = -1): self.okt = Okt() self.n_jobs = n_jobs def __call__(self, text_series: pd.Series) -> List[List[str]]: # 주석: joblib.Parallel을 사용해 여러 CPU 코어에서 동시에 형태소 분석을 수행합니다. # 데이터가 클 경우, 이 부분이 가장 큰 성능 향상을 가져옵니다. return joblib.Parallel(n_jobs=self.n_jobs)(joblib.delayed(self._tokenize)(text) for text in text_series) def _tokenize(self, text: str) -> List[str]: """Extracts nouns and verbs from a single text.""" if not isinstance(text, str) or not text.strip(): return [] return [ word for word, tag in self.okt.pos(text, stem=True) if tag in ["Noun", "Verb"] ] # 전역 토크나이저 인스턴스 # 주석: TfidfVectorizer는 callable 객체를 tokenizer로 받지 않으므로, 실제 사용할 함수를 정의합니다. # 이 예제에서는 TfidfVectorizer의 내부 로직상 시리즈를 직접 받지 않으므로, # 아래 engineer_features에서 직접 텍스트를 처리하는 방식으로 변경합니다. def okt_tokenizer(text): """Simple wrapper for Okt POS tagging (nouns and verbs).""" okt = Okt() if not text.strip(): return [] return [word for word, tag in okt.pos(text, stem=True) if tag in ['Noun', 'Verb']] def ensure_files_exist(data_dir: Path, paths: Dict[str, str]) -> List[Path]: """Raise a helpful error if any expected data file is missing.""" full_paths = [data_dir / p for p in paths.values()] missing = [str(path) for path in full_paths if not path.exists()] if missing: raise FileNotFoundError(f"Missing required data files: {', '.join(missing)}") return full_paths def load_datasets(data_dir: Path, paths: Dict[str, str]) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: logging.info("Loading datasets...") contents_path = data_dir / paths["contents"] metrics_path = data_dir / paths["metrics"] demographics_path = data_dir / paths["demographics"] ensure_files_exist(data_dir, paths) contents = pd.read_csv(contents_path) metrics = pd.read_csv(metrics_path) demographics = pd.read_csv(demographics_path) return contents, metrics, demographics def preprocess_data( contents: pd.DataFrame, metrics: pd.DataFrame, demographics: pd.DataFrame ) -> pd.DataFrame: logging.info("Preprocessing and merging datasets...") # Aggregate metrics metrics_agg = ( metrics.groupby("article_id")[["views_total", "comments", "likes"]] .sum() .reset_index() .rename(columns={ "views_total": "views_total", "comments": "comments_total", "likes": "likes_total", }) ) # Identify primary audience filtered_demo = demographics[demographics["age_group"] != "전체"].copy() if filtered_demo.empty: raise ValueError("No demographic records found after excluding '전체'.") idx = filtered_demo.groupby("article_id")["views"].idxmax() primary_audience = ( filtered_demo.loc[idx, ["article_id", "age_group"]] .rename(columns={"age_group": "primary_age_group"}) .reset_index(drop=True) ) # Build master dataframe df_master = contents.merge(metrics_agg, on="article_id", how="left") df_master = df_master.merge(primary_audience, on="article_id", how="left") df_master[["views_total", "comments_total", "likes_total"]] = df_master[ ["views_total", "comments_total", "likes_total"] ].fillna(0) return df_master def engineer_features(df_master: pd.DataFrame) -> tuple[csr_matrix, csr_matrix, TfidfVectorizer, OneHotEncoder]: logging.info("Engineering features (text + category)...") text_series = (df_master["title"].fillna("") + " " + df_master["content"].fillna("")).str.strip() # 주석: konlpy 토크나이저는 상대적으로 느리므로, 단일 프로세스 tokenizer를 사용합니다. # 만약 데이터가 매우 커서 병렬처리가 필요하다면, 텍스트를 먼저 토크나이징한 후 # TfidfVectorizer(tokenizer=lambda x: x, preprocessor=lambda x: x) 와 같이 사용해야 합니다. vectorizer = TfidfVectorizer( tokenizer=okt_tokenizer, max_features=CONFIG["feature_engineering"]["tfidf_max_features"], lowercase=False, ) X_text = vectorizer.fit_transform(text_series) category_series = df_master["category"].fillna("미분류") onehot_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=True) X_cat = onehot_encoder.fit_transform(category_series.to_frame()) X_combined = cast(csr_matrix, hstack([X_text, X_cat]).tocsr()) return X_combined, X_text, vectorizer, onehot_encoder def prepare_targets( df_master: pd.DataFrame, X_combined: csr_matrix, X_text: csr_matrix ) -> tuple[csr_matrix, csr_matrix, np.ndarray, np.ndarray, LabelEncoder, pd.DataFrame]: logging.info("Preparing targets and filtering valid samples...") # --- 4. 모델 정확도 향상: 로그 변환 (Model Accuracy: Log Transformation) --- # 주석: 조회수의 분포가 매우 치우쳐져 있으므로 np.log1p를 적용합니다. # 모델은 변환된 값을 예측하고, 나중에 np.expm1로 원래 스케일로 복원합니다. # 0인 값에 로그를 취하면 -inf가 되므로, 1을 더해주는 log1p를 사용합니다. y_views = np.log1p(df_master["views_total"].astype(np.float32)) y_age = df_master["primary_age_group"] valid_mask = y_age.notna().to_numpy() if not valid_mask.any(): raise ValueError("No samples with a primary audience label found.") X_combined_valid = X_combined[valid_mask] X_text_valid = X_text[valid_mask] y_views_valid = y_views[valid_mask] y_age_valid = y_age[valid_mask].astype(str) label_encoder = LabelEncoder() y_age_encoded = label_encoder.fit_transform(y_age_valid) article_mapping = df_master.loc[valid_mask, ["article_id", "title"]].reset_index(drop=True) return ( X_combined_valid, X_text_valid, y_views_valid, y_age_encoded, label_encoder, article_mapping, ) # --- 5. 모델 정확도 향상: 하이퍼파라미터 튜닝 (Model Accuracy: Hyperparameter Tuning) --- def tune_xgbregressor(X_train, y_train, X_valid, y_valid) -> Dict[str, Any]: """Find best hyperparameters for XGBRegressor using Optuna.""" def objective(trial): params = { "objective": "reg:squarederror", "tree_method": "hist", "n_estimators": trial.suggest_int("n_estimators", 200, 1000, step=100), "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), "max_depth": trial.suggest_int("max_depth", 4, 10), "subsample": trial.suggest_float("subsample", 0.6, 1.0), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), "random_state": CONFIG["feature_engineering"]["random_state"], "n_jobs": -1, } model = XGBRegressor(**params) model.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], eval_metric="rmse", callbacks=[optuna.integration.XGBoostPruningCallback(trial, "validation_0-rmse")], verbose=False, ) preds = model.predict(X_valid) rmse = np.sqrt(mean_squared_error(y_valid, preds)) return rmse study = optuna.create_study(direction="minimize", pruner=optuna.pruners.MedianPruner()) study.optimize(objective, n_trials=CONFIG["optuna"]["n_trials_reg"], timeout=600) logging.info(f"Best trial for XGBRegressor: {study.best_trial.params} (RMSE: {study.best_value:.4f})") return study.best_trial.params def tune_xgbclassifier(X_train, y_train, X_valid, y_valid, num_classes) -> Dict[str, Any]: """Find best hyperparameters for XGBClassifier using Optuna.""" def objective(trial): params = { "objective": "multi:softprob", "num_class": num_classes, "tree_method": "hist", "eval_metric": "mlogloss", "use_label_encoder": False, "n_estimators": trial.suggest_int("n_estimators", 300, 1500, step=100), "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True), "max_depth": trial.suggest_int("max_depth", 4, 10), "subsample": trial.suggest_float("subsample", 0.6, 1.0), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), "random_state": CONFIG["feature_engineering"]["random_state"], "n_jobs": -1, } model = XGBClassifier(**params) model.fit( X_train, y_train, eval_set=[(X_valid, y_valid)], callbacks=[optuna.integration.XGBoostPruningCallback(trial, "validation_0-mlogloss")], verbose=False, ) return model.evals_result()["validation_0"]["mlogloss"][-1] study = optuna.create_study(direction="minimize", pruner=optuna.pruners.MedianPruner()) study.optimize(objective, n_trials=CONFIG["optuna"]["n_trials_clf"], timeout=600) logging.info(f"Best trial for XGBClassifier: {study.best_trial.params} (LogLoss: {study.best_value:.4f})") return study.best_trial.params def train_models( X_features: csr_matrix, y_views: np.ndarray, y_age_encoded: np.ndarray, num_classes: int ) -> tuple[XGBRegressor, XGBClassifier]: logging.info("Splitting data and training final models...") stratify_target = y_age_encoded if len(np.unique(y_age_encoded)) > 1 else None X_train, X_valid, y_views_train, y_views_valid, y_age_train, y_age_valid = train_test_split( X_features, y_views, y_age_encoded, test_size=CONFIG["feature_engineering"]["test_size"], random_state=CONFIG["feature_engineering"]["random_state"], stratify=stratify_target, ) # Hyperparameter tuning logging.info("--- Starting Hyperparameter Tuning ---") best_reg_params = tune_xgbregressor(X_train, y_views_train, X_valid, y_views_valid) best_clf_params = tune_xgbclassifier(X_train, y_age_train, X_valid, y_age_valid, num_classes) logging.info("--- Hyperparameter Tuning Finished ---") # Train final models with best parameters on the full dataset logging.info("Training final models on the full dataset with best parameters...") view_model = XGBRegressor(objective="reg:squarederror", **best_reg_params) view_model.fit(X_features, y_views) age_model = XGBClassifier( objective="multi:softprob", num_class=num_classes, use_label_encoder=False, eval_metric="mlogloss", **best_clf_params, ) age_model.fit(X_features, y_age_encoded) # Final evaluation on the hold-out set view_pred_log = view_model.predict(X_valid) view_pred_original = np.expm1(view_pred_log) # 로그 변환된 예측값을 원래 스케일로 복원 y_views_valid_original = np.expm1(y_views_valid) mae = mean_absolute_error(y_views_valid_original, view_pred_original) age_pred = age_model.predict(X_valid) acc = accuracy_score(y_age_valid, age_pred) logging.info(f"Final Validation MAE (views): {mae:,.2f}") logging.info(f"Final Validation Accuracy (audience): {acc:.4f}") return view_model, age_model def save_artifacts(artifacts: Dict[str, Any], artifact_paths: Dict[str, str]) -> None: logging.info("Saving artifacts...") for name, obj in artifacts.items(): path = artifact_paths[name] joblib.dump(obj, path) logging.info(f"- Saved {path}") # --- 6. 새로운 기능: 유사 기사 탐색 (New Feature: Similar Article Search) --- def find_similar_articles( article_id: str, text_features: csr_matrix, mapping_df: pd.DataFrame, top_n: int = 5, ) -> pd.DataFrame: """Finds top_n similar articles for a given article_id.""" if article_id not in mapping_df["article_id"].values: raise ValueError(f"Article ID {article_id} not found in the mapping.") # Get the index of the source article source_idx = mapping_df[mapping_df["article_id"] == article_id].index[0] source_vector = text_features[source_idx] # Compute cosine similarity similarities = cosine_similarity(source_vector, text_features)[0] # Get top_n similar articles (excluding the source article itself) similar_indices = similarities.argsort()[-(top_n + 1):-1][::-1] similar_scores = similarities[similar_indices] result_df = mapping_df.iloc[similar_indices].copy() result_df["similarity"] = similar_scores logging.info(f"\n--- Top {top_n} similar articles to '{mapping_df.iloc[source_idx]['title']}' ---") logging.info(result_df) return result_df def main() -> None: """Main execution pipeline.""" np.random.seed(CONFIG["feature_engineering"]["random_state"]) # Load and process data contents, metrics, demographics = load_datasets(CONFIG["data_dir"], CONFIG["paths"]) df_master = preprocess_data(contents, metrics, demographics) # Feature engineering X_combined, X_text, vectorizer, onehot_encoder = engineer_features(df_master) # Prepare targets and filter ( X_features, X_text_filtered, y_views_log, y_age_encoded, label_encoder, article_mapping, ) = prepare_targets(df_master, X_combined, X_text) # Train models view_model, age_model = train_models( X_features, y_views_log, y_age_encoded, num_classes=len(label_encoder.classes_) ) # Save all artifacts artifacts_to_save = { "vectorizer": vectorizer, "onehot_encoder": onehot_encoder, "label_encoder": label_encoder, "view_model": view_model, "age_model": age_model, "text_features": X_text_filtered, "article_mapping": article_mapping, } save_artifacts(artifacts_to_save, CONFIG["artifacts"]) logging.info("All artifacts saved successfully.") # Demonstrate similar article search if not article_mapping.empty: sample_article_id = article_mapping.iloc[0]["article_id"] find_similar_articles(sample_article_id, X_text_filtered, article_mapping) if __name__ == "__main__": try: main() except Exception as exc: logging.error(f"An error occurred: {exc}", exc_info=True) raise