Text Classification
Transformers
Safetensors
Chinese
chinese
ai-text-detection
ensemble
bert
roberta
qwen
lora
research
dataset
Instructions to use LUCIFerace/enhanced-replica-model-pack with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use LUCIFerace/enhanced-replica-model-pack with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-classification", model="LUCIFerace/enhanced-replica-model-pack")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("LUCIFerace/enhanced-replica-model-pack", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import csv | |
| import hashlib | |
| import json | |
| import os | |
| import math | |
| import re | |
| import shutil | |
| import sys | |
| from collections import Counter, defaultdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| ROOT = Path(os.environ.get("ENHANCED_REPLICA_DATA_ROOT", r"F:\codex开发\研究")) | |
| ROUTE_ROOT = ROOT / "研究成果" / "实验路线" / "20260408_SOTA路线重构" | |
| DATASET_ROOT = ROUTE_ROOT / "data" / "dataset" | |
| SRC_HUMAN = ROOT / "data" / "human" | |
| SRC_QUARANTINE = ROOT / "data" / "human_quarantine" | |
| SRC_RAW_EXTERNAL = ROOT / "data" / "raw_external" | |
| NLPCC_DIR = SRC_RAW_EXTERNAL / "NLPCC-2025-Task1-main" / "data" | |
| HC3_PLUS_DIR = SRC_RAW_EXTERNAL / "human_longform_20260406" / "HC3_Plus" / "raw" / "files" / "zh" | |
| HC3_RAW_FILE = SRC_RAW_EXTERNAL / "human_longform_20260406" / "HC3" / "raw" / "modelscope_zh" / "all.jsonl" | |
| QUARANTINE_HC_DIRS = [ | |
| SRC_QUARANTINE / "failed_platforms_20260406" / "weibo_quality_v2" / "high_confidence", | |
| SRC_QUARANTINE / "failed_platforms_20260406" / "weibo_quality_v3" / "exports" / "high_confidence", | |
| ] | |
| P2_CLTS_DIR = SRC_RAW_EXTERNAL / "CLTS数据集" | |
| P2_ZHIHU_DIR = SRC_RAW_EXTERNAL / "huggingface_zhihu" | |
| REQUIRED_FIELDS = ["record_id", "text", "label", "source", "split", "length_char", "topic", "model_slug"] | |
| SPLITS = ["train", "dev", "test"] | |
| LABEL_MAP = {"human": 0, "ai": 1} | |
| MIN_CHAR_LEN = 30 | |
| LENGTH_BUCKETS = [ | |
| (0, 29, "000_029"), | |
| (30, 99, "030_099"), | |
| (100, 299, "100_299"), | |
| (300, 599, "300_599"), | |
| (600, 999, "600_999"), | |
| (1000, 1499, "1000_1499"), | |
| (1500, 2199, "1500_2199"), | |
| (2200, 10**9, "2200_plus"), | |
| ] | |
| EXCLUSION_LOG_ROWS: List[Dict[str, str]] = [] | |
| OVERLAP_AUDIT: Dict[str, Dict[str, int]] = {} | |
| def now_str() -> str: | |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| def iso_now() -> str: | |
| return datetime.now().isoformat(timespec="seconds") | |
| def log(msg: str) -> None: | |
| print(f"[{now_str()}] {msg}") | |
| def ensure_dir(path: Path) -> None: | |
| path.mkdir(parents=True, exist_ok=True) | |
| def write_json(path: Path, obj: dict) -> None: | |
| ensure_dir(path.parent) | |
| with path.open("w", encoding="utf-8") as f: | |
| json.dump(obj, f, ensure_ascii=False, indent=2) | |
| def write_jsonl(path: Path, records: List[dict]) -> None: | |
| ensure_dir(path.parent) | |
| with path.open("w", encoding="utf-8", newline="\n") as f: | |
| for rec in records: | |
| out = {k: rec[k] for k in REQUIRED_FIELDS} | |
| f.write(json.dumps(out, ensure_ascii=False) + "\n") | |
| def write_csv(path: Path, fieldnames: List[str], rows: List[dict]) -> None: | |
| ensure_dir(path.parent) | |
| with path.open("w", encoding="utf-8-sig", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| for r in rows: | |
| writer.writerow(r) | |
| def read_text_with_fallback(path: Path) -> str: | |
| raw = path.read_bytes() | |
| for enc in ("utf-8-sig", "utf-8", "gb18030"): | |
| try: | |
| return raw.decode(enc) | |
| except UnicodeDecodeError: | |
| continue | |
| return raw.decode("utf-8", errors="replace") | |
| def load_json_with_fallback(path: Path): | |
| return json.loads(read_text_with_fallback(path)) | |
| def iter_jsonl(path: Path): | |
| text = read_text_with_fallback(path) | |
| for lineno, line in enumerate(text.splitlines(), 1): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| yield lineno, json.loads(line) | |
| except json.JSONDecodeError: | |
| EXCLUSION_LOG_ROWS.append( | |
| { | |
| "dataset_id": "GLOBAL", | |
| "stage": "read_jsonl", | |
| "source_file": str(path), | |
| "record_ref": f"line_{lineno}", | |
| "label": "", | |
| "drop_reason": "json_decode_error", | |
| "raw_length": "", | |
| "clean_length": "", | |
| "text_hash": "", | |
| } | |
| ) | |
| def normalize_label(label) -> Optional[int]: | |
| if label is None: | |
| return None | |
| if isinstance(label, bool): | |
| return int(label) | |
| if isinstance(label, int): | |
| return 1 if label == 1 else 0 if label == 0 else None | |
| s = str(label).strip().lower() | |
| if s in {"0", "human", "h"}: | |
| return 0 | |
| if s in {"1", "ai", "machine", "gpt"}: | |
| return 1 | |
| return None | |
| def text_hash(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def non_ws_len(text: str) -> int: | |
| return len(re.sub(r"\s+", "", text)) | |
| def length_bucket(length_char: int) -> str: | |
| for lo, hi, name in LENGTH_BUCKETS: | |
| if lo <= length_char <= hi: | |
| return name | |
| return "unknown" | |
| MD_CODE_BLOCK_RE = re.compile(r"```[\s\S]*?```", flags=re.MULTILINE) | |
| MD_INLINE_CODE_RE = re.compile(r"`([^`]+)`") | |
| MD_IMG_RE = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)") | |
| MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)") | |
| MD_HEADING_RE = re.compile(r"^\s{0,3}#{1,6}\s*") | |
| MD_QUOTE_RE = re.compile(r"^\s{0,3}>\s?") | |
| MD_LIST_RE = re.compile(r"^\s{0,3}(?:[-*+]|\d+\.)\s+") | |
| def clean_text(text: str) -> str: | |
| if text is None: | |
| return "" | |
| t = str(text) | |
| t = t.replace("\r\n", "\n").replace("\r", "\n") | |
| t = t.replace("\\n", "\n").replace("\\r", "\n") | |
| t = re.sub(r"(?<=\S)\s*/n\s*(?=\S)", "\n", t) | |
| t = re.sub(r"^\s*/n\s*$", "", t, flags=re.MULTILINE) | |
| t = t.replace("/n/n", "\n\n") | |
| t = re.sub( | |
| r"(?<=[\u4e00-\u9fffA-Za-z0-9,。!?;:,.!?;:])\s*/n\s*(?=[\u4e00-\u9fffA-Za-z0-9])", | |
| "\n", | |
| t, | |
| ) | |
| t = MD_CODE_BLOCK_RE.sub(" ", t) | |
| t = MD_INLINE_CODE_RE.sub(r"\1", t) | |
| t = MD_IMG_RE.sub(r"\1", t) | |
| t = MD_LINK_RE.sub(r"\1", t) | |
| lines = [] | |
| for line in t.split("\n"): | |
| line = MD_HEADING_RE.sub("", line) | |
| line = MD_QUOTE_RE.sub("", line) | |
| line = MD_LIST_RE.sub("", line) | |
| line = line.replace("**", "").replace("__", "") | |
| line = line.replace("`", "") | |
| lines.append(line.strip()) | |
| t = "\n".join(lines) | |
| t = t.replace("\u3000", " ") | |
| t = re.sub(r"[ \t\f\v]+", " ", t) | |
| t = re.sub(r"\n{3,}", "\n\n", t) | |
| t = re.sub(r"[ ]+\n", "\n", t) | |
| t = re.sub(r"\n[ ]+", "\n", t) | |
| return t.strip() | |
| def severe_artifact_reason(cleaned_text: str, length_char: int) -> Optional[str]: | |
| if not cleaned_text: | |
| return "empty_after_clean" | |
| if re.fullmatch(r"[\W_]+", cleaned_text): | |
| return "non_content_symbols_only" | |
| content_chars = re.findall(r"[\u4e00-\u9fffA-Za-z0-9]", cleaned_text) | |
| ratio = len(content_chars) / max(len(cleaned_text), 1) | |
| if length_char >= MIN_CHAR_LEN and ratio < 0.10: | |
| return "severe_artifact_low_content_ratio" | |
| compact = re.sub(r"\s+", "", cleaned_text) | |
| if compact: | |
| most_freq = Counter(compact).most_common(1)[0][1] | |
| if len(compact) >= 120 and most_freq / len(compact) > 0.60: | |
| return "severe_artifact_repetitive_chars" | |
| return None | |
| def add_exclusion( | |
| dataset_id: str, | |
| stage: str, | |
| source_file: str, | |
| record_ref: str, | |
| label, | |
| drop_reason: str, | |
| raw_length, | |
| clean_length, | |
| text_hash_value: str = "", | |
| ) -> None: | |
| EXCLUSION_LOG_ROWS.append( | |
| { | |
| "dataset_id": dataset_id, | |
| "stage": stage, | |
| "source_file": source_file, | |
| "record_ref": record_ref, | |
| "label": "" if label is None else str(label), | |
| "drop_reason": drop_reason, | |
| "raw_length": "" if raw_length is None else str(raw_length), | |
| "clean_length": "" if clean_length is None else str(clean_length), | |
| "text_hash": text_hash_value, | |
| } | |
| ) | |
| def build_record( | |
| ds_id: str, | |
| stage: str, | |
| source_file: Path, | |
| record_ref: str, | |
| raw_text, | |
| label: int, | |
| source: str, | |
| split: str, | |
| topic: str = "", | |
| model_slug: str = "", | |
| seen_hashes: Optional[set] = None, | |
| ) -> Optional[dict]: | |
| raw_text_str = "" if raw_text is None else str(raw_text) | |
| raw_len = non_ws_len(raw_text_str) | |
| cleaned = clean_text(raw_text_str) | |
| clean_len = non_ws_len(cleaned) | |
| if not cleaned: | |
| add_exclusion(ds_id, stage, str(source_file), record_ref, label, "empty_text", raw_len, clean_len) | |
| return None | |
| if clean_len < MIN_CHAR_LEN: | |
| add_exclusion(ds_id, stage, str(source_file), record_ref, label, f"too_short_lt_{MIN_CHAR_LEN}", raw_len, clean_len) | |
| return None | |
| reason = severe_artifact_reason(cleaned, clean_len) | |
| if reason: | |
| add_exclusion(ds_id, stage, str(source_file), record_ref, label, reason, raw_len, clean_len) | |
| return None | |
| h = text_hash(cleaned) | |
| if seen_hashes is not None: | |
| if h in seen_hashes: | |
| add_exclusion(ds_id, stage, str(source_file), record_ref, label, "duplicate_text", raw_len, clean_len, h) | |
| return None | |
| seen_hashes.add(h) | |
| return { | |
| "record_id": "", | |
| "text": cleaned, | |
| "label": int(label), | |
| "source": source, | |
| "split": split, | |
| "length_char": clean_len, | |
| "topic": "" if topic is None else str(topic), | |
| "model_slug": "" if model_slug is None else str(model_slug), | |
| "_hash": h, | |
| } | |
| def split_counts(records: List[dict]) -> Dict[str, int]: | |
| c = Counter(r["split"] for r in records) | |
| return {k: int(c.get(k, 0)) for k in SPLITS} | |
| def label_counts(records: List[dict]) -> Dict[str, int]: | |
| c = Counter(r["label"] for r in records) | |
| return {"0": int(c.get(0, 0)), "1": int(c.get(1, 0))} | |
| def write_distribution_files(ds_dir: Path, records: List[dict]) -> None: | |
| label_rows = [] | |
| lc = Counter((r["split"], r["label"]) for r in records) | |
| for sp in SPLITS: | |
| for lb in (0, 1): | |
| label_rows.append({"split": sp, "label": lb, "count": lc.get((sp, lb), 0)}) | |
| label_rows.append({"split": "all", "label": 0, "count": sum(r["label"] == 0 for r in records)}) | |
| label_rows.append({"split": "all", "label": 1, "count": sum(r["label"] == 1 for r in records)}) | |
| write_csv(ds_dir / "label_dist.csv", ["split", "label", "count"], label_rows) | |
| source_rows = [] | |
| sc = Counter((r["split"], r["source"], r["label"]) for r in records) | |
| for (sp, src, lb), cnt in sorted(sc.items()): | |
| source_rows.append({"split": sp, "source": src, "label": lb, "count": cnt}) | |
| write_csv(ds_dir / "source_dist.csv", ["split", "source", "label", "count"], source_rows) | |
| length_rows = [] | |
| lbc = Counter((r["split"], length_bucket(r["length_char"])) for r in records) | |
| for sp in SPLITS: | |
| for _, _, b in LENGTH_BUCKETS: | |
| length_rows.append({"split": sp, "length_bucket": b, "count": lbc.get((sp, b), 0)}) | |
| write_csv(ds_dir / "length_dist.csv", ["split", "length_bucket", "count"], length_rows) | |
| def finalize_dataset( | |
| ds_id: str, | |
| ds_dir: Path, | |
| records_by_split: Dict[str, List[dict]], | |
| build_note: str, | |
| source_inputs: List[str], | |
| ) -> dict: | |
| ensure_dir(ds_dir) | |
| all_records: List[dict] = [] | |
| for sp in SPLITS: | |
| recs = records_by_split.get(sp, []) | |
| for idx, rec in enumerate(recs, 1): | |
| rec["split"] = sp | |
| rec["record_id"] = f"{ds_id.lower()}_{sp}_{idx:07d}" | |
| recs.sort(key=lambda x: x["record_id"]) | |
| write_jsonl(ds_dir / f"{sp}.jsonl", recs) | |
| all_records.extend(recs) | |
| # Distribution stats are now covered comprehensively in 审阅报告.md; | |
| # skip generating intermediate CSVs to keep dataset directories clean. | |
| # write_distribution_files(ds_dir, all_records) | |
| manifest = { | |
| "dataset_id": ds_id, | |
| "created_at": iso_now(), | |
| "schema_fields": REQUIRED_FIELDS, | |
| "label_mapping": LABEL_MAP, | |
| "build_note": build_note, | |
| "source_inputs": source_inputs, | |
| "record_count_total": len(all_records), | |
| "record_count_by_split": split_counts(all_records), | |
| "record_count_by_label": label_counts(all_records), | |
| "source_set": sorted({r["source"] for r in all_records}), | |
| } | |
| write_json(ds_dir / "manifest.json", manifest) | |
| return manifest | |
| def stratified_split(records: List[dict], ratios: Tuple[float, float, float], strata_fn) -> Dict[str, List[dict]]: | |
| if not records: | |
| return {"train": [], "dev": [], "test": []} | |
| train_ratio, dev_ratio, test_ratio = ratios | |
| if not math.isclose(train_ratio + dev_ratio + test_ratio, 1.0, rel_tol=1e-9): | |
| raise ValueError("ratios must sum to 1.0") | |
| grouped = defaultdict(list) | |
| for rec in records: | |
| grouped[strata_fn(rec)].append(rec) | |
| out = {"train": [], "dev": [], "test": []} | |
| for _, group in grouped.items(): | |
| group = sorted(group, key=lambda x: x["_hash"]) | |
| n = len(group) | |
| n_train = int(n * train_ratio) | |
| n_dev = int(n * dev_ratio) | |
| n_test = n - n_train - n_dev | |
| out["train"].extend(group[:n_train]) | |
| out["dev"].extend(group[n_train : n_train + n_dev]) | |
| out["test"].extend(group[n_train + n_dev : n_train + n_dev + n_test]) | |
| for sp in SPLITS: | |
| out[sp].sort(key=lambda x: x["_hash"]) | |
| return out | |
| def collect_source_inventory(source_roots: Dict[str, Path]) -> dict: | |
| inventory = {"generated_at": iso_now(), "roots": {}, "global_digest": ""} | |
| global_hasher = hashlib.sha256() | |
| for name, base in source_roots.items(): | |
| if not base.exists(): | |
| inventory["roots"][name] = { | |
| "path": str(base), | |
| "exists": False, | |
| "file_count": 0, | |
| "total_size": 0, | |
| "mtime_min_ns": None, | |
| "mtime_max_ns": None, | |
| "digest": "", | |
| } | |
| continue | |
| file_count = 0 | |
| total_size = 0 | |
| mtime_min = None | |
| mtime_max = None | |
| local_hasher = hashlib.sha256() | |
| for p in sorted(base.rglob("*")): | |
| if not p.is_file(): | |
| continue | |
| st = p.stat() | |
| rel = str(p.relative_to(base)).replace("\\", "/") | |
| signature = f"{name}|{rel}|{st.st_size}|{st.st_mtime_ns}" | |
| local_hasher.update(signature.encode("utf-8")) | |
| global_hasher.update(signature.encode("utf-8")) | |
| file_count += 1 | |
| total_size += int(st.st_size) | |
| mtime_min = st.st_mtime_ns if mtime_min is None else min(mtime_min, st.st_mtime_ns) | |
| mtime_max = st.st_mtime_ns if mtime_max is None else max(mtime_max, st.st_mtime_ns) | |
| inventory["roots"][name] = { | |
| "path": str(base), | |
| "exists": True, | |
| "file_count": file_count, | |
| "total_size": total_size, | |
| "mtime_min_ns": mtime_min, | |
| "mtime_max_ns": mtime_max, | |
| "digest": local_hasher.hexdigest(), | |
| } | |
| inventory["global_digest"] = global_hasher.hexdigest() | |
| return inventory | |
| def extract_text_from_obj(obj: dict) -> str: | |
| if not isinstance(obj, dict): | |
| return "" | |
| for key in ("text", "content", "body", "answer", "article", "message"): | |
| v = obj.get(key) | |
| if isinstance(v, str) and v.strip(): | |
| return v | |
| return "" | |
| def iter_dataset_records(ds_dir: Path): | |
| for sp in SPLITS: | |
| fp = ds_dir / f"{sp}.jsonl" | |
| if not fp.exists(): | |
| continue | |
| for _, rec in iter_jsonl(fp): | |
| rec["split"] = sp | |
| yield rec | |
| def coerce_answer_text(ans) -> str: | |
| if isinstance(ans, str): | |
| return ans | |
| if isinstance(ans, dict): | |
| for k in ("text", "answer", "content", "body"): | |
| v = ans.get(k) | |
| if isinstance(v, str) and v.strip(): | |
| return v | |
| if isinstance(ans, list): | |
| return "\n".join(str(x) for x in ans if str(x).strip()) | |
| return str(ans) if ans is not None else "" | |
| def build_ds01_nlpcc() -> Tuple[Path, dict, set]: | |
| ds_id = "DS01_NLPCC_core_v1" | |
| ds_dir = DATASET_ROOT / "10_P0_主线必用" / ds_id | |
| log(f"Building {ds_id}") | |
| files = { | |
| "train": NLPCC_DIR / "train.json", | |
| "dev": NLPCC_DIR / "dev.json", | |
| "test": NLPCC_DIR / "test_with_label.json", | |
| } | |
| seen_hashes = set() | |
| records_by_split = {"train": [], "dev": [], "test": []} | |
| for sp, fp in files.items(): | |
| rows = load_json_with_fallback(fp) | |
| for idx, row in enumerate(rows): | |
| lb = normalize_label(row.get("label")) | |
| if lb is None: | |
| add_exclusion(ds_id, "parse_nlpcc", str(fp), f"{sp}_{idx}", None, "invalid_label", "", "", "") | |
| continue | |
| rec = build_record( | |
| ds_id=ds_id, | |
| stage="clean_nlpcc", | |
| source_file=fp, | |
| record_ref=f"{sp}_{idx}", | |
| raw_text=row.get("text", ""), | |
| label=lb, | |
| source="nlpcc", | |
| split=sp, | |
| topic=row.get("source", ""), | |
| model_slug=row.get("model", ""), | |
| seen_hashes=seen_hashes, | |
| ) | |
| if rec is not None: | |
| records_by_split[sp].append(rec) | |
| manifest = finalize_dataset( | |
| ds_id, | |
| ds_dir, | |
| records_by_split, | |
| build_note="NLPCC official train/dev/test_with_label with unified cleaning + dedupe.", | |
| source_inputs=[str(files["train"]), str(files["dev"]), str(files["test"])], | |
| ) | |
| return ds_dir, manifest, seen_hashes | |
| def build_ds02_hc3plus() -> Tuple[Path, dict]: | |
| ds_id = "DS02_HC3Plus_clean_v1" | |
| ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id | |
| log(f"Building {ds_id}") | |
| file_map = { | |
| "train": [HC3_PLUS_DIR / "train.jsonl"], | |
| "dev": [HC3_PLUS_DIR / "val_hc3_QA.jsonl", HC3_PLUS_DIR / "val_hc3_si.jsonl"], | |
| "test": [HC3_PLUS_DIR / "test_hc3_QA.jsonl", HC3_PLUS_DIR / "test_hc3_si.jsonl"], | |
| } | |
| seen_hashes = set() | |
| records_by_split = {"train": [], "dev": [], "test": []} | |
| for sp, fplist in file_map.items(): | |
| for fp in fplist: | |
| for lineno, row in iter_jsonl(fp): | |
| lb = normalize_label(row.get("label")) | |
| if lb is None: | |
| add_exclusion(ds_id, "parse_hc3_plus", str(fp), f"{lineno}", None, "invalid_label", "", "", "") | |
| continue | |
| rec = build_record( | |
| ds_id=ds_id, | |
| stage="clean_hc3_plus", | |
| source_file=fp, | |
| record_ref=f"line_{lineno}", | |
| raw_text=row.get("text", ""), | |
| label=lb, | |
| source="hc3_plus", | |
| split=sp, | |
| topic=fp.stem, | |
| model_slug="", | |
| seen_hashes=seen_hashes, | |
| ) | |
| if rec is not None: | |
| records_by_split[sp].append(rec) | |
| manifest = finalize_dataset( | |
| ds_id, | |
| ds_dir, | |
| records_by_split, | |
| build_note="HC3_Plus train + merged val->dev + merged test with unified cleaning + dedupe.", | |
| source_inputs=[str(x) for arr in file_map.values() for x in arr], | |
| ) | |
| return ds_dir, manifest | |
| def build_ds03_hc3raw_expanded() -> Tuple[Path, dict]: | |
| ds_id = "DS03_HC3raw_expanded_v1" | |
| ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id | |
| log(f"Building {ds_id}") | |
| seen_hashes = set() | |
| staged_records: List[dict] = [] | |
| for lineno, row in iter_jsonl(HC3_RAW_FILE): | |
| topic = row.get("source", "") | |
| human_answers = row.get("human_answers") or [] | |
| ai_answers = row.get("chatgpt_answers") or [] | |
| if not isinstance(human_answers, list): | |
| human_answers = [human_answers] | |
| if not isinstance(ai_answers, list): | |
| ai_answers = [ai_answers] | |
| for idx, ans in enumerate(human_answers): | |
| rec = build_record( | |
| ds_id=ds_id, | |
| stage="expand_hc3_raw_human", | |
| source_file=HC3_RAW_FILE, | |
| record_ref=f"line_{lineno}_h_{idx}", | |
| raw_text=coerce_answer_text(ans), | |
| label=0, | |
| source="hc3_raw", | |
| split="train", | |
| topic=topic, | |
| model_slug="", | |
| seen_hashes=seen_hashes, | |
| ) | |
| if rec is not None: | |
| rec["_stratum"] = f"{topic}|0" | |
| staged_records.append(rec) | |
| for idx, ans in enumerate(ai_answers): | |
| rec = build_record( | |
| ds_id=ds_id, | |
| stage="expand_hc3_raw_ai", | |
| source_file=HC3_RAW_FILE, | |
| record_ref=f"line_{lineno}_a_{idx}", | |
| raw_text=coerce_answer_text(ans), | |
| label=1, | |
| source="hc3_raw", | |
| split="train", | |
| topic=topic, | |
| model_slug="chatgpt", | |
| seen_hashes=seen_hashes, | |
| ) | |
| if rec is not None: | |
| rec["_stratum"] = f"{topic}|1" | |
| staged_records.append(rec) | |
| split_map = stratified_split(staged_records, (0.8, 0.1, 0.1), strata_fn=lambda r: r["_stratum"]) | |
| for sp in SPLITS: | |
| for rec in split_map[sp]: | |
| rec["split"] = sp | |
| rec.pop("_stratum", None) | |
| manifest = finalize_dataset( | |
| ds_id, | |
| ds_dir, | |
| split_map, | |
| build_note="Expanded HC3 raw answers (human_answers/chatgpt_answers), then stratified hash split 80/10/10.", | |
| source_inputs=[str(HC3_RAW_FILE)], | |
| ) | |
| return ds_dir, manifest | |
| def build_ds04_human_pools_merged() -> Tuple[Path, dict]: | |
| ds_id = "DS04_Human_pools_merged_v1" | |
| ds_dir = DATASET_ROOT / "30_P2_单标签原料池" / ds_id | |
| log(f"Building {ds_id}") | |
| seen_hashes = set() | |
| staged_records: List[dict] = [] | |
| for fp in sorted(SRC_HUMAN.rglob("*.json")): | |
| try: | |
| row = load_json_with_fallback(fp) | |
| except Exception: | |
| add_exclusion(ds_id, "read_human", str(fp), "", 0, "json_decode_error", "", "", "") | |
| continue | |
| text = extract_text_from_obj(row) | |
| rel = fp.relative_to(SRC_HUMAN) | |
| top = rel.parts[0] if rel.parts else "unknown" | |
| rec = build_record( | |
| ds_id=ds_id, | |
| stage="clean_human", | |
| source_file=fp, | |
| record_ref=str(rel).replace("\\", "/"), | |
| raw_text=text, | |
| label=0, | |
| source="human", | |
| split="train", | |
| topic=row.get("topic", top) if isinstance(row, dict) else top, | |
| model_slug="", | |
| seen_hashes=seen_hashes, | |
| ) | |
| if rec is not None: | |
| rec["_stratum"] = f"{top}|0" | |
| staged_records.append(rec) | |
| for base in QUARANTINE_HC_DIRS: | |
| version = "v2" if "v2" in str(base).lower() else "v3" | |
| for fp in sorted(base.rglob("*.json")): | |
| try: | |
| row = load_json_with_fallback(fp) | |
| except Exception: | |
| add_exclusion(ds_id, "read_quarantine", str(fp), "", 0, "json_decode_error", "", "", "") | |
| continue | |
| text = extract_text_from_obj(row) | |
| rel = fp.relative_to(base) | |
| rec = build_record( | |
| ds_id=ds_id, | |
| stage="clean_quarantine_hc", | |
| source_file=fp, | |
| record_ref=f"{version}/{str(rel).replace('\\', '/')}", | |
| raw_text=text, | |
| label=0, | |
| source="quarantine_hc", | |
| split="train", | |
| topic=row.get("topic", version) if isinstance(row, dict) else version, | |
| model_slug="", | |
| seen_hashes=seen_hashes, | |
| ) | |
| if rec is not None: | |
| rec["_stratum"] = f"{version}|0" | |
| staged_records.append(rec) | |
| split_map: Dict[str, List[dict]] = {"train": [], "dev": [], "test": []} | |
| for rec in staged_records: | |
| rec["split"] = "train" | |
| rec.pop("_stratum", None) | |
| split_map["train"].append(rec) | |
| manifest = finalize_dataset( | |
| ds_id, | |
| ds_dir, | |
| split_map, | |
| build_note="Merged human pool from data/human (core) + quarantine v2+v3 high_confidence. All records go to train (pure single-label pool).", | |
| source_inputs=[str(SRC_HUMAN)] + [str(x) for x in QUARANTINE_HC_DIRS], | |
| ) | |
| return ds_dir, manifest | |
| def balanced_round_robin_select(records: List[dict], n: int, key: str = "source") -> List[dict]: | |
| if n <= 0: | |
| return [] | |
| grouped = defaultdict(list) | |
| for r in records: | |
| grouped[r.get(key, "unknown")].append(r) | |
| keys = sorted(grouped.keys()) | |
| for k in keys: | |
| grouped[k].sort(key=lambda x: x["_hash"]) | |
| selected = [] | |
| i = 0 | |
| while len(selected) < n: | |
| progressed = False | |
| for k in keys: | |
| if i < len(grouped[k]): | |
| selected.append(grouped[k][i]) | |
| progressed = True | |
| if len(selected) >= n: | |
| break | |
| if not progressed: | |
| break | |
| i += 1 | |
| return selected[:n] | |
| def collect_pool_from_datasets(ds_dirs: List[Path], desired_label: int) -> List[dict]: | |
| pool = [] | |
| for ds_dir in ds_dirs: | |
| for rec in iter_dataset_records(ds_dir): | |
| if rec.get("label") == desired_label: | |
| rec["_hash"] = text_hash(rec["text"]) | |
| pool.append(rec) | |
| return pool | |
| def build_ds06_external_core_balanced( | |
| ds01_hashes: set, | |
| ds04_dir: Path, | |
| ds11_dir: Path, | |
| ) -> Tuple[Path, dict]: | |
| ds_id = "DS06_External_core_balanced_v1" | |
| max_len = 1500 | |
| ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id | |
| log(f"Building {ds_id}") | |
| human_pool_raw = collect_pool_from_datasets([ds04_dir], desired_label=0) | |
| ai_pool_raw = collect_pool_from_datasets([ds11_dir], desired_label=1) | |
| def filter_pool(pool: List[dict], label: int) -> List[dict]: | |
| filtered = [] | |
| local_seen = set() | |
| overlap_drop = 0 | |
| dedupe_drop = 0 | |
| range_drop = 0 | |
| for rec in pool: | |
| h = rec["_hash"] | |
| lch = rec["length_char"] | |
| if lch < 100 or lch > max_len: | |
| range_drop += 1 | |
| add_exclusion(ds_id, "filter_len", rec.get("source", ""), rec.get("record_id", ""), label, f"length_out_of_range_100_{max_len}", lch, lch, h) | |
| continue | |
| if h in ds01_hashes: | |
| overlap_drop += 1 | |
| add_exclusion(ds_id, "overlap_filter", rec.get("source", ""), rec.get("record_id", ""), label, "overlap_with_ds01", lch, lch, h) | |
| continue | |
| if h in local_seen: | |
| dedupe_drop += 1 | |
| add_exclusion(ds_id, "dedupe_pool", rec.get("source", ""), rec.get("record_id", ""), label, "duplicate_text_pool", lch, lch, h) | |
| continue | |
| local_seen.add(h) | |
| filtered.append(rec) | |
| OVERLAP_AUDIT.setdefault(ds_id, {}) | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_raw"] = len(pool) | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_after_filter"] = len(filtered) | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_drop_overlap_ds01"] = overlap_drop | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_drop_duplicate_pool"] = dedupe_drop | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_drop_length"] = range_drop | |
| return filtered | |
| human_pool = filter_pool(human_pool_raw, 0) | |
| ai_pool = filter_pool(ai_pool_raw, 1) | |
| n = min(len(human_pool), len(ai_pool)) | |
| if n <= 0: | |
| raise RuntimeError(f"{ds_id}: no balanced data available after filtering") | |
| combined = balanced_round_robin_select(human_pool, n, "source") + balanced_round_robin_select(ai_pool, n, "source") | |
| final = [] | |
| seen_final = {} | |
| conflict_drop = 0 | |
| for rec in sorted(combined, key=lambda x: x["_hash"]): | |
| h = rec["_hash"] | |
| lb = rec["label"] | |
| if h in seen_final and seen_final[h] != lb: | |
| conflict_drop += 1 | |
| add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "cross_label_conflict_hash", rec["length_char"], rec["length_char"], h) | |
| continue | |
| if h in seen_final: | |
| add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "duplicate_text_final", rec["length_char"], rec["length_char"], h) | |
| continue | |
| seen_final[h] = lb | |
| final.append(rec) | |
| OVERLAP_AUDIT[ds_id]["balanced_n_before_conflict"] = 2 * n | |
| OVERLAP_AUDIT[ds_id]["drop_cross_label_conflict"] = conflict_drop | |
| OVERLAP_AUDIT[ds_id]["final_n"] = len(final) | |
| split_map = stratified_split(final, (0.6, 0.2, 0.2), strata_fn=lambda r: f"{r['source']}|{r['label']}") | |
| for sp in SPLITS: | |
| for rec in split_map[sp]: | |
| rec["split"] = sp | |
| manifest = finalize_dataset( | |
| ds_id, | |
| ds_dir, | |
| split_map, | |
| build_note=f"External balanced dataset from DS04 (human) and DS11 (generated ai standard), length 100-{max_len}, leakage-filtered vs DS01, stratified split 60/20/20.", | |
| source_inputs=[str(ds04_dir), str(ds11_dir)], | |
| ) | |
| return ds_dir, manifest | |
| def build_ds07_external_long( | |
| ds01_hashes: set, | |
| ds04_dir: Path, | |
| ds12_dir: Path, | |
| ) -> Tuple[Path, dict]: | |
| ds_id = "DS07_External_long_v1" | |
| max_len = 2200 | |
| ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id | |
| log(f"Building {ds_id}") | |
| human_pool_raw = collect_pool_from_datasets([ds04_dir], desired_label=0) | |
| ai_pool_raw = collect_pool_from_datasets([ds12_dir], desired_label=1) | |
| def filter_pool(pool: List[dict], label: int) -> List[dict]: | |
| filtered = [] | |
| local_seen = set() | |
| overlap_drop = 0 | |
| dedupe_drop = 0 | |
| range_drop = 0 | |
| for rec in pool: | |
| h = rec["_hash"] | |
| lch = rec["length_char"] | |
| if lch < 100 or lch > max_len: | |
| range_drop += 1 | |
| add_exclusion(ds_id, "filter_len", rec.get("source", ""), rec.get("record_id", ""), label, f"length_out_of_range_100_{max_len}", lch, lch, h) | |
| continue | |
| if h in ds01_hashes: | |
| overlap_drop += 1 | |
| add_exclusion(ds_id, "overlap_filter", rec.get("source", ""), rec.get("record_id", ""), label, "overlap_with_ds01", lch, lch, h) | |
| continue | |
| if h in local_seen: | |
| dedupe_drop += 1 | |
| add_exclusion(ds_id, "dedupe_pool", rec.get("source", ""), rec.get("record_id", ""), label, "duplicate_text_pool", lch, lch, h) | |
| continue | |
| local_seen.add(h) | |
| filtered.append(rec) | |
| OVERLAP_AUDIT.setdefault(ds_id, {}) | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_raw"] = len(pool) | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_after_filter"] = len(filtered) | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_drop_overlap_ds01"] = overlap_drop | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_drop_duplicate_pool"] = dedupe_drop | |
| OVERLAP_AUDIT[ds_id][f"label_{label}_drop_length"] = range_drop | |
| return filtered | |
| human_pool = filter_pool(human_pool_raw, 0) | |
| ai_pool = filter_pool(ai_pool_raw, 1) | |
| n = min(len(human_pool), len(ai_pool)) | |
| if n <= 0: | |
| raise RuntimeError(f"{ds_id}: no balanced data available after filtering") | |
| combined = balanced_round_robin_select(human_pool, n, "source") + balanced_round_robin_select(ai_pool, n, "source") | |
| final = [] | |
| seen_final = {} | |
| conflict_drop = 0 | |
| for rec in sorted(combined, key=lambda x: x["_hash"]): | |
| h = rec["_hash"] | |
| lb = rec["label"] | |
| if h in seen_final and seen_final[h] != lb: | |
| conflict_drop += 1 | |
| add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "cross_label_conflict_hash", rec["length_char"], rec["length_char"], h) | |
| continue | |
| if h in seen_final: | |
| add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "duplicate_text_final", rec["length_char"], rec["length_char"], h) | |
| continue | |
| seen_final[h] = lb | |
| final.append(rec) | |
| OVERLAP_AUDIT[ds_id]["balanced_n_before_conflict"] = 2 * n | |
| OVERLAP_AUDIT[ds_id]["drop_cross_label_conflict"] = conflict_drop | |
| OVERLAP_AUDIT[ds_id]["final_n"] = len(final) | |
| split_map = stratified_split(final, (0.6, 0.2, 0.2), strata_fn=lambda r: f"{r['source']}|{r['label']}") | |
| for sp in SPLITS: | |
| for rec in split_map[sp]: | |
| rec["split"] = sp | |
| manifest = finalize_dataset( | |
| ds_id, | |
| ds_dir, | |
| split_map, | |
| build_note=f"External balanced dataset from DS04 (human) and DS12 (generated ai natural), length 100-{max_len}, leakage-filtered vs DS01, stratified split 60/20/20.", | |
| source_inputs=[str(ds04_dir), str(ds12_dir)], | |
| ) | |
| return ds_dir, manifest | |
| def build_p2_candidate_audit() -> dict: | |
| p2_dir = DATASET_ROOT / "30_P2_候选待验证" | |
| ensure_dir(p2_dir) | |
| def scan_dir(path: Path) -> dict: | |
| by_ext = Counter() | |
| total_size = 0 | |
| files = [] | |
| if path.exists(): | |
| for fp in sorted(path.rglob("*")): | |
| if fp.is_file(): | |
| ext = fp.suffix.lower() if fp.suffix else "<no_ext>" | |
| by_ext[ext] += 1 | |
| st = fp.stat() | |
| total_size += int(st.st_size) | |
| files.append(str(fp)) | |
| return { | |
| "path": str(path), | |
| "exists": path.exists(), | |
| "file_count": len(files), | |
| "total_size": total_size, | |
| "ext_dist": dict(sorted(by_ext.items())), | |
| "sample_files": files[:200], | |
| } | |
| clts_info = scan_dir(P2_CLTS_DIR) | |
| zhihu_info = scan_dir(P2_ZHIHU_DIR) | |
| audit = { | |
| "generated_at": iso_now(), | |
| "policy": "P2 candidate only; not included in DS01-DS07 trainable main schema this round.", | |
| "candidates": { | |
| "CLTS数据集": {**clts_info, "why_not_direct_train": "parallel src/tgt corpus; no direct human-vs-ai detection labels"}, | |
| "huggingface_zhihu": {**zhihu_info, "why_not_direct_train": "metadata/inspection-focused files; supervised label semantics need redesign"}, | |
| }, | |
| } | |
| write_json(p2_dir / "candidate_audit.json", audit) | |
| write_csv( | |
| p2_dir / "candidate_summary.csv", | |
| ["candidate", "file_count", "total_size", "why_not_direct_train"], | |
| [ | |
| { | |
| "candidate": "CLTS数据集", | |
| "file_count": clts_info["file_count"], | |
| "total_size": clts_info["total_size"], | |
| "why_not_direct_train": "parallel corpus; no direct detection labels", | |
| }, | |
| { | |
| "candidate": "huggingface_zhihu", | |
| "file_count": zhihu_info["file_count"], | |
| "total_size": zhihu_info["total_size"], | |
| "why_not_direct_train": "metadata/inspection-first; label semantics pending", | |
| }, | |
| ], | |
| ) | |
| return audit | |
| def write_contract_files() -> None: | |
| cdir = DATASET_ROOT / "00_contract" | |
| ensure_dir(cdir) | |
| schema = { | |
| "title": "Unified Main Schema for 20260408 SOTA Route", | |
| "type": "object", | |
| "required": REQUIRED_FIELDS, | |
| "properties": { | |
| "record_id": {"type": "string"}, | |
| "text": {"type": "string"}, | |
| "label": {"type": "integer", "enum": [0, 1]}, | |
| "source": {"type": "string"}, | |
| "split": {"type": "string", "enum": ["train", "dev", "test"]}, | |
| "length_char": {"type": "integer", "minimum": 0}, | |
| "topic": {"type": "string"}, | |
| "model_slug": {"type": "string"}, | |
| }, | |
| } | |
| write_json(cdir / "main_schema.json", schema) | |
| write_json(cdir / "label_mapping.json", LABEL_MAP) | |
| md = """# 主 Schema 合同(统一训练格式) | |
| ## 字段 | |
| - `record_id`:字符串唯一ID | |
| - `text`:清洗后的正文 | |
| - `label`:整数标签(human=0, ai=1) | |
| - `source`:数据族来源(如 `nlpcc/hc3_plus/hc3_raw/human/quarantine_hc`) | |
| - `split`:`train/dev/test` | |
| - `length_char`:去空白字符后的长度 | |
| - `topic`:主题字段(可空字符串) | |
| - `model_slug`:AI来源模型(可空字符串) | |
| ## 全局规则 | |
| - 编码回退:`utf-8-sig -> utf-8 -> gb18030` | |
| - 清洗:换行统一、空白压缩、`\\n`与明显`/n`噪声修正、Markdown去痕 | |
| - 质量过滤:空文本、`length_char < 30`、严重伪装文本、重复文本剔除 | |
| - 标签映射固定:`human=0, ai=1` | |
| """ | |
| (cdir / "schema_contract.md").write_text(md, encoding="utf-8") | |
| def write_experiment_views(ds_paths: Dict[str, Path]) -> None: | |
| edir = DATASET_ROOT / "40_experiment_views" | |
| ensure_dir(edir) | |
| mapping = { | |
| "E00": {"use": ["DS01"], "detail": "Schema check + distribution"}, | |
| "E01": {"use": ["DS01"], "detail": "BERT baseline train/dev"}, | |
| "E02": {"use": ["DS01"], "detail": "RoBERTa baseline train/dev"}, | |
| "E03": {"use": ["DS01"], "detail": "Rule module on dev/test"}, | |
| "E04": {"use": ["DS01"], "detail": "Static weighted vote"}, | |
| "E05": {"use": ["DS01"], "detail": "Determinative override"}, | |
| "E06": {"use": ["DS01"], "detail": "Dynamic vote by buckets"}, | |
| "E07": {"use": ["DS01"], "detail": "Joint features MLP"}, | |
| "E08": {"use": ["DS01", "DS02", "DS03"], "detail": "Full chain rerun / optional aug"}, | |
| "E09": {"use": ["DS01", "DS06"], "detail": "Train on DS01, transfer eval on DS06"}, | |
| "E10": {"use": ["DS01", "DS06"], "detail": "Calibrate with DS01 dev + DS06 dev/test"}, | |
| "E11": {"use": ["DS01", "DS06"], "detail": "Bucket threshold on DS06"}, | |
| "E12": {"use": ["DS01", "DS06", "DS07"], "detail": "Zero-shot branch"}, | |
| "E13": {"use": ["DS01", "DS06", "DS07"], "detail": "Zero-shot branch"}, | |
| "E14": {"use": ["DS01", "DS06", "DS07"], "detail": "LLM LoRA branch"}, | |
| "E15": {"use": ["DS01", "DS06", "DS07"], "detail": "Combo search"}, | |
| } | |
| resolved = {} | |
| rows = [] | |
| for exp, item in mapping.items(): | |
| resolved[exp] = {"detail": item["detail"], "datasets": {}} | |
| for ds in item["use"]: | |
| p = ds_paths[ds] | |
| resolved[exp]["datasets"][ds] = { | |
| "dataset_dir": str(p), | |
| "train": str(p / "train.jsonl"), | |
| "dev": str(p / "dev.jsonl"), | |
| "test": str(p / "test.jsonl"), | |
| } | |
| rows.append( | |
| { | |
| "experiment_id": exp, | |
| "dataset_id": ds, | |
| "dataset_dir": str(p), | |
| "train_jsonl": str(p / "train.jsonl"), | |
| "dev_jsonl": str(p / "dev.jsonl"), | |
| "test_jsonl": str(p / "test.jsonl"), | |
| } | |
| ) | |
| write_json(edir / "experiment_views.json", {"generated_at": iso_now(), "views": resolved}) | |
| write_csv(edir / "E00_E15_dataset_mapping.csv", ["experiment_id", "dataset_id", "dataset_dir", "train_jsonl", "dev_jsonl", "test_jsonl"], rows) | |
| def validate_schema_sample(ds_paths: Dict[str, Path]) -> dict: | |
| report = {"generated_at": iso_now(), "datasets": {}} | |
| for ds, p in ds_paths.items(): | |
| checked = 0 | |
| invalid = 0 | |
| examples = [] | |
| for sp in SPLITS: | |
| fp = p / f"{sp}.jsonl" | |
| if not fp.exists(): | |
| continue | |
| for _, rec in iter_jsonl(fp): | |
| checked += 1 | |
| ok = True | |
| for k in REQUIRED_FIELDS: | |
| if k not in rec: | |
| ok = False | |
| if len(examples) < 10: | |
| examples.append(f"{sp}:missing:{k}") | |
| if rec.get("label") not in (0, 1): | |
| ok = False | |
| if len(examples) < 10: | |
| examples.append(f"{sp}:bad_label:{rec.get('label')}") | |
| if rec.get("split") not in ("train", "dev", "test"): | |
| ok = False | |
| if len(examples) < 10: | |
| examples.append(f"{sp}:bad_split:{rec.get('split')}") | |
| if not isinstance(rec.get("length_char"), int): | |
| ok = False | |
| if len(examples) < 10: | |
| examples.append(f"{sp}:bad_length_type:{type(rec.get('length_char')).__name__}") | |
| if not ok: | |
| invalid += 1 | |
| if checked >= 1000: | |
| break | |
| if checked >= 1000: | |
| break | |
| report["datasets"][ds] = { | |
| "checked_records": checked, | |
| "invalid_records": invalid, | |
| "valid": invalid == 0, | |
| "example_errors": examples, | |
| } | |
| return report | |
| def write_exclusion_log() -> None: | |
| mdir = DATASET_ROOT / "90_manifests" | |
| ensure_dir(mdir) | |
| fieldnames = ["dataset_id", "stage", "source_file", "record_ref", "label", "drop_reason", "raw_length", "clean_length", "text_hash"] | |
| write_csv(mdir / "exclusion_log.csv", fieldnames, EXCLUSION_LOG_ROWS) | |
| reason_counter = Counter(r["drop_reason"] for r in EXCLUSION_LOG_ROWS) | |
| rows = [{"drop_reason": k, "count": v} for k, v in sorted(reason_counter.items(), key=lambda x: (-x[1], x[0]))] | |
| write_csv(mdir / "exclusion_reason_stats.csv", ["drop_reason", "count"], rows) | |
| def main() -> int: | |
| source_roots = { | |
| "human": SRC_HUMAN, | |
| "human_quarantine": SRC_QUARANTINE, | |
| "raw_external": SRC_RAW_EXTERNAL, | |
| } | |
| if DATASET_ROOT.exists(): | |
| log(f"Removing old dataset directory: {DATASET_ROOT}") | |
| shutil.rmtree(DATASET_ROOT) | |
| ensure_dir(DATASET_ROOT) | |
| ensure_dir(DATASET_ROOT / "90_manifests") | |
| log("Collecting source inventory (before)...") | |
| inv_before = collect_source_inventory(source_roots) | |
| write_json(DATASET_ROOT / "90_manifests" / "source_inventory_before.json", inv_before) | |
| log("Writing dataset contract files...") | |
| write_contract_files() | |
| ds_paths: Dict[str, Path] = {} | |
| dataset_manifests = {} | |
| ds01_dir, ds01_manifest, ds01_hashes = build_ds01_nlpcc() | |
| ds_paths["DS01"] = ds01_dir | |
| dataset_manifests["DS01"] = ds01_manifest | |
| ds02_dir, ds02_manifest = build_ds02_hc3plus() | |
| ds_paths["DS02"] = ds02_dir | |
| dataset_manifests["DS02"] = ds02_manifest | |
| ds03_dir, ds03_manifest = build_ds03_hc3raw_expanded() | |
| ds_paths["DS03"] = ds03_dir | |
| dataset_manifests["DS03"] = ds03_manifest | |
| ds04_dir, ds04_manifest = build_ds04_human_pools_merged() | |
| ds_paths["DS04"] = ds04_dir | |
| dataset_manifests["DS04"] = ds04_manifest | |
| ds11_dir = DATASET_ROOT / "30_P2_单标签原料池" / "DS11_Generated_AI_v1" | |
| ds12_dir = DATASET_ROOT / "30_P2_单标签原料池" / "DS12_Generated_AI_natural_v1" | |
| ds06_dir, ds06_manifest = build_ds06_external_core_balanced( | |
| ds01_hashes=ds01_hashes, | |
| ds04_dir=ds04_dir, | |
| ds11_dir=ds11_dir, | |
| ) | |
| ds_paths["DS06"] = ds06_dir | |
| dataset_manifests["DS06"] = ds06_manifest | |
| ds07_dir, ds07_manifest = build_ds07_external_long( | |
| ds01_hashes=ds01_hashes, | |
| ds04_dir=ds04_dir, | |
| ds12_dir=ds12_dir, | |
| ) | |
| ds_paths["DS07"] = ds07_dir | |
| dataset_manifests["DS07"] = ds07_manifest | |
| log("Building P2 candidate audit...") | |
| p2_audit = build_p2_candidate_audit() | |
| log("Writing experiment views...") | |
| write_experiment_views(ds_paths) | |
| log("Writing manifests + validation reports...") | |
| write_json(DATASET_ROOT / "90_manifests" / "dataset_manifests.json", dataset_manifests) | |
| write_json(DATASET_ROOT / "90_manifests" / "overlap_audit.json", OVERLAP_AUDIT) | |
| write_json(DATASET_ROOT / "90_manifests" / "p2_candidate_audit_copy.json", p2_audit) | |
| schema_report = validate_schema_sample(ds_paths) | |
| write_json(DATASET_ROOT / "90_manifests" / "schema_validation_report.json", schema_report) | |
| write_exclusion_log() | |
| log("Collecting source inventory (after)...") | |
| inv_after = collect_source_inventory(source_roots) | |
| write_json(DATASET_ROOT / "90_manifests" / "source_inventory_after.json", inv_after) | |
| compare = { | |
| "generated_at": iso_now(), | |
| "before_digest": inv_before.get("global_digest"), | |
| "after_digest": inv_after.get("global_digest"), | |
| "same_digest": inv_before.get("global_digest") == inv_after.get("global_digest"), | |
| "before": inv_before, | |
| "after": inv_after, | |
| } | |
| write_json(DATASET_ROOT / "90_manifests" / "source_inventory_compare.json", compare) | |
| if not compare["same_digest"]: | |
| log("ERROR: source inventory changed during build. Stopping.") | |
| return 2 | |
| summary = { | |
| "generated_at": iso_now(), | |
| "dataset_root": str(DATASET_ROOT), | |
| "datasets": {k: {"path": str(v), "manifest": dataset_manifests.get(k, {})} for k, v in ds_paths.items()}, | |
| "overlap_audit_file": str(DATASET_ROOT / "90_manifests" / "overlap_audit.json"), | |
| "exclusion_log_file": str(DATASET_ROOT / "90_manifests" / "exclusion_log.csv"), | |
| "schema_validation_report_file": str(DATASET_ROOT / "90_manifests" / "schema_validation_report.json"), | |
| } | |
| write_json(DATASET_ROOT / "90_manifests" / "build_summary.json", summary) | |
| log("Dataset build completed successfully.") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |