LUCIFerace's picture
Add files using upload-large-folder tool
4a0f6a5 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import hashlib
import json
import os
import math
import re
import shutil
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
ROOT = Path(os.environ.get("ENHANCED_REPLICA_DATA_ROOT", r"F:\codex开发\研究"))
ROUTE_ROOT = ROOT / "研究成果" / "实验路线" / "20260408_SOTA路线重构"
DATASET_ROOT = ROUTE_ROOT / "data" / "dataset"
SRC_HUMAN = ROOT / "data" / "human"
SRC_QUARANTINE = ROOT / "data" / "human_quarantine"
SRC_RAW_EXTERNAL = ROOT / "data" / "raw_external"
NLPCC_DIR = SRC_RAW_EXTERNAL / "NLPCC-2025-Task1-main" / "data"
HC3_PLUS_DIR = SRC_RAW_EXTERNAL / "human_longform_20260406" / "HC3_Plus" / "raw" / "files" / "zh"
HC3_RAW_FILE = SRC_RAW_EXTERNAL / "human_longform_20260406" / "HC3" / "raw" / "modelscope_zh" / "all.jsonl"
QUARANTINE_HC_DIRS = [
SRC_QUARANTINE / "failed_platforms_20260406" / "weibo_quality_v2" / "high_confidence",
SRC_QUARANTINE / "failed_platforms_20260406" / "weibo_quality_v3" / "exports" / "high_confidence",
]
P2_CLTS_DIR = SRC_RAW_EXTERNAL / "CLTS数据集"
P2_ZHIHU_DIR = SRC_RAW_EXTERNAL / "huggingface_zhihu"
REQUIRED_FIELDS = ["record_id", "text", "label", "source", "split", "length_char", "topic", "model_slug"]
SPLITS = ["train", "dev", "test"]
LABEL_MAP = {"human": 0, "ai": 1}
MIN_CHAR_LEN = 30
LENGTH_BUCKETS = [
(0, 29, "000_029"),
(30, 99, "030_099"),
(100, 299, "100_299"),
(300, 599, "300_599"),
(600, 999, "600_999"),
(1000, 1499, "1000_1499"),
(1500, 2199, "1500_2199"),
(2200, 10**9, "2200_plus"),
]
EXCLUSION_LOG_ROWS: List[Dict[str, str]] = []
OVERLAP_AUDIT: Dict[str, Dict[str, int]] = {}
def now_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def iso_now() -> str:
return datetime.now().isoformat(timespec="seconds")
def log(msg: str) -> None:
print(f"[{now_str()}] {msg}")
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def write_json(path: Path, obj: dict) -> None:
ensure_dir(path.parent)
with path.open("w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2)
def write_jsonl(path: Path, records: List[dict]) -> None:
ensure_dir(path.parent)
with path.open("w", encoding="utf-8", newline="\n") as f:
for rec in records:
out = {k: rec[k] for k in REQUIRED_FIELDS}
f.write(json.dumps(out, ensure_ascii=False) + "\n")
def write_csv(path: Path, fieldnames: List[str], rows: List[dict]) -> None:
ensure_dir(path.parent)
with path.open("w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in rows:
writer.writerow(r)
def read_text_with_fallback(path: Path) -> str:
raw = path.read_bytes()
for enc in ("utf-8-sig", "utf-8", "gb18030"):
try:
return raw.decode(enc)
except UnicodeDecodeError:
continue
return raw.decode("utf-8", errors="replace")
def load_json_with_fallback(path: Path):
return json.loads(read_text_with_fallback(path))
def iter_jsonl(path: Path):
text = read_text_with_fallback(path)
for lineno, line in enumerate(text.splitlines(), 1):
line = line.strip()
if not line:
continue
try:
yield lineno, json.loads(line)
except json.JSONDecodeError:
EXCLUSION_LOG_ROWS.append(
{
"dataset_id": "GLOBAL",
"stage": "read_jsonl",
"source_file": str(path),
"record_ref": f"line_{lineno}",
"label": "",
"drop_reason": "json_decode_error",
"raw_length": "",
"clean_length": "",
"text_hash": "",
}
)
def normalize_label(label) -> Optional[int]:
if label is None:
return None
if isinstance(label, bool):
return int(label)
if isinstance(label, int):
return 1 if label == 1 else 0 if label == 0 else None
s = str(label).strip().lower()
if s in {"0", "human", "h"}:
return 0
if s in {"1", "ai", "machine", "gpt"}:
return 1
return None
def text_hash(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def non_ws_len(text: str) -> int:
return len(re.sub(r"\s+", "", text))
def length_bucket(length_char: int) -> str:
for lo, hi, name in LENGTH_BUCKETS:
if lo <= length_char <= hi:
return name
return "unknown"
MD_CODE_BLOCK_RE = re.compile(r"```[\s\S]*?```", flags=re.MULTILINE)
MD_INLINE_CODE_RE = re.compile(r"`([^`]+)`")
MD_IMG_RE = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)")
MD_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
MD_HEADING_RE = re.compile(r"^\s{0,3}#{1,6}\s*")
MD_QUOTE_RE = re.compile(r"^\s{0,3}>\s?")
MD_LIST_RE = re.compile(r"^\s{0,3}(?:[-*+]|\d+\.)\s+")
def clean_text(text: str) -> str:
if text is None:
return ""
t = str(text)
t = t.replace("\r\n", "\n").replace("\r", "\n")
t = t.replace("\\n", "\n").replace("\\r", "\n")
t = re.sub(r"(?<=\S)\s*/n\s*(?=\S)", "\n", t)
t = re.sub(r"^\s*/n\s*$", "", t, flags=re.MULTILINE)
t = t.replace("/n/n", "\n\n")
t = re.sub(
r"(?<=[\u4e00-\u9fffA-Za-z0-9,。!?;:,.!?;:])\s*/n\s*(?=[\u4e00-\u9fffA-Za-z0-9])",
"\n",
t,
)
t = MD_CODE_BLOCK_RE.sub(" ", t)
t = MD_INLINE_CODE_RE.sub(r"\1", t)
t = MD_IMG_RE.sub(r"\1", t)
t = MD_LINK_RE.sub(r"\1", t)
lines = []
for line in t.split("\n"):
line = MD_HEADING_RE.sub("", line)
line = MD_QUOTE_RE.sub("", line)
line = MD_LIST_RE.sub("", line)
line = line.replace("**", "").replace("__", "")
line = line.replace("`", "")
lines.append(line.strip())
t = "\n".join(lines)
t = t.replace("\u3000", " ")
t = re.sub(r"[ \t\f\v]+", " ", t)
t = re.sub(r"\n{3,}", "\n\n", t)
t = re.sub(r"[ ]+\n", "\n", t)
t = re.sub(r"\n[ ]+", "\n", t)
return t.strip()
def severe_artifact_reason(cleaned_text: str, length_char: int) -> Optional[str]:
if not cleaned_text:
return "empty_after_clean"
if re.fullmatch(r"[\W_]+", cleaned_text):
return "non_content_symbols_only"
content_chars = re.findall(r"[\u4e00-\u9fffA-Za-z0-9]", cleaned_text)
ratio = len(content_chars) / max(len(cleaned_text), 1)
if length_char >= MIN_CHAR_LEN and ratio < 0.10:
return "severe_artifact_low_content_ratio"
compact = re.sub(r"\s+", "", cleaned_text)
if compact:
most_freq = Counter(compact).most_common(1)[0][1]
if len(compact) >= 120 and most_freq / len(compact) > 0.60:
return "severe_artifact_repetitive_chars"
return None
def add_exclusion(
dataset_id: str,
stage: str,
source_file: str,
record_ref: str,
label,
drop_reason: str,
raw_length,
clean_length,
text_hash_value: str = "",
) -> None:
EXCLUSION_LOG_ROWS.append(
{
"dataset_id": dataset_id,
"stage": stage,
"source_file": source_file,
"record_ref": record_ref,
"label": "" if label is None else str(label),
"drop_reason": drop_reason,
"raw_length": "" if raw_length is None else str(raw_length),
"clean_length": "" if clean_length is None else str(clean_length),
"text_hash": text_hash_value,
}
)
def build_record(
ds_id: str,
stage: str,
source_file: Path,
record_ref: str,
raw_text,
label: int,
source: str,
split: str,
topic: str = "",
model_slug: str = "",
seen_hashes: Optional[set] = None,
) -> Optional[dict]:
raw_text_str = "" if raw_text is None else str(raw_text)
raw_len = non_ws_len(raw_text_str)
cleaned = clean_text(raw_text_str)
clean_len = non_ws_len(cleaned)
if not cleaned:
add_exclusion(ds_id, stage, str(source_file), record_ref, label, "empty_text", raw_len, clean_len)
return None
if clean_len < MIN_CHAR_LEN:
add_exclusion(ds_id, stage, str(source_file), record_ref, label, f"too_short_lt_{MIN_CHAR_LEN}", raw_len, clean_len)
return None
reason = severe_artifact_reason(cleaned, clean_len)
if reason:
add_exclusion(ds_id, stage, str(source_file), record_ref, label, reason, raw_len, clean_len)
return None
h = text_hash(cleaned)
if seen_hashes is not None:
if h in seen_hashes:
add_exclusion(ds_id, stage, str(source_file), record_ref, label, "duplicate_text", raw_len, clean_len, h)
return None
seen_hashes.add(h)
return {
"record_id": "",
"text": cleaned,
"label": int(label),
"source": source,
"split": split,
"length_char": clean_len,
"topic": "" if topic is None else str(topic),
"model_slug": "" if model_slug is None else str(model_slug),
"_hash": h,
}
def split_counts(records: List[dict]) -> Dict[str, int]:
c = Counter(r["split"] for r in records)
return {k: int(c.get(k, 0)) for k in SPLITS}
def label_counts(records: List[dict]) -> Dict[str, int]:
c = Counter(r["label"] for r in records)
return {"0": int(c.get(0, 0)), "1": int(c.get(1, 0))}
def write_distribution_files(ds_dir: Path, records: List[dict]) -> None:
label_rows = []
lc = Counter((r["split"], r["label"]) for r in records)
for sp in SPLITS:
for lb in (0, 1):
label_rows.append({"split": sp, "label": lb, "count": lc.get((sp, lb), 0)})
label_rows.append({"split": "all", "label": 0, "count": sum(r["label"] == 0 for r in records)})
label_rows.append({"split": "all", "label": 1, "count": sum(r["label"] == 1 for r in records)})
write_csv(ds_dir / "label_dist.csv", ["split", "label", "count"], label_rows)
source_rows = []
sc = Counter((r["split"], r["source"], r["label"]) for r in records)
for (sp, src, lb), cnt in sorted(sc.items()):
source_rows.append({"split": sp, "source": src, "label": lb, "count": cnt})
write_csv(ds_dir / "source_dist.csv", ["split", "source", "label", "count"], source_rows)
length_rows = []
lbc = Counter((r["split"], length_bucket(r["length_char"])) for r in records)
for sp in SPLITS:
for _, _, b in LENGTH_BUCKETS:
length_rows.append({"split": sp, "length_bucket": b, "count": lbc.get((sp, b), 0)})
write_csv(ds_dir / "length_dist.csv", ["split", "length_bucket", "count"], length_rows)
def finalize_dataset(
ds_id: str,
ds_dir: Path,
records_by_split: Dict[str, List[dict]],
build_note: str,
source_inputs: List[str],
) -> dict:
ensure_dir(ds_dir)
all_records: List[dict] = []
for sp in SPLITS:
recs = records_by_split.get(sp, [])
for idx, rec in enumerate(recs, 1):
rec["split"] = sp
rec["record_id"] = f"{ds_id.lower()}_{sp}_{idx:07d}"
recs.sort(key=lambda x: x["record_id"])
write_jsonl(ds_dir / f"{sp}.jsonl", recs)
all_records.extend(recs)
# Distribution stats are now covered comprehensively in 审阅报告.md;
# skip generating intermediate CSVs to keep dataset directories clean.
# write_distribution_files(ds_dir, all_records)
manifest = {
"dataset_id": ds_id,
"created_at": iso_now(),
"schema_fields": REQUIRED_FIELDS,
"label_mapping": LABEL_MAP,
"build_note": build_note,
"source_inputs": source_inputs,
"record_count_total": len(all_records),
"record_count_by_split": split_counts(all_records),
"record_count_by_label": label_counts(all_records),
"source_set": sorted({r["source"] for r in all_records}),
}
write_json(ds_dir / "manifest.json", manifest)
return manifest
def stratified_split(records: List[dict], ratios: Tuple[float, float, float], strata_fn) -> Dict[str, List[dict]]:
if not records:
return {"train": [], "dev": [], "test": []}
train_ratio, dev_ratio, test_ratio = ratios
if not math.isclose(train_ratio + dev_ratio + test_ratio, 1.0, rel_tol=1e-9):
raise ValueError("ratios must sum to 1.0")
grouped = defaultdict(list)
for rec in records:
grouped[strata_fn(rec)].append(rec)
out = {"train": [], "dev": [], "test": []}
for _, group in grouped.items():
group = sorted(group, key=lambda x: x["_hash"])
n = len(group)
n_train = int(n * train_ratio)
n_dev = int(n * dev_ratio)
n_test = n - n_train - n_dev
out["train"].extend(group[:n_train])
out["dev"].extend(group[n_train : n_train + n_dev])
out["test"].extend(group[n_train + n_dev : n_train + n_dev + n_test])
for sp in SPLITS:
out[sp].sort(key=lambda x: x["_hash"])
return out
def collect_source_inventory(source_roots: Dict[str, Path]) -> dict:
inventory = {"generated_at": iso_now(), "roots": {}, "global_digest": ""}
global_hasher = hashlib.sha256()
for name, base in source_roots.items():
if not base.exists():
inventory["roots"][name] = {
"path": str(base),
"exists": False,
"file_count": 0,
"total_size": 0,
"mtime_min_ns": None,
"mtime_max_ns": None,
"digest": "",
}
continue
file_count = 0
total_size = 0
mtime_min = None
mtime_max = None
local_hasher = hashlib.sha256()
for p in sorted(base.rglob("*")):
if not p.is_file():
continue
st = p.stat()
rel = str(p.relative_to(base)).replace("\\", "/")
signature = f"{name}|{rel}|{st.st_size}|{st.st_mtime_ns}"
local_hasher.update(signature.encode("utf-8"))
global_hasher.update(signature.encode("utf-8"))
file_count += 1
total_size += int(st.st_size)
mtime_min = st.st_mtime_ns if mtime_min is None else min(mtime_min, st.st_mtime_ns)
mtime_max = st.st_mtime_ns if mtime_max is None else max(mtime_max, st.st_mtime_ns)
inventory["roots"][name] = {
"path": str(base),
"exists": True,
"file_count": file_count,
"total_size": total_size,
"mtime_min_ns": mtime_min,
"mtime_max_ns": mtime_max,
"digest": local_hasher.hexdigest(),
}
inventory["global_digest"] = global_hasher.hexdigest()
return inventory
def extract_text_from_obj(obj: dict) -> str:
if not isinstance(obj, dict):
return ""
for key in ("text", "content", "body", "answer", "article", "message"):
v = obj.get(key)
if isinstance(v, str) and v.strip():
return v
return ""
def iter_dataset_records(ds_dir: Path):
for sp in SPLITS:
fp = ds_dir / f"{sp}.jsonl"
if not fp.exists():
continue
for _, rec in iter_jsonl(fp):
rec["split"] = sp
yield rec
def coerce_answer_text(ans) -> str:
if isinstance(ans, str):
return ans
if isinstance(ans, dict):
for k in ("text", "answer", "content", "body"):
v = ans.get(k)
if isinstance(v, str) and v.strip():
return v
if isinstance(ans, list):
return "\n".join(str(x) for x in ans if str(x).strip())
return str(ans) if ans is not None else ""
def build_ds01_nlpcc() -> Tuple[Path, dict, set]:
ds_id = "DS01_NLPCC_core_v1"
ds_dir = DATASET_ROOT / "10_P0_主线必用" / ds_id
log(f"Building {ds_id}")
files = {
"train": NLPCC_DIR / "train.json",
"dev": NLPCC_DIR / "dev.json",
"test": NLPCC_DIR / "test_with_label.json",
}
seen_hashes = set()
records_by_split = {"train": [], "dev": [], "test": []}
for sp, fp in files.items():
rows = load_json_with_fallback(fp)
for idx, row in enumerate(rows):
lb = normalize_label(row.get("label"))
if lb is None:
add_exclusion(ds_id, "parse_nlpcc", str(fp), f"{sp}_{idx}", None, "invalid_label", "", "", "")
continue
rec = build_record(
ds_id=ds_id,
stage="clean_nlpcc",
source_file=fp,
record_ref=f"{sp}_{idx}",
raw_text=row.get("text", ""),
label=lb,
source="nlpcc",
split=sp,
topic=row.get("source", ""),
model_slug=row.get("model", ""),
seen_hashes=seen_hashes,
)
if rec is not None:
records_by_split[sp].append(rec)
manifest = finalize_dataset(
ds_id,
ds_dir,
records_by_split,
build_note="NLPCC official train/dev/test_with_label with unified cleaning + dedupe.",
source_inputs=[str(files["train"]), str(files["dev"]), str(files["test"])],
)
return ds_dir, manifest, seen_hashes
def build_ds02_hc3plus() -> Tuple[Path, dict]:
ds_id = "DS02_HC3Plus_clean_v1"
ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id
log(f"Building {ds_id}")
file_map = {
"train": [HC3_PLUS_DIR / "train.jsonl"],
"dev": [HC3_PLUS_DIR / "val_hc3_QA.jsonl", HC3_PLUS_DIR / "val_hc3_si.jsonl"],
"test": [HC3_PLUS_DIR / "test_hc3_QA.jsonl", HC3_PLUS_DIR / "test_hc3_si.jsonl"],
}
seen_hashes = set()
records_by_split = {"train": [], "dev": [], "test": []}
for sp, fplist in file_map.items():
for fp in fplist:
for lineno, row in iter_jsonl(fp):
lb = normalize_label(row.get("label"))
if lb is None:
add_exclusion(ds_id, "parse_hc3_plus", str(fp), f"{lineno}", None, "invalid_label", "", "", "")
continue
rec = build_record(
ds_id=ds_id,
stage="clean_hc3_plus",
source_file=fp,
record_ref=f"line_{lineno}",
raw_text=row.get("text", ""),
label=lb,
source="hc3_plus",
split=sp,
topic=fp.stem,
model_slug="",
seen_hashes=seen_hashes,
)
if rec is not None:
records_by_split[sp].append(rec)
manifest = finalize_dataset(
ds_id,
ds_dir,
records_by_split,
build_note="HC3_Plus train + merged val->dev + merged test with unified cleaning + dedupe.",
source_inputs=[str(x) for arr in file_map.values() for x in arr],
)
return ds_dir, manifest
def build_ds03_hc3raw_expanded() -> Tuple[Path, dict]:
ds_id = "DS03_HC3raw_expanded_v1"
ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id
log(f"Building {ds_id}")
seen_hashes = set()
staged_records: List[dict] = []
for lineno, row in iter_jsonl(HC3_RAW_FILE):
topic = row.get("source", "")
human_answers = row.get("human_answers") or []
ai_answers = row.get("chatgpt_answers") or []
if not isinstance(human_answers, list):
human_answers = [human_answers]
if not isinstance(ai_answers, list):
ai_answers = [ai_answers]
for idx, ans in enumerate(human_answers):
rec = build_record(
ds_id=ds_id,
stage="expand_hc3_raw_human",
source_file=HC3_RAW_FILE,
record_ref=f"line_{lineno}_h_{idx}",
raw_text=coerce_answer_text(ans),
label=0,
source="hc3_raw",
split="train",
topic=topic,
model_slug="",
seen_hashes=seen_hashes,
)
if rec is not None:
rec["_stratum"] = f"{topic}|0"
staged_records.append(rec)
for idx, ans in enumerate(ai_answers):
rec = build_record(
ds_id=ds_id,
stage="expand_hc3_raw_ai",
source_file=HC3_RAW_FILE,
record_ref=f"line_{lineno}_a_{idx}",
raw_text=coerce_answer_text(ans),
label=1,
source="hc3_raw",
split="train",
topic=topic,
model_slug="chatgpt",
seen_hashes=seen_hashes,
)
if rec is not None:
rec["_stratum"] = f"{topic}|1"
staged_records.append(rec)
split_map = stratified_split(staged_records, (0.8, 0.1, 0.1), strata_fn=lambda r: r["_stratum"])
for sp in SPLITS:
for rec in split_map[sp]:
rec["split"] = sp
rec.pop("_stratum", None)
manifest = finalize_dataset(
ds_id,
ds_dir,
split_map,
build_note="Expanded HC3 raw answers (human_answers/chatgpt_answers), then stratified hash split 80/10/10.",
source_inputs=[str(HC3_RAW_FILE)],
)
return ds_dir, manifest
def build_ds04_human_pools_merged() -> Tuple[Path, dict]:
ds_id = "DS04_Human_pools_merged_v1"
ds_dir = DATASET_ROOT / "30_P2_单标签原料池" / ds_id
log(f"Building {ds_id}")
seen_hashes = set()
staged_records: List[dict] = []
for fp in sorted(SRC_HUMAN.rglob("*.json")):
try:
row = load_json_with_fallback(fp)
except Exception:
add_exclusion(ds_id, "read_human", str(fp), "", 0, "json_decode_error", "", "", "")
continue
text = extract_text_from_obj(row)
rel = fp.relative_to(SRC_HUMAN)
top = rel.parts[0] if rel.parts else "unknown"
rec = build_record(
ds_id=ds_id,
stage="clean_human",
source_file=fp,
record_ref=str(rel).replace("\\", "/"),
raw_text=text,
label=0,
source="human",
split="train",
topic=row.get("topic", top) if isinstance(row, dict) else top,
model_slug="",
seen_hashes=seen_hashes,
)
if rec is not None:
rec["_stratum"] = f"{top}|0"
staged_records.append(rec)
for base in QUARANTINE_HC_DIRS:
version = "v2" if "v2" in str(base).lower() else "v3"
for fp in sorted(base.rglob("*.json")):
try:
row = load_json_with_fallback(fp)
except Exception:
add_exclusion(ds_id, "read_quarantine", str(fp), "", 0, "json_decode_error", "", "", "")
continue
text = extract_text_from_obj(row)
rel = fp.relative_to(base)
rec = build_record(
ds_id=ds_id,
stage="clean_quarantine_hc",
source_file=fp,
record_ref=f"{version}/{str(rel).replace('\\', '/')}",
raw_text=text,
label=0,
source="quarantine_hc",
split="train",
topic=row.get("topic", version) if isinstance(row, dict) else version,
model_slug="",
seen_hashes=seen_hashes,
)
if rec is not None:
rec["_stratum"] = f"{version}|0"
staged_records.append(rec)
split_map: Dict[str, List[dict]] = {"train": [], "dev": [], "test": []}
for rec in staged_records:
rec["split"] = "train"
rec.pop("_stratum", None)
split_map["train"].append(rec)
manifest = finalize_dataset(
ds_id,
ds_dir,
split_map,
build_note="Merged human pool from data/human (core) + quarantine v2+v3 high_confidence. All records go to train (pure single-label pool).",
source_inputs=[str(SRC_HUMAN)] + [str(x) for x in QUARANTINE_HC_DIRS],
)
return ds_dir, manifest
def balanced_round_robin_select(records: List[dict], n: int, key: str = "source") -> List[dict]:
if n <= 0:
return []
grouped = defaultdict(list)
for r in records:
grouped[r.get(key, "unknown")].append(r)
keys = sorted(grouped.keys())
for k in keys:
grouped[k].sort(key=lambda x: x["_hash"])
selected = []
i = 0
while len(selected) < n:
progressed = False
for k in keys:
if i < len(grouped[k]):
selected.append(grouped[k][i])
progressed = True
if len(selected) >= n:
break
if not progressed:
break
i += 1
return selected[:n]
def collect_pool_from_datasets(ds_dirs: List[Path], desired_label: int) -> List[dict]:
pool = []
for ds_dir in ds_dirs:
for rec in iter_dataset_records(ds_dir):
if rec.get("label") == desired_label:
rec["_hash"] = text_hash(rec["text"])
pool.append(rec)
return pool
def build_ds06_external_core_balanced(
ds01_hashes: set,
ds04_dir: Path,
ds11_dir: Path,
) -> Tuple[Path, dict]:
ds_id = "DS06_External_core_balanced_v1"
max_len = 1500
ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id
log(f"Building {ds_id}")
human_pool_raw = collect_pool_from_datasets([ds04_dir], desired_label=0)
ai_pool_raw = collect_pool_from_datasets([ds11_dir], desired_label=1)
def filter_pool(pool: List[dict], label: int) -> List[dict]:
filtered = []
local_seen = set()
overlap_drop = 0
dedupe_drop = 0
range_drop = 0
for rec in pool:
h = rec["_hash"]
lch = rec["length_char"]
if lch < 100 or lch > max_len:
range_drop += 1
add_exclusion(ds_id, "filter_len", rec.get("source", ""), rec.get("record_id", ""), label, f"length_out_of_range_100_{max_len}", lch, lch, h)
continue
if h in ds01_hashes:
overlap_drop += 1
add_exclusion(ds_id, "overlap_filter", rec.get("source", ""), rec.get("record_id", ""), label, "overlap_with_ds01", lch, lch, h)
continue
if h in local_seen:
dedupe_drop += 1
add_exclusion(ds_id, "dedupe_pool", rec.get("source", ""), rec.get("record_id", ""), label, "duplicate_text_pool", lch, lch, h)
continue
local_seen.add(h)
filtered.append(rec)
OVERLAP_AUDIT.setdefault(ds_id, {})
OVERLAP_AUDIT[ds_id][f"label_{label}_raw"] = len(pool)
OVERLAP_AUDIT[ds_id][f"label_{label}_after_filter"] = len(filtered)
OVERLAP_AUDIT[ds_id][f"label_{label}_drop_overlap_ds01"] = overlap_drop
OVERLAP_AUDIT[ds_id][f"label_{label}_drop_duplicate_pool"] = dedupe_drop
OVERLAP_AUDIT[ds_id][f"label_{label}_drop_length"] = range_drop
return filtered
human_pool = filter_pool(human_pool_raw, 0)
ai_pool = filter_pool(ai_pool_raw, 1)
n = min(len(human_pool), len(ai_pool))
if n <= 0:
raise RuntimeError(f"{ds_id}: no balanced data available after filtering")
combined = balanced_round_robin_select(human_pool, n, "source") + balanced_round_robin_select(ai_pool, n, "source")
final = []
seen_final = {}
conflict_drop = 0
for rec in sorted(combined, key=lambda x: x["_hash"]):
h = rec["_hash"]
lb = rec["label"]
if h in seen_final and seen_final[h] != lb:
conflict_drop += 1
add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "cross_label_conflict_hash", rec["length_char"], rec["length_char"], h)
continue
if h in seen_final:
add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "duplicate_text_final", rec["length_char"], rec["length_char"], h)
continue
seen_final[h] = lb
final.append(rec)
OVERLAP_AUDIT[ds_id]["balanced_n_before_conflict"] = 2 * n
OVERLAP_AUDIT[ds_id]["drop_cross_label_conflict"] = conflict_drop
OVERLAP_AUDIT[ds_id]["final_n"] = len(final)
split_map = stratified_split(final, (0.6, 0.2, 0.2), strata_fn=lambda r: f"{r['source']}|{r['label']}")
for sp in SPLITS:
for rec in split_map[sp]:
rec["split"] = sp
manifest = finalize_dataset(
ds_id,
ds_dir,
split_map,
build_note=f"External balanced dataset from DS04 (human) and DS11 (generated ai standard), length 100-{max_len}, leakage-filtered vs DS01, stratified split 60/20/20.",
source_inputs=[str(ds04_dir), str(ds11_dir)],
)
return ds_dir, manifest
def build_ds07_external_long(
ds01_hashes: set,
ds04_dir: Path,
ds12_dir: Path,
) -> Tuple[Path, dict]:
ds_id = "DS07_External_long_v1"
max_len = 2200
ds_dir = DATASET_ROOT / "20_P1_混合标签集" / ds_id
log(f"Building {ds_id}")
human_pool_raw = collect_pool_from_datasets([ds04_dir], desired_label=0)
ai_pool_raw = collect_pool_from_datasets([ds12_dir], desired_label=1)
def filter_pool(pool: List[dict], label: int) -> List[dict]:
filtered = []
local_seen = set()
overlap_drop = 0
dedupe_drop = 0
range_drop = 0
for rec in pool:
h = rec["_hash"]
lch = rec["length_char"]
if lch < 100 or lch > max_len:
range_drop += 1
add_exclusion(ds_id, "filter_len", rec.get("source", ""), rec.get("record_id", ""), label, f"length_out_of_range_100_{max_len}", lch, lch, h)
continue
if h in ds01_hashes:
overlap_drop += 1
add_exclusion(ds_id, "overlap_filter", rec.get("source", ""), rec.get("record_id", ""), label, "overlap_with_ds01", lch, lch, h)
continue
if h in local_seen:
dedupe_drop += 1
add_exclusion(ds_id, "dedupe_pool", rec.get("source", ""), rec.get("record_id", ""), label, "duplicate_text_pool", lch, lch, h)
continue
local_seen.add(h)
filtered.append(rec)
OVERLAP_AUDIT.setdefault(ds_id, {})
OVERLAP_AUDIT[ds_id][f"label_{label}_raw"] = len(pool)
OVERLAP_AUDIT[ds_id][f"label_{label}_after_filter"] = len(filtered)
OVERLAP_AUDIT[ds_id][f"label_{label}_drop_overlap_ds01"] = overlap_drop
OVERLAP_AUDIT[ds_id][f"label_{label}_drop_duplicate_pool"] = dedupe_drop
OVERLAP_AUDIT[ds_id][f"label_{label}_drop_length"] = range_drop
return filtered
human_pool = filter_pool(human_pool_raw, 0)
ai_pool = filter_pool(ai_pool_raw, 1)
n = min(len(human_pool), len(ai_pool))
if n <= 0:
raise RuntimeError(f"{ds_id}: no balanced data available after filtering")
combined = balanced_round_robin_select(human_pool, n, "source") + balanced_round_robin_select(ai_pool, n, "source")
final = []
seen_final = {}
conflict_drop = 0
for rec in sorted(combined, key=lambda x: x["_hash"]):
h = rec["_hash"]
lb = rec["label"]
if h in seen_final and seen_final[h] != lb:
conflict_drop += 1
add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "cross_label_conflict_hash", rec["length_char"], rec["length_char"], h)
continue
if h in seen_final:
add_exclusion(ds_id, "final_dedupe", rec.get("source", ""), rec.get("record_id", ""), lb, "duplicate_text_final", rec["length_char"], rec["length_char"], h)
continue
seen_final[h] = lb
final.append(rec)
OVERLAP_AUDIT[ds_id]["balanced_n_before_conflict"] = 2 * n
OVERLAP_AUDIT[ds_id]["drop_cross_label_conflict"] = conflict_drop
OVERLAP_AUDIT[ds_id]["final_n"] = len(final)
split_map = stratified_split(final, (0.6, 0.2, 0.2), strata_fn=lambda r: f"{r['source']}|{r['label']}")
for sp in SPLITS:
for rec in split_map[sp]:
rec["split"] = sp
manifest = finalize_dataset(
ds_id,
ds_dir,
split_map,
build_note=f"External balanced dataset from DS04 (human) and DS12 (generated ai natural), length 100-{max_len}, leakage-filtered vs DS01, stratified split 60/20/20.",
source_inputs=[str(ds04_dir), str(ds12_dir)],
)
return ds_dir, manifest
def build_p2_candidate_audit() -> dict:
p2_dir = DATASET_ROOT / "30_P2_候选待验证"
ensure_dir(p2_dir)
def scan_dir(path: Path) -> dict:
by_ext = Counter()
total_size = 0
files = []
if path.exists():
for fp in sorted(path.rglob("*")):
if fp.is_file():
ext = fp.suffix.lower() if fp.suffix else "<no_ext>"
by_ext[ext] += 1
st = fp.stat()
total_size += int(st.st_size)
files.append(str(fp))
return {
"path": str(path),
"exists": path.exists(),
"file_count": len(files),
"total_size": total_size,
"ext_dist": dict(sorted(by_ext.items())),
"sample_files": files[:200],
}
clts_info = scan_dir(P2_CLTS_DIR)
zhihu_info = scan_dir(P2_ZHIHU_DIR)
audit = {
"generated_at": iso_now(),
"policy": "P2 candidate only; not included in DS01-DS07 trainable main schema this round.",
"candidates": {
"CLTS数据集": {**clts_info, "why_not_direct_train": "parallel src/tgt corpus; no direct human-vs-ai detection labels"},
"huggingface_zhihu": {**zhihu_info, "why_not_direct_train": "metadata/inspection-focused files; supervised label semantics need redesign"},
},
}
write_json(p2_dir / "candidate_audit.json", audit)
write_csv(
p2_dir / "candidate_summary.csv",
["candidate", "file_count", "total_size", "why_not_direct_train"],
[
{
"candidate": "CLTS数据集",
"file_count": clts_info["file_count"],
"total_size": clts_info["total_size"],
"why_not_direct_train": "parallel corpus; no direct detection labels",
},
{
"candidate": "huggingface_zhihu",
"file_count": zhihu_info["file_count"],
"total_size": zhihu_info["total_size"],
"why_not_direct_train": "metadata/inspection-first; label semantics pending",
},
],
)
return audit
def write_contract_files() -> None:
cdir = DATASET_ROOT / "00_contract"
ensure_dir(cdir)
schema = {
"title": "Unified Main Schema for 20260408 SOTA Route",
"type": "object",
"required": REQUIRED_FIELDS,
"properties": {
"record_id": {"type": "string"},
"text": {"type": "string"},
"label": {"type": "integer", "enum": [0, 1]},
"source": {"type": "string"},
"split": {"type": "string", "enum": ["train", "dev", "test"]},
"length_char": {"type": "integer", "minimum": 0},
"topic": {"type": "string"},
"model_slug": {"type": "string"},
},
}
write_json(cdir / "main_schema.json", schema)
write_json(cdir / "label_mapping.json", LABEL_MAP)
md = """# 主 Schema 合同(统一训练格式)
## 字段
- `record_id`:字符串唯一ID
- `text`:清洗后的正文
- `label`:整数标签(human=0, ai=1)
- `source`:数据族来源(如 `nlpcc/hc3_plus/hc3_raw/human/quarantine_hc`)
- `split`:`train/dev/test`
- `length_char`:去空白字符后的长度
- `topic`:主题字段(可空字符串)
- `model_slug`:AI来源模型(可空字符串)
## 全局规则
- 编码回退:`utf-8-sig -> utf-8 -> gb18030`
- 清洗:换行统一、空白压缩、`\\n`与明显`/n`噪声修正、Markdown去痕
- 质量过滤:空文本、`length_char < 30`、严重伪装文本、重复文本剔除
- 标签映射固定:`human=0, ai=1`
"""
(cdir / "schema_contract.md").write_text(md, encoding="utf-8")
def write_experiment_views(ds_paths: Dict[str, Path]) -> None:
edir = DATASET_ROOT / "40_experiment_views"
ensure_dir(edir)
mapping = {
"E00": {"use": ["DS01"], "detail": "Schema check + distribution"},
"E01": {"use": ["DS01"], "detail": "BERT baseline train/dev"},
"E02": {"use": ["DS01"], "detail": "RoBERTa baseline train/dev"},
"E03": {"use": ["DS01"], "detail": "Rule module on dev/test"},
"E04": {"use": ["DS01"], "detail": "Static weighted vote"},
"E05": {"use": ["DS01"], "detail": "Determinative override"},
"E06": {"use": ["DS01"], "detail": "Dynamic vote by buckets"},
"E07": {"use": ["DS01"], "detail": "Joint features MLP"},
"E08": {"use": ["DS01", "DS02", "DS03"], "detail": "Full chain rerun / optional aug"},
"E09": {"use": ["DS01", "DS06"], "detail": "Train on DS01, transfer eval on DS06"},
"E10": {"use": ["DS01", "DS06"], "detail": "Calibrate with DS01 dev + DS06 dev/test"},
"E11": {"use": ["DS01", "DS06"], "detail": "Bucket threshold on DS06"},
"E12": {"use": ["DS01", "DS06", "DS07"], "detail": "Zero-shot branch"},
"E13": {"use": ["DS01", "DS06", "DS07"], "detail": "Zero-shot branch"},
"E14": {"use": ["DS01", "DS06", "DS07"], "detail": "LLM LoRA branch"},
"E15": {"use": ["DS01", "DS06", "DS07"], "detail": "Combo search"},
}
resolved = {}
rows = []
for exp, item in mapping.items():
resolved[exp] = {"detail": item["detail"], "datasets": {}}
for ds in item["use"]:
p = ds_paths[ds]
resolved[exp]["datasets"][ds] = {
"dataset_dir": str(p),
"train": str(p / "train.jsonl"),
"dev": str(p / "dev.jsonl"),
"test": str(p / "test.jsonl"),
}
rows.append(
{
"experiment_id": exp,
"dataset_id": ds,
"dataset_dir": str(p),
"train_jsonl": str(p / "train.jsonl"),
"dev_jsonl": str(p / "dev.jsonl"),
"test_jsonl": str(p / "test.jsonl"),
}
)
write_json(edir / "experiment_views.json", {"generated_at": iso_now(), "views": resolved})
write_csv(edir / "E00_E15_dataset_mapping.csv", ["experiment_id", "dataset_id", "dataset_dir", "train_jsonl", "dev_jsonl", "test_jsonl"], rows)
def validate_schema_sample(ds_paths: Dict[str, Path]) -> dict:
report = {"generated_at": iso_now(), "datasets": {}}
for ds, p in ds_paths.items():
checked = 0
invalid = 0
examples = []
for sp in SPLITS:
fp = p / f"{sp}.jsonl"
if not fp.exists():
continue
for _, rec in iter_jsonl(fp):
checked += 1
ok = True
for k in REQUIRED_FIELDS:
if k not in rec:
ok = False
if len(examples) < 10:
examples.append(f"{sp}:missing:{k}")
if rec.get("label") not in (0, 1):
ok = False
if len(examples) < 10:
examples.append(f"{sp}:bad_label:{rec.get('label')}")
if rec.get("split") not in ("train", "dev", "test"):
ok = False
if len(examples) < 10:
examples.append(f"{sp}:bad_split:{rec.get('split')}")
if not isinstance(rec.get("length_char"), int):
ok = False
if len(examples) < 10:
examples.append(f"{sp}:bad_length_type:{type(rec.get('length_char')).__name__}")
if not ok:
invalid += 1
if checked >= 1000:
break
if checked >= 1000:
break
report["datasets"][ds] = {
"checked_records": checked,
"invalid_records": invalid,
"valid": invalid == 0,
"example_errors": examples,
}
return report
def write_exclusion_log() -> None:
mdir = DATASET_ROOT / "90_manifests"
ensure_dir(mdir)
fieldnames = ["dataset_id", "stage", "source_file", "record_ref", "label", "drop_reason", "raw_length", "clean_length", "text_hash"]
write_csv(mdir / "exclusion_log.csv", fieldnames, EXCLUSION_LOG_ROWS)
reason_counter = Counter(r["drop_reason"] for r in EXCLUSION_LOG_ROWS)
rows = [{"drop_reason": k, "count": v} for k, v in sorted(reason_counter.items(), key=lambda x: (-x[1], x[0]))]
write_csv(mdir / "exclusion_reason_stats.csv", ["drop_reason", "count"], rows)
def main() -> int:
source_roots = {
"human": SRC_HUMAN,
"human_quarantine": SRC_QUARANTINE,
"raw_external": SRC_RAW_EXTERNAL,
}
if DATASET_ROOT.exists():
log(f"Removing old dataset directory: {DATASET_ROOT}")
shutil.rmtree(DATASET_ROOT)
ensure_dir(DATASET_ROOT)
ensure_dir(DATASET_ROOT / "90_manifests")
log("Collecting source inventory (before)...")
inv_before = collect_source_inventory(source_roots)
write_json(DATASET_ROOT / "90_manifests" / "source_inventory_before.json", inv_before)
log("Writing dataset contract files...")
write_contract_files()
ds_paths: Dict[str, Path] = {}
dataset_manifests = {}
ds01_dir, ds01_manifest, ds01_hashes = build_ds01_nlpcc()
ds_paths["DS01"] = ds01_dir
dataset_manifests["DS01"] = ds01_manifest
ds02_dir, ds02_manifest = build_ds02_hc3plus()
ds_paths["DS02"] = ds02_dir
dataset_manifests["DS02"] = ds02_manifest
ds03_dir, ds03_manifest = build_ds03_hc3raw_expanded()
ds_paths["DS03"] = ds03_dir
dataset_manifests["DS03"] = ds03_manifest
ds04_dir, ds04_manifest = build_ds04_human_pools_merged()
ds_paths["DS04"] = ds04_dir
dataset_manifests["DS04"] = ds04_manifest
ds11_dir = DATASET_ROOT / "30_P2_单标签原料池" / "DS11_Generated_AI_v1"
ds12_dir = DATASET_ROOT / "30_P2_单标签原料池" / "DS12_Generated_AI_natural_v1"
ds06_dir, ds06_manifest = build_ds06_external_core_balanced(
ds01_hashes=ds01_hashes,
ds04_dir=ds04_dir,
ds11_dir=ds11_dir,
)
ds_paths["DS06"] = ds06_dir
dataset_manifests["DS06"] = ds06_manifest
ds07_dir, ds07_manifest = build_ds07_external_long(
ds01_hashes=ds01_hashes,
ds04_dir=ds04_dir,
ds12_dir=ds12_dir,
)
ds_paths["DS07"] = ds07_dir
dataset_manifests["DS07"] = ds07_manifest
log("Building P2 candidate audit...")
p2_audit = build_p2_candidate_audit()
log("Writing experiment views...")
write_experiment_views(ds_paths)
log("Writing manifests + validation reports...")
write_json(DATASET_ROOT / "90_manifests" / "dataset_manifests.json", dataset_manifests)
write_json(DATASET_ROOT / "90_manifests" / "overlap_audit.json", OVERLAP_AUDIT)
write_json(DATASET_ROOT / "90_manifests" / "p2_candidate_audit_copy.json", p2_audit)
schema_report = validate_schema_sample(ds_paths)
write_json(DATASET_ROOT / "90_manifests" / "schema_validation_report.json", schema_report)
write_exclusion_log()
log("Collecting source inventory (after)...")
inv_after = collect_source_inventory(source_roots)
write_json(DATASET_ROOT / "90_manifests" / "source_inventory_after.json", inv_after)
compare = {
"generated_at": iso_now(),
"before_digest": inv_before.get("global_digest"),
"after_digest": inv_after.get("global_digest"),
"same_digest": inv_before.get("global_digest") == inv_after.get("global_digest"),
"before": inv_before,
"after": inv_after,
}
write_json(DATASET_ROOT / "90_manifests" / "source_inventory_compare.json", compare)
if not compare["same_digest"]:
log("ERROR: source inventory changed during build. Stopping.")
return 2
summary = {
"generated_at": iso_now(),
"dataset_root": str(DATASET_ROOT),
"datasets": {k: {"path": str(v), "manifest": dataset_manifests.get(k, {})} for k, v in ds_paths.items()},
"overlap_audit_file": str(DATASET_ROOT / "90_manifests" / "overlap_audit.json"),
"exclusion_log_file": str(DATASET_ROOT / "90_manifests" / "exclusion_log.csv"),
"schema_validation_report_file": str(DATASET_ROOT / "90_manifests" / "schema_validation_report.json"),
}
write_json(DATASET_ROOT / "90_manifests" / "build_summary.json", summary)
log("Dataset build completed successfully.")
return 0
if __name__ == "__main__":
sys.exit(main())