bbkdevops's picture
download
raw
10.2 kB
"""Build a defensive reverse-engineering learning corpus from vendored sources."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SOURCE = ROOT / "third_party" / "z0f_reverse_engineering"
TEXT_EXTENSIONS = {
"",
".asm",
".bat",
".c",
".cfg",
".cmd",
".cpp",
".cs",
".filters",
".gradle",
".h",
".hpp",
".java",
".js",
".json",
".kt",
".kts",
".md",
".properties",
".ps1",
".py",
".sh",
".sln",
".ts",
".txt",
".vcxproj",
".xml",
".yaml",
".yml",
}
BINARY_EXTENSIONS = {
".7z",
".apk",
".db",
".dex",
".dll",
".exe",
".exp",
".fzz",
".i64",
".id0",
".id1",
".id2",
".idb",
".ilk",
".iobj",
".ipch",
".ipdb",
".jar",
".lib",
".nam",
".obj",
".pdb",
".pdf",
".png",
".recipe",
".suo",
".til",
".xlsx",
".z0f",
}
EXCLUDE_DIRS = {
".git",
".gradle",
".idea",
".vs",
"__pycache__",
"bin",
"build",
"coverage",
"debug",
"dist",
"node_modules",
"obj",
"out",
"target",
"x64",
}
def _sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def _read_text(path: Path) -> str:
return path.read_text(encoding="utf-8", errors="replace")
def _domain_for(path: Path) -> str:
raw = path.as_posix().lower()
if "assembly" in raw:
return "reverse_engineering_assembly"
if "binarybasics" in raw:
return "reverse_engineering_binary_basics"
if "tools" in raw:
return "reverse_engineering_tools"
if "basicreversing" in raw:
return "reverse_engineering_basic_analysis"
if "dll" in raw:
return "reverse_engineering_dll_analysis"
if "windows" in raw:
return "reverse_engineering_windows_internals"
if "android" in raw or "apk" in raw or "droid" in raw:
return "reverse_engineering_android"
if "ghidra" in raw:
return "reverse_engineering_ghidra"
if "il2cpp" in raw or "unity" in raw:
return "reverse_engineering_il2cpp_unity"
if "doubleagent" in raw:
return "reverse_engineering_persistence_defense"
return "reverse_engineering_foundations"
@dataclass(frozen=True)
class ReverseEngineeringCorpusPolicy:
include_binary_payloads: bool = False
max_chars_per_record: int = 24_000
safety_scope: str = "authorized_defensive_analysis_only"
class ReverseEngineeringCorpusBuilder:
def __init__(
self,
source_root: str | Path = DEFAULT_SOURCE,
policy: ReverseEngineeringCorpusPolicy | None = None,
source_name: str = "Z0FCourse_ReverseEngineering",
upstream: str = "https://github.com/0xZ0F/Z0FCourse_ReverseEngineering.git",
source_label: str = "z0f_reverse_engineering_course",
):
self.source_root = Path(source_root).resolve()
self.policy = policy or ReverseEngineeringCorpusPolicy()
self.source_name = source_name
self.upstream = upstream
self.source_label = source_label
def build(self, out_dir: str | Path) -> dict:
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
train_path = out / "reverse_engineering_train.jsonl"
eval_path = out / "reverse_engineering_eval.jsonl"
binary_manifest_path = out / "reverse_engineering_binary_manifest.json"
records = self._text_records()
binaries = self._binary_records()
eval_every = 5
train_count = 0
eval_count = 0
with train_path.open("w", encoding="utf-8", newline="\n") as train_f, eval_path.open("w", encoding="utf-8", newline="\n") as eval_f:
for idx, record in enumerate(records):
target = eval_f if idx % eval_every == 0 else train_f
target.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n")
if idx % eval_every == 0:
eval_count += 1
else:
train_count += 1
binary_manifest_path.write_text(json.dumps(binaries, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
manifest = {
"schema_version": "tinymind-reverse-engineering-corpus-v1",
"created_at": datetime.now(timezone.utc).isoformat(),
"source": {
"name": self.source_name,
"upstream": self.upstream,
"local_path": str(self.source_root),
},
"policy": {
"safety_scope": self.policy.safety_scope,
"include_binary_payloads": self.policy.include_binary_payloads,
"binary_handling": "hash_metadata_only",
"allowed_training_extensions": sorted(TEXT_EXTENSIONS),
},
"records_written": len(records),
"train_records": train_count,
"eval_records": eval_count,
"binary_files_indexed": len(binaries["files"]),
"train_path": str(train_path),
"eval_path": str(eval_path),
"binary_manifest_path": str(binary_manifest_path),
"domain_counts": self._domain_counts(records),
"claim_gate": {
"safe_learning_corpus_ready": len(records) > 0 and not self.policy.include_binary_payloads,
"malware_execution_or_bypass_training_allowed": False,
"reason": "Course text/source snippets are indexed for authorized defensive analysis; executable/binary payloads are stored only as hashes and metadata.",
},
}
manifest_path = out / "reverse_engineering_corpus_manifest.json"
manifest["manifest_path"] = str(manifest_path)
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
return manifest
def _text_records(self) -> list[dict]:
records: list[dict] = []
for path in sorted(self.source_root.rglob("*")):
if self._excluded(path):
continue
if not path.is_file() or path.suffix.lower() not in TEXT_EXTENSIONS:
continue
rel = path.relative_to(self.source_root).as_posix()
text = _read_text(path).strip()
if not text:
continue
sections = self._markdown_sections(text) if path.suffix.lower() == ".md" else [(None, text)]
for section_title, section_text in sections:
clipped = section_text[: self.policy.max_chars_per_record]
records.append(self._make_text_record(path, rel, clipped, text, section_title))
return records
def _markdown_sections(self, text: str) -> list[tuple[str | None, str]]:
lines = text.splitlines()
sections: list[tuple[str | None, list[str]]] = []
current_title: str | None = None
current: list[str] = []
for line in lines:
if line.startswith("## "):
if current:
sections.append((current_title, current))
current_title = line.lstrip("#").strip()
current = [line]
else:
current.append(line)
if current:
sections.append((current_title, current))
return [(title, "\n".join(body).strip()) for title, body in sections if "\n".join(body).strip()]
def _make_text_record(self, path: Path, rel: str, clipped: str, full_text: str, section_title: str | None) -> dict:
title = f"\n\nSection: {section_title}" if section_title else ""
return {
"messages": [
{
"role": "system",
"content": "You are TinyMind defensive reverse-engineering tutor. Explain only authorized analysis, tooling, file formats, assembly, debugging concepts, and safety boundaries.",
},
{
"role": "user",
"content": f"Study and explain this reverse-engineering lesson safely:\n\nSource: {rel}{title}\n\n{clipped}",
},
{
"role": "assistant",
"content": "I will treat this as authorized defensive learning material, preserve source context, explain concepts plainly, and avoid instructions for unauthorized bypass or misuse.",
},
],
"source": self.source_label,
"source_path": rel,
"section": section_title,
"source_sha256": _sha256(path.read_bytes()),
"domain": _domain_for(path),
"topic": "reverse_engineering",
"safety_scope": self.policy.safety_scope,
"truncated": len(full_text) > len(clipped),
}
def _binary_records(self) -> dict:
files: list[dict] = []
for path in sorted(self.source_root.rglob("*")):
if self._excluded(path):
continue
if not path.is_file() or path.suffix.lower() not in BINARY_EXTENSIONS:
continue
data = path.read_bytes()
files.append(
{
"path": path.relative_to(self.source_root).as_posix(),
"extension": path.suffix.lower(),
"bytes": len(data),
"sha256": _sha256(data),
"training_payload_included": False,
}
)
return {
"policy": "binary files are indexed for provenance only; payloads are not inserted into text training JSONL",
"files": files,
}
def _domain_counts(self, records: list[dict]) -> dict:
counts: dict[str, int] = {}
for record in records:
counts[record["domain"]] = counts.get(record["domain"], 0) + 1
return counts
def _excluded(self, path: Path) -> bool:
parts = {part.lower() for part in path.relative_to(self.source_root).parts}
return bool(parts & EXCLUDE_DIRS)

Xet Storage Details

Size:
10.2 kB
·
Xet hash:
efea9a1b1e7d2998ca40ec835cbb9758f88a6660e1ef32de02dadf9a7d606577

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.