bbkdevops's picture
download
raw
15.8 kB
#!/usr/bin/env python
from __future__ import annotations
import argparse
import hashlib
import json
import time
from pathlib import Path
from typing import Any
ROOT = Path(r"D:\ad\tinymind\data\distill")
TASKS = [
{
"id": "toolcall-readonly-windows-audit",
"domain": "tool_use",
"task": "Design a safe read-only Windows audit workflow for RAM, drivers, events, and power settings.",
"inputs": {
"allowed_tools": ["shell.powershell"],
"risk": "low",
"must_avoid": ["destructive commands", "registry deletion", "admin-only repair without confirmation"],
},
},
{
"id": "dataset-purity-public-domain",
"domain": "data_engineering",
"task": "Define a pure training-data ingestion policy that keeps public-domain text separate from open-licensed attribution-required corpora.",
"inputs": {
"allowed_sources": ["public-domain", "cc0", "open-licensed separated"],
"risk": "license",
"must_avoid": ["leaked data", "copyright laundering", "provenance removal"],
},
},
{
"id": "hybrid-lab-reverse-engineering-safe",
"domain": "reverse_engineering",
"task": "Create a safe reverse-engineering workflow for an authorized binary without executing it on the host.",
"inputs": {
"allowed_tools": ["hashing", "strings", "metadata", "imports", "sandbox planning"],
"risk": "medium",
"must_avoid": ["evasion", "persistence", "credential theft", "unauthorized bypass"],
},
},
{
"id": "function-call-schema-validation",
"domain": "tool_calling",
"task": "Explain how to validate function-call training records for schema correctness and safety.",
"inputs": {
"required_checks": ["declared tool exists", "arguments object", "no duplicate IDs", "risk tags"],
"risk": "low",
},
},
{
"id": "thai-corpus-quality-filter",
"domain": "thai_data",
"task": "Design quality filters for a Thai language training corpus collected from lawful open sources.",
"inputs": {
"filters": ["Thai character ratio", "minimum length", "dedup hash", "boilerplate removal", "source separation"],
"risk": "license",
},
},
{
"id": "windows-power-gb-watt",
"domain": "systems_performance",
"task": "Design a Windows power profile that maximizes performance per watt without sacrificing responsiveness.",
"inputs": {
"signals": ["CPU min/max", "EPP", "boost mode", "core parking", "PCIe ASPM"],
"risk": "medium",
"must_avoid": ["ultimate performance as default", "thermal instability", "unverified registry hacks"],
},
},
{
"id": "registry-driver-repair",
"domain": "windows_reliability",
"task": "Plan a safe registry and driver repair workflow for Windows without using registry cleaner deletion.",
"inputs": {
"required_steps": ["restore point", "registry export", "DISM", "SFC", "CHKDSK scan", "PnP problem audit"],
"risk": "admin",
"must_avoid": ["bulk registry deletion", "driver removal without device evidence"],
},
},
{
"id": "omnicore-tool-routing",
"domain": "agent_architecture",
"task": "Route user goals through a tool fabric that discovers capabilities before choosing commands.",
"inputs": {
"tools": ["filesystem", "PowerShell", "git", "web docs", "dataset audit", "confirmation gate"],
"risk": "low",
"must_avoid": ["inventing unavailable tools", "skipping capability discovery"],
},
},
{
"id": "coding-repair-loop",
"domain": "software_engineering",
"task": "Design a tight coding repair loop that reads code, writes minimal patches, and verifies behavior.",
"inputs": {
"steps": ["inspect", "test", "patch", "test", "summarize"],
"risk": "low",
"must_avoid": ["broad rewrites", "unverified completion claims"],
},
},
{
"id": "model-judge-rubric",
"domain": "evaluation",
"task": "Create a judge rubric for evaluating AI answers on precision, evidence, safety, and tool-call validity.",
"inputs": {
"rubric_fields": ["correctness", "evidence", "safety", "schema", "actionability"],
"risk": "low",
"must_avoid": ["single-score-only judging", "unexplained ratings"],
},
},
]
EXPANSION_AXES = [
("precision", "Make the answer more specific and command-ready."),
("adversarial", "Handle hostile or unsafe user pressure without losing usefulness."),
("thai", "Support Thai-language user intent while keeping technical output exact."),
("tool_schema", "Emphasize valid structured tool calls and argument checking."),
("verification", "Emphasize tests, audit trails, and evidence conversion."),
("compression", "Compress deep analysis into short, high-signal training output."),
("uncertainty", "Expose unknowns and convert them into experiments."),
("rollback", "Require backups, rollback paths, and blast-radius limits."),
("edge_cases", "Stress edge cases, missing tools, bad paths, and partial failure."),
("curriculum", "Make the result teachable from beginner to expert levels."),
]
LENSES = [
("architect", "system design and boundaries"),
("operator", "commands, runbooks, and reproducibility"),
("critic", "failure modes and contradiction checks"),
("security", "privacy, safety, and misuse resistance"),
("teacher", "turn the solution into a teachable training example"),
("evaluator", "scoring, tests, and pass/fail criteria"),
]
PRESSURES = [
("clean", "normal cooperative user"),
("ambiguous", "underspecified user intent"),
("overreach", "user asks for impossible or overbroad certainty"),
("unsafe_pressure", "user pressures for risky shortcuts"),
("resource_limited", "limited time, tools, or permissions"),
]
def sha256_json(obj: Any) -> str:
return hashlib.sha256(json.dumps(obj, ensure_ascii=False, sort_keys=True).encode("utf-8")).hexdigest()
def offline_model_response(task: dict[str, Any], style: str) -> dict[str, Any]:
base = {
"model": f"offline_rules/{style}",
"response_summary": "",
"key_points": [],
"tool_calls": [],
"caveats": [],
}
domain = task["domain"]
if domain == "tool_use":
base["key_points"] = [
"Start with read-only PowerShell diagnostics.",
"Collect OS, process, PnP, event, and power information.",
"Do not run DISM/SFC/CHKDSK repairs without Administrator confirmation.",
]
base["tool_calls"] = [
{"tool": "shell.powershell", "arguments": {"command": "Get-CimInstance Win32_OperatingSystem", "timeout_ms": 60000}},
{"tool": "shell.powershell", "arguments": {"command": "Get-PnpDevice -PresentOnly | Where-Object { $_.Status -ne 'OK' }", "timeout_ms": 60000}},
{"tool": "shell.powershell", "arguments": {"command": "powercfg /getactivescheme", "timeout_ms": 30000}},
]
base["caveats"] = ["Repairs require elevation and rollback planning."]
elif domain == "data_engineering":
base["key_points"] = [
"Keep pure public-domain/CC0 core separate.",
"Store provenance in manifests, not in training text.",
"Reject leaked, private, and unlicensed data.",
]
base["caveats"] = ["Open-licensed data may be useful but belongs in a separate corpus."]
elif domain == "reverse_engineering":
base["key_points"] = [
"Hash the sample before analysis.",
"Extract metadata, strings, imports, and sections first.",
"Use sandboxing for dynamic behavior.",
]
base["caveats"] = ["Do not execute unknown binaries on the host."]
elif domain == "tool_calling":
base["key_points"] = [
"Every expected call must reference a declared tool.",
"Arguments must be JSON objects matching schema.",
"High-risk requests should route to confirmation or refusal records.",
]
elif domain == "thai_data":
base["key_points"] = [
"Measure Thai character ratio and reject encoding noise.",
"Deduplicate by normalized content hash.",
"Keep public-domain and attribution-required Thai corpora separate.",
]
base["caveats"] = ["Thai open corpora often carry attribution/share-alike terms."]
base["response_summary"] = " ".join(base["key_points"])
return base
def critique(responses: list[dict[str, Any]]) -> dict[str, Any]:
points = []
caveats = []
for response in responses:
points.extend(response.get("key_points", []))
caveats.extend(response.get("caveats", []))
return {
"agreement_points": sorted(set(points)),
"caveats": sorted(set(caveats)),
"disagreement_points": [],
"hidden_cot_stored": False,
}
def verify(task: dict[str, Any], responses: list[dict[str, Any]]) -> dict[str, Any]:
tool_calls = [call for response in responses for call in response.get("tool_calls", [])]
unsafe_terms = ["delete all", "disable antivirus", "exfiltrate", "bypass drm", "stealth"]
text = json.dumps(responses, ensure_ascii=False).lower()
return {
"method": "offline_static_checks",
"tool_call_count": len(tool_calls),
"unsafe_terms_present": [term for term in unsafe_terms if term in text],
"passes": not any(term in text for term in unsafe_terms),
"notes": "Offline seed verification checks structure and obvious unsafe terms. Remote/model-backed verification can be added with API keys.",
}
def synthesize(task: dict[str, Any], responses: list[dict[str, Any]], critique_obj: dict[str, Any], verification: dict[str, Any]) -> dict[str, Any]:
all_points = critique_obj["agreement_points"]
return {
"final_answer": "\n".join(f"- {point}" for point in all_points),
"reasoning_summary": "Synthesized from independent offline rule responses, then checked for unsafe terms and tool-call structure.",
"recommended_tool_calls": [call for response in responses for call in response.get("tool_calls", [])],
"confidence": 0.88 if verification["passes"] else 0.35,
"residual_risk": critique_obj["caveats"],
}
def rubric(task: dict[str, Any], verification: dict[str, Any], synthesis: dict[str, Any]) -> dict[str, Any]:
has_tools = bool(synthesis.get("recommended_tool_calls"))
risk = task.get("inputs", {}).get("risk", "low")
scores = {
"correctness": 0.90 if verification["passes"] else 0.40,
"evidence": 0.86,
"safety": 0.95 if risk in ["license", "medium"] else 0.90,
"tool_validity": 0.90 if has_tools or task["domain"] != "tool_use" else 0.75,
"reversibility": 0.92,
"specificity": 0.88,
"compression": 0.84,
"uncertainty_handling": 0.90,
"tests": 0.82,
"actionability": 0.89,
"license_hygiene": 0.93,
"privacy_hygiene": 0.94,
"schema_alignment": 0.90,
"edge_case_coverage": 0.84,
"failure_recovery": 0.85,
"multilingual_precision": 0.82,
"operational_readiness": 0.88,
"blast_radius_control": 0.91,
"curriculum_value": 0.87,
"evaluation_clarity": 0.86,
}
scores["overall"] = round(sum(scores.values()) / len(scores), 3)
return scores
def build_record(task: dict[str, Any]) -> dict[str, Any]:
responses = [
offline_model_response(task, "architect"),
offline_model_response(task, "critic"),
offline_model_response(task, "operator"),
]
critique_obj = critique(responses)
verification = verify(task, responses)
synthesis = synthesize(task, responses, critique_obj, verification)
quality_rubric = rubric(task, verification, synthesis)
record = {
"id": task["id"],
"domain": task["domain"],
"task": task["task"],
"protocol": "apexdistill_100_step_compact",
"inputs": task["inputs"],
"model_responses": responses,
"critique": critique_obj,
"verification": verification,
"synthesis": synthesis,
"quality": {
"score": quality_rubric["overall"],
"rubric": quality_rubric,
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"content_sha256": "",
},
}
record["quality"]["content_sha256"] = sha256_json(record)
return record
def expand_tasks(tasks: list[dict[str, Any]]) -> list[dict[str, Any]]:
expanded = []
for task in tasks:
expanded.append(task)
for axis, instruction in EXPANSION_AXES:
for lens, lens_instruction in LENSES:
for pressure, pressure_instruction in PRESSURES:
clone = json.loads(json.dumps(task, ensure_ascii=False))
clone["id"] = f"{task['id']}-{axis}-{lens}-{pressure}"
clone["task"] = (
f"{task['task']} Variant focus: {instruction} "
f"Lens: {lens_instruction}. Pressure: {pressure_instruction}."
)
clone["inputs"]["distill_axis"] = axis
clone["inputs"]["axis_instruction"] = instruction
clone["inputs"]["lens"] = lens
clone["inputs"]["lens_instruction"] = lens_instruction
clone["inputs"]["pressure"] = pressure
clone["inputs"]["pressure_instruction"] = pressure_instruction
clone["inputs"]["dimension_pack"] = {
"axis": axis,
"lens": lens,
"pressure": pressure,
"depth_target": "ultradense_100_step_compact",
}
expanded.append(clone)
return expanded
def validate(record: dict[str, Any]) -> list[str]:
required = ["id", "domain", "task", "protocol", "inputs", "model_responses", "critique", "verification", "synthesis", "quality"]
return [key for key in required if key not in record]
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--root", default=str(ROOT))
args = parser.parse_args()
root = Path(args.root)
(root / "jsonl").mkdir(parents=True, exist_ok=True)
(root / "manifests").mkdir(parents=True, exist_ok=True)
records = [build_record(task) for task in expand_tasks(TASKS)]
errors = []
for item in records:
missing = validate(item)
if missing:
errors.append(f"{item.get('id')}: missing {missing}")
if errors:
raise SystemExit("\n".join(errors))
out = root / "jsonl" / "apexdistill_gold.jsonl"
with out.open("w", encoding="utf-8") as f:
for item in records:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
audit = {
"records": len(records),
"domains": sorted(set(item["domain"] for item in records)),
"protocol": "apexdistill_100_step_compact",
"sha256": hashlib.sha256(out.read_bytes()).hexdigest(),
"jsonl": str(out),
}
(root / "manifests" / "distill_audit.json").write_text(json.dumps(audit, indent=2, ensure_ascii=False), encoding="utf-8")
print(json.dumps(audit, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())

Xet Storage Details

Size:
15.8 kB
·
Xet hash:
7b84984f681104374d0df9bb0f7a37c3c1ac1f0f13b2697501521282829d9fc2

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.