| """Build a fully open AffectiveText prediction cache with out-of-fold regressors.""" |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import logging |
| from pathlib import Path |
|
|
| import numpy as np |
| from sklearn.decomposition import TruncatedSVD |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.model_selection import KFold |
| from sklearn.neighbors import KNeighborsRegressor |
| from sklearn.preprocessing import Normalizer |
|
|
| import sys |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
|
|
| from src.data import EMOTION_NAMES, load_affective_text |
|
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") |
| log = logging.getLogger(__name__) |
|
|
|
|
| def macro_pearson(a: np.ndarray, b: np.ndarray) -> float: |
| vals = [] |
| for j in range(a.shape[1]): |
| aj = a[:, j] |
| bj = b[:, j] |
| if np.std(aj) <= 1e-12 or np.std(bj) <= 1e-12: |
| continue |
| vals.append(float(np.corrcoef(aj, bj)[0, 1])) |
| return float(np.mean(vals)) if vals else float("nan") |
|
|
|
|
| def fit_predict_fold( |
| train_texts: list[str], |
| test_texts: list[str], |
| train_targets: np.ndarray, |
| n_components: int, |
| n_neighbors: int, |
| ) -> np.ndarray: |
| vectorizer = TfidfVectorizer( |
| lowercase=True, |
| strip_accents="unicode", |
| sublinear_tf=True, |
| ngram_range=(1, 2), |
| min_df=1, |
| max_df=0.95, |
| stop_words="english", |
| ) |
| x_train = vectorizer.fit_transform(train_texts) |
| x_test = vectorizer.transform(test_texts) |
|
|
| max_rank = min(x_train.shape[0] - 1, x_train.shape[1] - 1) |
| if max_rank >= 2: |
| rank = min(n_components, max_rank) |
| svd = TruncatedSVD(n_components=rank, random_state=0) |
| normalizer = Normalizer(copy=False) |
| x_train = normalizer.fit_transform(svd.fit_transform(x_train)) |
| x_test = normalizer.transform(svd.transform(x_test)) |
| else: |
| x_train = x_train.toarray() |
| x_test = x_test.toarray() |
|
|
| knn = KNeighborsRegressor( |
| n_neighbors=min(n_neighbors, len(train_texts)), |
| weights="distance", |
| metric="minkowski", |
| p=2, |
| ) |
| knn.fit(x_train, train_targets) |
| return np.asarray(knn.predict(x_test), dtype=float) |
|
|
|
|
| def build_open_predictions( |
| headlines: list[str], |
| raw_scores: np.ndarray, |
| n_splits: int, |
| n_components: int, |
| n_neighbors: int, |
| seed: int, |
| ) -> tuple[np.ndarray, np.ndarray]: |
| n = len(headlines) |
| preds = np.zeros_like(raw_scores, dtype=float) |
| folds = np.full(n, -1, dtype=int) |
| splitter = KFold(n_splits=n_splits, shuffle=True, random_state=seed) |
| global_mean = raw_scores.mean(axis=0) |
|
|
| for fold_id, (train_idx, test_idx) in enumerate(splitter.split(headlines)): |
| train_texts = [headlines[i] for i in train_idx] |
| test_texts = [headlines[i] for i in test_idx] |
| train_targets = raw_scores[train_idx] |
| fold_preds = fit_predict_fold( |
| train_texts=train_texts, |
| test_texts=test_texts, |
| train_targets=train_targets, |
| n_components=n_components, |
| n_neighbors=n_neighbors, |
| ) |
| fold_preds = np.clip(fold_preds, 0.0, None) |
| zero_rows = fold_preds.sum(axis=1) <= 1e-12 |
| if np.any(zero_rows): |
| fold_preds[zero_rows] = global_mean |
| preds[test_idx] = fold_preds |
| folds[test_idx] = fold_id |
| log.info("Finished fold %d/%d", fold_id + 1, n_splits) |
|
|
| return preds, folds |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--data-dir", default="data/raw/AffectiveText.Semeval.2007") |
| parser.add_argument("--output", default="data/processed/affective_text_open_oof_predictions.jsonl") |
| parser.add_argument("--n-splits", type=int, default=5) |
| parser.add_argument("--n-components", type=int, default=128) |
| parser.add_argument("--n-neighbors", type=int, default=25) |
| parser.add_argument("--seed", type=int, default=2026) |
| parser.add_argument("--limit", type=int, default=None) |
| parser.add_argument("--overwrite", action="store_true") |
| args = parser.parse_args() |
|
|
| output_path = Path(args.output) |
| if output_path.exists() and not args.overwrite: |
| raise FileExistsError(f"Output already exists: {output_path}") |
|
|
| data = load_affective_text(args.data_dir) |
| ids = data["ids"] |
| headlines = data["headlines"] |
| raw_scores = np.asarray(data["raw_scores"], dtype=float) |
| if args.limit is not None: |
| ids = ids[:args.limit] |
| headlines = headlines[:args.limit] |
| raw_scores = raw_scores[:args.limit] |
|
|
| pred_scores, folds = build_open_predictions( |
| headlines=headlines, |
| raw_scores=raw_scores, |
| n_splits=args.n_splits, |
| n_components=args.n_components, |
| n_neighbors=args.n_neighbors, |
| seed=args.seed, |
| ) |
|
|
| macro_r = macro_pearson(raw_scores, pred_scores) |
| flat_r = float(np.corrcoef(raw_scores.reshape(-1), pred_scores.reshape(-1))[0, 1]) |
| log.info( |
| "Open fallback predictor quality: macro Pearson=%.3f, flattened Pearson=%.3f", |
| macro_r, |
| flat_r, |
| ) |
|
|
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| with open(output_path, "w", encoding="utf-8") as f: |
| for idx, headline, scores, fold_id in zip(ids, headlines, pred_scores, folds): |
| row = { |
| "id": idx, |
| "headline": headline, |
| "emotions": EMOTION_NAMES, |
| "scores": [float(x) for x in scores], |
| "provider": "open_fallback", |
| "model": "tfidf_svd_knn_oof", |
| "fold": int(fold_id), |
| "builder": { |
| "n_splits": int(args.n_splits), |
| "n_components": int(args.n_components), |
| "n_neighbors": int(args.n_neighbors), |
| "seed": int(args.seed), |
| }, |
| "notes": "Deterministic out-of-fold TF-IDF+SVD+kNN regression fallback.", |
| } |
| f.write(json.dumps(row, ensure_ascii=True) + "\n") |
|
|
| log.info("Finished. Predictions cached at %s", output_path) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|