bbkdevops's picture
download
raw
10.1 kB
from __future__ import annotations
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import json
from pathlib import Path
from typing import Any, Iterable
from train.dataset_quality_governor import (
_canonical_text,
_domain,
_norm_for_hash,
_quality_reject_reason,
_source,
_token_estimate,
)
@dataclass(frozen=True)
class PurityConcentratorPolicy:
max_records: int = 120_000
min_estimated_tokens: int = 16
max_estimated_tokens: int = 2048
max_domain_share: float = 0.18
coverage_max_share: float = 0.03
cve_max_share: float = 0.07
weak_axis_min_share: float = 0.20
min_quality_score: float = 0.48
WEAK_AXIS_DOMAINS = {
"alignment_constraint_following",
"alignment_tool_calling",
"logic_instruction_following",
"logic_tool_grounding",
"logic_data_tooling",
"sandbox_tools",
"data_tooling",
}
def _read_jsonl(path: Path) -> Iterable[tuple[int, dict[str, Any] | None]]:
decoder = json.JSONDecoder(strict=False)
with path.open("r", encoding="utf-8", errors="replace") as f:
for line_no, line in enumerate(f, start=1):
if not line.strip():
continue
try:
yield line_no, decoder.decode(line)
except json.JSONDecodeError:
yield line_no, None
def _semantic_hash(item: dict[str, Any], text: str, domain: str) -> str:
metadata = item.get("metadata") or {}
qg = item.get("quality_governor") or {}
if qg.get("semantic_sha256"):
existing = str(qg["semantic_sha256"])
return existing if existing.startswith(f"{domain}:") else domain + ":" + existing
if metadata.get("fingerprint_sha256"):
return domain + ":" + str(metadata["fingerprint_sha256"])
return hashlib.sha256(_norm_for_hash(text).encode("utf-8", errors="ignore")).hexdigest()
def _assistant_text(item: dict[str, Any]) -> str:
if isinstance(item.get("messages"), list):
return "\n".join(str(m.get("content", "")) for m in item["messages"] if m.get("role") == "assistant")
return str(item.get("answer") or item.get("response") or item.get("text") or "")
def _quality_score(item: dict[str, Any], text: str, domain: str, tokens: int) -> float:
assistant = _assistant_text(item)
score = 0.35
if len(assistant.strip()) >= 80:
score += 0.14
if len(assistant.strip()) >= 240:
score += 0.08
if any(mark in assistant for mark in ("```", "def ", "class ", "return ", "{", "}")) and (
"code" in domain or "tool" in domain or "data_tooling" in domain or "sandbox" in domain
):
score += 0.14
if any("\u0e00" <= ch <= "\u0e7f" for ch in text) and ("thai" in domain or "general" in domain):
score += 0.10
if any(mark in text.lower() for mark in ("verify", "evidence", "test", "ตรวจ", "หลักฐาน", "schema", "json")):
score += 0.10
if domain in WEAK_AXIS_DOMAINS:
score += 0.10
if domain == "coverage_100k":
score -= 0.18
if domain == "cve_intelligence":
score -= 0.05
if tokens < 32:
score -= 0.15
if tokens > 1536:
score -= 0.10
if assistant.strip().lower() in {"ok", "done", "yes", "no"}:
score -= 0.30
return max(0.0, min(1.0, score))
def _loss_weight(domain: str, score: float) -> float:
if domain == "coverage_100k":
return 0.06
if domain in {"cve_intelligence", "reverse_engineering"}:
return 0.85
if domain in {"thai_grounding", "general"}:
return 1.0
if domain in WEAK_AXIS_DOMAINS:
return 1.55
if domain.startswith("logic_"):
return 1.35
if domain.startswith("alignment_"):
return 1.60
if domain.startswith("claude_reasoning_"):
return min(1.0, 0.70 + score * 0.25)
return 1.0
def build_purity_concentrator(
input_paths: list[str | Path],
out_dir: str | Path,
*,
policy: PurityConcentratorPolicy | None = None,
) -> dict[str, Any]:
policy = policy or PurityConcentratorPolicy()
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
candidates: dict[str, dict[str, Any]] = {}
reject_counts: Counter[str] = Counter()
source_counts: Counter[str] = Counter()
input_records_seen = 0
for raw_path in input_paths:
path = Path(raw_path)
if not path.exists():
reject_counts["missing_input_path"] += 1
continue
for line_no, item in _read_jsonl(path):
input_records_seen += 1
if item is None:
reject_counts["invalid_json"] += 1
continue
text = _canonical_text(item)
tokens = _token_estimate(text)
source = _source(item)
domain = str((item.get("quality_governor") or {}).get("domain") or _domain(item, source))
if tokens < policy.min_estimated_tokens:
reject_counts["too_short"] += 1
continue
if tokens > policy.max_estimated_tokens:
reject_counts["too_long"] += 1
continue
reason = _quality_reject_reason(text, skip_repetition_scan=domain.startswith(("logic_", "alignment_")))
if reason:
reject_counts[reason] += 1
continue
h = _semantic_hash(item, text, domain)
score = _quality_score(item, text, domain, tokens)
if score < policy.min_quality_score:
reject_counts["low_density_score"] += 1
continue
enriched = item
enriched.setdefault("metadata", {})
enriched.setdefault("quality_governor", {})
enriched["quality_governor"].update(
{
"domain": domain,
"source": source,
"estimated_tokens": tokens,
"semantic_sha256": h,
"purity_density_score": score,
"loss_weight": _loss_weight(domain, score),
"purity_concentrator": "puremax-v1",
}
)
prev = candidates.get(h)
if prev is None or score > prev["_score"]:
enriched["_score"] = score
enriched["_input_path"] = str(path)
enriched["_line_no"] = line_no
candidates[h] = enriched
else:
reject_counts["duplicate_lower_score"] += 1
source_counts[source] += 1
ranked = sorted(candidates.values(), key=lambda row: (-row["_score"], row["quality_governor"]["estimated_tokens"]))
selected: list[dict[str, Any]] = []
domain_counts: Counter[str] = Counter()
domain_caps = defaultdict(lambda: max(1, int(policy.max_records * policy.max_domain_share)))
domain_caps["coverage_100k"] = max(1, int(policy.max_records * policy.coverage_max_share))
domain_caps["cve_intelligence"] = max(1, int(policy.max_records * policy.cve_max_share))
for row in ranked:
domain = row["quality_governor"]["domain"]
if len(selected) >= policy.max_records:
reject_counts["global_cap"] += 1
continue
if domain_counts[domain] >= domain_caps[domain]:
reject_counts["domain_share_cap"] += 1
continue
clean = {k: v for k, v in row.items() if not k.startswith("_")}
selected.append(clean)
domain_counts[domain] += 1
out_jsonl = out / "tinymind_puremax_concentrated_mix.jsonl"
with out_jsonl.open("w", encoding="utf-8", newline="\n") as f:
for row in selected:
f.write(json.dumps(row, ensure_ascii=False) + "\n")
kept_records = len(selected)
weak_axis_share = sum(domain_counts[d] for d in WEAK_AXIS_DOMAINS) / max(kept_records, 1)
coverage_share = domain_counts["coverage_100k"] / max(kept_records, 1)
dominant_domain_share = max(domain_counts.values(), default=0) / max(kept_records, 1)
avg_score = sum(float(row["quality_governor"]["purity_density_score"]) for row in selected) / max(kept_records, 1)
report = {
"schema": "tinymind.purity_concentrator.v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"input_paths": [str(Path(p)) for p in input_paths],
"output_jsonl": str(out_jsonl),
"input_records_seen": input_records_seen,
"candidate_records_after_quality": len(candidates),
"kept_records": kept_records,
"rejected_records": sum(reject_counts.values()),
"reject_counts": dict(sorted(reject_counts.items())),
"domain_counts": dict(sorted(domain_counts.items())),
"source_counts_top": dict(source_counts.most_common(40)),
"metrics": {
"avg_purity_density_score": avg_score,
"weak_axis_share": weak_axis_share,
"coverage_share": coverage_share,
"dominant_domain_share": dominant_domain_share,
},
"training_contract": {
"loss_normalization": "per_sample_token_normalized",
"targeting": "assistant_final_answer_only",
"coverage_100k_loss_weight": 0.06,
"weak_axis_loss_weight_floor": 1.35,
"domain_share_cap": policy.max_domain_share,
},
"claim_gate": {
"puremax_dataset_ready": kept_records > 0,
"round_domain_balance_ready": dominant_domain_share <= policy.max_domain_share and coverage_share <= policy.coverage_max_share,
"weak_axis_enrichment_ready": weak_axis_share >= policy.weak_axis_min_share,
"world_purest_claim_allowed": False,
"reason": "This is local data purity evidence. It improves filtering and balance but cannot prove world's purest data.",
},
}
report_path = out / "purity_concentrator_manifest.json"
report["manifest_path"] = str(report_path)
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return report

Xet Storage Details

Size:
10.1 kB
·
Xet hash:
2171c241ea10e14e9e45541a2b31186d388445476a562a2f342038617321ea89

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.