Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-colab-handoff /bundle /evaluation /code_recovery.py
| """Pure code/document recovery for owned reversible encodings. | |
| The module detects and reverses common non-destructive layers such as base64, | |
| hex, URL encoding, gzip/zlib, and ROT13. It intentionally does not crack | |
| strong encryption, bypass DRM, or deobfuscate third-party protected code | |
| without authorization. | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| import gzip | |
| import hashlib | |
| import json | |
| from pathlib import Path | |
| import re | |
| import urllib.parse | |
| import zlib | |
| PRINTABLE = set(bytes(range(9, 14)) + bytes(range(32, 127))) | |
| def _sha256(data: bytes) -> str: | |
| return hashlib.sha256(data).hexdigest() | |
| def _printable_ratio(data: bytes) -> float: | |
| if not data: | |
| return 0.0 | |
| return sum(1 for b in data if b in PRINTABLE or b >= 0x80) / len(data) | |
| def _looks_b64(data: bytes) -> bool: | |
| text = data.strip() | |
| return len(text) >= 8 and len(text) % 4 == 0 and re.fullmatch(rb"[A-Za-z0-9+/=\r\n]+", text) is not None | |
| def _looks_hex(data: bytes) -> bool: | |
| text = re.sub(rb"\s+", b"", data.strip()) | |
| return len(text) >= 8 and len(text) % 2 == 0 and re.fullmatch(rb"[0-9a-fA-F]+", text) is not None | |
| def _rot13(data: bytes) -> bytes: | |
| text = data.decode("utf-8", errors="strict") | |
| return text.translate(str.maketrans( | |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz", | |
| "NOPQRSTUVWXYZABCDEFGHIJKLMnopqrstuvwxyzabcdefghijklm", | |
| )).encode("utf-8") | |
| class RecoveryStep: | |
| layer: str | |
| input_sha256: str | |
| output_sha256: str | |
| input_bytes: int | |
| output_bytes: int | |
| def recover_bytes(data: bytes, max_layers: int = 8) -> dict: | |
| current = data | |
| steps: list[RecoveryStep] = [] | |
| blocked_reason = None | |
| for _ in range(max_layers): | |
| before = current | |
| layer = None | |
| try: | |
| if current.startswith(b"\x1f\x8b"): | |
| current = gzip.decompress(current) | |
| layer = "gzip" | |
| elif current.startswith((b"x\x9c", b"x\xda", b"x\x01")): | |
| current = zlib.decompress(current) | |
| layer = "zlib" | |
| elif _looks_b64(current): | |
| decoded = base64.b64decode(current, validate=True) | |
| if decoded and (_printable_ratio(decoded) >= 0.55 or decoded.startswith((b"\x1f\x8b", b"x\x9c", b"x\xda"))): | |
| current = decoded | |
| layer = "base64" | |
| elif _looks_hex(current): | |
| decoded = bytes.fromhex(re.sub(rb"\s+", b"", current).decode("ascii")) | |
| if decoded and _printable_ratio(decoded) >= 0.55: | |
| current = decoded | |
| layer = "hex" | |
| elif b"%" in current: | |
| text = current.decode("utf-8", errors="strict") | |
| decoded = urllib.parse.unquote_to_bytes(text) | |
| if decoded != current: | |
| current = decoded | |
| layer = "url_percent" | |
| else: | |
| try: | |
| rot = _rot13(current) | |
| if rot != current and any(token in rot.lower() for token in (b"def ", b"function", b"class ", b"import ", b"# ", b"document")): | |
| current = rot | |
| layer = "rot13" | |
| except Exception: | |
| pass | |
| except Exception as exc: | |
| blocked_reason = f"{type(exc).__name__}:{exc}" | |
| break | |
| if layer is None or current == before: | |
| break | |
| steps.append( | |
| RecoveryStep( | |
| layer=layer, | |
| input_sha256=_sha256(before), | |
| output_sha256=_sha256(current), | |
| input_bytes=len(before), | |
| output_bytes=len(current), | |
| ) | |
| ) | |
| return { | |
| "schema_version": "tinymind-code-recovery-v1", | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "input_sha256": _sha256(data), | |
| "output_sha256": _sha256(current), | |
| "input_bytes": len(data), | |
| "output_bytes": len(current), | |
| "layers": [step.__dict__ for step in steps], | |
| "layer_count": len(steps), | |
| "blocked_reason": blocked_reason, | |
| "claim_scope": "reversible_owned_encoding_recovery_not_encryption_cracking", | |
| "world_best_claim_allowed": False, | |
| "recovered_bytes": current, | |
| } | |
| def recover_file(input_path: str | Path, out_dir: str | Path) -> dict: | |
| inp = Path(input_path) | |
| out = Path(out_dir) | |
| out.mkdir(parents=True, exist_ok=True) | |
| result = recover_bytes(inp.read_bytes()) | |
| recovered_path = out / f"{inp.name}.recovered" | |
| report_path = out / "code_recovery_report.json" | |
| recovered_path.write_bytes(result.pop("recovered_bytes")) | |
| result["input_path"] = str(inp) | |
| result["recovered_path"] = str(recovered_path) | |
| result["report_path"] = str(report_path) | |
| report_path.write_text(json.dumps(result, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8") | |
| return result | |
Xet Storage Details
- Size:
- 5.02 kB
- Xet hash:
- 72f8decadab57df6103859f892b3406a0b79ebf9cc38fd6092d4252602c113e7
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.