Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /purity_concentrator.py
| from __future__ import annotations | |
| from collections import Counter, defaultdict | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| from typing import Any, Iterable | |
| from train.dataset_quality_governor import ( | |
| _canonical_text, | |
| _domain, | |
| _norm_for_hash, | |
| _quality_reject_reason, | |
| _source, | |
| _token_estimate, | |
| ) | |
| class PurityConcentratorPolicy: | |
| max_records: int = 120_000 | |
| min_estimated_tokens: int = 16 | |
| max_estimated_tokens: int = 2048 | |
| max_domain_share: float = 0.18 | |
| coverage_max_share: float = 0.03 | |
| cve_max_share: float = 0.07 | |
| weak_axis_min_share: float = 0.20 | |
| min_quality_score: float = 0.48 | |
| WEAK_AXIS_DOMAINS = { | |
| "alignment_constraint_following", | |
| "alignment_tool_calling", | |
| "logic_instruction_following", | |
| "logic_tool_grounding", | |
| "logic_data_tooling", | |
| "sandbox_tools", | |
| "data_tooling", | |
| } | |
| def _read_jsonl(path: Path) -> Iterable[tuple[int, dict[str, Any] | None]]: | |
| decoder = json.JSONDecoder(strict=False) | |
| with path.open("r", encoding="utf-8", errors="replace") as f: | |
| for line_no, line in enumerate(f, start=1): | |
| if not line.strip(): | |
| continue | |
| try: | |
| yield line_no, decoder.decode(line) | |
| except json.JSONDecodeError: | |
| yield line_no, None | |
| def _semantic_hash(item: dict[str, Any], text: str, domain: str) -> str: | |
| metadata = item.get("metadata") or {} | |
| qg = item.get("quality_governor") or {} | |
| if qg.get("semantic_sha256"): | |
| existing = str(qg["semantic_sha256"]) | |
| return existing if existing.startswith(f"{domain}:") else domain + ":" + existing | |
| if metadata.get("fingerprint_sha256"): | |
| return domain + ":" + str(metadata["fingerprint_sha256"]) | |
| return hashlib.sha256(_norm_for_hash(text).encode("utf-8", errors="ignore")).hexdigest() | |
| def _assistant_text(item: dict[str, Any]) -> str: | |
| if isinstance(item.get("messages"), list): | |
| return "\n".join(str(m.get("content", "")) for m in item["messages"] if m.get("role") == "assistant") | |
| return str(item.get("answer") or item.get("response") or item.get("text") or "") | |
| def _quality_score(item: dict[str, Any], text: str, domain: str, tokens: int) -> float: | |
| assistant = _assistant_text(item) | |
| score = 0.35 | |
| if len(assistant.strip()) >= 80: | |
| score += 0.14 | |
| if len(assistant.strip()) >= 240: | |
| score += 0.08 | |
| if any(mark in assistant for mark in ("```", "def ", "class ", "return ", "{", "}")) and ( | |
| "code" in domain or "tool" in domain or "data_tooling" in domain or "sandbox" in domain | |
| ): | |
| score += 0.14 | |
| if any("\u0e00" <= ch <= "\u0e7f" for ch in text) and ("thai" in domain or "general" in domain): | |
| score += 0.10 | |
| if any(mark in text.lower() for mark in ("verify", "evidence", "test", "ตรวจ", "หลักฐาน", "schema", "json")): | |
| score += 0.10 | |
| if domain in WEAK_AXIS_DOMAINS: | |
| score += 0.10 | |
| if domain == "coverage_100k": | |
| score -= 0.18 | |
| if domain == "cve_intelligence": | |
| score -= 0.05 | |
| if tokens < 32: | |
| score -= 0.15 | |
| if tokens > 1536: | |
| score -= 0.10 | |
| if assistant.strip().lower() in {"ok", "done", "yes", "no"}: | |
| score -= 0.30 | |
| return max(0.0, min(1.0, score)) | |
| def _loss_weight(domain: str, score: float) -> float: | |
| if domain == "coverage_100k": | |
| return 0.06 | |
| if domain in {"cve_intelligence", "reverse_engineering"}: | |
| return 0.85 | |
| if domain in {"thai_grounding", "general"}: | |
| return 1.0 | |
| if domain in WEAK_AXIS_DOMAINS: | |
| return 1.55 | |
| if domain.startswith("logic_"): | |
| return 1.35 | |
| if domain.startswith("alignment_"): | |
| return 1.60 | |
| if domain.startswith("claude_reasoning_"): | |
| return min(1.0, 0.70 + score * 0.25) | |
| return 1.0 | |
| def build_purity_concentrator( | |
| input_paths: list[str | Path], | |
| out_dir: str | Path, | |
| *, | |
| policy: PurityConcentratorPolicy | None = None, | |
| ) -> dict[str, Any]: | |
| policy = policy or PurityConcentratorPolicy() | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| candidates: dict[str, dict[str, Any]] = {} | |
| reject_counts: Counter[str] = Counter() | |
| source_counts: Counter[str] = Counter() | |
| input_records_seen = 0 | |
| for raw_path in input_paths: | |
| path = Path(raw_path) | |
| if not path.exists(): | |
| reject_counts["missing_input_path"] += 1 | |
| continue | |
| for line_no, item in _read_jsonl(path): | |
| input_records_seen += 1 | |
| if item is None: | |
| reject_counts["invalid_json"] += 1 | |
| continue | |
| text = _canonical_text(item) | |
| tokens = _token_estimate(text) | |
| source = _source(item) | |
| domain = str((item.get("quality_governor") or {}).get("domain") or _domain(item, source)) | |
| if tokens < policy.min_estimated_tokens: | |
| reject_counts["too_short"] += 1 | |
| continue | |
| if tokens > policy.max_estimated_tokens: | |
| reject_counts["too_long"] += 1 | |
| continue | |
| reason = _quality_reject_reason(text, skip_repetition_scan=domain.startswith(("logic_", "alignment_"))) | |
| if reason: | |
| reject_counts[reason] += 1 | |
| continue | |
| h = _semantic_hash(item, text, domain) | |
| score = _quality_score(item, text, domain, tokens) | |
| if score < policy.min_quality_score: | |
| reject_counts["low_density_score"] += 1 | |
| continue | |
| enriched = item | |
| enriched.setdefault("metadata", {}) | |
| enriched.setdefault("quality_governor", {}) | |
| enriched["quality_governor"].update( | |
| { | |
| "domain": domain, | |
| "source": source, | |
| "estimated_tokens": tokens, | |
| "semantic_sha256": h, | |
| "purity_density_score": score, | |
| "loss_weight": _loss_weight(domain, score), | |
| "purity_concentrator": "puremax-v1", | |
| } | |
| ) | |
| prev = candidates.get(h) | |
| if prev is None or score > prev["_score"]: | |
| enriched["_score"] = score | |
| enriched["_input_path"] = str(path) | |
| enriched["_line_no"] = line_no | |
| candidates[h] = enriched | |
| else: | |
| reject_counts["duplicate_lower_score"] += 1 | |
| source_counts[source] += 1 | |
| ranked = sorted(candidates.values(), key=lambda row: (-row["_score"], row["quality_governor"]["estimated_tokens"])) | |
| selected: list[dict[str, Any]] = [] | |
| domain_counts: Counter[str] = Counter() | |
| domain_caps = defaultdict(lambda: max(1, int(policy.max_records * policy.max_domain_share))) | |
| domain_caps["coverage_100k"] = max(1, int(policy.max_records * policy.coverage_max_share)) | |
| domain_caps["cve_intelligence"] = max(1, int(policy.max_records * policy.cve_max_share)) | |
| for row in ranked: | |
| domain = row["quality_governor"]["domain"] | |
| if len(selected) >= policy.max_records: | |
| reject_counts["global_cap"] += 1 | |
| continue | |
| if domain_counts[domain] >= domain_caps[domain]: | |
| reject_counts["domain_share_cap"] += 1 | |
| continue | |
| clean = {k: v for k, v in row.items() if not k.startswith("_")} | |
| selected.append(clean) | |
| domain_counts[domain] += 1 | |
| out_jsonl = out / "tinymind_puremax_concentrated_mix.jsonl" | |
| with out_jsonl.open("w", encoding="utf-8", newline="\n") as f: | |
| for row in selected: | |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") | |
| kept_records = len(selected) | |
| weak_axis_share = sum(domain_counts[d] for d in WEAK_AXIS_DOMAINS) / max(kept_records, 1) | |
| coverage_share = domain_counts["coverage_100k"] / max(kept_records, 1) | |
| dominant_domain_share = max(domain_counts.values(), default=0) / max(kept_records, 1) | |
| avg_score = sum(float(row["quality_governor"]["purity_density_score"]) for row in selected) / max(kept_records, 1) | |
| report = { | |
| "schema": "tinymind.purity_concentrator.v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "input_paths": [str(Path(p)) for p in input_paths], | |
| "output_jsonl": str(out_jsonl), | |
| "input_records_seen": input_records_seen, | |
| "candidate_records_after_quality": len(candidates), | |
| "kept_records": kept_records, | |
| "rejected_records": sum(reject_counts.values()), | |
| "reject_counts": dict(sorted(reject_counts.items())), | |
| "domain_counts": dict(sorted(domain_counts.items())), | |
| "source_counts_top": dict(source_counts.most_common(40)), | |
| "metrics": { | |
| "avg_purity_density_score": avg_score, | |
| "weak_axis_share": weak_axis_share, | |
| "coverage_share": coverage_share, | |
| "dominant_domain_share": dominant_domain_share, | |
| }, | |
| "training_contract": { | |
| "loss_normalization": "per_sample_token_normalized", | |
| "targeting": "assistant_final_answer_only", | |
| "coverage_100k_loss_weight": 0.06, | |
| "weak_axis_loss_weight_floor": 1.35, | |
| "domain_share_cap": policy.max_domain_share, | |
| }, | |
| "claim_gate": { | |
| "puremax_dataset_ready": kept_records > 0, | |
| "round_domain_balance_ready": dominant_domain_share <= policy.max_domain_share and coverage_share <= policy.coverage_max_share, | |
| "weak_axis_enrichment_ready": weak_axis_share >= policy.weak_axis_min_share, | |
| "world_purest_claim_allowed": False, | |
| "reason": "This is local data purity evidence. It improves filtering and balance but cannot prove world's purest data.", | |
| }, | |
| } | |
| report_path = out / "purity_concentrator_manifest.json" | |
| report["manifest_path"] = str(report_path) | |
| report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8") | |
| return report | |
Xet Storage Details
- Size:
- 10.1 kB
- Xet hash:
- 2171c241ea10e14e9e45541a2b31186d388445476a562a2f342038617321ea89
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.