#!/usr/bin/env python3 # -*- coding: utf-8 -*- import csv import hashlib import json import os import math import re import shutil import sys from collections import Counter, defaultdict from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple ROOT = Path(os.environ.get("ENHANCED_REPLICA_DATA_ROOT", r"F:\codex开发\研究")) ROUTE_ROOT = ROOT / "研究成果" / "实验路线" / "20260408_SOTA路线重构" DATASET_ROOT = ROUTE_ROOT / "data" / "dataset" SRC_HUMAN = ROOT / "data" / "human" SRC_QUARANTINE = ROOT / "data" / "human_quarantine" SRC_RAW_EXTERNAL = ROOT / "data" / "raw_external" NLPCC_DIR = SRC_RAW_EXTERNAL / "NLPCC-2025-Task1-main" / "data" HC3_PLUS_DIR = SRC_RAW_EXTERNAL / "human_longform_20260406" / "HC3_Plus" / "raw" / "files" / "zh" HC3_RAW_FILE = SRC_RAW_EXTERNAL / "human_longform_20260406" / "HC3" / "raw" / "modelscope_zh" / "all.jsonl" QUARANTINE_HC_DIRS = [ SRC_QUARANTINE / "failed_platforms_20260406" / "weibo_quality_v2" / "high_confidence", SRC_QUARANTINE / "failed_platforms_20260406" / "weibo_quality_v3" / "exports" / "high_confidence", ] P2_CLTS_DIR = SRC_RAW_EXTERNAL / "CLTS数据集" P2_ZHIHU_DIR = SRC_RAW_EXTERNAL / "huggingface_zhihu" REQUIRED_FIELDS = ["record_id", "text", "label", "source", "split", "length_char", "topic", "model_slug"] SPLITS = ["train", "dev", "test"] LABEL_MAP = {"human": 0, "ai": 1} MIN_CHAR_LEN = 30 LENGTH_BUCKETS = [ (0, 29, "000_029"), (30, 99, "030_099"), (100, 299, "100_299"), (300, 599, "300_599"), (600, 999, "600_999"), (1000, 1499, "1000_1499"), (1500, 2199, "1500_2199"), (2200, 10**9, "2200_plus"), ] EXCLUSION_LOG_ROWS: List[Dict[str, str]] = [] OVERLAP_AUDIT: Dict[str, Dict[str, int]] = {} def now_str() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def iso_now() -> str: return datetime.now().isoformat(timespec="seconds") def log(msg: str) -> None: print(f"[{now_str()}] {msg}") def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def write_json(path: Path, obj: dict) -> None: ensure_dir(path.parent) with path.open("w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) def write_jsonl(path: Path, records: List[dict]) -> None: ensure_dir(path.parent) with path.open("w", encoding="utf-8", newline="\n") as f: for rec in records: out = {k: rec[k] for k in REQUIRED_FIELDS} f.write(json.dumps(out, ensure_ascii=False) + "\n") def write_csv(path: Path, fieldnames: List[str], rows: List[dict]) -> None: ensure_dir(path.parent) with path.open("w", encoding="utf-8-sig", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for r in rows: writer.writerow(r) def read_text_with_fallback(path: Path) -> str: raw = path.read_bytes() for enc in ("utf-8-sig", "utf-8", "gb18030"): try: return raw.decode(enc) except UnicodeDecodeError: continue return raw.decode("utf-8", errors="replace") def load_json_with_fallback(path: Path): return json.loads(read_text_with_fallback(path)) def iter_jsonl(path: Path): text = read_text_with_fallback(path) for lineno, line in enumerate(text.splitlines(), 1): line = line.strip() if not line: continue try: yield lineno, json.loads(line) except json.JSONDecodeError: EXCLUSION_LOG_ROWS.append( { "dataset_id": "GLOBAL", "stage": "read_jsonl", "source_file": str(path), "record_ref": f"line_{lineno}", "label": "", "drop_reason": "json_decode_error", "raw_length": "", "clean_length": "", "text_hash": "", } ) def normalize_label(label) -> Optional[int]: if label is None: return None if isinstance(label, bool): return int(label) if isinstance(label, int): return 1 if label == 1 else 0 if label == 0 else None s = str(label).strip().lower() if s in {"0", "human", "h"}: return 0 if s in {"1", "ai", "machine", "gpt"}: return 1 return None def text_hash(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def non_ws_len(text: str) -> int: return len(re.sub(r"\s+", "", text)) def length_bucket(length_char: int) -> str: for lo, hi, name in LENGTH_BUCKETS: if lo <= length_char <= hi: return name return "unknown" MD_CODE_BLOCK_RE = re.compile(r"```[\s\S]*?```", flags=re.MULTILINE) MD_INLINE_CODE_RE = re.compile(r"`([^`]+)`") MD_IMG_RE = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)") MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") MD_HEADING_RE = re.compile(r"^\s{0,3}#{1,6}\s*") MD_QUOTE_RE = re.compile(r"^\s{0,3}>\s?") MD_LIST_RE = re.compile(r"^\s{0,3}(?:[-*+]|\d+\.)\s+") def clean_text(text: str) -> str: if text is None: return "" t = str(text) t = t.replace("\r\n", "\n").replace("\r", "\n") t = t.replace("\\n", "\n").replace("\\r", "\n") t = re.sub(r"(?<=\S)\s*/n\s*(?=\S)", "\n", t) t = re.sub(r"^\s*/n\s*$", "", t, flags=re.MULTILINE) t = t.replace("/n/n", "\n\n") t = re.sub( r"(?<=[\u4e00-\u9fffA-Za-z0-9,。!?;:,.!?;:])\s*/n\s*(?=[\u4e00-\u9fffA-Za-z0-9])", "\n", t, ) t = MD_CODE_BLOCK_RE.sub(" ", t) t = MD_INLINE_CODE_RE.sub(r"\1", t) t = MD_IMG_RE.sub(r"\1", t) t = MD_LINK_RE.sub(r"\1", t) lines = [] for line in t.split("\n"): line = MD_HEADING_RE.sub("", line) line = MD_QUOTE_RE.sub("", line) line = MD_LIST_RE.sub("", line) line = line.replace("**", "").replace("__", "") line = line.replace("`", "") lines.append(line.strip()) t = "\n".join(lines) t = t.replace("\u3000", " ") t = re.sub(r"[ \t\f\v]+", " ", t) t = re.sub(r"\n{3,}", "\n\n", t) t = re.sub(r"[ ]+\n", "\n", t) t = re.sub(r"\n[ ]+", "\n", t) return t.strip() def severe_artifact_reason(cleaned_text: str, length_char: int) -> Optional[str]: if not cleaned_text: return "empty_after_clean" if re.fullmatch(r"[\W_]+", cleaned_text): return "non_content_symbols_only" content_chars = re.findall(r"[\u4e00-\u9fffA-Za-z0-9]", cleaned_text) ratio = len(content_chars) / max(len(cleaned_text), 1) if length_char >= MIN_CHAR_LEN and ratio < 0.10: return "severe_artifact_low_content_ratio" compact = re.sub(r"\s+", "", cleaned_text) if compact: most_freq = Counter(compact).most_common(1)[0][1] if len(compact) >= 120 and most_freq / len(compact) > 0.60: return "severe_artifact_repetitive_chars" return None def add_exclusion( dataset_id: str, stage: str, source_file: str, record_ref: str, label, drop_reason: str, raw_length, clean_length, text_hash_value: str = "", ) -> None: EXCLUSION_LOG_ROWS.append( { "dataset_id": dataset_id, "stage": stage, "source_file": source_file, "record_ref": record_ref, "label": "" if label is None else str(label), "drop_reason": drop_reason, "raw_length": "" if raw_length is None else str(raw_length), "clean_length": "" if clean_length is None else str(clean_length), "text_hash": text_hash_value, } ) def build_record( ds_id: str, stage: str, source_file: Path, record_ref: str, raw_text, label: int, source: str, split: str, topic: str = "", model_slug: str = "", seen_hashes: Optional[set] = None, ) -> Optional[dict]: raw_text_str = "" if raw_text is None else str(raw_text) raw_len = non_ws_len(raw_text_str) cleaned = clean_text(raw_text_str) clean_len = non_ws_len(cleaned) if not cleaned: add_exclusion(ds_id, stage, str(source_file), record_ref, label, "empty_text", raw_len, clean_len) return None if clean_len < MIN_CHAR_LEN: add_exclusion(ds_id, stage, str(source_file), record_ref, label, f"too_short_lt_{MIN_CHAR_LEN}", raw_len, clean_len) return None reason = severe_artifact_reason(cleaned, clean_len) if reason: add_exclusion(ds_id, stage, str(source_file), record_ref, label, reason, raw_len, clean_len) return None h = text_hash(cleaned) if seen_hashes is not None: if h in seen_hashes: add_exclusion(ds_id, stage, str(source_file), record_ref, label, "duplicate_text", raw_len, clean_len, h) return None seen_hashes.add(h) return { "record_id": "", "text": cleaned, "label": int(label), "source": source, "split": split, "length_char": clean_len, "topic": "" if topic is None else str(topic), "model_slug": "" if model_slug is None else str(model_slug), "_hash": h, } def split_counts(records: List[dict]) -> Dict[str, int]: c = Counter(r["split"] for r in records) return {k: int(c.get(k, 0)) for k in SPLITS} def label_counts(records: List[dict]) -> Dict[str, int]: c = Counter(r["label"] for r in records) return {"0": int(c.get(0, 0)), "1": int(c.get(1, 0))} def write_distribution_files(ds_dir: Path, records: List[dict]) -> None: label_rows = [] lc = Counter((r["split"], r["label"]) for r in records) for sp in SPLITS: for lb in (0, 1): label_rows.append({"split": sp, "label": lb, "count": lc.get((sp, lb), 0)}) label_rows.append({"split": "all", "label": 0, "count": sum(r["label"] == 0 for r in records)}) label_rows.append({"split": "all", "label": 1, "count": sum(r["label"] == 1 for r in records)}) write_csv(ds_dir / "label_dist.csv", ["split", "label", "count"], label_rows) source_rows = [] sc = Counter((r["split"], r["source"], r["label"]) for r in records) for (sp, src, lb), cnt in sorted(sc.items()): source_rows.append({"split": sp, "source": src, "label": lb, "count": cnt}) write_csv(ds_dir / "source_dist.csv", ["split", "source", "label", "count"], source_rows) length_rows = [] lbc = Counter((r["split"], length_bucket(r["length_char"])) for r in records) for sp in SPLITS: for _, _, b in LENGTH_BUCKETS: length_rows.append({"split": sp, "length_bucket": b, "count": lbc.get((sp, b), 0)}) write_csv(ds_dir / "length_dist.csv", ["split", "length_bucket", "count"], length_rows) def finalize_dataset( ds_id: str, ds_dir: Path, records_by_split: Dict[str, List[dict]], build_note: str, source_inputs: List[str], ) -> dict: ensure_dir(ds_dir) all_records: List[dict] = [] for sp in SPLITS: recs = records_by_split.get(sp, []) for idx, rec in enumerate(recs, 1): rec["split"] = sp rec["record_id"] = f"{ds_id.lower()}_{sp}_{idx:07d}" recs.sort(key=lambda x: x["record_id"]) write_jsonl(ds_dir / f"{sp}.jsonl", recs) all_records.extend(recs) # Distribution stats are now covered comprehensively in 审阅报告.md; # skip generating intermediate CSVs to keep dataset directories clean. # write_distribution_files(ds_dir, all_records) manifest = { "dataset_id": ds_id, "created_at": iso_now(), "schema_fields": REQUIRED_FIELDS, "label_mapping": LABEL_MAP, "build_note": build_note, "source_inputs": source_inputs, "record_count_total": len(all_records), "record_count_by_split": split_counts(all_records), "record_count_by_label": label_counts(all_records), "source_set": sorted({r["source"] for r in all_records}), } write_json(ds_dir / "manifest.json", manifest) return manifest def stratified_split(records: List[dict], ratios: Tuple[float, float, float], strata_fn) -> Dict[str, List[dict]]: if not records: return {"train": [], "dev": [], "test": []} train_ratio, dev_ratio, test_ratio = ratios if not math.isclose(train_ratio + dev_ratio + test_ratio, 1.0, rel_tol=1e-9): raise ValueError("ratios must sum to 1.0") grouped = defaultdict(list) for rec in records: grouped[strata_fn(rec)].append(rec) out = {"train": [], "dev": [], "test": []} for _, group in grouped.items(): group = sorted(group, key=lambda x: x["_hash"]) n = len(group) n_train = int(n * train_ratio) n_dev = int(n * dev_ratio) n_test = n - n_train - n_dev out["train"].extend(group[:n_train]) out["dev"].extend(group[n_train : n_train + n_dev]) out["test"].extend(group[n_train + n_dev : n_train + n_dev + n_test]) for sp in SPLITS: out[sp].sort(key=lambda x: x["_hash"]) return out def collect_source_inventory(source_roots: Dict[str, Path]) -> dict: inventory = {"generated_at": iso_now(), "roots": {}, "global_digest": ""} global_hasher = hashlib.sha256() for name, base in source_roots.items(): if not base.exists(): inventory["roots"][name] = { "path": str(base), "exists": False, "file_count": 0, "total_size": 0, "mtime_min_ns": None, "mtime_max_ns": None, "digest": "", } continue file_count = 0 total_size = 0 mtime_min = None mtime_max = None local_hasher = hashlib.sha256() for p in sorted(base.rglob("*")): if not p.is_file(): continue st = p.stat() rel = str(p.relative_to(base)).replace("\\", "/") signature = f"{name}|{rel}|{st.st_size}|{st.st_mtime_ns}" local_hasher.update(signature.encode("utf-8")) global_hasher.update(signature.encode("utf-8")) file_count += 1 total_size += int(st.st_size) mtime_min = st.st_mtime_ns if mtime_min is None else min(mtime_min, st.st_mtime_ns) mtime_max = st.st_mtime_ns if mtime_max is None else max(mtime_max, st.st_mtime_ns) inventory["roots"][name] = { "path": str(base), "exists": True, "file_count": file_count, "total_size": total_size, "mtime_min_ns": mtime_min, "mtime_max_ns": mtime_max, "digest": local_hasher.hexdigest(), } inventory["global_digest"] = global_hasher.hexdigest() return inventory def extract_text_from_obj(obj: dict) -> str: if not isinstance(obj, dict): return "" for key in ("text", "content", "body", "answer", "article", "message"): v = obj.get(key) if isinstance(v, str) and v.strip(): return v return "" def iter_dataset_records(ds_dir: Path): for sp in SPLITS: fp = ds_dir / f"{sp}.jsonl" if not fp.exists(): continue for _, rec in iter_jsonl(fp): rec["split"] = sp yield rec def coerce_answer_text(ans) -> str: if isinstance(ans, str): return ans if isinstance(ans, dict): for k in ("text", "answer", "content", "body"): v = ans.get(k) if isinstance(v, str) and v.strip(): return v if isinstance(ans, list): return "\n".join(str(x) for x in ans if str(x).strip()) return str(ans) if ans is not None else "" def build_ds01_nlpcc() -> Tuple[Path, dict, set]: ds_id = "DS01_NLPCC_core_v1" ds_dir = DATASET_ROOT / "10_P0_主线必用" / ds_id log(f"Building {ds_id}") files = { "train": NLPCC_DIR / "train.json", "dev": NLPCC_DIR / "dev.json", "test": NLPCC_DIR / "test_with_label.json", } seen_hashes = set() records_by_split = {"train": [], "dev": [], "test": []} for sp, fp in files.items(): rows = load_json_with_fallback(fp) for idx, row in enumerate(rows): lb = normalize_label(row.get("label")) if lb is None: add_exclusion(ds_id, "parse_nlpcc", str(fp), f"{sp}_{idx}", None, "invalid_label", "", "", "") continue rec = build_record( ds_id=ds_id, stage="clean_nlpcc", source_file=fp, record_ref=f"{sp}_{idx}", raw_text=row.get("text", ""), label=lb, source="nlpcc", split=sp, topic=row.get("source", ""), model_slug=row.get("model", ""), seen_hashes=seen_hashes, ) if rec is not None: records_by_split[sp].append(rec) manifest = finalize_dataset( ds_id, ds_dir, records_by_split, build_note="NLPCC official train/dev/test_with_label with unified cleaning + dedupe.", source_inputs=[str(files["train"]), str(files["dev"]), str(files["test"])], ) return ds_dir, manifest, seen_hashes def build_ds02_hc3plus() -> Tuple[Path, dict]: ds_id = "DS02_HC3Plus_clean_v1" ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id log(f"Building {ds_id}") file_map = { "train": [HC3_PLUS_DIR / "train.jsonl"], "dev": [HC3_PLUS_DIR / "val_hc3_QA.jsonl", HC3_PLUS_DIR / "val_hc3_si.jsonl"], "test": [HC3_PLUS_DIR / "test_hc3_QA.jsonl", HC3_PLUS_DIR / "test_hc3_si.jsonl"], } seen_hashes = set() records_by_split = {"train": [], "dev": [], "test": []} for sp, fplist in file_map.items(): for fp in fplist: for lineno, row in iter_jsonl(fp): lb = normalize_label(row.get("label")) if lb is None: add_exclusion(ds_id, "parse_hc3_plus", str(fp), f"{lineno}", None, "invalid_label", "", "", "") continue rec = build_record( ds_id=ds_id, stage="clean_hc3_plus", source_file=fp, record_ref=f"line_{lineno}", raw_text=row.get("text", ""), label=lb, source="hc3_plus", split=sp, topic=fp.stem, model_slug="", seen_hashes=seen_hashes, ) if rec is not None: records_by_split[sp].append(rec) manifest = finalize_dataset( ds_id, ds_dir, records_by_split, build_note="HC3_Plus train + merged val->dev + merged test with unified cleaning + dedupe.", source_inputs=[str(x) for arr in file_map.values() for x in arr], ) return ds_dir, manifest def build_ds03_hc3raw_expanded() -> Tuple[Path, dict]: ds_id = "DS03_HC3raw_expanded_v1" ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id log(f"Building {ds_id}") seen_hashes = set() staged_records: List[dict] = [] for lineno, row in iter_jsonl(HC3_RAW_FILE): topic = row.get("source", "") human_answers = row.get("human_answers") or [] ai_answers = row.get("chatgpt_answers") or [] if not isinstance(human_answers, list): human_answers = [human_answers] if not isinstance(ai_answers, list): ai_answers = [ai_answers] for idx, ans in enumerate(human_answers): rec = build_record( ds_id=ds_id, stage="expand_hc3_raw_human", source_file=HC3_RAW_FILE, record_ref=f"line_{lineno}_h_{idx}", raw_text=coerce_answer_text(ans), label=0, source="hc3_raw", split="train", topic=topic, model_slug="", seen_hashes=seen_hashes, ) if rec is not None: rec["_stratum"] = f"{topic}|0" staged_records.append(rec) for idx, ans in enumerate(ai_answers): rec = build_record( ds_id=ds_id, stage="expand_hc3_raw_ai", source_file=HC3_RAW_FILE, record_ref=f"line_{lineno}_a_{idx}", raw_text=coerce_answer_text(ans), label=1, source="hc3_raw", split="train", topic=topic, model_slug="chatgpt", seen_hashes=seen_hashes, ) if rec is not None: rec["_stratum"] = f"{topic}|1" staged_records.append(rec) split_map = stratified_split(staged_records, (0.8, 0.1, 0.1), strata_fn=lambda r: r["_stratum"]) for sp in SPLITS: for rec in split_map[sp]: rec["split"] = sp rec.pop("_stratum", None) manifest = finalize_dataset( ds_id, ds_dir, split_map, build_note="Expanded HC3 raw answers (human_answers/chatgpt_answers), then stratified hash split 80/10/10.", source_inputs=[str(HC3_RAW_FILE)], ) return ds_dir, manifest def build_ds04_human_pools_merged() -> Tuple[Path, dict]: ds_id = "DS04_Human_pools_merged_v1" ds_dir = DATASET_ROOT / "30_P2_单标签原料池" / ds_id log(f"Building {ds_id}") seen_hashes = set() staged_records: List[dict] = [] for fp in sorted(SRC_HUMAN.rglob("*.json")): try: row = load_json_with_fallback(fp) except Exception: add_exclusion(ds_id, "read_human", str(fp), "", 0, "json_decode_error", "", "", "") continue text = extract_text_from_obj(row) rel = fp.relative_to(SRC_HUMAN) top = rel.parts[0] if rel.parts else "unknown" rec = build_record( ds_id=ds_id, stage="clean_human", source_file=fp, record_ref=str(rel).replace("\\", "/"), raw_text=text, label=0, source="human", split="train", topic=row.get("topic", top) if isinstance(row, dict) else top, model_slug="", seen_hashes=seen_hashes, ) if rec is not None: rec["_stratum"] = f"{top}|0" staged_records.append(rec) for base in QUARANTINE_HC_DIRS: version = "v2" if "v2" in str(base).lower() else "v3" for fp in sorted(base.rglob("*.json")): try: row = load_json_with_fallback(fp) except Exception: add_exclusion(ds_id, "read_quarantine", str(fp), "", 0, "json_decode_error", "", "", "") continue text = extract_text_from_obj(row) rel = fp.relative_to(base) rec = build_record( ds_id=ds_id, stage="clean_quarantine_hc", source_file=fp, record_ref=f"{version}/{str(rel).replace('\\', '/')}", raw_text=text, label=0, source="quarantine_hc", split="train", topic=row.get("topic", version) if isinstance(row, dict) else version, model_slug="", seen_hashes=seen_hashes, ) if rec is not None: rec["_stratum"] = f"{version}|0" staged_records.append(rec) split_map: Dict[str, List[dict]] = {"train": [], "dev": [], "test": []} for rec in staged_records: rec["split"] = "train" rec.pop("_stratum", None) split_map["train"].append(rec) manifest = finalize_dataset( ds_id, ds_dir, split_map, build_note="Merged human pool from data/human (core) + quarantine v2+v3 high_confidence. All records go to train (pure single-label pool).", source_inputs=[str(SRC_HUMAN)] + [str(x) for x in QUARANTINE_HC_DIRS], ) return ds_dir, manifest def balanced_round_robin_select(records: List[dict], n: int, key: str = "source") -> List[dict]: if n <= 0: return [] grouped = defaultdict(list) for r in records: grouped[r.get(key, "unknown")].append(r) keys = sorted(grouped.keys()) for k in keys: grouped[k].sort(key=lambda x: x["_hash"]) selected = [] i = 0 while len(selected) < n: progressed = False for k in keys: if i < len(grouped[k]): selected.append(grouped[k][i]) progressed = True if len(selected) >= n: break if not progressed: break i += 1 return selected[:n] def collect_pool_from_datasets(ds_dirs: List[Path], desired_label: int) -> List[dict]: pool = [] for ds_dir in ds_dirs: for rec in iter_dataset_records(ds_dir): if rec.get("label") == desired_label: rec["_hash"] = text_hash(rec["text"]) pool.append(rec) return pool def build_ds06_external_core_balanced( ds01_hashes: set, ds04_dir: Path, ds11_dir: Path, ) -> Tuple[Path, dict]: ds_id = "DS06_External_core_balanced_v1" max_len = 1500 ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id log(f"Building {ds_id}") human_pool_raw = collect_pool_from_datasets([ds04_dir], desired_label=0) ai_pool_raw = collect_pool_from_datasets([ds11_dir], desired_label=1) def filter_pool(pool: List[dict], label: int) -> List[dict]: filtered = [] local_seen = set() overlap_drop = 0 dedupe_drop = 0 range_drop = 0 for rec in pool: h = rec["_hash"] lch = rec["length_char"] if lch < 100 or lch > max_len: range_drop += 1 add_exclusion(ds_id, "filter_len", rec.get("source", ""), rec.get("record_id", ""), label, f"length_out_of_range_100_{max_len}", lch, lch, h) continue if h in ds01_hashes: overlap_drop += 1 add_exclusion(ds_id, "overlap_filter", rec.get("source", ""), rec.get("record_id", ""), label, "overlap_with_ds01", lch, lch, h) continue if h in local_seen: dedupe_drop += 1 add_exclusion(ds_id, "dedupe_pool", rec.get("source", ""), rec.get("record_id", ""), label, "duplicate_text_pool", lch, lch, h) continue local_seen.add(h) filtered.append(rec) OVERLAP_AUDIT.setdefault(ds_id, {}) OVERLAP_AUDIT[ds_id][f"label_{label}_raw"] = len(pool) OVERLAP_AUDIT[ds_id][f"label_{label}_after_filter"] = len(filtered) OVERLAP_AUDIT[ds_id][f"label_{label}_drop_overlap_ds01"] = overlap_drop OVERLAP_AUDIT[ds_id][f"label_{label}_drop_duplicate_pool"] = dedupe_drop OVERLAP_AUDIT[ds_id][f"label_{label}_drop_length"] = range_drop return filtered human_pool = filter_pool(human_pool_raw, 0) ai_pool = filter_pool(ai_pool_raw, 1) n = min(len(human_pool), len(ai_pool)) if n <= 0: raise RuntimeError(f"{ds_id}: no balanced data available after filtering") combined = balanced_round_robin_select(human_pool, n, "source") + balanced_round_robin_select(ai_pool, n, "source") final = [] seen_final = {} conflict_drop = 0 for rec in sorted(combined, key=lambda x: x["_hash"]): h = rec["_hash"] lb = rec["label"] if h in seen_final and seen_final[h] != lb: conflict_drop += 1 add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "cross_label_conflict_hash", rec["length_char"], rec["length_char"], h) continue if h in seen_final: add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "duplicate_text_final", rec["length_char"], rec["length_char"], h) continue seen_final[h] = lb final.append(rec) OVERLAP_AUDIT[ds_id]["balanced_n_before_conflict"] = 2 * n OVERLAP_AUDIT[ds_id]["drop_cross_label_conflict"] = conflict_drop OVERLAP_AUDIT[ds_id]["final_n"] = len(final) split_map = stratified_split(final, (0.6, 0.2, 0.2), strata_fn=lambda r: f"{r['source']}|{r['label']}") for sp in SPLITS: for rec in split_map[sp]: rec["split"] = sp manifest = finalize_dataset( ds_id, ds_dir, split_map, build_note=f"External balanced dataset from DS04 (human) and DS11 (generated ai standard), length 100-{max_len}, leakage-filtered vs DS01, stratified split 60/20/20.", source_inputs=[str(ds04_dir), str(ds11_dir)], ) return ds_dir, manifest def build_ds07_external_long( ds01_hashes: set, ds04_dir: Path, ds12_dir: Path, ) -> Tuple[Path, dict]: ds_id = "DS07_External_long_v1" max_len = 2200 ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id log(f"Building {ds_id}") human_pool_raw = collect_pool_from_datasets([ds04_dir], desired_label=0) ai_pool_raw = collect_pool_from_datasets([ds12_dir], desired_label=1) def filter_pool(pool: List[dict], label: int) -> List[dict]: filtered = [] local_seen = set() overlap_drop = 0 dedupe_drop = 0 range_drop = 0 for rec in pool: h = rec["_hash"] lch = rec["length_char"] if lch < 100 or lch > max_len: range_drop += 1 add_exclusion(ds_id, "filter_len", rec.get("source", ""), rec.get("record_id", ""), label, f"length_out_of_range_100_{max_len}", lch, lch, h) continue if h in ds01_hashes: overlap_drop += 1 add_exclusion(ds_id, "overlap_filter", rec.get("source", ""), rec.get("record_id", ""), label, "overlap_with_ds01", lch, lch, h) continue if h in local_seen: dedupe_drop += 1 add_exclusion(ds_id, "dedupe_pool", rec.get("source", ""), rec.get("record_id", ""), label, "duplicate_text_pool", lch, lch, h) continue local_seen.add(h) filtered.append(rec) OVERLAP_AUDIT.setdefault(ds_id, {}) OVERLAP_AUDIT[ds_id][f"label_{label}_raw"] = len(pool) OVERLAP_AUDIT[ds_id][f"label_{label}_after_filter"] = len(filtered) OVERLAP_AUDIT[ds_id][f"label_{label}_drop_overlap_ds01"] = overlap_drop OVERLAP_AUDIT[ds_id][f"label_{label}_drop_duplicate_pool"] = dedupe_drop OVERLAP_AUDIT[ds_id][f"label_{label}_drop_length"] = range_drop return filtered human_pool = filter_pool(human_pool_raw, 0) ai_pool = filter_pool(ai_pool_raw, 1) n = min(len(human_pool), len(ai_pool)) if n <= 0: raise RuntimeError(f"{ds_id}: no balanced data available after filtering") combined = balanced_round_robin_select(human_pool, n, "source") + balanced_round_robin_select(ai_pool, n, "source") final = [] seen_final = {} conflict_drop = 0 for rec in sorted(combined, key=lambda x: x["_hash"]): h = rec["_hash"] lb = rec["label"] if h in seen_final and seen_final[h] != lb: conflict_drop += 1 add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "cross_label_conflict_hash", rec["length_char"], rec["length_char"], h) continue if h in seen_final: add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "duplicate_text_final", rec["length_char"], rec["length_char"], h) continue seen_final[h] = lb final.append(rec) OVERLAP_AUDIT[ds_id]["balanced_n_before_conflict"] = 2 * n OVERLAP_AUDIT[ds_id]["drop_cross_label_conflict"] = conflict_drop OVERLAP_AUDIT[ds_id]["final_n"] = len(final) split_map = stratified_split(final, (0.6, 0.2, 0.2), strata_fn=lambda r: f"{r['source']}|{r['label']}") for sp in SPLITS: for rec in split_map[sp]: rec["split"] = sp manifest = finalize_dataset( ds_id, ds_dir, split_map, build_note=f"External balanced dataset from DS04 (human) and DS12 (generated ai natural), length 100-{max_len}, leakage-filtered vs DS01, stratified split 60/20/20.", source_inputs=[str(ds04_dir), str(ds12_dir)], ) return ds_dir, manifest def build_p2_candidate_audit() -> dict: p2_dir = DATASET_ROOT / "30_P2_候选待验证" ensure_dir(p2_dir) def scan_dir(path: Path) -> dict: by_ext = Counter() total_size = 0 files = [] if path.exists(): for fp in sorted(path.rglob("*")): if fp.is_file(): ext = fp.suffix.lower() if fp.suffix else "" by_ext[ext] += 1 st = fp.stat() total_size += int(st.st_size) files.append(str(fp)) return { "path": str(path), "exists": path.exists(), "file_count": len(files), "total_size": total_size, "ext_dist": dict(sorted(by_ext.items())), "sample_files": files[:200], } clts_info = scan_dir(P2_CLTS_DIR) zhihu_info = scan_dir(P2_ZHIHU_DIR) audit = { "generated_at": iso_now(), "policy": "P2 candidate only; not included in DS01-DS07 trainable main schema this round.", "candidates": { "CLTS数据集": {**clts_info, "why_not_direct_train": "parallel src/tgt corpus; no direct human-vs-ai detection labels"}, "huggingface_zhihu": {**zhihu_info, "why_not_direct_train": "metadata/inspection-focused files; supervised label semantics need redesign"}, }, } write_json(p2_dir / "candidate_audit.json", audit) write_csv( p2_dir / "candidate_summary.csv", ["candidate", "file_count", "total_size", "why_not_direct_train"], [ { "candidate": "CLTS数据集", "file_count": clts_info["file_count"], "total_size": clts_info["total_size"], "why_not_direct_train": "parallel corpus; no direct detection labels", }, { "candidate": "huggingface_zhihu", "file_count": zhihu_info["file_count"], "total_size": zhihu_info["total_size"], "why_not_direct_train": "metadata/inspection-first; label semantics pending", }, ], ) return audit def write_contract_files() -> None: cdir = DATASET_ROOT / "00_contract" ensure_dir(cdir) schema = { "title": "Unified Main Schema for 20260408 SOTA Route", "type": "object", "required": REQUIRED_FIELDS, "properties": { "record_id": {"type": "string"}, "text": {"type": "string"}, "label": {"type": "integer", "enum": [0, 1]}, "source": {"type": "string"}, "split": {"type": "string", "enum": ["train", "dev", "test"]}, "length_char": {"type": "integer", "minimum": 0}, "topic": {"type": "string"}, "model_slug": {"type": "string"}, }, } write_json(cdir / "main_schema.json", schema) write_json(cdir / "label_mapping.json", LABEL_MAP) md = """# 主 Schema 合同(统一训练格式) ## 字段 - `record_id`:字符串唯一ID - `text`:清洗后的正文 - `label`:整数标签(human=0, ai=1) - `source`:数据族来源(如 `nlpcc/hc3_plus/hc3_raw/human/quarantine_hc`) - `split`:`train/dev/test` - `length_char`:去空白字符后的长度 - `topic`:主题字段(可空字符串) - `model_slug`:AI来源模型(可空字符串) ## 全局规则 - 编码回退:`utf-8-sig -> utf-8 -> gb18030` - 清洗:换行统一、空白压缩、`\\n`与明显`/n`噪声修正、Markdown去痕 - 质量过滤:空文本、`length_char < 30`、严重伪装文本、重复文本剔除 - 标签映射固定:`human=0, ai=1` """ (cdir / "schema_contract.md").write_text(md, encoding="utf-8") def write_experiment_views(ds_paths: Dict[str, Path]) -> None: edir = DATASET_ROOT / "40_experiment_views" ensure_dir(edir) mapping = { "E00": {"use": ["DS01"], "detail": "Schema check + distribution"}, "E01": {"use": ["DS01"], "detail": "BERT baseline train/dev"}, "E02": {"use": ["DS01"], "detail": "RoBERTa baseline train/dev"}, "E03": {"use": ["DS01"], "detail": "Rule module on dev/test"}, "E04": {"use": ["DS01"], "detail": "Static weighted vote"}, "E05": {"use": ["DS01"], "detail": "Determinative override"}, "E06": {"use": ["DS01"], "detail": "Dynamic vote by buckets"}, "E07": {"use": ["DS01"], "detail": "Joint features MLP"}, "E08": {"use": ["DS01", "DS02", "DS03"], "detail": "Full chain rerun / optional aug"}, "E09": {"use": ["DS01", "DS06"], "detail": "Train on DS01, transfer eval on DS06"}, "E10": {"use": ["DS01", "DS06"], "detail": "Calibrate with DS01 dev + DS06 dev/test"}, "E11": {"use": ["DS01", "DS06"], "detail": "Bucket threshold on DS06"}, "E12": {"use": ["DS01", "DS06", "DS07"], "detail": "Zero-shot branch"}, "E13": {"use": ["DS01", "DS06", "DS07"], "detail": "Zero-shot branch"}, "E14": {"use": ["DS01", "DS06", "DS07"], "detail": "LLM LoRA branch"}, "E15": {"use": ["DS01", "DS06", "DS07"], "detail": "Combo search"}, } resolved = {} rows = [] for exp, item in mapping.items(): resolved[exp] = {"detail": item["detail"], "datasets": {}} for ds in item["use"]: p = ds_paths[ds] resolved[exp]["datasets"][ds] = { "dataset_dir": str(p), "train": str(p / "train.jsonl"), "dev": str(p / "dev.jsonl"), "test": str(p / "test.jsonl"), } rows.append( { "experiment_id": exp, "dataset_id": ds, "dataset_dir": str(p), "train_jsonl": str(p / "train.jsonl"), "dev_jsonl": str(p / "dev.jsonl"), "test_jsonl": str(p / "test.jsonl"), } ) write_json(edir / "experiment_views.json", {"generated_at": iso_now(), "views": resolved}) write_csv(edir / "E00_E15_dataset_mapping.csv", ["experiment_id", "dataset_id", "dataset_dir", "train_jsonl", "dev_jsonl", "test_jsonl"], rows) def validate_schema_sample(ds_paths: Dict[str, Path]) -> dict: report = {"generated_at": iso_now(), "datasets": {}} for ds, p in ds_paths.items(): checked = 0 invalid = 0 examples = [] for sp in SPLITS: fp = p / f"{sp}.jsonl" if not fp.exists(): continue for _, rec in iter_jsonl(fp): checked += 1 ok = True for k in REQUIRED_FIELDS: if k not in rec: ok = False if len(examples) < 10: examples.append(f"{sp}:missing:{k}") if rec.get("label") not in (0, 1): ok = False if len(examples) < 10: examples.append(f"{sp}:bad_label:{rec.get('label')}") if rec.get("split") not in ("train", "dev", "test"): ok = False if len(examples) < 10: examples.append(f"{sp}:bad_split:{rec.get('split')}") if not isinstance(rec.get("length_char"), int): ok = False if len(examples) < 10: examples.append(f"{sp}:bad_length_type:{type(rec.get('length_char')).__name__}") if not ok: invalid += 1 if checked >= 1000: break if checked >= 1000: break report["datasets"][ds] = { "checked_records": checked, "invalid_records": invalid, "valid": invalid == 0, "example_errors": examples, } return report def write_exclusion_log() -> None: mdir = DATASET_ROOT / "90_manifests" ensure_dir(mdir) fieldnames = ["dataset_id", "stage", "source_file", "record_ref", "label", "drop_reason", "raw_length", "clean_length", "text_hash"] write_csv(mdir / "exclusion_log.csv", fieldnames, EXCLUSION_LOG_ROWS) reason_counter = Counter(r["drop_reason"] for r in EXCLUSION_LOG_ROWS) rows = [{"drop_reason": k, "count": v} for k, v in sorted(reason_counter.items(), key=lambda x: (-x[1], x[0]))] write_csv(mdir / "exclusion_reason_stats.csv", ["drop_reason", "count"], rows) def main() -> int: source_roots = { "human": SRC_HUMAN, "human_quarantine": SRC_QUARANTINE, "raw_external": SRC_RAW_EXTERNAL, } if DATASET_ROOT.exists(): log(f"Removing old dataset directory: {DATASET_ROOT}") shutil.rmtree(DATASET_ROOT) ensure_dir(DATASET_ROOT) ensure_dir(DATASET_ROOT / "90_manifests") log("Collecting source inventory (before)...") inv_before = collect_source_inventory(source_roots) write_json(DATASET_ROOT / "90_manifests" / "source_inventory_before.json", inv_before) log("Writing dataset contract files...") write_contract_files() ds_paths: Dict[str, Path] = {} dataset_manifests = {} ds01_dir, ds01_manifest, ds01_hashes = build_ds01_nlpcc() ds_paths["DS01"] = ds01_dir dataset_manifests["DS01"] = ds01_manifest ds02_dir, ds02_manifest = build_ds02_hc3plus() ds_paths["DS02"] = ds02_dir dataset_manifests["DS02"] = ds02_manifest ds03_dir, ds03_manifest = build_ds03_hc3raw_expanded() ds_paths["DS03"] = ds03_dir dataset_manifests["DS03"] = ds03_manifest ds04_dir, ds04_manifest = build_ds04_human_pools_merged() ds_paths["DS04"] = ds04_dir dataset_manifests["DS04"] = ds04_manifest ds11_dir = DATASET_ROOT / "30_P2_单标签原料池" / "DS11_Generated_AI_v1" ds12_dir = DATASET_ROOT / "30_P2_单标签原料池" / "DS12_Generated_AI_natural_v1" ds06_dir, ds06_manifest = build_ds06_external_core_balanced( ds01_hashes=ds01_hashes, ds04_dir=ds04_dir, ds11_dir=ds11_dir, ) ds_paths["DS06"] = ds06_dir dataset_manifests["DS06"] = ds06_manifest ds07_dir, ds07_manifest = build_ds07_external_long( ds01_hashes=ds01_hashes, ds04_dir=ds04_dir, ds12_dir=ds12_dir, ) ds_paths["DS07"] = ds07_dir dataset_manifests["DS07"] = ds07_manifest log("Building P2 candidate audit...") p2_audit = build_p2_candidate_audit() log("Writing experiment views...") write_experiment_views(ds_paths) log("Writing manifests + validation reports...") write_json(DATASET_ROOT / "90_manifests" / "dataset_manifests.json", dataset_manifests) write_json(DATASET_ROOT / "90_manifests" / "overlap_audit.json", OVERLAP_AUDIT) write_json(DATASET_ROOT / "90_manifests" / "p2_candidate_audit_copy.json", p2_audit) schema_report = validate_schema_sample(ds_paths) write_json(DATASET_ROOT / "90_manifests" / "schema_validation_report.json", schema_report) write_exclusion_log() log("Collecting source inventory (after)...") inv_after = collect_source_inventory(source_roots) write_json(DATASET_ROOT / "90_manifests" / "source_inventory_after.json", inv_after) compare = { "generated_at": iso_now(), "before_digest": inv_before.get("global_digest"), "after_digest": inv_after.get("global_digest"), "same_digest": inv_before.get("global_digest") == inv_after.get("global_digest"), "before": inv_before, "after": inv_after, } write_json(DATASET_ROOT / "90_manifests" / "source_inventory_compare.json", compare) if not compare["same_digest"]: log("ERROR: source inventory changed during build. Stopping.") return 2 summary = { "generated_at": iso_now(), "dataset_root": str(DATASET_ROOT), "datasets": {k: {"path": str(v), "manifest": dataset_manifests.get(k, {})} for k, v in ds_paths.items()}, "overlap_audit_file": str(DATASET_ROOT / "90_manifests" / "overlap_audit.json"), "exclusion_log_file": str(DATASET_ROOT / "90_manifests" / "exclusion_log.csv"), "schema_validation_report_file": str(DATASET_ROOT / "90_manifests" / "schema_validation_report.json"), } write_json(DATASET_ROOT / "90_manifests" / "build_summary.json", summary) log("Dataset build completed successfully.") return 0 if __name__ == "__main__": sys.exit(main())