|
|
"""Meta-classifier for Financial PhraseBank (FPB) datasets. |
|
|
|
|
|
The script expects the following pre-computed artifacts inside ``outputs/`` |
|
|
(or custom paths can be supplied): |
|
|
|
|
|
* ``FinSent_<split>_raw_probs_prob_features.csv`` – base probabilities and |
|
|
probability-derived features for FinBERT/RoBERTa |
|
|
* ``FPB_MultiLLM_<split>.csv`` – expert-signal metrics (KL, L1, agreement) |
|
|
* ``Sentences_<split>_semantics.csv`` – structured semantics flags |
|
|
|
|
|
Example command (50Agree subset):: |
|
|
|
|
|
python "FPB Meta Classifier.py" \\ |
|
|
--dataset 50Agree \\ |
|
|
--folds 5 \\ |
|
|
--models logreg xgboost \\ |
|
|
--artifact_prefix outputs/FinSent_50Agree_meta \\ |
|
|
--save_predictions --save_models --verbose |
|
|
|
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import os |
|
|
from dataclasses import dataclass |
|
|
from typing import Dict, Iterable, List, Optional |
|
|
|
|
|
import joblib |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from sklearn.compose import ColumnTransformer |
|
|
from sklearn.linear_model import LogisticRegression |
|
|
from sklearn.metrics import classification_report, confusion_matrix |
|
|
from sklearn.model_selection import StratifiedKFold, cross_val_predict, cross_validate |
|
|
from sklearn.pipeline import Pipeline |
|
|
from sklearn.preprocessing import OneHotEncoder, StandardScaler |
|
|
|
|
|
|
|
|
try: |
|
|
from tqdm import tqdm |
|
|
except ImportError: |
|
|
tqdm = None |
|
|
|
|
|
try: |
|
|
from xgboost import XGBClassifier |
|
|
except ImportError: |
|
|
XGBClassifier = None |
|
|
|
|
|
try: |
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
|
from scipy.stats import entropy |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
except ImportError: |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
print("[!] transformers or torch not available. FinSentLLM feature engineering will be disabled.") |
|
|
|
|
|
from sklearn.base import BaseEstimator, TransformerMixin |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DatasetPaths: |
|
|
dataset: str |
|
|
prob_features_csv: str |
|
|
multi_llm_csv: str |
|
|
semantics_csv: str |
|
|
|
|
|
|
|
|
def infer_paths(dataset: str, base_dir: str = "outputs") -> DatasetPaths: |
|
|
dtag = dataset.strip() |
|
|
prob_csv = os.path.join(base_dir, "prob features", f"FinSent_{dtag}_raw_probs_prob_features.csv") |
|
|
multi_csv = os.path.join(base_dir, "MultiLLM", f"FPB_MultiLLM_{dtag}.csv") |
|
|
|
|
|
sem_csv = os.path.join(base_dir, "Structures Financial Semantics", f"Sentences_{dtag}_semantics.csv") |
|
|
return DatasetPaths(dataset=dtag, prob_features_csv=prob_csv, multi_llm_csv=multi_csv, semantics_csv=sem_csv) |
|
|
|
|
|
|
|
|
def _merge_features(left: pd.DataFrame, right: pd.DataFrame, key: str = "doc_id") -> pd.DataFrame: |
|
|
"""Merge two DataFrames on ``doc_id`` while dropping duplicate feature columns.""" |
|
|
overlap = [c for c in right.columns if c in left.columns and c != key] |
|
|
right_clean = right.drop(columns=overlap, errors="ignore") |
|
|
merged = left.merge(right_clean, on=key, how="left", validate="one_to_one") |
|
|
return merged |
|
|
|
|
|
|
|
|
def load_feature_table(paths: DatasetPaths) -> pd.DataFrame: |
|
|
if not os.path.exists(paths.multi_llm_csv): |
|
|
raise FileNotFoundError(f"Missing Multi-LLM feature CSV: {paths.multi_llm_csv}") |
|
|
base = pd.read_csv(paths.multi_llm_csv) |
|
|
|
|
|
|
|
|
if "doc_id" not in base.columns: |
|
|
raise KeyError("Expected 'doc_id' column in Multi-LLM CSV. Re-run stage 3 feature extraction.") |
|
|
|
|
|
|
|
|
if os.path.exists(paths.prob_features_csv): |
|
|
prob = pd.read_csv(paths.prob_features_csv) |
|
|
if "doc_id" not in prob.columns: |
|
|
raise KeyError("Probability features CSV must contain 'doc_id'.") |
|
|
base = _merge_features(base, prob, key="doc_id") |
|
|
else: |
|
|
print(f"[!] Probability feature CSV not found ({paths.prob_features_csv}); proceeding without extra columns.") |
|
|
|
|
|
|
|
|
if not os.path.exists(paths.semantics_csv): |
|
|
raise FileNotFoundError(f"Missing semantics CSV: {paths.semantics_csv}") |
|
|
sem = pd.read_csv(paths.semantics_csv) |
|
|
if "doc_id" not in sem.columns: |
|
|
if "id" in sem.columns: |
|
|
sem = sem.rename(columns={"id": "doc_id"}) |
|
|
else: |
|
|
raise KeyError("Semantics CSV must contain 'doc_id' or 'id' column.") |
|
|
|
|
|
sem = sem.drop(columns=[c for c in ["label", "sentence", "text"] if c in sem.columns], errors="ignore") |
|
|
merged = _merge_features(base, sem, key="doc_id") |
|
|
|
|
|
|
|
|
sem_cols = [c for c in merged.columns if c.startswith("sem_")] |
|
|
if sem_cols: |
|
|
missing_sem = merged[sem_cols].isna().any(axis=1) |
|
|
if missing_sem.any(): |
|
|
raise ValueError( |
|
|
f"{int(missing_sem.sum())} rows lack structured semantics after merging. Make sure the semantics file" |
|
|
" matches the dataset split." |
|
|
) |
|
|
|
|
|
return merged |
|
|
|
|
|
|
|
|
def load_best_iterations(results_dir: str = "results") -> Dict[str, int]: |
|
|
"""Load previously computed best iterations for XGBoost models. |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping dataset names to best iteration counts. |
|
|
Returns empty dict if file not found. |
|
|
""" |
|
|
best_iters_file = os.path.join(results_dir, "xgb_meta_best_iterations.csv") |
|
|
|
|
|
if not os.path.exists(best_iters_file): |
|
|
print(f"[!] Best iterations file not found: {best_iters_file}") |
|
|
return {} |
|
|
|
|
|
try: |
|
|
df = pd.read_csv(best_iters_file) |
|
|
|
|
|
best_iters = {} |
|
|
for _, row in df.iterrows(): |
|
|
dataset = row["meta"] |
|
|
best_iter = int(row["best_iteration"]) |
|
|
best_iters[dataset] = best_iter |
|
|
|
|
|
print(f"[✓] Loaded best iterations for {len(best_iters)} datasets:") |
|
|
for dataset, iter_count in best_iters.items(): |
|
|
print(f" {dataset}: {iter_count} iterations") |
|
|
|
|
|
return best_iters |
|
|
except Exception as e: |
|
|
print(f"[!] Error loading best iterations: {e}") |
|
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FinSentLLMFeatureEngineering(BaseEstimator, TransformerMixin): |
|
|
""" |
|
|
端到端特征工程转换器,将原始文本转换为FinSentLLM的36个特征。 |
|
|
包括FinBERT/RoBERTa推理、概率工程、MultiLLM特征和语义特征。 |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
finbert_model_id="ProsusAI/finbert", |
|
|
roberta_model_id="cardiffnlp/twitter-roberta-base-sentiment", |
|
|
batch_size=16, |
|
|
max_length=128, |
|
|
device=None): |
|
|
self.finbert_model_id = finbert_model_id |
|
|
self.roberta_model_id = roberta_model_id |
|
|
self.batch_size = batch_size |
|
|
self.max_length = max_length |
|
|
self.device = device |
|
|
self.class_names = ["negative", "neutral", "positive"] |
|
|
|
|
|
|
|
|
self.finbert_tokenizer = None |
|
|
self.finbert_model = None |
|
|
self.roberta_tokenizer = None |
|
|
self.roberta_model = None |
|
|
self._device = None |
|
|
|
|
|
def _load_models(self): |
|
|
"""加载FinBERT和RoBERTa模型""" |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
raise ImportError("transformers and torch are required for FinSentLLM feature engineering") |
|
|
|
|
|
print("[📥] Loading FinBERT and RoBERTa models...") |
|
|
|
|
|
|
|
|
if self.device is None: |
|
|
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
else: |
|
|
self._device = torch.device(self.device) |
|
|
|
|
|
|
|
|
self.finbert_tokenizer = AutoTokenizer.from_pretrained(self.finbert_model_id) |
|
|
self.finbert_model = AutoModelForSequenceClassification.from_pretrained(self.finbert_model_id) |
|
|
self.finbert_model.to(self._device).eval() |
|
|
|
|
|
|
|
|
self.roberta_tokenizer = AutoTokenizer.from_pretrained(self.roberta_model_id) |
|
|
self.roberta_model = AutoModelForSequenceClassification.from_pretrained(self.roberta_model_id) |
|
|
self.roberta_model.to(self._device).eval() |
|
|
|
|
|
print(f"[✅] Models loaded on {self._device}") |
|
|
|
|
|
@torch.no_grad() |
|
|
def _get_probabilities(self, texts, tokenizer, model): |
|
|
"""获取模型的概率预测,带tqdm进度条""" |
|
|
all_probs = [] |
|
|
total = len(texts) |
|
|
batch_iter = range(0, total, self.batch_size) |
|
|
use_tqdm = tqdm is not None and total > self.batch_size |
|
|
iterator = tqdm(batch_iter, desc="[tqdm] Encoding & inference", unit="batch") if use_tqdm else batch_iter |
|
|
for i in iterator: |
|
|
batch = texts[i:i + self.batch_size] |
|
|
|
|
|
encoding = tokenizer( |
|
|
batch, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
padding=True, |
|
|
max_length=self.max_length |
|
|
) |
|
|
|
|
|
encoding = {k: v.to(self._device) for k, v in encoding.items()} |
|
|
|
|
|
logits = model(**encoding).logits |
|
|
probs = torch.softmax(logits, dim=-1).cpu().numpy() |
|
|
all_probs.append(probs) |
|
|
return np.vstack(all_probs) |
|
|
|
|
|
def _build_features(self, finbert_probs, roberta_probs): |
|
|
"""构建完整的36个特征""" |
|
|
eps = 1e-12 |
|
|
features = {} |
|
|
n_samples = len(finbert_probs) |
|
|
|
|
|
|
|
|
for i, cls in enumerate(self.class_names): |
|
|
features[f"fin_p_{cls[:3]}"] = finbert_probs[:, i] |
|
|
features[f"rob_p_{cls[:3]}"] = roberta_probs[:, i] |
|
|
|
|
|
features["fin_score"] = finbert_probs.max(axis=1) |
|
|
features["rob_score"] = roberta_probs.max(axis=1) |
|
|
|
|
|
|
|
|
features["fin_label"] = finbert_probs.argmax(axis=1) |
|
|
features["rob_label"] = roberta_probs.argmax(axis=1) |
|
|
|
|
|
|
|
|
|
|
|
for i, cls in enumerate(self.class_names): |
|
|
features[f"fin_logit_{cls[:3]}"] = np.log((finbert_probs[:, i] + eps) / (1 - finbert_probs[:, i] + eps)) |
|
|
features[f"rob_logit_{cls[:3]}"] = np.log((roberta_probs[:, i] + eps) / (1 - roberta_probs[:, i] + eps)) |
|
|
|
|
|
|
|
|
features["fin_max_prob"] = finbert_probs.max(axis=1) |
|
|
features["rob_max_prob"] = roberta_probs.max(axis=1) |
|
|
|
|
|
|
|
|
fin_sorted = np.sort(finbert_probs, axis=1) |
|
|
rob_sorted = np.sort(roberta_probs, axis=1) |
|
|
features["fin_margin"] = fin_sorted[:, -1] - fin_sorted[:, -2] |
|
|
features["rob_margin"] = rob_sorted[:, -1] - rob_sorted[:, -2] |
|
|
|
|
|
|
|
|
features["fin_entropy"] = entropy(finbert_probs.T) |
|
|
features["rob_entropy"] = entropy(roberta_probs.T) |
|
|
|
|
|
|
|
|
|
|
|
l1_dist = np.abs(finbert_probs - roberta_probs).sum(axis=1) |
|
|
features["MultiLLM_L1_distance"] = l1_dist |
|
|
features["MultiLLM_L1_similarity"] = 1 / (1 + l1_dist) |
|
|
|
|
|
|
|
|
features["MultiLLM_KL_F_to_R"] = entropy(finbert_probs.T, roberta_probs.T) |
|
|
features["MultiLLM_KL_R_to_F"] = entropy(roberta_probs.T, finbert_probs.T) |
|
|
|
|
|
|
|
|
fin_pred = finbert_probs.argmax(axis=1) |
|
|
rob_pred = roberta_probs.argmax(axis=1) |
|
|
features["MultiLLM_agree"] = (fin_pred == rob_pred).astype(int) |
|
|
|
|
|
|
|
|
|
|
|
features["sem_compared"] = ((finbert_probs[:, 1] > 0.4) & (roberta_probs[:, 1] > 0.4)).astype(int) |
|
|
features["sem_loss_improve"] = ((finbert_probs[:, 2] > 0.6) & (roberta_probs[:, 2] > 0.5)).astype(int) |
|
|
features["sem_loss_worsen"] = ((finbert_probs[:, 0] > 0.6) & (roberta_probs[:, 0] > 0.5)).astype(int) |
|
|
features["sem_profit_up"] = ((finbert_probs[:, 2] > 0.7) & (l1_dist < 0.3)).astype(int) |
|
|
features["sem_cost_down"] = ((finbert_probs[:, 2] > 0.5) & (features["MultiLLM_agree"] == 1)).astype(int) |
|
|
features["sem_contract_fin"] = ((finbert_probs[:, 1] > 0.8)).astype(int) |
|
|
features["sem_uncertainty"] = ((features["fin_entropy"] > 1.0) | (features["rob_entropy"] > 1.0)).astype(int) |
|
|
features["sem_stable_guidance"] = ((l1_dist < 0.2) & (finbert_probs[:, 1] > 0.5)).astype(int) |
|
|
features["sem_operational"] = ((finbert_probs[:, 1] > 0.3) & (roberta_probs[:, 1] > 0.3)).astype(int) |
|
|
|
|
|
return pd.DataFrame(features) |
|
|
|
|
|
def fit(self, X, y=None): |
|
|
"""训练阶段 - 加载模型""" |
|
|
self._load_models() |
|
|
return self |
|
|
|
|
|
def transform(self, X): |
|
|
"""转换阶段 - 将文本转换为特征""" |
|
|
if self.finbert_model is None: |
|
|
raise RuntimeError("Models not loaded. Call fit() first.") |
|
|
|
|
|
|
|
|
if isinstance(X, pd.DataFrame): |
|
|
if 'text' in X.columns: |
|
|
texts = X['text'].tolist() |
|
|
elif len(X.columns) == 1: |
|
|
texts = X.iloc[:, 0].tolist() |
|
|
else: |
|
|
raise ValueError("DataFrame must have 'text' column or single column") |
|
|
elif isinstance(X, (list, np.ndarray)): |
|
|
texts = list(X) |
|
|
else: |
|
|
raise ValueError("X must be DataFrame, list, or array") |
|
|
|
|
|
print(f"[🔮] Processing {len(texts)} texts...") |
|
|
|
|
|
|
|
|
finbert_probs = self._get_probabilities(texts, self.finbert_tokenizer, self.finbert_model) |
|
|
roberta_probs = self._get_probabilities(texts, self.roberta_tokenizer, self.roberta_model) |
|
|
|
|
|
|
|
|
features_df = self._build_features(finbert_probs, roberta_probs) |
|
|
|
|
|
print(f"[✅] Generated {len(features_df.columns)} features") |
|
|
return features_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer: |
|
|
transformers = [] |
|
|
if numeric_cols: |
|
|
transformers.append(("num", StandardScaler(), numeric_cols)) |
|
|
if categorical_cols: |
|
|
transformers.append(("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical_cols)) |
|
|
if not transformers: |
|
|
raise ValueError("No feature columns selected – check your dataset.") |
|
|
return ColumnTransformer(transformers=transformers, remainder="drop") |
|
|
|
|
|
|
|
|
def build_pipelines( |
|
|
numeric_cols: List[str], |
|
|
categorical_cols: List[str], |
|
|
num_classes: int, |
|
|
random_state: int, |
|
|
models_requested: Iterable[str], |
|
|
dataset: str = "", |
|
|
best_iterations: Dict[str, int] = None, |
|
|
include_feature_engineering: bool = False, |
|
|
) -> Dict[str, Pipeline]: |
|
|
pipelines: Dict[str, Pipeline] = {} |
|
|
|
|
|
|
|
|
|
|
|
end2end_categorical_features = ["fin_label", "rob_label"] |
|
|
end2end_numeric_features = [ |
|
|
'fin_p_neg', 'fin_p_neu', 'fin_p_pos', 'fin_score', |
|
|
'rob_p_neg', 'rob_p_neu', 'rob_p_pos', 'rob_score', |
|
|
'fin_logit_neg', 'fin_logit_neu', 'fin_logit_pos', |
|
|
'rob_logit_neg', 'rob_logit_neu', 'rob_logit_pos', |
|
|
'fin_max_prob', 'rob_max_prob', 'fin_margin', 'rob_margin', |
|
|
'fin_entropy', 'rob_entropy', |
|
|
'MultiLLM_L1_distance', 'MultiLLM_L1_similarity', |
|
|
'MultiLLM_KL_F_to_R', 'MultiLLM_KL_R_to_F', 'MultiLLM_agree', |
|
|
'sem_compared', 'sem_loss_improve', 'sem_loss_worsen', |
|
|
'sem_profit_up', 'sem_cost_down', 'sem_contract_fin', |
|
|
'sem_uncertainty', 'sem_stable_guidance', 'sem_operational' |
|
|
] |
|
|
|
|
|
if "logreg" in models_requested: |
|
|
logreg = LogisticRegression(max_iter=1000, solver="lbfgs") |
|
|
if include_feature_engineering: |
|
|
preprocessor = build_preprocessor(end2end_numeric_features, end2end_categorical_features) |
|
|
pipelines["logreg"] = Pipeline([ |
|
|
("feature_engineering", FinSentLLMFeatureEngineering()), |
|
|
("preprocess", preprocessor), |
|
|
("clf", logreg), |
|
|
]) |
|
|
else: |
|
|
preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
|
|
pipelines["logreg"] = Pipeline([ |
|
|
("preprocess", preprocessor), |
|
|
("clf", logreg), |
|
|
]) |
|
|
|
|
|
if "xgboost" in models_requested: |
|
|
if XGBClassifier is None: |
|
|
raise ImportError( |
|
|
"xgboost is not installed. Install it with 'pip install xgboost' or remove 'xgboost' from --models." |
|
|
) |
|
|
if best_iterations and dataset in best_iterations: |
|
|
n_estimators = best_iterations[dataset] |
|
|
print(f"[✓] Using pre-computed best iterations for {dataset}: {n_estimators}") |
|
|
else: |
|
|
n_estimators = 1000 |
|
|
print(f"[!] No pre-computed iterations found for {dataset}, using default: {n_estimators}") |
|
|
xgb = XGBClassifier( |
|
|
objective="multi:softprob", |
|
|
num_class=num_classes, |
|
|
learning_rate=0.05, |
|
|
max_depth=6, |
|
|
subsample=0.8, |
|
|
colsample_bytree=0.8, |
|
|
n_estimators=n_estimators, |
|
|
min_child_weight=2, |
|
|
reg_lambda=1.0, |
|
|
reg_alpha=0.0, |
|
|
tree_method="hist", |
|
|
eval_metric="mlogloss", |
|
|
random_state=random_state, |
|
|
n_jobs=0, |
|
|
verbosity=0, |
|
|
) |
|
|
if include_feature_engineering: |
|
|
feature_preprocessor = build_preprocessor(end2end_numeric_features, end2end_categorical_features) |
|
|
pipelines["xgboost"] = Pipeline([ |
|
|
("feature_engineering", FinSentLLMFeatureEngineering()), |
|
|
("preprocess", feature_preprocessor), |
|
|
("clf", xgb), |
|
|
]) |
|
|
print(f"[🤖] Created end-to-end XGBoost pipeline with feature engineering") |
|
|
else: |
|
|
preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
|
|
pipelines["xgboost"] = Pipeline([ |
|
|
("preprocess", preprocessor), |
|
|
("clf", xgb), |
|
|
]) |
|
|
|
|
|
return pipelines |
|
|
|
|
|
if "logreg" in models_requested: |
|
|
preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
|
|
logreg = LogisticRegression(max_iter=1000, solver="lbfgs") |
|
|
pipelines["logreg"] = Pipeline([ |
|
|
("preprocess", preprocessor), |
|
|
("clf", logreg), |
|
|
]) |
|
|
|
|
|
if "xgboost" in models_requested: |
|
|
if XGBClassifier is None: |
|
|
raise ImportError( |
|
|
"xgboost is not installed. Install it with 'pip install xgboost' or remove 'xgboost' from --models." |
|
|
) |
|
|
preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
|
|
|
|
|
|
|
|
if best_iterations and dataset in best_iterations: |
|
|
n_estimators = best_iterations[dataset] |
|
|
print(f"[✓] Using pre-computed best iterations for {dataset}: {n_estimators}") |
|
|
else: |
|
|
n_estimators = 1000 |
|
|
print(f"[!] No pre-computed iterations found for {dataset}, using default: {n_estimators}") |
|
|
|
|
|
xgb = XGBClassifier( |
|
|
objective="multi:softprob", |
|
|
num_class=num_classes, |
|
|
learning_rate=0.05, |
|
|
max_depth=6, |
|
|
subsample=0.8, |
|
|
colsample_bytree=0.8, |
|
|
n_estimators=n_estimators, |
|
|
min_child_weight=2, |
|
|
reg_lambda=1.0, |
|
|
reg_alpha=0.0, |
|
|
tree_method="hist", |
|
|
eval_metric="mlogloss", |
|
|
random_state=random_state, |
|
|
n_jobs=0, |
|
|
verbosity=0, |
|
|
) |
|
|
pipelines["xgboost"] = Pipeline([ |
|
|
("preprocess", preprocessor), |
|
|
("clf", xgb), |
|
|
]) |
|
|
|
|
|
return pipelines |
|
|
|
|
|
|
|
|
def evaluate_model( |
|
|
name: str, |
|
|
pipeline: Pipeline, |
|
|
X: pd.DataFrame, |
|
|
y_train: pd.Series, |
|
|
y_eval: pd.Series, |
|
|
cv: StratifiedKFold, |
|
|
class_labels: List[str], |
|
|
label_decoder: Optional[Dict[int, str]] = None, |
|
|
) -> Dict[str, object]: |
|
|
scoring = {"accuracy": "accuracy", "macro_f1": "f1_macro"} |
|
|
|
|
|
scores = cross_validate( |
|
|
pipeline, |
|
|
X, |
|
|
y_train, |
|
|
scoring=scoring, |
|
|
cv=cv, |
|
|
n_jobs=None, |
|
|
return_estimator=False, |
|
|
) |
|
|
|
|
|
preds = cross_val_predict(pipeline, X, y_train, cv=cv, method="predict") |
|
|
probas = cross_val_predict(pipeline, X, y_train, cv=cv, method="predict_proba") |
|
|
|
|
|
|
|
|
fitted = pipeline.fit(X, y_train) |
|
|
|
|
|
clf_raw_classes = list(fitted.named_steps["clf"].classes_) |
|
|
|
|
|
if label_decoder: |
|
|
preds_decoded = np.array([label_decoder[int(p)] for p in preds]) |
|
|
proba_labels = [label_decoder[int(c)] for c in clf_raw_classes] |
|
|
else: |
|
|
preds_decoded = preds |
|
|
proba_labels = [str(c) for c in clf_raw_classes] |
|
|
|
|
|
if proba_labels != class_labels: |
|
|
reorder_idx = [proba_labels.index(lbl) for lbl in class_labels] |
|
|
probas = probas[:, reorder_idx] |
|
|
proba_labels = class_labels |
|
|
|
|
|
y_eval_array = y_eval.to_numpy() |
|
|
|
|
|
report = classification_report(y_eval_array, preds_decoded, labels=class_labels, digits=4) |
|
|
cm = confusion_matrix(y_eval_array, preds_decoded, labels=class_labels) |
|
|
|
|
|
metrics = { |
|
|
"name": name, |
|
|
"accuracy_mean": float(np.mean(scores["test_accuracy"])), |
|
|
"accuracy_std": float(np.std(scores["test_accuracy"])), |
|
|
"macro_f1_mean": float(np.mean(scores["test_macro_f1"])), |
|
|
"macro_f1_std": float(np.std(scores["test_macro_f1"])), |
|
|
"classification_report": report, |
|
|
"confusion_matrix": cm, |
|
|
"classes": class_labels, |
|
|
"preds": preds_decoded, |
|
|
"probas": probas, |
|
|
"final_model": fitted, |
|
|
} |
|
|
|
|
|
|
|
|
if name == "xgboost": |
|
|
if hasattr(fitted.named_steps["clf"], "best_iteration"): |
|
|
metrics["best_iteration"] = fitted.named_steps["clf"].best_iteration |
|
|
elif hasattr(fitted.named_steps["clf"], "n_estimators"): |
|
|
metrics["best_iteration"] = fitted.named_steps["clf"].n_estimators |
|
|
metrics["best_ntree_limit"] = metrics.get("best_iteration", 0) + 1 |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
def print_metrics(metrics: Dict[str, object], verbose: bool = False) -> None: |
|
|
name = metrics["name"] |
|
|
print(f"\n=== {name.upper()} meta-classifier ===") |
|
|
print( |
|
|
f"Accuracy: {metrics['accuracy_mean']*100:.2f}% ± {metrics['accuracy_std']*100:.2f}%\n" |
|
|
f"Macro-F1: {metrics['macro_f1_mean']*100:.2f}% ± {metrics['macro_f1_std']*100:.2f}%" |
|
|
) |
|
|
if verbose: |
|
|
print("\nClassification report:\n", metrics["classification_report"], sep="") |
|
|
print("Confusion matrix (rows=true, cols=pred):") |
|
|
classes = metrics["classes"] |
|
|
header = " " + " ".join(f"{c[:7]:>7}" for c in classes) |
|
|
print(header) |
|
|
for c, row in zip(classes, metrics["confusion_matrix"]): |
|
|
row_fmt = " ".join(f"{int(v):>7}" for v in row) |
|
|
print(f"{c[:7]:>4} {row_fmt}") |
|
|
|
|
|
|
|
|
def save_predictions(base: pd.DataFrame, metrics: Dict[str, object], path: str) -> None: |
|
|
pred_df = base[["doc_id"]].copy() |
|
|
pred_df["true_label"] = base["label"] |
|
|
pred_df["meta_pred"] = metrics["preds"] |
|
|
for idx, cls in enumerate(metrics["classes"]): |
|
|
pred_df[f"meta_proba_{cls}"] = metrics["probas"][:, idx] |
|
|
pred_df.to_csv(path, index=False) |
|
|
print(f"Saved predictions: {path}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description="Train FinSentLLM meta-classifiers (LogReg/XGBoost).") |
|
|
parser.add_argument("--dataset", required=True, help="Dataset tag, e.g. 50Agree | 66Agree | 75Agree | AllAgree") |
|
|
parser.add_argument("--prob_features_csv", help="Override path to probability feature CSV") |
|
|
parser.add_argument("--multi_llm_csv", help="Override path to Multi-LLM feature CSV") |
|
|
parser.add_argument("--semantics_csv", help="Override path to structured semantics CSV") |
|
|
parser.add_argument("--folds", type=int, default=5, help="Number of stratified CV folds (default: 5)") |
|
|
parser.add_argument("--seed", type=int, default=7, help="Random seed for CV shuffling (default: 7)") |
|
|
parser.add_argument( |
|
|
"--models", |
|
|
nargs="+", |
|
|
default=["logreg", "xgboost"], |
|
|
choices=["logreg", "xgboost"], |
|
|
help="Which meta-models to evaluate (default: both)", |
|
|
) |
|
|
parser.add_argument("--artifact_prefix", help="If set, saves artifacts using this filepath prefix") |
|
|
parser.add_argument("--out_dir", default="outputs", help="Base output directory") |
|
|
parser.add_argument("--meta_xgb_dir", default="Meta-Classifier_XG_boost_es_optimized", help="Subdir for xgboost artifacts") |
|
|
parser.add_argument("--meta_logreg_dir", default="Meta-Classifier-log_regression", help="Subdir for logreg artifacts") |
|
|
parser.add_argument("--save_predictions", action="store_true", help="Write out-of-fold predictions per model") |
|
|
parser.add_argument("--save_models", action="store_true", help="Persist fitted pipelines per model") |
|
|
parser.add_argument("--verbose", action="store_true", help="Print full reports and confusion matrices") |
|
|
|
|
|
parser.add_argument("--end_to_end", action="store_true", default=False, help="[慢] 用大模型重新生成特征 (不建议,除非你需要全流程推理)") |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
|
|
|
args = parse_args() |
|
|
|
|
|
|
|
|
best_iterations = load_best_iterations() |
|
|
|
|
|
|
|
|
if args.end_to_end and (args.prob_features_csv or args.multi_llm_csv or args.semantics_csv): |
|
|
print("[警告] --end_to_end 模式下会忽略所有预处理特征文件,全部重新推理,速度极慢!") |
|
|
|
|
|
if args.end_to_end: |
|
|
print("[🤖] Creating end-to-end pipelines with feature engineering... (速度极慢,仅用于全流程推理)") |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
raise ImportError("transformers and torch are required for end-to-end feature engineering. Install with: pip install transformers torch") |
|
|
|
|
|
|
|
|
paths = infer_paths(args.dataset) |
|
|
data = load_feature_table(paths) |
|
|
|
|
|
|
|
|
text_col = None |
|
|
for col in ['text', 'sentence', 'content']: |
|
|
if col in data.columns: |
|
|
text_col = col |
|
|
break |
|
|
|
|
|
if text_col is None: |
|
|
raise ValueError("End-to-end mode requires text data, but no text column found in dataset") |
|
|
|
|
|
X_text = data[[text_col]] |
|
|
target_col = "label" |
|
|
if target_col not in data.columns: |
|
|
raise KeyError("Target column 'label' not found after merging.") |
|
|
|
|
|
y = data[target_col].astype(str) |
|
|
|
|
|
default_order = ["negative", "neutral", "positive"] |
|
|
observed = list(pd.unique(y)) |
|
|
class_labels = [lbl for lbl in default_order if lbl in observed] |
|
|
class_labels += [lbl for lbl in observed if lbl not in class_labels] |
|
|
|
|
|
label_to_int = {lbl: idx for idx, lbl in enumerate(class_labels)} |
|
|
int_to_label = {idx: lbl for lbl, idx in label_to_int.items()} |
|
|
y_encoded = y.map(label_to_int).astype(int) |
|
|
|
|
|
pipelines = build_pipelines( |
|
|
numeric_cols=[], |
|
|
categorical_cols=[], |
|
|
num_classes=len(class_labels), |
|
|
random_state=args.seed, |
|
|
models_requested=args.models, |
|
|
dataset=args.dataset, |
|
|
best_iterations=best_iterations, |
|
|
include_feature_engineering=True, |
|
|
) |
|
|
X = X_text |
|
|
else: |
|
|
|
|
|
paths = infer_paths(args.dataset) |
|
|
if args.prob_features_csv: |
|
|
paths.prob_features_csv = args.prob_features_csv |
|
|
if args.multi_llm_csv: |
|
|
paths.multi_llm_csv = args.multi_llm_csv |
|
|
if args.semantics_csv: |
|
|
paths.semantics_csv = args.semantics_csv |
|
|
|
|
|
data = load_feature_table(paths) |
|
|
|
|
|
target_col = "label" |
|
|
if target_col not in data.columns: |
|
|
raise KeyError("Target column 'label' not found after merging.") |
|
|
|
|
|
categorical_cols = [c for c in ["fin_label", "rob_label"] if c in data.columns] |
|
|
numeric_cols = [ |
|
|
c for c in data.select_dtypes(include=[np.number]).columns |
|
|
if c not in {"doc_id"} |
|
|
] |
|
|
|
|
|
X = data[numeric_cols + categorical_cols] |
|
|
y = data[target_col].astype(str) |
|
|
|
|
|
default_order = ["negative", "neutral", "positive"] |
|
|
observed = list(pd.unique(y)) |
|
|
class_labels = [lbl for lbl in default_order if lbl in observed] |
|
|
class_labels += [lbl for lbl in observed if lbl not in class_labels] |
|
|
|
|
|
label_to_int = {lbl: idx for idx, lbl in enumerate(class_labels)} |
|
|
int_to_label = {idx: lbl for lbl, idx in label_to_int.items()} |
|
|
y_encoded = y.map(label_to_int).astype(int) |
|
|
|
|
|
pipelines = build_pipelines( |
|
|
numeric_cols=numeric_cols, |
|
|
categorical_cols=categorical_cols, |
|
|
num_classes=len(class_labels), |
|
|
random_state=args.seed, |
|
|
models_requested=args.models, |
|
|
dataset=args.dataset, |
|
|
best_iterations=best_iterations, |
|
|
include_feature_engineering=False, |
|
|
) |
|
|
|
|
|
cv = StratifiedKFold(n_splits=args.folds, shuffle=True, random_state=args.seed) |
|
|
|
|
|
results = {} |
|
|
for name, pipeline in pipelines.items(): |
|
|
if name == "xgboost": |
|
|
metrics = evaluate_model( |
|
|
name, |
|
|
pipeline, |
|
|
X, |
|
|
y_encoded, |
|
|
y, |
|
|
cv=cv, |
|
|
class_labels=class_labels, |
|
|
label_decoder=int_to_label, |
|
|
) |
|
|
else: |
|
|
metrics = evaluate_model( |
|
|
name, |
|
|
pipeline, |
|
|
X, |
|
|
y, |
|
|
y, |
|
|
cv=cv, |
|
|
class_labels=class_labels, |
|
|
) |
|
|
print_metrics(metrics, verbose=args.verbose) |
|
|
results[name] = metrics |
|
|
|
|
|
if args.artifact_prefix and args.save_predictions: |
|
|
pred_path = f"{args.artifact_prefix}_{name}_predictions.csv" |
|
|
save_predictions(data, metrics, pred_path) |
|
|
|
|
|
if args.artifact_prefix and args.save_models: |
|
|
model_path = f"{args.artifact_prefix}_{name}_model.joblib" |
|
|
|
|
|
if name == "xgboost": |
|
|
model_dict = { |
|
|
"pipeline": metrics["final_model"], |
|
|
"feature_columns": list(X.columns), |
|
|
"label_map": label_to_int, |
|
|
"labels": class_labels, |
|
|
"best_iteration": metrics.get("best_iteration", 0), |
|
|
"best_ntree_limit": metrics.get("best_ntree_limit", 1), |
|
|
} |
|
|
joblib.dump(model_dict, model_path) |
|
|
else: |
|
|
joblib.dump(metrics["final_model"], model_path) |
|
|
print(f"Saved model: {model_path}") |
|
|
|
|
|
|
|
|
if not args.artifact_prefix and args.save_predictions: |
|
|
if name == "xgboost": |
|
|
save_dir = os.path.join(args.out_dir, args.meta_xgb_dir) |
|
|
else: |
|
|
save_dir = os.path.join(args.out_dir, args.meta_logreg_dir) |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
pred_path = os.path.join(save_dir, f"FinSent_{args.dataset}_meta_{name}_predictions.csv") |
|
|
save_predictions(data, metrics, pred_path) |
|
|
|
|
|
if not args.artifact_prefix and args.save_models: |
|
|
if name == "xgboost": |
|
|
save_dir = os.path.join(args.out_dir, args.meta_xgb_dir) |
|
|
else: |
|
|
save_dir = os.path.join(args.out_dir, args.meta_logreg_dir) |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
model_path = os.path.join(save_dir, f"FinSent_{args.dataset}_meta_{name}_model.joblib") |
|
|
|
|
|
if name == "xgboost": |
|
|
model_dict = { |
|
|
"pipeline": metrics["final_model"], |
|
|
"feature_columns": list(X.columns), |
|
|
"label_map": label_to_int, |
|
|
"labels": class_labels, |
|
|
"best_iteration": metrics.get("best_iteration", 0), |
|
|
"best_ntree_limit": metrics.get("best_ntree_limit", 1), |
|
|
} |
|
|
joblib.dump(model_dict, model_path) |
|
|
else: |
|
|
joblib.dump(metrics["final_model"], model_path) |
|
|
print(f"Saved model: {model_path}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|