Text Classification
Transformers
Safetensors
Chinese
chinese
ai-text-detection
ensemble
bert
roberta
qwen
lora
research
dataset
Instructions to use LUCIFerace/enhanced-replica-model-pack with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use LUCIFerace/enhanced-replica-model-pack with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-classification", model="LUCIFerace/enhanced-replica-model-pack")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("LUCIFerace/enhanced-replica-model-pack", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| """ | |
| Full ensemble script for 0109. | |
| Loads available voter predictions for a dataset, trains LR Stacking on dev, | |
| and evaluates on test. Supports global and per-bucket evaluation. | |
| Expected voter sources (auto-discovered): | |
| - Qwen specialists: outputs/cross_domain/{dataset}/qwen7b_*_{split}_predictions.csv | |
| - FastDetectGPT 3B: outputs/zero_shot/{dataset}/{split}_fdgpt_scores.csv | |
| - Binoculars 3B: outputs/zero_shot/{dataset}/{split}_bino_scores.csv | |
| - BERT/RoBERTa (optional): outputs/plm/{dataset}/{name}_{split}_predictions.csv | |
| Outputs to: outputs/ensemble_full/{dataset}/ | |
| """ | |
| import json | |
| import math | |
| import numpy as np | |
| import pandas as pd | |
| from pathlib import Path | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score | |
| from scipy.special import expit | |
| ROOT = Path(__file__).resolve().parents[2] | |
| CROSS_DOMAIN_ROOT = ROOT / "outputs/cross_domain" | |
| ZERO_SHOT_ROOT = ROOT / "outputs/zero_shot" | |
| PLM_ROOT = ROOT / "outputs/plm" | |
| OUT_ROOT = ROOT / "outputs/ensemble_full" | |
| MIN_SAMPLES_PER_BUCKET = 20 | |
| MIN_SAMPLES_GLOBAL_LR = 50 | |
| BUCKETS = { | |
| "extreme_short": (0, 75), | |
| "short": (76, 180), | |
| "general": (181, 999999), | |
| } | |
| DS13_SUBSETS = { | |
| "normal": (1, 4000), | |
| "mixed_attack": (4001, 5000), | |
| "paraphrase_attack": (5001, 6000), | |
| "perturbation_attack": (6001, 7000), | |
| "len_64": (7001, 8000), | |
| "len_128": (8001, 9000), | |
| "len_256": (9001, 10000), | |
| "len_512": (10001, 11000), | |
| } | |
| VOTER_COLUMN_MAP = { | |
| "qwen7b_general": "pred_prob", | |
| "qwen7b_short": "pred_prob", | |
| "qwen7b_extreme_short": "pred_prob", | |
| "fdgpt": "fdgpt_score", | |
| "binoculars": "binoculars_score", | |
| "bert": "pred_prob", | |
| "roberta": "pred_prob", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def discover_voters(dataset: str): | |
| """Auto-discover available prediction CSVs for a dataset.""" | |
| voters = {} | |
| # 1. Qwen specialists | |
| cd_dir = CROSS_DOMAIN_ROOT / dataset | |
| if cd_dir.exists(): | |
| for split in ["dev", "test"]: | |
| for adapter in ["qwen7b_general", "qwen7b_short", "qwen7b_extreme_short"]: | |
| csv = cd_dir / f"{adapter}_{split}_predictions.csv" | |
| if csv.exists(): | |
| voters.setdefault(adapter, {})[split] = csv | |
| # 2. Zero-shot detectors | |
| zs_dir = ZERO_SHOT_ROOT / dataset | |
| if zs_dir.exists(): | |
| for split in ["dev", "test"]: | |
| for name, fname in [ | |
| ("fdgpt", f"{split}_fdgpt_scores.csv"), | |
| ("binoculars", f"{split}_bino_scores.csv"), | |
| ]: | |
| csv = zs_dir / fname | |
| if csv.exists(): | |
| voters.setdefault(name, {})[split] = csv | |
| # 3. PLM baselines (optional) | |
| plm_dir = PLM_ROOT / dataset | |
| if plm_dir.exists(): | |
| for split in ["dev", "test"]: | |
| for name in ["bert", "roberta"]: | |
| csv = plm_dir / f"{name}_{split}_predictions.csv" | |
| if csv.exists(): | |
| voters.setdefault(name, {})[split] = csv | |
| return voters | |
| def load_voter_df(voters, split): | |
| """Align all voter predictions for a given split into one DataFrame by row index. | |
| All inference scripts process the same jsonl in the same order, so index | |
| alignment is the most robust method (avoids Cartesian product on duplicate texts). | |
| """ | |
| base_voter = list(voters.keys())[0] | |
| base_df = pd.read_csv(voters[base_voter][split]) | |
| if "length" not in base_df.columns: | |
| base_df["length"] = base_df["text"].astype(str).apply(len) | |
| result = base_df[["text", "label", "length"]].copy() | |
| # Bring in id if any voter has it (needed for DS13 subset evaluation) | |
| for v in voters: | |
| tmp = pd.read_csv(voters[v][split]) | |
| if "id" in tmp.columns: | |
| if len(tmp) != len(result): | |
| raise ValueError(f"Length mismatch for {v}: {len(tmp)} vs {len(result)}") | |
| result["id"] = tmp["id"].values | |
| break | |
| for voter_name, paths in voters.items(): | |
| vdf = pd.read_csv(paths[split]) | |
| if len(vdf) != len(result): | |
| raise ValueError(f"Length mismatch for {voter_name}: {len(vdf)} vs {len(result)}") | |
| col = VOTER_COLUMN_MAP.get(voter_name, "pred_prob") | |
| if col not in vdf.columns: | |
| raise ValueError(f"Expected column '{col}' not found in {paths[split]}") | |
| result[f"feat_{voter_name}"] = vdf[col].values | |
| return result.reset_index(drop=True) | |
| def best_threshold_f1(y_true, probs): | |
| y_true = np.asarray(y_true) | |
| probs = np.asarray(probs) | |
| if len(y_true) == 0: | |
| return 0.5, 0.0 | |
| # dynamic range based on score distribution | |
| p_min, p_max = probs.min(), probs.max() | |
| if p_min == p_max: | |
| return p_min, 0.0 | |
| thresholds = np.linspace(p_min, p_max, 200) | |
| preds = (probs[:, None] >= thresholds).astype(int) | |
| tp = (preds * y_true[:, None]).sum(axis=0) | |
| fp = (preds * (1 - y_true[:, None])).sum(axis=0) | |
| fn = y_true.sum() - tp | |
| precision = tp / (tp + fp + 1e-9) | |
| recall = tp / (tp + fn + 1e-9) | |
| f1 = 2 * precision * recall / (precision + recall + 1e-9) | |
| best_idx = np.argmax(f1) | |
| return thresholds[best_idx], f1[best_idx] | |
| def evaluate(y_true, probs, threshold=None): | |
| if threshold is None: | |
| threshold, _ = best_threshold_f1(y_true, probs) | |
| preds = (probs >= threshold).astype(int) | |
| return { | |
| "threshold": round(float(threshold), 4), | |
| "f1": round(f1_score(y_true, preds, zero_division=0), 6), | |
| "precision": round(precision_score(y_true, preds, zero_division=0), 6), | |
| "recall": round(recall_score(y_true, preds, zero_division=0), 6), | |
| "accuracy": round(accuracy_score(y_true, preds), 6), | |
| } | |
| def bucket_name(length): | |
| if length <= 75: | |
| return "extreme_short" | |
| elif length <= 180: | |
| return "short" | |
| else: | |
| return "general" | |
| # --------------------------------------------------------------------------- | |
| # LR models | |
| # --------------------------------------------------------------------------- | |
| def fit_lr_global(df_dev, feature_cols, C=1.0): | |
| X = df_dev[feature_cols].copy() | |
| X["len_norm"] = df_dev["length"] / 1000.0 | |
| y = df_dev["label"].values | |
| scaler = StandardScaler() | |
| Xs = scaler.fit_transform(X) | |
| clf = LogisticRegression(C=C, max_iter=1000, solver="lbfgs") | |
| clf.fit(Xs, y) | |
| probs = clf.predict_proba(Xs)[:, 1] | |
| th, _ = best_threshold_f1(y, probs) | |
| meta = { | |
| "feature_names": list(X.columns), | |
| "coef": clf.coef_[0].tolist(), | |
| "intercept": float(clf.intercept_[0]), | |
| "threshold": round(th, 4), | |
| } | |
| return scaler, clf, th, meta | |
| def apply_lr_global(df, scaler, clf, feature_cols): | |
| X = df[feature_cols].copy() | |
| X["len_norm"] = df["length"] / 1000.0 | |
| Xs = scaler.transform(X) | |
| return clf.predict_proba(Xs)[:, 1] | |
| def fit_lr_bucket(df_dev, feature_cols, global_scaler, global_clf, global_th): | |
| bucket_models = {} | |
| bucket_thresholds = {} | |
| meta = { | |
| "global": {"threshold": global_th}, | |
| "fallback": {}, | |
| } | |
| df_dev = df_dev.copy() | |
| df_dev["bucket"] = df_dev["length"].apply(bucket_name) | |
| for bname, (lo, hi) in BUCKETS.items(): | |
| sub = df_dev[(df_dev["length"] >= lo) & (df_dev["length"] <= hi)] | |
| if len(sub) >= MIN_SAMPLES_PER_BUCKET: | |
| X = sub[feature_cols].copy() | |
| X["len_norm"] = sub["length"] / 1000.0 | |
| y = sub["label"].values | |
| scaler = StandardScaler() | |
| Xs = scaler.fit_transform(X) | |
| clf = LogisticRegression(C=1.0, max_iter=1000, solver="lbfgs") | |
| clf.fit(Xs, y) | |
| probs = clf.predict_proba(Xs)[:, 1] | |
| th, _ = best_threshold_f1(y, probs) | |
| bucket_models[bname] = (scaler, clf) | |
| bucket_thresholds[bname] = th | |
| meta[bname] = {"coef": clf.coef_[0].tolist(), "intercept": float(clf.intercept_[0]), "threshold": th, "n_dev": len(sub)} | |
| meta["fallback"][bname] = False | |
| else: | |
| bucket_models[bname] = (global_scaler, global_clf) | |
| bucket_thresholds[bname] = global_th | |
| meta[bname] = {"fallback_to_global": True, "n_dev": len(sub)} | |
| meta["fallback"][bname] = True | |
| def _apply(df): | |
| df = df.copy() | |
| df["bucket"] = df["length"].apply(bucket_name) | |
| probs = np.zeros(len(df)) | |
| for bname, (lo, hi) in BUCKETS.items(): | |
| mask = (df["length"] >= lo) & (df["length"] <= hi) | |
| if mask.any(): | |
| scaler, clf = bucket_models[bname] | |
| X = df.loc[mask, feature_cols].copy() | |
| X["len_norm"] = df.loc[mask, "length"] / 1000.0 | |
| Xs = scaler.transform(X) | |
| probs[mask] = clf.predict_proba(Xs)[:, 1] | |
| return probs | |
| return _apply, meta, bucket_thresholds | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--dataset", required=True, help="e.g. DS06_External_core_balanced_v1 or DS13_NLPCC_full_test_v1") | |
| parser.add_argument("--fallback-dataset", default="DS06_External_core_balanced_v1", | |
| help="If dev samples are insufficient, use this dataset's dev to train LR") | |
| args = parser.parse_args() | |
| voters = discover_voters(args.dataset) | |
| print(f"Discovered voters for {args.dataset}: {list(voters.keys())}") | |
| if not voters: | |
| print("No voter predictions found. Please run inference first. Exiting.") | |
| return | |
| df_dev = load_voter_df(voters, "dev") | |
| df_test = load_voter_df(voters, "test") | |
| print(f"Loaded dev={len(df_dev)} test={len(df_test)}") | |
| feature_cols = [c for c in df_dev.columns if c.startswith("feat_")] | |
| print(f"Feature columns ({len(feature_cols)}): {feature_cols}") | |
| # ----------------------------------------------------------------------- | |
| # Global LR (with fallback if dev too small) | |
| # ----------------------------------------------------------------------- | |
| print("\n[Global LR Stacking]") | |
| if len(df_dev) >= MIN_SAMPLES_GLOBAL_LR: | |
| scaler, clf, th, meta = fit_lr_global(df_dev, feature_cols) | |
| print(f" Trained on {args.dataset} dev (n={len(df_dev)})") | |
| else: | |
| fb_voters = discover_voters(args.fallback_dataset) | |
| if not fb_voters: | |
| print(f" Fallback dataset {args.fallback_dataset} has no voters; using current dev anyway") | |
| scaler, clf, th, meta = fit_lr_global(df_dev, feature_cols) | |
| else: | |
| fb_dev = load_voter_df(fb_voters, "dev") | |
| # intersect features | |
| fb_features = [c for c in fb_dev.columns if c.startswith("feat_")] | |
| common_features = list(set(feature_cols) & set(fb_features)) | |
| if not common_features: | |
| print(" No common features between datasets; using current dev anyway") | |
| scaler, clf, th, meta = fit_lr_global(df_dev, feature_cols) | |
| else: | |
| print(f" Dev too small (n={len(df_dev)}). Fallback LR trained on {args.fallback_dataset} dev (n={len(fb_dev)})") | |
| scaler, clf, th, meta = fit_lr_global(fb_dev, common_features) | |
| feature_cols = common_features | |
| # ensure any missing common features are present (should be rare) | |
| for c in common_features: | |
| for d in (df_dev, df_test): | |
| if c not in d.columns: | |
| d[c] = np.nan | |
| print(f" coef={meta['coef']}") | |
| print(f" intercept={meta['intercept']}") | |
| print(f" threshold={th}") | |
| summary = {} | |
| for split, df in [("dev", df_dev), ("test", df_test)]: | |
| probs = apply_lr_global(df, scaler, clf, feature_cols) | |
| metrics = evaluate(df["label"].values, probs, threshold=th) | |
| summary.setdefault("lr_global", {})[split] = metrics | |
| print(f" {split}: f1={metrics['f1']:.4f} acc={metrics['accuracy']:.4f} prec={metrics['precision']:.4f} rec={metrics['recall']:.4f}") | |
| # ----------------------------------------------------------------------- | |
| # LR Bucket | |
| # ----------------------------------------------------------------------- | |
| print("\n[LR Bucket]") | |
| lb_fn, lb_meta, lb_ths = fit_lr_bucket(df_dev, feature_cols, scaler, clf, th) | |
| for split, df in [("dev", df_dev), ("test", df_test)]: | |
| probs = lb_fn(df) | |
| df_tmp = df.copy() | |
| df_tmp["prob"] = probs | |
| preds = np.zeros(len(df_tmp), dtype=int) | |
| for bname, (lo, hi) in BUCKETS.items(): | |
| mask = (df_tmp["length"] >= lo) & (df_tmp["length"] <= hi) | |
| if mask.any(): | |
| th_b = lb_ths[bname] | |
| preds[mask] = (df_tmp.loc[mask, "prob"].values >= th_b).astype(int) | |
| metrics = { | |
| "threshold": "per-bucket", | |
| "f1": round(f1_score(df_tmp["label"].values, preds, zero_division=0), 6), | |
| "precision": round(precision_score(df_tmp["label"].values, preds, zero_division=0), 6), | |
| "recall": round(recall_score(df_tmp["label"].values, preds, zero_division=0), 6), | |
| "accuracy": round(accuracy_score(df_tmp["label"].values, preds), 6), | |
| } | |
| summary.setdefault("lr_bucket", {})[split] = metrics | |
| print(f" {split}: f1={metrics['f1']:.4f} acc={metrics['accuracy']:.4f} prec={metrics['precision']:.4f} rec={metrics['recall']:.4f}") | |
| # ----------------------------------------------------------------------- | |
| # Per-subset evaluation for DS13 | |
| # ----------------------------------------------------------------------- | |
| if args.dataset == "DS13_NLPCC_full_test_v1" and "id" in df_test.columns: | |
| print("\n[DS13 Per-Subset LR Global]") | |
| probs_test = apply_lr_global(df_test, scaler, clf, feature_cols) | |
| df_test_eval = df_test.copy() | |
| df_test_eval["ensemble_prob"] = probs_test | |
| df_test_eval["ensemble_pred"] = (probs_test >= th).astype(int) | |
| subset_results = {} | |
| for sname, (lo, hi) in DS13_SUBSETS.items(): | |
| sub = df_test_eval[(df_test_eval["id"] >= lo) & (df_test_eval["id"] <= hi)] | |
| if len(sub) == 0: | |
| continue | |
| y_true = sub["label"].values | |
| y_pred = sub["ensemble_pred"].values | |
| subset_results[sname] = { | |
| "n": len(sub), | |
| "f1": round(f1_score(y_true, y_pred, zero_division=0), 6), | |
| "precision": round(precision_score(y_true, y_pred, zero_division=0), 6), | |
| "recall": round(recall_score(y_true, y_pred, zero_division=0), 6), | |
| "accuracy": round(accuracy_score(y_true, y_pred), 6), | |
| } | |
| print(f" {sname}: f1={subset_results[sname]['f1']:.4f} acc={subset_results[sname]['accuracy']:.4f}") | |
| summary["lr_global"]["subset"] = subset_results | |
| # ----------------------------------------------------------------------- | |
| # Save outputs | |
| # ----------------------------------------------------------------------- | |
| out_dir = OUT_ROOT / args.dataset | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| for split, df in [("dev", df_dev), ("test", df_test)]: | |
| probs = apply_lr_global(df, scaler, clf, feature_cols) | |
| df_out = df.copy() | |
| df_out["ensemble_prob"] = probs | |
| df_out["ensemble_pred"] = (probs >= th).astype(int) | |
| df_out.to_csv(out_dir / f"ensemble_lr_global_{split}_predictions.csv", index=False, encoding="utf-8") | |
| probs_b = lb_fn(df) | |
| df_out_b = df.copy() | |
| df_out_b["ensemble_prob"] = probs_b | |
| df_tmp = df_out_b.copy() | |
| df_tmp["prob"] = probs_b | |
| preds_b = np.zeros(len(df_tmp), dtype=int) | |
| for bname, (lo, hi) in BUCKETS.items(): | |
| mask = (df_tmp["length"] >= lo) & (df_tmp["length"] <= hi) | |
| if mask.any(): | |
| preds_b[mask] = (df_tmp.loc[mask, "prob"].values >= lb_ths[bname]).astype(int) | |
| df_out_b["ensemble_pred"] = preds_b | |
| df_out_b.to_csv(out_dir / f"ensemble_lr_bucket_{split}_predictions.csv", index=False, encoding="utf-8") | |
| with open(out_dir / "ensemble_meta.json", "w", encoding="utf-8") as f: | |
| json.dump({"lr_global": meta, "lr_bucket": lb_meta, "voters": list(voters.keys()), "summary": summary}, f, ensure_ascii=False, indent=2) | |
| print(f"\nSaved all outputs to {out_dir}") | |
| if __name__ == "__main__": | |
| main() | |