|
|
| import os |
| import sys |
| import joblib |
| import logging |
| import pandas as pd |
| import numpy as np |
| from sklearn.pipeline import Pipeline |
| from sklearn.base import BaseEstimator, TransformerMixin |
| from sklearn.preprocessing import StandardScaler |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| |
| def to_float64_array(df): |
| """将 DataFrame 强制转换为 float64 numpy 数组""" |
| return np.asarray(df, dtype=np.float64) |
|
|
| |
| def enforce_float64_df(df): |
| """保持 DataFrame 结构,仅强制为 float64 类型""" |
| if isinstance(df, pd.DataFrame): |
| return df.astype(np.float64) |
| else: |
| return pd.DataFrame(df, dtype=np.float64) |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| sys.path.insert(0, current_dir) |
|
|
| |
| try: |
| import torch |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| from scipy.stats import entropy |
| import re |
| logger.info("✅ 成功导入所有必要的库") |
| except ImportError as e: |
| logger.error(f"❌ 缺少必要的库: {e}") |
| sys.exit(1) |
|
|
| class FinSentLLMFeatureEngineering(BaseEstimator, TransformerMixin): |
| """ |
| 金融情感分析特征工程器 |
| 集成FinBERT、RoBERTa、MultiLLM和语义特征 |
| """ |
| |
| def __init__(self): |
| self.finbert_tokenizer = None |
| self.finbert_model = None |
| self.roberta_tokenizer = None |
| self.roberta_model = None |
| |
| def fit(self, X, y=None): |
| """拟合阶段,加载模型""" |
| logger.info("🔄 正在加载FinBERT和RoBERTa模型...") |
| |
| try: |
| |
| self.finbert_tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert") |
| self.finbert_model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert") |
| |
| |
| self.roberta_tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") |
| self.roberta_model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") |
| |
| logger.info("✅ 模型加载完成") |
| except Exception as e: |
| logger.error(f"❌ 模型加载失败: {e}") |
| raise |
| |
| return self |
| |
| def transform(self, X): |
| """转换阶段,提取特征""" |
| logger.info(f"🔄 正在为{len(X)}个样本提取特征...") |
|
|
| if isinstance(X, pd.Series): |
| texts = X.tolist() |
| elif isinstance(X, list): |
| texts = X |
| else: |
| texts = X.flatten().tolist() |
|
|
| features = [] |
|
|
| for i, text in enumerate(texts): |
| if i % 100 == 0: |
| logger.info(f"处理进度: {i}/{len(texts)}") |
| text_features = self._build_features(text) |
| features.append(text_features) |
|
|
| feature_columns = [ |
| 'fin_p_neg', 'fin_p_neu', 'fin_p_pos', 'fin_score', |
| 'rob_p_neg', 'rob_p_neu', 'rob_p_pos', 'rob_score', |
| 'fin_logit_neg', 'fin_logit_neu', 'fin_logit_pos', |
| 'fin_max_prob', 'fin_margin', 'fin_entropy', |
| 'rob_logit_neg', 'rob_logit_neu', 'rob_logit_pos', |
| 'rob_max_prob', 'rob_margin', 'rob_entropy', |
| 'MultiLLM_L1_distance', 'MultiLLM_L1_similarity', |
| 'MultiLLM_KL_F_to_R', 'MultiLLM_KL_R_to_F', 'MultiLLM_agree', |
| 'sem_compared', 'sem_loss_improve', 'sem_loss_worsen', |
| 'sem_profit_up', 'sem_cost_down', 'sem_contract_fin', |
| 'sem_uncertainty', 'sem_stable_guidance', 'sem_operational', |
| 'fin_label', 'rob_label' |
| ] |
| feature_df = pd.DataFrame(features, columns=feature_columns) |
| |
| feature_df = feature_df.apply(pd.to_numeric, errors='coerce').fillna(0.0) |
| |
| feature_df['fin_label'] = feature_df['fin_label'].astype(str) |
| feature_df['rob_label'] = feature_df['rob_label'].astype(str) |
| |
| for col in feature_df.columns: |
| if col not in ['fin_label', 'rob_label']: |
| feature_df[col] = feature_df[col].astype('float64') |
| |
| print('DEBUG: feature_df.dtypes:') |
| print(feature_df.dtypes) |
| non_float_cols = feature_df.columns[~feature_df.dtypes.apply(lambda dt: np.issubdtype(dt, np.floating)) & (feature_df.columns != 'fin_label') & (feature_df.columns != 'rob_label')] |
| if len(non_float_cols) > 0: |
| print('⚠️ WARNING: Non-float columns detected:', list(non_float_cols)) |
| print('DEBUG: feature_df.head():') |
| print(feature_df.head()) |
| print('DEBUG: feature_df.info():') |
| print(feature_df.info()) |
| print('DEBUG: feature_df unique types per column:') |
| for col in feature_df.columns: |
| unique_types = {type(x) for x in feature_df[col].values} |
| print(f'{col}: {unique_types}') |
| |
| return feature_df |
| |
| def _build_features(self, text): |
| """为单个文本构建特征向量,强制全 float,异常填 0.0""" |
| features = [] |
| try: |
| |
| finbert_probs = self._get_finbert_probabilities(text) |
| features.extend(finbert_probs) |
| |
| fin_score = max(finbert_probs) |
| features.append(fin_score) |
| |
| roberta_probs = self._get_roberta_probabilities(text) |
| features.extend(roberta_probs) |
| |
| rob_score = max(roberta_probs) |
| features.append(rob_score) |
| |
| fin_logits = self._get_finbert_logits(text) |
| features.extend(fin_logits) |
| |
| fin_max_prob = max(finbert_probs) |
| fin_margin = fin_max_prob - sorted(finbert_probs)[-2] |
| fin_entropy = entropy(finbert_probs) |
| features.extend([fin_max_prob, fin_margin, fin_entropy]) |
| |
| rob_logits = self._get_roberta_logits(text) |
| features.extend(rob_logits) |
| |
| rob_max_prob = max(roberta_probs) |
| rob_margin = rob_max_prob - sorted(roberta_probs)[-2] |
| rob_entropy = entropy(roberta_probs) |
| features.extend([rob_max_prob, rob_margin, rob_entropy]) |
| |
| multillm_features = self._get_multillm_features(finbert_probs, roberta_probs) |
| features.extend(multillm_features) |
| |
| semantic_features = self._get_semantic_features(text) |
| features.extend(semantic_features) |
| |
| fin_label = np.argmax(finbert_probs) |
| rob_label = np.argmax(roberta_probs) |
| features.extend([fin_label, rob_label]) |
| except Exception as e: |
| logger.error(f"特征构建异常: {e}, text={text}") |
| |
| float_features = [] |
| for x in features: |
| try: |
| float_features.append(float(x)) |
| except Exception: |
| float_features.append(0.0) |
| return float_features |
| |
| def _get_finbert_probabilities(self, text): |
| """获取FinBERT概率""" |
| try: |
| inputs = self.finbert_tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
| |
| with torch.no_grad(): |
| outputs = self.finbert_model(**inputs) |
| probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) |
| |
| return probabilities[0].tolist() |
| except: |
| return [0.33, 0.33, 0.34] |
| |
| def _get_roberta_probabilities(self, text): |
| """获取RoBERTa概率""" |
| try: |
| inputs = self.roberta_tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
| |
| with torch.no_grad(): |
| outputs = self.roberta_model(**inputs) |
| probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1) |
| |
| return probabilities[0].tolist() |
| except: |
| return [0.33, 0.33, 0.34] |
| |
| def _get_finbert_logits(self, text): |
| """获取FinBERT logits""" |
| try: |
| inputs = self.finbert_tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
| |
| with torch.no_grad(): |
| outputs = self.finbert_model(**inputs) |
| logits = outputs.logits[0].tolist() |
| |
| return logits |
| except: |
| return [0.0, 0.0, 0.0] |
| |
| def _get_roberta_logits(self, text): |
| """获取RoBERTa logits""" |
| try: |
| inputs = self.roberta_tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
| |
| with torch.no_grad(): |
| outputs = self.roberta_model(**inputs) |
| logits = outputs.logits[0].tolist() |
| |
| return logits |
| except: |
| return [0.0, 0.0, 0.0] |
| |
| def _get_multillm_features(self, finbert_probs, roberta_probs): |
| """MultiLLM特征(基于概率的计算)""" |
| features = [] |
| |
| |
| l1_distance = sum(abs(fp - rp) for fp, rp in zip(finbert_probs, roberta_probs)) |
| features.append(l1_distance) |
| |
| |
| l1_similarity = 1.0 / (1.0 + l1_distance) |
| features.append(l1_similarity) |
| |
| |
| kl_f_to_r = entropy(finbert_probs, roberta_probs) if min(roberta_probs) > 0 else 0.0 |
| features.append(kl_f_to_r) |
| |
| |
| kl_r_to_f = entropy(roberta_probs, finbert_probs) if min(finbert_probs) > 0 else 0.0 |
| features.append(kl_r_to_f) |
| |
| |
| fin_pred = np.argmax(finbert_probs) |
| rob_pred = np.argmax(roberta_probs) |
| agree = 1.0 if fin_pred == rob_pred else 0.0 |
| features.append(agree) |
| |
| return features |
| |
| def _get_semantic_features(self, text): |
| """语义特征(9个特定特征)- 基于原始正则表达式模式""" |
| import re |
| |
| features = [] |
| text_lower = text.lower() |
| |
| |
| compared_patterns = [ |
| r"\bcompared\s+to\b", r"\bcompared\s+with\b", r"\bversus\b", r"\bvs\.?\b", |
| r"\bfrom\s+[-+]?\d+(?:\.\d+)?\s*(?:%|percent|percentage|[A-Za-z]+)?\s+to\s+[-+]?\d+(?:\.\d+)?\s*(?:%|percent|percentage|[A-Za-z]+)?\b" |
| ] |
| sem_compared = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in compared_patterns)) |
| features.append(sem_compared) |
| |
| |
| loss_improve_patterns = [ |
| r"\bloss(?:es)?\s+(?:narrowed|shr[aou]nk|decreased|fell|reduced)\b", |
| r"\bturn(?:ed)?\s+to\s+(?:profit|black)\b" |
| ] |
| sem_loss_improve = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in loss_improve_patterns)) |
| features.append(sem_loss_improve) |
| |
| |
| loss_worsen_patterns = [ |
| r"\bloss(?:es)?\s+(?:widened|grew|increased|rose|deepened)\b", |
| r"\bturn(?:ed)?\s+to\s+(?:loss|red)\b" |
| ] |
| sem_loss_worsen = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in loss_worsen_patterns)) |
| features.append(sem_loss_worsen) |
| |
| |
| profit_up_patterns = [ |
| r"\b(profit|profits|net\s+income|earnings|ebit|ebitda|eps|roe|roi|return(?:s)?(?:\s+on\s+equity)?)\b.*\b(rose|grew|increased|up|higher|improved|jumped|surged|soared)\b", |
| r"\b(rose|grew|increased|up|higher|improved|jumped|surged|soared)\b.*\b(profit|profits|net\s+income|earnings|ebit|ebitda|eps|roe|roi|return(?:s)?(?:\s+on\s+equity)?)\b" |
| ] |
| sem_profit_up = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in profit_up_patterns)) |
| features.append(sem_profit_up) |
| |
| |
| cost_down_patterns = [ |
| r"\b(cost|costs|expenses|opex|operating\s+expense(?:s)?)\b.*\b(fell|declined|decreased|lower|reduced|down)\b", |
| r"\b(fell|declined|decreased|lower|reduced|down)\b.*\b(cost|costs|expenses|opex|operating\s+expense(?:s)?)\b" |
| ] |
| sem_cost_down = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in cost_down_patterns)) |
| features.append(sem_cost_down) |
| |
| |
| contract_fin_patterns = [ |
| r"\b(agreement|deal|contract|order|purchase\s+order|framework\s+agreement)\b", |
| r"\b(bond|notes?|debenture|convertible|placement|issuance|issue|offering|ipo|follow-?on)\b", |
| r"\b(loan|credit\s+facility|credit\s+line|revolver|revolving\s+credit|financing)\b" |
| ] |
| sem_contract_fin = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in contract_fin_patterns)) |
| features.append(sem_contract_fin) |
| |
| |
| uncertainty_patterns = [ |
| r"\b(uncertain|uncertainty|cannot\s+be\s+determined|not\s+clear|unknown|unpredictable)\b", |
| r"\b(impairment|write-?down|one-?off|exceptional\s+(?:item|charge)|non-?recurring)\b", |
| r"\b(outlook\s+(?:uncertain|cloudy|cautious))\b" |
| ] |
| sem_uncertainty = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in uncertainty_patterns)) |
| features.append(sem_uncertainty) |
| |
| |
| stable_guidance_patterns = [ |
| r"\b(guidance|forecast|outlook)\s+(?:maintained|confirmed|reiterated|unchanged)\b", |
| r"\b(reiterated|maintained)\s+(?:its\s+)?(guidance|forecast|outlook)\b" |
| ] |
| sem_stable_guidance = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in stable_guidance_patterns)) |
| features.append(sem_stable_guidance) |
| |
| |
| operational_patterns = [ |
| r"\b(restructuring|reorganization|spin-?off|divest(?:iture)?|asset\s+sale)\b", |
| r"\b(ban|suspension|halted|blocked|prohibited)\b", |
| r"\b(recall|probe|investigation|lawsuit|litigation|settlement)\b", |
| r"\b(layoffs?|headcount\s+reduction|cut\s+jobs|hiring\s+freeze)\b" |
| ] |
| sem_operational = int(any(re.search(pattern, text_lower, re.IGNORECASE) for pattern in operational_patterns)) |
| features.append(sem_operational) |
| |
| return features |
|
|
|
|
| def create_end_to_end_pipeline(optimized_model_path, output_path): |
| """ |
| 创建端到端流水线 |
| |
| Args: |
| optimized_model_path: 优化模型的路径 |
| output_path: 输出流水线的路径 |
| """ |
| logger.info(f"🔄 正在创建端到端流水线...") |
| logger.info(f"输入模型: {optimized_model_path}") |
| logger.info(f"输出路径: {output_path}") |
| |
| try: |
| |
| optimized_model = joblib.load(optimized_model_path) |
| |
| |
| if isinstance(optimized_model, dict): |
| |
| optimized_pipeline = optimized_model['pipeline'] |
| logger.info(f"✅ 成功加载优化模型字典,提取流水线,步骤: {optimized_pipeline.steps}") |
| else: |
| |
| optimized_pipeline = optimized_model |
| logger.info(f"✅ 成功加载优化流水线,步骤: {optimized_pipeline.steps}") |
| |
| |
| preprocessor = None |
| classifier = None |
| |
| for step_name, step_obj in optimized_pipeline.steps: |
| if step_name == 'preprocess': |
| preprocessor = step_obj |
| elif step_name == 'clf': |
| classifier = step_obj |
| |
| if preprocessor is None or classifier is None: |
| raise ValueError("无法从优化模型中提取预处理器或分类器") |
| |
| |
| feature_engineer = FinSentLLMFeatureEngineering() |
| |
| end_to_end_pipeline = Pipeline([ |
| ('feature_engineering', feature_engineer), |
| ('preprocess', preprocessor), |
| ('clf', classifier) |
| ]) |
| |
| |
| logger.info("🔄 正在初始化特征工程器...") |
| feature_engineer.fit([]) |
| |
| |
| if isinstance(optimized_model, dict): |
| end_to_end_model = optimized_model.copy() |
| end_to_end_model['pipeline'] = end_to_end_pipeline |
| end_to_end_model['pipeline_type'] = 'end_to_end' |
| else: |
| end_to_end_model = end_to_end_pipeline |
| |
| |
| joblib.dump(end_to_end_model, output_path) |
| logger.info(f"✅ 端到端流水线已保存至: {output_path}") |
| |
| return end_to_end_pipeline |
| |
| except Exception as e: |
| logger.error(f"❌ 创建端到端流水线失败: {e}") |
| raise |
|
|
|
|
| def test_end_to_end_pipeline(pipeline_path, test_texts=None): |
| """ |
| 测试端到端流水线 |
| |
| Args: |
| pipeline_path: 流水线路径 |
| test_texts: 测试文本列表 |
| """ |
| if test_texts is None: |
| test_texts = [ |
| "The company reported strong earnings growth this quarter.", |
| "Stock prices fell sharply due to market concerns.", |
| "The outlook remains neutral with mixed signals." |
| ] |
| |
| logger.info(f"🧪 正在测试端到端流水线: {pipeline_path}") |
| |
| import traceback |
| try: |
| |
| model = joblib.load(pipeline_path) |
|
|
| |
| if isinstance(model, dict): |
| pipeline = model['pipeline'] |
| logger.info(f"✅ 成功加载模型字典,提取流水线,步骤: {[step[0] for step in pipeline.steps]}") |
| else: |
| pipeline = model |
| logger.info(f"✅ 成功加载流水线,步骤: {[step[0] for step in pipeline.steps]}") |
|
|
| |
| print("\nDEBUG: 单独调用 FinSentLLMFeatureEngineering.transform(test_texts) 输出:") |
| feature_engineer = FinSentLLMFeatureEngineering() |
| feature_engineer.fit([]) |
| features_df = feature_engineer.transform(test_texts) |
| print("features_df type:", type(features_df)) |
| print("features_df dtypes:", getattr(features_df, 'dtypes', 'N/A')) |
| print("features_df head:\n", getattr(features_df, 'head', lambda: features_df)()) |
|
|
| |
| logger.info("🔬 逐层调试 pipeline...") |
| X = test_texts |
| layer_outputs = {} |
| for name, step in pipeline.steps: |
| try: |
| if name == "feature_engineering": |
| X = step.transform(X) |
| layer_outputs[name] = X |
| print(f"\n[DEBUG] feature_engineering 输出 shape: {getattr(X, 'shape', None)}, type: {type(X)}") |
| elif name == "to_float_array": |
| X = step.transform(X) |
| layer_outputs[name] = X |
| print(f"\n[DEBUG] to_float_array 输出 shape: {getattr(X, 'shape', None)}, type: {type(X)}") |
| elif name == "preprocess": |
| print("\n[DEBUG] preprocess 层逐子transformer调试:") |
| preproc = step |
| |
| if hasattr(preproc, 'transformers_'): |
| for tname, trans, cols in preproc.transformers_: |
| try: |
| |
| |
| if hasattr(X, 'iloc'): |
| input_cols = cols |
| |
| if input_cols == 'passthrough' or input_cols is None: |
| input_X = X |
| else: |
| input_X = X[input_cols] |
| else: |
| |
| if isinstance(cols, (list, tuple)) and all(isinstance(c, int) for c in cols): |
| input_X = X[:, cols] |
| else: |
| input_X = X |
| print(f" [DEBUG] 子transformer '{tname}' ({type(trans)}) 输入 shape: {getattr(input_X, 'shape', None)}") |
| try: |
| trans_out = trans.transform(input_X) |
| print(f" [OK] '{tname}' transform 输出 shape: {getattr(trans_out, 'shape', None)}") |
| except Exception as sub_e: |
| print(f" [ERROR] 子transformer '{tname}' transform 出错: {sub_e}") |
| import traceback |
| traceback.print_exc() |
| except Exception as sub_e2: |
| print(f" [ERROR] 子transformer '{tname}' 输入提取出错: {sub_e2}") |
| traceback.print_exc() |
| |
| X = preproc.transform(X) |
| layer_outputs[name] = X |
| print(f"\n[DEBUG] preprocess 输出 shape: {getattr(X, 'shape', None)}, type: {type(X)}") |
| elif name == "clf": |
| |
| pass |
| except Exception as layer_e: |
| print(f"[ERROR] pipeline 层 '{name}' transform 出错: {layer_e}") |
| traceback.print_exc() |
| raise |
|
|
| |
| logger.info("🔄 正在进行预测测试...") |
| predictions = pipeline.predict(test_texts) |
| probabilities = pipeline.predict_proba(test_texts) |
|
|
| |
| print("\n📊 测试结果:") |
| print("=" * 80) |
| for i, (text, pred, prob) in enumerate(zip(test_texts, predictions, probabilities)): |
| print(f"\n文本 {i+1}: {text}") |
| print(f"预测: {pred}") |
| print(f"概率: {prob}") |
| print("=" * 80) |
|
|
| logger.info("✅ 端到端流水线测试成功!") |
| return True |
|
|
| except Exception as e: |
| logger.error(f"❌ 端到端流水线测试失败: {e}") |
| traceback.print_exc() |
| return False |
|
|
|
|
| def main(): |
| """主函数""" |
| logger.info("启动端到端流水线创建器") |
| |
| |
| optimized_dir = "outputs/Meta-Classifier_XG_boost_es_optimized" |
| end_to_end_dir = "outputs/End-To-End-Pipelines" |
| |
| |
| os.makedirs(end_to_end_dir, exist_ok=True) |
| |
| |
| datasets = ['50Agree', '66Agree', '75Agree', 'AllAgree'] |
| |
| created_pipelines = [] |
| |
| for dataset in datasets: |
| optimized_path = os.path.join(optimized_dir, f"FinSent_{dataset}_meta_xgboost_model.joblib") |
| output_path = os.path.join(end_to_end_dir, f"FinSent_{dataset}_end_to_end_pipeline.joblib") |
| |
| if os.path.exists(optimized_path): |
| try: |
| logger.info(f"\n{'='*60}") |
| logger.info(f"🔄 处理数据集: {dataset}") |
| |
| |
| pipeline = create_end_to_end_pipeline(optimized_path, output_path) |
| created_pipelines.append(output_path) |
| |
| logger.info(f"{dataset} 端到端流水线创建成功") |
| |
| except Exception as e: |
| logger.error(f"❌ {dataset} 端到端流水线创建失败: {e}") |
| else: |
| logger.warning(f"优化模型不存在: {optimized_path}") |
| |
| |
| if created_pipelines: |
| logger.info(f"\n{'='*60}") |
| logger.info("🧪 测试第一个端到端流水线...") |
| test_end_to_end_pipeline(created_pipelines[0]) |
| |
| logger.info(f"\n✅ 端到端流水线创建完成! 共创建 {len(created_pipelines)} 个流水线") |
| logger.info(f"📁 输出目录: {end_to_end_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |