Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /reverse_engineering_corpus.py
| """Build a defensive reverse-engineering learning corpus from vendored sources.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DEFAULT_SOURCE = ROOT / "third_party" / "z0f_reverse_engineering" | |
| TEXT_EXTENSIONS = { | |
| "", | |
| ".asm", | |
| ".bat", | |
| ".c", | |
| ".cfg", | |
| ".cmd", | |
| ".cpp", | |
| ".cs", | |
| ".filters", | |
| ".gradle", | |
| ".h", | |
| ".hpp", | |
| ".java", | |
| ".js", | |
| ".json", | |
| ".kt", | |
| ".kts", | |
| ".md", | |
| ".properties", | |
| ".ps1", | |
| ".py", | |
| ".sh", | |
| ".sln", | |
| ".ts", | |
| ".txt", | |
| ".vcxproj", | |
| ".xml", | |
| ".yaml", | |
| ".yml", | |
| } | |
| BINARY_EXTENSIONS = { | |
| ".7z", | |
| ".apk", | |
| ".db", | |
| ".dex", | |
| ".dll", | |
| ".exe", | |
| ".exp", | |
| ".fzz", | |
| ".i64", | |
| ".id0", | |
| ".id1", | |
| ".id2", | |
| ".idb", | |
| ".ilk", | |
| ".iobj", | |
| ".ipch", | |
| ".ipdb", | |
| ".jar", | |
| ".lib", | |
| ".nam", | |
| ".obj", | |
| ".pdb", | |
| ".pdf", | |
| ".png", | |
| ".recipe", | |
| ".suo", | |
| ".til", | |
| ".xlsx", | |
| ".z0f", | |
| } | |
| EXCLUDE_DIRS = { | |
| ".git", | |
| ".gradle", | |
| ".idea", | |
| ".vs", | |
| "__pycache__", | |
| "bin", | |
| "build", | |
| "coverage", | |
| "debug", | |
| "dist", | |
| "node_modules", | |
| "obj", | |
| "out", | |
| "target", | |
| "x64", | |
| } | |
| def _sha256(data: bytes) -> str: | |
| return hashlib.sha256(data).hexdigest() | |
| def _read_text(path: Path) -> str: | |
| return path.read_text(encoding="utf-8", errors="replace") | |
| def _domain_for(path: Path) -> str: | |
| raw = path.as_posix().lower() | |
| if "assembly" in raw: | |
| return "reverse_engineering_assembly" | |
| if "binarybasics" in raw: | |
| return "reverse_engineering_binary_basics" | |
| if "tools" in raw: | |
| return "reverse_engineering_tools" | |
| if "basicreversing" in raw: | |
| return "reverse_engineering_basic_analysis" | |
| if "dll" in raw: | |
| return "reverse_engineering_dll_analysis" | |
| if "windows" in raw: | |
| return "reverse_engineering_windows_internals" | |
| if "android" in raw or "apk" in raw or "droid" in raw: | |
| return "reverse_engineering_android" | |
| if "ghidra" in raw: | |
| return "reverse_engineering_ghidra" | |
| if "il2cpp" in raw or "unity" in raw: | |
| return "reverse_engineering_il2cpp_unity" | |
| if "doubleagent" in raw: | |
| return "reverse_engineering_persistence_defense" | |
| return "reverse_engineering_foundations" | |
| class ReverseEngineeringCorpusPolicy: | |
| include_binary_payloads: bool = False | |
| max_chars_per_record: int = 24_000 | |
| safety_scope: str = "authorized_defensive_analysis_only" | |
| class ReverseEngineeringCorpusBuilder: | |
| def __init__( | |
| self, | |
| source_root: str | Path = DEFAULT_SOURCE, | |
| policy: ReverseEngineeringCorpusPolicy | None = None, | |
| source_name: str = "Z0FCourse_ReverseEngineering", | |
| upstream: str = "https://github.com/0xZ0F/Z0FCourse_ReverseEngineering.git", | |
| source_label: str = "z0f_reverse_engineering_course", | |
| ): | |
| self.source_root = Path(source_root).resolve() | |
| self.policy = policy or ReverseEngineeringCorpusPolicy() | |
| self.source_name = source_name | |
| self.upstream = upstream | |
| self.source_label = source_label | |
| def build(self, out_dir: str | Path) -> dict: | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| train_path = out / "reverse_engineering_train.jsonl" | |
| eval_path = out / "reverse_engineering_eval.jsonl" | |
| binary_manifest_path = out / "reverse_engineering_binary_manifest.json" | |
| records = self._text_records() | |
| binaries = self._binary_records() | |
| eval_every = 5 | |
| train_count = 0 | |
| eval_count = 0 | |
| with train_path.open("w", encoding="utf-8", newline="\n") as train_f, eval_path.open("w", encoding="utf-8", newline="\n") as eval_f: | |
| for idx, record in enumerate(records): | |
| target = eval_f if idx % eval_every == 0 else train_f | |
| target.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") | |
| if idx % eval_every == 0: | |
| eval_count += 1 | |
| else: | |
| train_count += 1 | |
| binary_manifest_path.write_text(json.dumps(binaries, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| manifest = { | |
| "schema_version": "tinymind-reverse-engineering-corpus-v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "source": { | |
| "name": self.source_name, | |
| "upstream": self.upstream, | |
| "local_path": str(self.source_root), | |
| }, | |
| "policy": { | |
| "safety_scope": self.policy.safety_scope, | |
| "include_binary_payloads": self.policy.include_binary_payloads, | |
| "binary_handling": "hash_metadata_only", | |
| "allowed_training_extensions": sorted(TEXT_EXTENSIONS), | |
| }, | |
| "records_written": len(records), | |
| "train_records": train_count, | |
| "eval_records": eval_count, | |
| "binary_files_indexed": len(binaries["files"]), | |
| "train_path": str(train_path), | |
| "eval_path": str(eval_path), | |
| "binary_manifest_path": str(binary_manifest_path), | |
| "domain_counts": self._domain_counts(records), | |
| "claim_gate": { | |
| "safe_learning_corpus_ready": len(records) > 0 and not self.policy.include_binary_payloads, | |
| "malware_execution_or_bypass_training_allowed": False, | |
| "reason": "Course text/source snippets are indexed for authorized defensive analysis; executable/binary payloads are stored only as hashes and metadata.", | |
| }, | |
| } | |
| manifest_path = out / "reverse_engineering_corpus_manifest.json" | |
| manifest["manifest_path"] = str(manifest_path) | |
| manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return manifest | |
| def _text_records(self) -> list[dict]: | |
| records: list[dict] = [] | |
| for path in sorted(self.source_root.rglob("*")): | |
| if self._excluded(path): | |
| continue | |
| if not path.is_file() or path.suffix.lower() not in TEXT_EXTENSIONS: | |
| continue | |
| rel = path.relative_to(self.source_root).as_posix() | |
| text = _read_text(path).strip() | |
| if not text: | |
| continue | |
| sections = self._markdown_sections(text) if path.suffix.lower() == ".md" else [(None, text)] | |
| for section_title, section_text in sections: | |
| clipped = section_text[: self.policy.max_chars_per_record] | |
| records.append(self._make_text_record(path, rel, clipped, text, section_title)) | |
| return records | |
| def _markdown_sections(self, text: str) -> list[tuple[str | None, str]]: | |
| lines = text.splitlines() | |
| sections: list[tuple[str | None, list[str]]] = [] | |
| current_title: str | None = None | |
| current: list[str] = [] | |
| for line in lines: | |
| if line.startswith("## "): | |
| if current: | |
| sections.append((current_title, current)) | |
| current_title = line.lstrip("#").strip() | |
| current = [line] | |
| else: | |
| current.append(line) | |
| if current: | |
| sections.append((current_title, current)) | |
| return [(title, "\n".join(body).strip()) for title, body in sections if "\n".join(body).strip()] | |
| def _make_text_record(self, path: Path, rel: str, clipped: str, full_text: str, section_title: str | None) -> dict: | |
| title = f"\n\nSection: {section_title}" if section_title else "" | |
| return { | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are TinyMind defensive reverse-engineering tutor. Explain only authorized analysis, tooling, file formats, assembly, debugging concepts, and safety boundaries.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Study and explain this reverse-engineering lesson safely:\n\nSource: {rel}{title}\n\n{clipped}", | |
| }, | |
| { | |
| "role": "assistant", | |
| "content": "I will treat this as authorized defensive learning material, preserve source context, explain concepts plainly, and avoid instructions for unauthorized bypass or misuse.", | |
| }, | |
| ], | |
| "source": self.source_label, | |
| "source_path": rel, | |
| "section": section_title, | |
| "source_sha256": _sha256(path.read_bytes()), | |
| "domain": _domain_for(path), | |
| "topic": "reverse_engineering", | |
| "safety_scope": self.policy.safety_scope, | |
| "truncated": len(full_text) > len(clipped), | |
| } | |
| def _binary_records(self) -> dict: | |
| files: list[dict] = [] | |
| for path in sorted(self.source_root.rglob("*")): | |
| if self._excluded(path): | |
| continue | |
| if not path.is_file() or path.suffix.lower() not in BINARY_EXTENSIONS: | |
| continue | |
| data = path.read_bytes() | |
| files.append( | |
| { | |
| "path": path.relative_to(self.source_root).as_posix(), | |
| "extension": path.suffix.lower(), | |
| "bytes": len(data), | |
| "sha256": _sha256(data), | |
| "training_payload_included": False, | |
| } | |
| ) | |
| return { | |
| "policy": "binary files are indexed for provenance only; payloads are not inserted into text training JSONL", | |
| "files": files, | |
| } | |
| def _domain_counts(self, records: list[dict]) -> dict: | |
| counts: dict[str, int] = {} | |
| for record in records: | |
| counts[record["domain"]] = counts.get(record["domain"], 0) + 1 | |
| return counts | |
| def _excluded(self, path: Path) -> bool: | |
| parts = {part.lower() for part in path.relative_to(self.source_root).parts} | |
| return bool(parts & EXCLUDE_DIRS) | |
Xet Storage Details
- Size:
- 10.2 kB
- Xet hash:
- efea9a1b1e7d2998ca40ec835cbb9758f88a6660e1ef32de02dadf9a7d606577
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.