simplexuq-code / scripts /cache_affective_text_open_predictions.py
anonymous0523ly's picture
Initial anonymous code release
fc329a3 verified
raw
history blame
6.18 kB
"""Build a fully open AffectiveText prediction cache with out-of-fold regressors."""
from __future__ import annotations
import argparse
import json
import logging
from pathlib import Path
import numpy as np
from sklearn.decomposition import TruncatedSVD
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import KFold
from sklearn.neighbors import KNeighborsRegressor
from sklearn.preprocessing import Normalizer
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from src.data import EMOTION_NAMES, load_affective_text
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
def macro_pearson(a: np.ndarray, b: np.ndarray) -> float:
vals = []
for j in range(a.shape[1]):
aj = a[:, j]
bj = b[:, j]
if np.std(aj) <= 1e-12 or np.std(bj) <= 1e-12:
continue
vals.append(float(np.corrcoef(aj, bj)[0, 1]))
return float(np.mean(vals)) if vals else float("nan")
def fit_predict_fold(
train_texts: list[str],
test_texts: list[str],
train_targets: np.ndarray,
n_components: int,
n_neighbors: int,
) -> np.ndarray:
vectorizer = TfidfVectorizer(
lowercase=True,
strip_accents="unicode",
sublinear_tf=True,
ngram_range=(1, 2),
min_df=1,
max_df=0.95,
stop_words="english",
)
x_train = vectorizer.fit_transform(train_texts)
x_test = vectorizer.transform(test_texts)
max_rank = min(x_train.shape[0] - 1, x_train.shape[1] - 1)
if max_rank >= 2:
rank = min(n_components, max_rank)
svd = TruncatedSVD(n_components=rank, random_state=0)
normalizer = Normalizer(copy=False)
x_train = normalizer.fit_transform(svd.fit_transform(x_train))
x_test = normalizer.transform(svd.transform(x_test))
else:
x_train = x_train.toarray()
x_test = x_test.toarray()
knn = KNeighborsRegressor(
n_neighbors=min(n_neighbors, len(train_texts)),
weights="distance",
metric="minkowski",
p=2,
)
knn.fit(x_train, train_targets)
return np.asarray(knn.predict(x_test), dtype=float)
def build_open_predictions(
headlines: list[str],
raw_scores: np.ndarray,
n_splits: int,
n_components: int,
n_neighbors: int,
seed: int,
) -> tuple[np.ndarray, np.ndarray]:
n = len(headlines)
preds = np.zeros_like(raw_scores, dtype=float)
folds = np.full(n, -1, dtype=int)
splitter = KFold(n_splits=n_splits, shuffle=True, random_state=seed)
global_mean = raw_scores.mean(axis=0)
for fold_id, (train_idx, test_idx) in enumerate(splitter.split(headlines)):
train_texts = [headlines[i] for i in train_idx]
test_texts = [headlines[i] for i in test_idx]
train_targets = raw_scores[train_idx]
fold_preds = fit_predict_fold(
train_texts=train_texts,
test_texts=test_texts,
train_targets=train_targets,
n_components=n_components,
n_neighbors=n_neighbors,
)
fold_preds = np.clip(fold_preds, 0.0, None)
zero_rows = fold_preds.sum(axis=1) <= 1e-12
if np.any(zero_rows):
fold_preds[zero_rows] = global_mean
preds[test_idx] = fold_preds
folds[test_idx] = fold_id
log.info("Finished fold %d/%d", fold_id + 1, n_splits)
return preds, folds
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", default="data/raw/AffectiveText.Semeval.2007")
parser.add_argument("--output", default="data/processed/affective_text_open_oof_predictions.jsonl")
parser.add_argument("--n-splits", type=int, default=5)
parser.add_argument("--n-components", type=int, default=128)
parser.add_argument("--n-neighbors", type=int, default=25)
parser.add_argument("--seed", type=int, default=2026)
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--overwrite", action="store_true")
args = parser.parse_args()
output_path = Path(args.output)
if output_path.exists() and not args.overwrite:
raise FileExistsError(f"Output already exists: {output_path}")
data = load_affective_text(args.data_dir)
ids = data["ids"]
headlines = data["headlines"]
raw_scores = np.asarray(data["raw_scores"], dtype=float)
if args.limit is not None:
ids = ids[:args.limit]
headlines = headlines[:args.limit]
raw_scores = raw_scores[:args.limit]
pred_scores, folds = build_open_predictions(
headlines=headlines,
raw_scores=raw_scores,
n_splits=args.n_splits,
n_components=args.n_components,
n_neighbors=args.n_neighbors,
seed=args.seed,
)
macro_r = macro_pearson(raw_scores, pred_scores)
flat_r = float(np.corrcoef(raw_scores.reshape(-1), pred_scores.reshape(-1))[0, 1])
log.info(
"Open fallback predictor quality: macro Pearson=%.3f, flattened Pearson=%.3f",
macro_r,
flat_r,
)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
for idx, headline, scores, fold_id in zip(ids, headlines, pred_scores, folds):
row = {
"id": idx,
"headline": headline,
"emotions": EMOTION_NAMES,
"scores": [float(x) for x in scores],
"provider": "open_fallback",
"model": "tfidf_svd_knn_oof",
"fold": int(fold_id),
"builder": {
"n_splits": int(args.n_splits),
"n_components": int(args.n_components),
"n_neighbors": int(args.n_neighbors),
"seed": int(args.seed),
},
"notes": "Deterministic out-of-fold TF-IDF+SVD+kNN regression fallback.",
}
f.write(json.dumps(row, ensure_ascii=True) + "\n")
log.info("Finished. Predictions cached at %s", output_path)
if __name__ == "__main__":
main()