Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from pathlib import Path | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.base import clone | |
| from sklearn.ensemble import ExtraTreesClassifier, GradientBoostingClassifier, RandomForestClassifier | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.model_selection import StratifiedKFold | |
| from sklearn.naive_bayes import MultinomialNB | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.svm import LinearSVC | |
| sys.path.append(str(Path(__file__).resolve().parents[1] / "src")) | |
| from matcha_sentiment.classical import DenseTransformer, W2VBundle, tfidf_tokenizer, vectorize_with_w2v | |
| from matcha_sentiment.config import ARTIFACT_DIR, DATA_PATH, ID2LABEL, MODEL_DIR, STOPWORDS | |
| from matcha_sentiment.data import load_binary_dataset | |
| from matcha_sentiment.metrics import binary_metrics, report_dict | |
| from matcha_sentiment.plots import plot_confusion, plot_roc_curve, plot_top_words, write_wordcloud | |
| from matcha_sentiment.text import tokenize, tokenized_documents | |
| ROOT = Path(__file__).resolve().parents[1] | |
| KEYWORD_SEEDS = [ | |
| "enak", | |
| "lezat", | |
| "nikmat", | |
| "nyaman", | |
| "ramah", | |
| "bagus", | |
| "terbaik", | |
| "mantap", | |
| "autentik", | |
| "direkomendasikan", | |
| "murah", | |
| "mahal", | |
| "harga", | |
| "harganya", | |
| "buruk", | |
| "kecewa", | |
| "lama", | |
| "antrean", | |
| "menunggu", | |
| "kurang", | |
| "tidak", | |
| "biasa", | |
| "pahit", | |
| "manis", | |
| ] | |
| def make_tfidf(max_features: int, min_df: int) -> TfidfVectorizer: | |
| return TfidfVectorizer( | |
| tokenizer=tfidf_tokenizer, | |
| token_pattern=None, | |
| lowercase=False, | |
| ngram_range=(1, 2), | |
| min_df=min_df, | |
| max_features=max_features, | |
| sublinear_tf=True, | |
| ) | |
| def classifiers(random_state: int) -> dict[str, object]: | |
| return { | |
| "logistic_regression": LogisticRegression( | |
| max_iter=2500, | |
| class_weight="balanced", | |
| solver="liblinear", | |
| random_state=random_state, | |
| ), | |
| "linear_svm": LinearSVC(class_weight="balanced", random_state=random_state), | |
| "random_forest": RandomForestClassifier( | |
| n_estimators=450, | |
| min_samples_leaf=2, | |
| class_weight="balanced_subsample", | |
| n_jobs=-1, | |
| random_state=random_state, | |
| ), | |
| "extra_trees": ExtraTreesClassifier( | |
| n_estimators=500, | |
| min_samples_leaf=2, | |
| class_weight="balanced", | |
| n_jobs=-1, | |
| random_state=random_state, | |
| ), | |
| "gradient_boosting": GradientBoostingClassifier(random_state=random_state), | |
| } | |
| def positive_score(model, features): | |
| if hasattr(model, "predict_proba"): | |
| proba = model.predict_proba(features) | |
| if proba.shape[1] == 2: | |
| return proba[:, 1] | |
| if hasattr(model, "decision_function"): | |
| return model.decision_function(features) | |
| return None | |
| def import_word2vec(): | |
| try: | |
| from gensim.models import Word2Vec | |
| except Exception as exc: # pragma: no cover - dependency/runtime guard | |
| raise RuntimeError( | |
| "Word2Vec needs gensim and compatible scipy. Install requirements.txt or use the Docker image." | |
| ) from exc | |
| return Word2Vec | |
| def train_word2vec(texts: list[str], *, vector_size: int, random_state: int): | |
| Word2Vec = import_word2vec() | |
| docs = tokenized_documents(texts, remove_stopwords=False) | |
| return Word2Vec( | |
| sentences=docs, | |
| vector_size=vector_size, | |
| window=5, | |
| min_count=1, | |
| workers=1, | |
| sg=1, | |
| seed=random_state, | |
| epochs=60, | |
| ) | |
| def make_tfidf_pipeline(estimator, model_name: str, max_features: int, min_df: int) -> Pipeline: | |
| steps = [("tfidf", make_tfidf(max_features=max_features, min_df=min_df))] | |
| if model_name == "gradient_boosting": | |
| steps.append(("dense", DenseTransformer())) | |
| steps.append(("classifier", estimator)) | |
| return Pipeline(steps) | |
| def evaluate_feature_model( | |
| *, | |
| feature_name: str, | |
| model_name: str, | |
| estimator, | |
| texts: np.ndarray, | |
| y: np.ndarray, | |
| folds: int, | |
| random_state: int, | |
| max_features: int, | |
| min_df: int, | |
| w2v_size: int, | |
| out_dir: Path, | |
| ) -> tuple[dict, pd.DataFrame]: | |
| splitter = StratifiedKFold(n_splits=folds, shuffle=True, random_state=random_state) | |
| oof_pred = np.zeros_like(y) | |
| oof_score = np.full(len(y), np.nan, dtype=float) | |
| fold_rows: list[dict] = [] | |
| for fold, (train_idx, valid_idx) in enumerate(splitter.split(texts, y), start=1): | |
| train_texts = texts[train_idx].tolist() | |
| valid_texts = texts[valid_idx].tolist() | |
| y_train, y_valid = y[train_idx], y[valid_idx] | |
| clf = clone(estimator) | |
| if feature_name == "tfidf": | |
| model = make_tfidf_pipeline(clf, model_name, max_features, min_df) | |
| model.fit(train_texts, y_train) | |
| pred = model.predict(valid_texts) | |
| score = positive_score(model, valid_texts) | |
| elif feature_name == "word2vec": | |
| w2v = train_word2vec(train_texts, vector_size=w2v_size, random_state=random_state + fold) | |
| x_train = vectorize_with_w2v(w2v, train_texts, w2v_size) | |
| x_valid = vectorize_with_w2v(w2v, valid_texts, w2v_size) | |
| clf.fit(x_train, y_train) | |
| pred = clf.predict(x_valid) | |
| score = positive_score(clf, x_valid) | |
| else: | |
| raise ValueError(f"Unsupported feature: {feature_name}") | |
| oof_pred[valid_idx] = pred | |
| if score is not None: | |
| oof_score[valid_idx] = score | |
| fold_metrics = binary_metrics( | |
| y_valid, | |
| pred, | |
| None if score is None else score, | |
| ) | |
| fold_metrics.update( | |
| { | |
| "feature": feature_name, | |
| "model": model_name, | |
| "fold": fold, | |
| "n_valid": int(len(valid_idx)), | |
| } | |
| ) | |
| fold_rows.append(fold_metrics) | |
| usable_score = None if np.isnan(oof_score).any() else oof_score | |
| aggregate = binary_metrics(y, oof_pred, usable_score) | |
| aggregate.update( | |
| { | |
| "feature": feature_name, | |
| "model": model_name, | |
| "folds": folds, | |
| "n": int(len(y)), | |
| } | |
| ) | |
| pred_frame = pd.DataFrame( | |
| { | |
| "text": texts, | |
| "label": y, | |
| "label_name": [ID2LABEL[int(v)] for v in y], | |
| "prediction": oof_pred, | |
| "prediction_name": [ID2LABEL[int(v)] for v in oof_pred], | |
| "score": oof_score, | |
| } | |
| ) | |
| pred_frame.to_csv(out_dir / f"oof_{feature_name}_{model_name}.csv", index=False) | |
| (out_dir / f"report_{feature_name}_{model_name}.json").write_text( | |
| json.dumps(report_dict(y, oof_pred), indent=2, ensure_ascii=False), | |
| encoding="utf-8", | |
| ) | |
| return aggregate, pd.DataFrame(fold_rows) | |
| def fit_final_model( | |
| *, | |
| feature_name: str, | |
| model_name: str, | |
| estimator, | |
| texts: list[str], | |
| y: np.ndarray, | |
| random_state: int, | |
| max_features: int, | |
| min_df: int, | |
| w2v_size: int, | |
| ): | |
| if feature_name == "tfidf": | |
| model = make_tfidf_pipeline(clone(estimator), model_name, max_features, min_df) | |
| model.fit(texts, y) | |
| return model | |
| w2v = train_word2vec(texts, vector_size=w2v_size, random_state=random_state) | |
| features = vectorize_with_w2v(w2v, texts, w2v_size) | |
| clf = clone(estimator) | |
| clf.fit(features, y) | |
| return W2VBundle(word2vec=w2v, classifier=clf, vector_size=w2v_size) | |
| def extract_tfidf_top_words(texts: list[str], y: np.ndarray, max_features: int, min_df: int, out_dir: Path) -> pd.DataFrame: | |
| vectorizer = make_tfidf(max_features=max_features, min_df=min_df) | |
| x = vectorizer.fit_transform(texts) | |
| model = LogisticRegression( | |
| max_iter=2500, | |
| class_weight="balanced", | |
| solver="liblinear", | |
| random_state=42, | |
| ) | |
| model.fit(x, y) | |
| terms = np.array(vectorizer.get_feature_names_out()) | |
| weights = model.coef_[0] | |
| def meaningful(term: str) -> bool: | |
| parts = term.split() | |
| return any(part not in STOPWORDS and len(part) >= 3 for part in parts) | |
| candidates = pd.DataFrame({"term": terms, "weight": weights}) | |
| candidates = candidates[candidates["term"].map(meaningful)].copy() | |
| pos = candidates.sort_values("weight", ascending=False).head(60) | |
| pos["label_name"] = "Positif" | |
| neg = candidates.sort_values("weight", ascending=True).head(60) | |
| neg["label_name"] = "Negatif" | |
| top_words = pd.concat([pos, neg], ignore_index=True) | |
| top_words.to_csv(out_dir / "top_words_tfidf.csv", index=False) | |
| return top_words | |
| def write_keyword_counts(df: pd.DataFrame, out_dir: Path) -> pd.DataFrame: | |
| token_sets = df["text"].map(lambda text: set(tokenize(text, remove_stopwords=False, min_len=2))) | |
| rows: list[dict] = [] | |
| pos_total = int(df["label"].eq(1).sum()) | |
| neg_total = int(df["label"].eq(0).sum()) | |
| for term in KEYWORD_SEEDS: | |
| pos_count = int(((df["label"].eq(1)) & token_sets.map(lambda tokens: term in tokens)).sum()) | |
| neg_count = int(((df["label"].eq(0)) & token_sets.map(lambda tokens: term in tokens)).sum()) | |
| if pos_count == 0 and neg_count == 0: | |
| continue | |
| pos_rate = pos_count / pos_total if pos_total else 0.0 | |
| neg_rate = neg_count / neg_total if neg_total else 0.0 | |
| dominant = "Positif" if pos_rate >= neg_rate else "Negatif" | |
| rows.append( | |
| { | |
| "term": term, | |
| "positif_docs": pos_count, | |
| "negatif_docs": neg_count, | |
| "positif_rate": pos_rate, | |
| "negatif_rate": neg_rate, | |
| "dominant_label": dominant, | |
| "lift": (pos_rate + 1e-9) / (neg_rate + 1e-9), | |
| } | |
| ) | |
| result = pd.DataFrame(rows).sort_values( | |
| ["dominant_label", "lift", "positif_docs", "negatif_docs"], | |
| ascending=[False, False, False, False], | |
| ) | |
| result.to_csv(out_dir / "keyword_counts.csv", index=False) | |
| return result | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Train TF-IDF and Word2Vec classical baselines with 10-fold CV.") | |
| parser.add_argument("--data", default=str(DATA_PATH), help="Prepared binary CSV.") | |
| parser.add_argument("--out-dir", default=str(ARTIFACT_DIR / "classical"), help="Output artifact directory.") | |
| parser.add_argument("--fig-dir", default=str(ARTIFACT_DIR / "figures"), help="Shared figure directory.") | |
| parser.add_argument("--folds", type=int, default=10, help="Number of stratified CV folds.") | |
| parser.add_argument("--random-state", type=int, default=42) | |
| parser.add_argument("--max-features", type=int, default=12000) | |
| parser.add_argument("--min-df", type=int, default=2) | |
| parser.add_argument("--w2v-size", type=int, default=150) | |
| parser.add_argument( | |
| "--features", | |
| nargs="+", | |
| default=["tfidf", "word2vec"], | |
| choices=["tfidf", "word2vec"], | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| out_dir = Path(args.out_dir) | |
| fig_dir = Path(args.fig_dir) | |
| model_dir = MODEL_DIR / "classical" | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| fig_dir.mkdir(parents=True, exist_ok=True) | |
| model_dir.mkdir(parents=True, exist_ok=True) | |
| df = load_binary_dataset(args.data) | |
| texts = df["text"].to_numpy() | |
| text_list = df["text"].tolist() | |
| y = df["label"].to_numpy(dtype=int) | |
| estimators = classifiers(args.random_state) | |
| results: list[dict] = [] | |
| fold_frames: list[pd.DataFrame] = [] | |
| for feature in args.features: | |
| for model_name, estimator in estimators.items(): | |
| if feature == "tfidf": | |
| model_estimator = estimator if model_name != "gradient_boosting" else estimator | |
| if model_name == "multinomial_nb": | |
| model_estimator = MultinomialNB() | |
| else: | |
| model_estimator = estimator | |
| print(f"Training {feature} + {model_name} with {args.folds}-fold CV") | |
| aggregate, folds = evaluate_feature_model( | |
| feature_name=feature, | |
| model_name=model_name, | |
| estimator=model_estimator, | |
| texts=texts, | |
| y=y, | |
| folds=args.folds, | |
| random_state=args.random_state, | |
| max_features=args.max_features, | |
| min_df=args.min_df, | |
| w2v_size=args.w2v_size, | |
| out_dir=out_dir, | |
| ) | |
| results.append(aggregate) | |
| fold_frames.append(folds) | |
| results_df = pd.DataFrame(results).sort_values(["f1", "roc_auc", "accuracy"], ascending=False) | |
| results_df.to_csv(out_dir / "results.csv", index=False) | |
| pd.concat(fold_frames, ignore_index=True).to_csv(out_dir / "fold_metrics.csv", index=False) | |
| best = results_df.iloc[0].to_dict() | |
| best_feature = best["feature"] | |
| best_model_name = best["model"] | |
| best_estimator = estimators[best_model_name] | |
| final_model = fit_final_model( | |
| feature_name=best_feature, | |
| model_name=best_model_name, | |
| estimator=best_estimator, | |
| texts=text_list, | |
| y=y, | |
| random_state=args.random_state, | |
| max_features=args.max_features, | |
| min_df=args.min_df, | |
| w2v_size=args.w2v_size, | |
| ) | |
| joblib.dump(final_model, model_dir / "best_model.joblib") | |
| (model_dir / "metadata.json").write_text( | |
| json.dumps(best, indent=2, ensure_ascii=False), | |
| encoding="utf-8", | |
| ) | |
| best_oof = pd.read_csv(out_dir / f"oof_{best_feature}_{best_model_name}.csv") | |
| score = None if best_oof["score"].isna().any() else best_oof["score"].to_numpy() | |
| plot_confusion( | |
| best_oof["label"].to_numpy(), | |
| best_oof["prediction"].to_numpy(), | |
| fig_dir / "classical_best_confusion_matrix.png", | |
| title=f"Classical best: {best_feature} + {best_model_name}", | |
| ) | |
| plot_roc_curve( | |
| best_oof["label"].to_numpy(), | |
| score, | |
| fig_dir / "classical_best_roc_auc.png", | |
| title=f"Classical ROC AUC: {best_feature} + {best_model_name}", | |
| ) | |
| top_words = extract_tfidf_top_words(text_list, y, args.max_features, args.min_df, out_dir) | |
| write_keyword_counts(df, out_dir) | |
| plot_top_words(top_words, fig_dir / "top_words_tfidf.png", title="Top words TF-IDF") | |
| write_wordcloud( | |
| df.loc[df["label"].eq(1), "text"].tolist(), | |
| fig_dir / "wordcloud_positif.png", | |
| title="Word cloud Positif", | |
| ) | |
| write_wordcloud( | |
| df.loc[df["label"].eq(0), "text"].tolist(), | |
| fig_dir / "wordcloud_negatif.png", | |
| title="Word cloud Negatif", | |
| ) | |
| print("Best classical model:") | |
| print(json.dumps(best, indent=2, ensure_ascii=False)) | |
| if __name__ == "__main__": | |
| main() | |