finsight / data /preprocess.py
Maggei's picture
Fix data leakage and label consistency before training
d3f108f
Raw
History Blame Contribute Delete
6.35 kB
# -*- coding: utf-8 -*-
"""
FinSight 数据预处理:清洗 + 统一Alpaca格式
输入: data/raw/ 下载的原始数据(先跑 download_datasets.py)
输出: data/processed/finsight_train.jsonl / finsight_eval.jsonl
统一字段: instruction / input / output / task / agent
清洗规则(对应数据探索发现的噪声):
1. 网页杂质: 广告语(【立即开户,领取福利】)、"原标题:"、乱码问号串、多余空白
2. 情感标签归一: FINFE输出表述不统一("中性。"/"所属情感是中性。")→ 统一为"积极/消极/中性"
sentiment agent 只保留 FINFE 三分类;FINNSP(负面主体识别,NER抽取)归入 event agent
3. BAAI数据: 只保留中文、deita质量分>=7、取首轮对话转Alpaca格式
按key确定性切出约1/25(~1.5k)做qa评估集(BAAI是qa唯一来源,否则eval无qa)
4. 通用过滤: 输出为空、总长超4000字符的样本剔除;按(instruction+input)去重
5. 跨集去泄漏: 存eval前减去train已含的(instruction+input),避免train/eval污染
"""
import hashlib
import json
import os
import re
from collections import Counter
RAW = os.path.join(os.path.dirname(__file__), "raw")
OUT = os.path.join(os.path.dirname(__file__), "processed")
os.makedirs(OUT, exist_ok=True)
# FinCUGE任务 → FinSight Agent 的映射
TASK2AGENT = {
"FINQA": "event", # 事件抽取
"FINCQA": "event", # 因果事件抽取
"FINESE": "event", # 事件主体抽取
"FINFE": "sentiment", # 论坛情绪
"FINNSP": "event", # 负面消息及主体识别(NER抽取,非情感分类)
"FINNA": "summary", # 新闻摘要
"FINNL": "topic", # 新闻分类
"FINRE": "topic", # 关系抽取
}
# 含这些关键词的整句删除(券商广告变体太多,按关键词整句清除比枚举文案可靠)
AD_KEYWORDS = ["立即开户", "你还在等什么", "领取福利", "极速响应", "点击查看",
"股市震荡,需要注意什么", "跨年行情,应该如何布局"]
# 句子 = 到上一个句末标点/换行 为止、到下一个句末标点/换行 为止
AD_SENT_RE = re.compile(
r"[^。!?!?\n]*(?:" + "|".join(map(re.escape, AD_KEYWORDS)) + r")[^。!?!?\n]*[。!?!?]?"
)
AD_RE = re.compile(r"原标题[::]\s*|\?{3,}|?{3,}")
WS_RE = re.compile(r"[ \t ]+")
SENT_LABELS = ["积极", "消极", "中性"]
def clean_text(t):
t = AD_SENT_RE.sub("", t)
t = AD_RE.sub("", t)
t = WS_RE.sub(" ", t)
return t.strip()
def normalize_sentiment(out):
"""'所属情感是中性。' / '以上文本情感极性属于消极。' → '中性' / '消极'"""
hits = [lb for lb in SENT_LABELS if lb in out]
return hits[0] if len(hits) == 1 else None # 多标签/无标签视为噪声
def iter_fincuge(path):
with open(path, encoding="utf-8") as f:
for line in f:
r = json.loads(line)
task = r["task"]
ins = clean_text(r["instruction"])
inp = clean_text(r.get("input", ""))
out = clean_text(r["output"])
if task == "FINFE":
label = normalize_sentiment(out)
if label is None:
continue
out = label
yield {"instruction": ins, "input": inp, "output": out,
"task": task, "agent": TASK2AGENT[task]}
def iter_baai(path, min_score=7.0):
with open(path, encoding="utf-8") as f:
for line in f:
r = json.loads(line)
if r.get("lang") != "zh" or r.get("deita_score", 0) < min_score:
continue
conv = r.get("conversations", [])
if len(conv) < 2 or conv[0]["from"] != "human":
continue
ins = clean_text(conv[0]["value"])
out = clean_text(conv[1]["value"])
yield {"instruction": ins, "input": "", "output": out,
"task": "BAAI_QA", "agent": "qa"}
def row_key(r):
return hashlib.md5((r["instruction"] + r["input"]).encode()).hexdigest()
def dedup_and_filter(rows):
seen, kept = set(), []
stats = Counter()
for r in rows:
if not r["output"] or not r["instruction"]:
stats["空字段剔除"] += 1
continue
if len(r["instruction"]) + len(r["input"]) + len(r["output"]) > 4000:
stats["超长剔除"] += 1
continue
key = row_key(r)
if key in seen:
stats["重复剔除"] += 1
continue
seen.add(key)
kept.append(r)
return kept, stats
def split_baai_eval(rows, mod=25):
"""确定性切分: 约1/mod进qa评估集。基于key哈希,可复现、与train无交集"""
tr, ev = [], []
for r in rows:
(ev if int(row_key(r), 16) % mod == 0 else tr).append(r)
return tr, ev
def remove_overlap(ev_rows, train_rows):
"""存eval前剔除train已含的(instruction+input),消除跨集泄漏"""
tkeys = {row_key(r) for r in train_rows}
kept = [r for r in ev_rows if row_key(r) not in tkeys]
return kept, len(ev_rows) - len(kept)
def save(rows, name):
path = os.path.join(OUT, name)
with open(path, "w", encoding="utf-8") as f:
for r in rows:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
print(f" {name}: {len(rows):,} 条")
print(f" agent分布: {dict(Counter(r['agent'] for r in rows))}")
if __name__ == "__main__":
print("[1/2] 处理训练集 ...")
train = list(iter_fincuge(os.path.join(RAW, "fincuge_train.jsonl")))
n_fincuge = len(train)
baai = list(iter_baai(os.path.join(RAW, "baai_finance_all.jsonl")))
baai_train, baai_eval = split_baai_eval(baai)
train += baai_train
print(f" 原始: FinCUGE {n_fincuge:,} + BAAI筛后 {len(baai):,}"
f"(train {len(baai_train):,} / 切出qa评估 {len(baai_eval):,})")
train, stats = dedup_and_filter(train)
print(f" 清洗统计: {dict(stats)}")
save(train, "finsight_train.jsonl")
print("[2/2] 处理评估集 ...")
ev = list(iter_fincuge(os.path.join(RAW, "fincuge_eval.jsonl"))) + baai_eval
ev, stats = dedup_and_filter(ev)
ev, n_overlap = remove_overlap(ev, train)
print(f" 清洗统计: {dict(stats)} 跨集泄漏剔除: {n_overlap}")
save(ev, "finsight_eval.jsonl")