| |
| """ |
| FinSight 数据预处理:清洗 + 统一Alpaca格式 |
| |
| 输入: data/raw/ 下载的原始数据(先跑 download_datasets.py) |
| 输出: data/processed/finsight_train.jsonl / finsight_eval.jsonl |
| 统一字段: instruction / input / output / task / agent |
| |
| 清洗规则(对应数据探索发现的噪声): |
| 1. 网页杂质: 广告语(【立即开户,领取福利】)、"原标题:"、乱码问号串、多余空白 |
| 2. 情感标签归一: FINFE输出表述不统一("中性。"/"所属情感是中性。")→ 统一为"积极/消极/中性" |
| sentiment agent 只保留 FINFE 三分类;FINNSP(负面主体识别,NER抽取)归入 event agent |
| 3. BAAI数据: 只保留中文、deita质量分>=7、取首轮对话转Alpaca格式 |
| 按key确定性切出约1/25(~1.5k)做qa评估集(BAAI是qa唯一来源,否则eval无qa) |
| 4. 通用过滤: 输出为空、总长超4000字符的样本剔除;按(instruction+input)去重 |
| 5. 跨集去泄漏: 存eval前减去train已含的(instruction+input),避免train/eval污染 |
| """ |
| import hashlib |
| import json |
| import os |
| import re |
| from collections import Counter |
|
|
| RAW = os.path.join(os.path.dirname(__file__), "raw") |
| OUT = os.path.join(os.path.dirname(__file__), "processed") |
| os.makedirs(OUT, exist_ok=True) |
|
|
| |
| TASK2AGENT = { |
| "FINQA": "event", |
| "FINCQA": "event", |
| "FINESE": "event", |
| "FINFE": "sentiment", |
| "FINNSP": "event", |
| "FINNA": "summary", |
| "FINNL": "topic", |
| "FINRE": "topic", |
| } |
|
|
| |
| AD_KEYWORDS = ["立即开户", "你还在等什么", "领取福利", "极速响应", "点击查看", |
| "股市震荡,需要注意什么", "跨年行情,应该如何布局"] |
| |
| AD_SENT_RE = re.compile( |
| r"[^。!?!?\n]*(?:" + "|".join(map(re.escape, AD_KEYWORDS)) + r")[^。!?!?\n]*[。!?!?]?" |
| ) |
| AD_RE = re.compile(r"原标题[::]\s*|\?{3,}|?{3,}") |
| WS_RE = re.compile(r"[ \t ]+") |
|
|
| SENT_LABELS = ["积极", "消极", "中性"] |
|
|
|
|
| def clean_text(t): |
| t = AD_SENT_RE.sub("", t) |
| t = AD_RE.sub("", t) |
| t = WS_RE.sub(" ", t) |
| return t.strip() |
|
|
|
|
| def normalize_sentiment(out): |
| """'所属情感是中性。' / '以上文本情感极性属于消极。' → '中性' / '消极'""" |
| hits = [lb for lb in SENT_LABELS if lb in out] |
| return hits[0] if len(hits) == 1 else None |
|
|
|
|
| def iter_fincuge(path): |
| with open(path, encoding="utf-8") as f: |
| for line in f: |
| r = json.loads(line) |
| task = r["task"] |
| ins = clean_text(r["instruction"]) |
| inp = clean_text(r.get("input", "")) |
| out = clean_text(r["output"]) |
| if task == "FINFE": |
| label = normalize_sentiment(out) |
| if label is None: |
| continue |
| out = label |
| yield {"instruction": ins, "input": inp, "output": out, |
| "task": task, "agent": TASK2AGENT[task]} |
|
|
|
|
| def iter_baai(path, min_score=7.0): |
| with open(path, encoding="utf-8") as f: |
| for line in f: |
| r = json.loads(line) |
| if r.get("lang") != "zh" or r.get("deita_score", 0) < min_score: |
| continue |
| conv = r.get("conversations", []) |
| if len(conv) < 2 or conv[0]["from"] != "human": |
| continue |
| ins = clean_text(conv[0]["value"]) |
| out = clean_text(conv[1]["value"]) |
| yield {"instruction": ins, "input": "", "output": out, |
| "task": "BAAI_QA", "agent": "qa"} |
|
|
|
|
| def row_key(r): |
| return hashlib.md5((r["instruction"] + r["input"]).encode()).hexdigest() |
|
|
|
|
| def dedup_and_filter(rows): |
| seen, kept = set(), [] |
| stats = Counter() |
| for r in rows: |
| if not r["output"] or not r["instruction"]: |
| stats["空字段剔除"] += 1 |
| continue |
| if len(r["instruction"]) + len(r["input"]) + len(r["output"]) > 4000: |
| stats["超长剔除"] += 1 |
| continue |
| key = row_key(r) |
| if key in seen: |
| stats["重复剔除"] += 1 |
| continue |
| seen.add(key) |
| kept.append(r) |
| return kept, stats |
|
|
|
|
| def split_baai_eval(rows, mod=25): |
| """确定性切分: 约1/mod进qa评估集。基于key哈希,可复现、与train无交集""" |
| tr, ev = [], [] |
| for r in rows: |
| (ev if int(row_key(r), 16) % mod == 0 else tr).append(r) |
| return tr, ev |
|
|
|
|
| def remove_overlap(ev_rows, train_rows): |
| """存eval前剔除train已含的(instruction+input),消除跨集泄漏""" |
| tkeys = {row_key(r) for r in train_rows} |
| kept = [r for r in ev_rows if row_key(r) not in tkeys] |
| return kept, len(ev_rows) - len(kept) |
|
|
|
|
| def save(rows, name): |
| path = os.path.join(OUT, name) |
| with open(path, "w", encoding="utf-8") as f: |
| for r in rows: |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") |
| print(f" {name}: {len(rows):,} 条") |
| print(f" agent分布: {dict(Counter(r['agent'] for r in rows))}") |
|
|
|
|
| if __name__ == "__main__": |
| print("[1/2] 处理训练集 ...") |
| train = list(iter_fincuge(os.path.join(RAW, "fincuge_train.jsonl"))) |
| n_fincuge = len(train) |
| baai = list(iter_baai(os.path.join(RAW, "baai_finance_all.jsonl"))) |
| baai_train, baai_eval = split_baai_eval(baai) |
| train += baai_train |
| print(f" 原始: FinCUGE {n_fincuge:,} + BAAI筛后 {len(baai):,}" |
| f"(train {len(baai_train):,} / 切出qa评估 {len(baai_eval):,})") |
| train, stats = dedup_and_filter(train) |
| print(f" 清洗统计: {dict(stats)}") |
| save(train, "finsight_train.jsonl") |
|
|
| print("[2/2] 处理评估集 ...") |
| ev = list(iter_fincuge(os.path.join(RAW, "fincuge_eval.jsonl"))) + baai_eval |
| ev, stats = dedup_and_filter(ev) |
| ev, n_overlap = remove_overlap(ev, train) |
| print(f" 清洗统计: {dict(stats)} 跨集泄漏剔除: {n_overlap}") |
| save(ev, "finsight_eval.jsonl") |
|
|