FinSentLLM / create_end_to_end_pipeline.py
jennyyu009's picture
Upload 3 files
93d8751 verified
import os
import sys
import joblib
import logging
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')
# 全局函数:将 DataFrame 强制转换为 float64 numpy 数组
def to_float64_array(df):
"""将 DataFrame 强制转换为 float64 numpy 数组"""
return np.asarray(df, dtype=np.float64)
# 新增函数:保持 DataFrame 结构,仅强制为 float64 类型
def enforce_float64_df(df):
"""保持 DataFrame 结构,仅强制为 float64 类型"""
if isinstance(df, pd.DataFrame):
return df.astype(np.float64)
else:
return pd.DataFrame(df, dtype=np.float64)
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 将当前目录添加到Python路径
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, current_dir)
# 导入必要的库用于特征工程
try:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from scipy.stats import entropy
import re
logger.info("✅ 成功导入所有必要的库")
except ImportError as e:
logger.error(f"❌ 缺少必要的库: {e}")
sys.exit(1)
class FinSentLLMFeatureEngineering(BaseEstimator, TransformerMixin):
"""
金融情感分析特征工程器
集成FinBERT、RoBERTa、MultiLLM和语义特征
"""
def __init__(self):
self.finbert_tokenizer = None
self.finbert_model = None
self.roberta_tokenizer = None
self.roberta_model = None
def fit(self, X, y=None):
"""拟合阶段,加载模型"""
logger.info("🔄 正在加载FinBERT和RoBERTa模型...")
try:
# 加载FinBERT
self.finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
self.finbert_model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
# 加载RoBERTa
self.roberta_tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest")
self.roberta_model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest")
logger.info("✅ 模型加载完成")
except Exception as e:
logger.error(f"❌ 模型加载失败: {e}")
raise
return self
def transform(self, X):
"""转换阶段,提取特征"""
logger.info(f"🔄 正在为{len(X)}个样本提取特征...")
if isinstance(X, pd.Series):
texts = X.tolist()
elif isinstance(X, list):
texts = X
else:
texts = X.flatten().tolist()
features = []
for i, text in enumerate(texts):
if i % 100 == 0:
logger.info(f"处理进度: {i}/{len(texts)}")
text_features = self._build_features(text)
features.append(text_features)
feature_columns = [
'fin_p_neg', 'fin_p_neu', 'fin_p_pos', 'fin_score',
'rob_p_neg', 'rob_p_neu', 'rob_p_pos', 'rob_score',
'fin_logit_neg', 'fin_logit_neu', 'fin_logit_pos',
'fin_max_prob', 'fin_margin', 'fin_entropy',
'rob_logit_neg', 'rob_logit_neu', 'rob_logit_pos',
'rob_max_prob', 'rob_margin', 'rob_entropy',
'MultiLLM_L1_distance', 'MultiLLM_L1_similarity',
'MultiLLM_KL_F_to_R', 'MultiLLM_KL_R_to_F', 'MultiLLM_agree',
'sem_compared', 'sem_loss_improve', 'sem_loss_worsen',
'sem_profit_up', 'sem_cost_down', 'sem_contract_fin',
'sem_uncertainty', 'sem_stable_guidance', 'sem_operational',
'fin_label', 'rob_label'
]
feature_df = pd.DataFrame(features, columns=feature_columns)
# 先全部转 float64
feature_df = feature_df.apply(pd.to_numeric, errors='coerce').fillna(0.0)
# ⚙️ 为保证 OneHotEncoder 稳定性,将类别列统一转为字符串
feature_df['fin_label'] = feature_df['fin_label'].astype(str)
feature_df['rob_label'] = feature_df['rob_label'].astype(str)
# 其余列保持 float64
for col in feature_df.columns:
if col not in ['fin_label', 'rob_label']:
feature_df[col] = feature_df[col].astype('float64')
# Debug 输出,方便定位潜在异常
print('DEBUG: feature_df.dtypes:')
print(feature_df.dtypes)
non_float_cols = feature_df.columns[~feature_df.dtypes.apply(lambda dt: np.issubdtype(dt, np.floating)) & (feature_df.columns != 'fin_label') & (feature_df.columns != 'rob_label')]
if len(non_float_cols) > 0:
print('⚠️ WARNING: Non-float columns detected:', list(non_float_cols))
print('DEBUG: feature_df.head():')
print(feature_df.head())
print('DEBUG: feature_df.info():')
print(feature_df.info())
print('DEBUG: feature_df unique types per column:')
for col in feature_df.columns:
unique_types = {type(x) for x in feature_df[col].values}
print(f'{col}: {unique_types}')
# ✅ 最终返回 DataFrame,保证与 sklearn / XGBoost 兼容
return feature_df
def _build_features(self, text):
"""为单个文本构建特征向量,强制全 float,异常填 0.0"""
features = []
try:
# 1. FinBERT概率特征 (3个)
finbert_probs = self._get_finbert_probabilities(text)
features.extend(finbert_probs)
# 2. FinBERT分数特征 (1个)
fin_score = max(finbert_probs)
features.append(fin_score)
# 3. RoBERTa概率特征 (3个)
roberta_probs = self._get_roberta_probabilities(text)
features.extend(roberta_probs)
# 4. RoBERTa分数特征 (1个)
rob_score = max(roberta_probs)
features.append(rob_score)
# 5. FinBERT logit特征 (3个)
fin_logits = self._get_finbert_logits(text)
features.extend(fin_logits)
# 6. FinBERT概率工程特征 (3个)
fin_max_prob = max(finbert_probs)
fin_margin = fin_max_prob - sorted(finbert_probs)[-2]
fin_entropy = entropy(finbert_probs)
features.extend([fin_max_prob, fin_margin, fin_entropy])
# 7. RoBERTa logit特征 (3个)
rob_logits = self._get_roberta_logits(text)
features.extend(rob_logits)
# 8. RoBERTa概率工程特征 (3个)
rob_max_prob = max(roberta_probs)
rob_margin = rob_max_prob - sorted(roberta_probs)[-2]
rob_entropy = entropy(roberta_probs)
features.extend([rob_max_prob, rob_margin, rob_entropy])
# 9. MultiLLM特征 (5个)
multillm_features = self._get_multillm_features(finbert_probs, roberta_probs)
features.extend(multillm_features)
# 10. 语义特征 (9个)
semantic_features = self._get_semantic_features(text)
features.extend(semantic_features)
# 11. 标签特征 (2个)
fin_label = np.argmax(finbert_probs)
rob_label = np.argmax(roberta_probs)
features.extend([fin_label, rob_label])
except Exception as e:
logger.error(f"特征构建异常: {e}, text={text}")
# 强制所有元素为 float,异常填 0.0
float_features = []
for x in features:
try:
float_features.append(float(x))
except Exception:
float_features.append(0.0)
return float_features
def _get_finbert_probabilities(self, text):
"""获取FinBERT概率"""
try:
inputs = self.finbert_tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.finbert_model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
return probabilities[0].tolist()
except:
return [0.33, 0.33, 0.34] # 默认均匀分布
def _get_roberta_probabilities(self, text):
"""获取RoBERTa概率"""
try:
inputs = self.roberta_tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.roberta_model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
return probabilities[0].tolist()
except:
return [0.33, 0.33, 0.34] # 默认均匀分布
def _get_finbert_logits(self, text):
"""获取FinBERT logits"""
try:
inputs = self.finbert_tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.finbert_model(**inputs)
logits = outputs.logits[0].tolist()
return logits
except:
return [0.0, 0.0, 0.0] # 默认值
def _get_roberta_logits(self, text):
"""获取RoBERTa logits"""
try:
inputs = self.roberta_tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = self.roberta_model(**inputs)
logits = outputs.logits[0].tolist()
return logits
except:
return [0.0, 0.0, 0.0] # 默认值
def _get_multillm_features(self, finbert_probs, roberta_probs):
"""MultiLLM特征(基于概率的计算)"""
features = []
# L1距离
l1_distance = sum(abs(fp - rp) for fp, rp in zip(finbert_probs, roberta_probs))
features.append(l1_distance)
# L1相似度
l1_similarity = 1.0 / (1.0 + l1_distance) # 修正为原始公式
features.append(l1_similarity)
# KL散度:FinBERT到RoBERTa
kl_f_to_r = entropy(finbert_probs, roberta_probs) if min(roberta_probs) > 0 else 0.0
features.append(kl_f_to_r)
# KL散度:RoBERTa到FinBERT
kl_r_to_f = entropy(roberta_probs, finbert_probs) if min(finbert_probs) > 0 else 0.0
features.append(kl_r_to_f)
# 一致性:预测是否一致
fin_pred = np.argmax(finbert_probs)
rob_pred = np.argmax(roberta_probs)
agree = 1.0 if fin_pred == rob_pred else 0.0
features.append(agree)
return features
def _get_semantic_features(self, text):
"""语义特征(9个特定特征)- 基于原始正则表达式模式"""
import re
features = []
text_lower = text.lower()
# 1. sem_compared - 比较相关(使用原始正则表达式)
compared_patterns = [
r"\bcompared\s+to\b", r"\bcompared\s+with\b", r"\bversus\b", r"\bvs\.?\b",
r"\bfrom\s+[-+]?\d+(?:\.\d+)?\s*(?:%|percent|percentage|[A-Za-z]+)?\s+to\s+[-+]?\d+(?:\.\d+)?\s*(?:%|percent|percentage|[A-Za-z]+)?\b"
]
sem_compared = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in compared_patterns))
features.append(sem_compared)
# 2. sem_loss_improve - 损失改善
loss_improve_patterns = [
r"\bloss(?:es)?\s+(?:narrowed|shr[aou]nk|decreased|fell|reduced)\b",
r"\bturn(?:ed)?\s+to\s+(?:profit|black)\b"
]
sem_loss_improve = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in loss_improve_patterns))
features.append(sem_loss_improve)
# 3. sem_loss_worsen - 损失恶化
loss_worsen_patterns = [
r"\bloss(?:es)?\s+(?:widened|grew|increased|rose|deepened)\b",
r"\bturn(?:ed)?\s+to\s+(?:loss|red)\b"
]
sem_loss_worsen = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in loss_worsen_patterns))
features.append(sem_loss_worsen)
# 4. sem_profit_up - 利润上升
profit_up_patterns = [
r"\b(profit|profits|net\s+income|earnings|ebit|ebitda|eps|roe|roi|return(?:s)?(?:\s+on\s+equity)?)\b.*\b(rose|grew|increased|up|higher|improved|jumped|surged|soared)\b",
r"\b(rose|grew|increased|up|higher|improved|jumped|surged|soared)\b.*\b(profit|profits|net\s+income|earnings|ebit|ebitda|eps|roe|roi|return(?:s)?(?:\s+on\s+equity)?)\b"
]
sem_profit_up = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in profit_up_patterns))
features.append(sem_profit_up)
# 5. sem_cost_down - 成本下降
cost_down_patterns = [
r"\b(cost|costs|expenses|opex|operating\s+expense(?:s)?)\b.*\b(fell|declined|decreased|lower|reduced|down)\b",
r"\b(fell|declined|decreased|lower|reduced|down)\b.*\b(cost|costs|expenses|opex|operating\s+expense(?:s)?)\b"
]
sem_cost_down = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in cost_down_patterns))
features.append(sem_cost_down)
# 6. sem_contract_fin - 合同金融
contract_fin_patterns = [
r"\b(agreement|deal|contract|order|purchase\s+order|framework\s+agreement)\b",
r"\b(bond|notes?|debenture|convertible|placement|issuance|issue|offering|ipo|follow-?on)\b",
r"\b(loan|credit\s+facility|credit\s+line|revolver|revolving\s+credit|financing)\b"
]
sem_contract_fin = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in contract_fin_patterns))
features.append(sem_contract_fin)
# 7. sem_uncertainty - 不确定性
uncertainty_patterns = [
r"\b(uncertain|uncertainty|cannot\s+be\s+determined|not\s+clear|unknown|unpredictable)\b",
r"\b(impairment|write-?down|one-?off|exceptional\s+(?:item|charge)|non-?recurring)\b",
r"\b(outlook\s+(?:uncertain|cloudy|cautious))\b"
]
sem_uncertainty = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in uncertainty_patterns))
features.append(sem_uncertainty)
# 8. sem_stable_guidance - 稳定指导
stable_guidance_patterns = [
r"\b(guidance|forecast|outlook)\s+(?:maintained|confirmed|reiterated|unchanged)\b",
r"\b(reiterated|maintained)\s+(?:its\s+)?(guidance|forecast|outlook)\b"
]
sem_stable_guidance = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in stable_guidance_patterns))
features.append(sem_stable_guidance)
# 9. sem_operational - 运营相关
operational_patterns = [
r"\b(restructuring|reorganization|spin-?off|divest(?:iture)?|asset\s+sale)\b",
r"\b(ban|suspension|halted|blocked|prohibited)\b",
r"\b(recall|probe|investigation|lawsuit|litigation|settlement)\b",
r"\b(layoffs?|headcount\s+reduction|cut\s+jobs|hiring\s+freeze)\b"
]
sem_operational = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in operational_patterns))
features.append(sem_operational)
return features
def create_end_to_end_pipeline(optimized_model_path, output_path):
"""
创建端到端流水线
Args:
optimized_model_path: 优化模型的路径
output_path: 输出流水线的路径
"""
logger.info(f"🔄 正在创建端到端流水线...")
logger.info(f"输入模型: {optimized_model_path}")
logger.info(f"输出路径: {output_path}")
try:
# 加载优化的模型
optimized_model = joblib.load(optimized_model_path)
# 检查模型格式
if isinstance(optimized_model, dict):
# 从字典中提取流水线
optimized_pipeline = optimized_model['pipeline']
logger.info(f"✅ 成功加载优化模型字典,提取流水线,步骤: {optimized_pipeline.steps}")
else:
# 直接是流水线对象
optimized_pipeline = optimized_model
logger.info(f"✅ 成功加载优化流水线,步骤: {optimized_pipeline.steps}")
# 提取预处理器和分类器
preprocessor = None
classifier = None
for step_name, step_obj in optimized_pipeline.steps:
if step_name == 'preprocess':
preprocessor = step_obj
elif step_name == 'clf':
classifier = step_obj
if preprocessor is None or classifier is None:
raise ValueError("无法从优化模型中提取预处理器或分类器")
# 创建特征工程器
feature_engineer = FinSentLLMFeatureEngineering()
end_to_end_pipeline = Pipeline([
('feature_engineering', feature_engineer),
('preprocess', preprocessor),
('clf', classifier)
])
# 为特征工程器预拟合(加载模型)
logger.info("🔄 正在初始化特征工程器...")
feature_engineer.fit([]) # 触发模型加载
# 创建完整的模型字典(保持与优化模型相同的结构)
if isinstance(optimized_model, dict):
end_to_end_model = optimized_model.copy()
end_to_end_model['pipeline'] = end_to_end_pipeline
end_to_end_model['pipeline_type'] = 'end_to_end'
else:
end_to_end_model = end_to_end_pipeline
# 保存端到端流水线
joblib.dump(end_to_end_model, output_path)
logger.info(f"✅ 端到端流水线已保存至: {output_path}")
return end_to_end_pipeline
except Exception as e:
logger.error(f"❌ 创建端到端流水线失败: {e}")
raise
def test_end_to_end_pipeline(pipeline_path, test_texts=None):
"""
测试端到端流水线
Args:
pipeline_path: 流水线路径
test_texts: 测试文本列表
"""
if test_texts is None:
test_texts = [
"The company reported strong earnings growth this quarter.",
"Stock prices fell sharply due to market concerns.",
"The outlook remains neutral with mixed signals."
]
logger.info(f"🧪 正在测试端到端流水线: {pipeline_path}")
import traceback
try:
# 加载流水线
model = joblib.load(pipeline_path)
# 检查模型格式
if isinstance(model, dict):
pipeline = model['pipeline']
logger.info(f"✅ 成功加载模型字典,提取流水线,步骤: {[step[0] for step in pipeline.steps]}")
else:
pipeline = model
logger.info(f"✅ 成功加载流水线,步骤: {[step[0] for step in pipeline.steps]}")
# Debug: 单独调用特征工程 transform
print("\nDEBUG: 单独调用 FinSentLLMFeatureEngineering.transform(test_texts) 输出:")
feature_engineer = FinSentLLMFeatureEngineering()
feature_engineer.fit([])
features_df = feature_engineer.transform(test_texts)
print("features_df type:", type(features_df))
print("features_df dtypes:", getattr(features_df, 'dtypes', 'N/A'))
print("features_df head:\n", getattr(features_df, 'head', lambda: features_df)())
# 逐层调试: 依次通过 pipeline 的每一层
logger.info("🔬 逐层调试 pipeline...")
X = test_texts
layer_outputs = {}
for name, step in pipeline.steps:
try:
if name == "feature_engineering":
X = step.transform(X)
layer_outputs[name] = X
print(f"\n[DEBUG] feature_engineering 输出 shape: {getattr(X, 'shape', None)}, type: {type(X)}")
elif name == "to_float_array":
X = step.transform(X)
layer_outputs[name] = X
print(f"\n[DEBUG] to_float_array 输出 shape: {getattr(X, 'shape', None)}, type: {type(X)}")
elif name == "preprocess":
print("\n[DEBUG] preprocess 层逐子transformer调试:")
preproc = step
# 如果是 ColumnTransformer, 对每个子transformer单独 transform
if hasattr(preproc, 'transformers_'):
for tname, trans, cols in preproc.transformers_:
try:
# 提取本子transformer的输入
# 支持 DataFrame/ndarray
if hasattr(X, 'iloc'):
input_cols = cols
# 支持 passthrough/None
if input_cols == 'passthrough' or input_cols is None:
input_X = X
else:
input_X = X[input_cols]
else:
# ndarray,cols为int列表
if isinstance(cols, (list, tuple)) and all(isinstance(c, int) for c in cols):
input_X = X[:, cols]
else:
input_X = X
print(f" [DEBUG] 子transformer '{tname}' ({type(trans)}) 输入 shape: {getattr(input_X, 'shape', None)}")
try:
trans_out = trans.transform(input_X)
print(f" [OK] '{tname}' transform 输出 shape: {getattr(trans_out, 'shape', None)}")
except Exception as sub_e:
print(f" [ERROR] 子transformer '{tname}' transform 出错: {sub_e}")
import traceback
traceback.print_exc()
except Exception as sub_e2:
print(f" [ERROR] 子transformer '{tname}' 输入提取出错: {sub_e2}")
traceback.print_exc()
# 再整体 transform
X = preproc.transform(X)
layer_outputs[name] = X
print(f"\n[DEBUG] preprocess 输出 shape: {getattr(X, 'shape', None)}, type: {type(X)}")
elif name == "clf":
# 不做 transform
pass
except Exception as layer_e:
print(f"[ERROR] pipeline 层 '{name}' transform 出错: {layer_e}")
traceback.print_exc()
raise
# 测试预测
logger.info("🔄 正在进行预测测试...")
predictions = pipeline.predict(test_texts)
probabilities = pipeline.predict_proba(test_texts)
# 输出结果
print("\n📊 测试结果:")
print("=" * 80)
for i, (text, pred, prob) in enumerate(zip(test_texts, predictions, probabilities)):
print(f"\n文本 {i+1}: {text}")
print(f"预测: {pred}")
print(f"概率: {prob}")
print("=" * 80)
logger.info("✅ 端到端流水线测试成功!")
return True
except Exception as e:
logger.error(f"❌ 端到端流水线测试失败: {e}")
traceback.print_exc()
return False
def main():
"""主函数"""
logger.info("启动端到端流水线创建器")
# 定义路径
optimized_dir = "outputs/Meta-Classifier_XG_boost_es_optimized"
end_to_end_dir = "outputs/End-To-End-Pipelines"
# 确保输出目录存在
os.makedirs(end_to_end_dir, exist_ok=True)
# 数据集列表
datasets = ['50Agree', '66Agree', '75Agree', 'AllAgree']
created_pipelines = []
for dataset in datasets:
optimized_path = os.path.join(optimized_dir, f"FinSent_{dataset}_meta_xgboost_model.joblib")
output_path = os.path.join(end_to_end_dir, f"FinSent_{dataset}_end_to_end_pipeline.joblib")
if os.path.exists(optimized_path):
try:
logger.info(f"\n{'='*60}")
logger.info(f"🔄 处理数据集: {dataset}")
# 创建端到端流水线
pipeline = create_end_to_end_pipeline(optimized_path, output_path)
created_pipelines.append(output_path)
logger.info(f"{dataset} 端到端流水线创建成功")
except Exception as e:
logger.error(f"❌ {dataset} 端到端流水线创建失败: {e}")
else:
logger.warning(f"优化模型不存在: {optimized_path}")
# 测试第一个创建的流水线
if created_pipelines:
logger.info(f"\n{'='*60}")
logger.info("🧪 测试第一个端到端流水线...")
test_end_to_end_pipeline(created_pipelines[0])
logger.info(f"\n✅ 端到端流水线创建完成! 共创建 {len(created_pipelines)} 个流水线")
logger.info(f"📁 输出目录: {end_to_end_dir}")
if __name__ == "__main__":
main()