Spaces:
Sleeping
Sleeping
| """ | |
| Genesis Boiler - File Auditing and Archive Management System | |
| This module provides functionality to audit files from configured source directories, | |
| create JSON inventories, and generate compressed tar archives. | |
| """ | |
| import os | |
| import json | |
| import tarfile | |
| import hashlib | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import List, Dict, Any | |
| import yaml | |
| class GenesisBoiler: | |
| """ | |
| Audits files from configured source directories and creates compressed archives. | |
| This class handles: | |
| - File system traversal and auditing | |
| - JSON inventory creation | |
| - Gzip-compressed tar archive generation | |
| - File metadata collection (size, hash, modified time) | |
| """ | |
| def __init__(self, config_path: str = "config.yaml"): | |
| """ | |
| Initialize GenesisBoiler with configuration. | |
| Args: | |
| config_path: Path to the YAML configuration file | |
| """ | |
| self.config_path = config_path | |
| self.config = self._load_config() | |
| self.inventory = [] | |
| self.audit_enabled = self.config.get('audit', {}).get('enabled', True) | |
| self.source_dirs = self.config.get('audit', {}).get('source_directories', ['.']) | |
| self.exclude_patterns = self.config.get('audit', {}).get('exclude_from_audit', []) | |
| def _load_config(self) -> Dict[str, Any]: | |
| """Load configuration from YAML file.""" | |
| try: | |
| with open(self.config_path, 'r') as f: | |
| return yaml.safe_load(f) | |
| except FileNotFoundError: | |
| print(f"Config file {self.config_path} not found, using defaults") | |
| return {} | |
| def _should_exclude(self, path: str) -> bool: | |
| """Check if a path should be excluded based on patterns.""" | |
| for pattern in self.exclude_patterns: | |
| if pattern in path or Path(path).match(pattern): | |
| return True | |
| return False | |
| def _calculate_file_hash(self, file_path: str) -> str: | |
| """Calculate SHA256 hash of a file.""" | |
| sha256_hash = hashlib.sha256() | |
| try: | |
| with open(file_path, "rb") as f: | |
| for byte_block in iter(lambda: f.read(4096), b""): | |
| sha256_hash.update(byte_block) | |
| return sha256_hash.hexdigest() | |
| except Exception as e: | |
| print(f"Error hashing {file_path}: {e}") | |
| return "" | |
| def audit_files(self) -> List[Dict[str, Any]]: | |
| """ | |
| Audit files from configured source directories. | |
| Returns: | |
| List of dictionaries containing file metadata | |
| """ | |
| self.inventory = [] | |
| for source_dir in self.source_dirs: | |
| source_path = Path(source_dir).resolve() | |
| if not source_path.exists(): | |
| print(f"Source directory {source_dir} does not exist, skipping") | |
| continue | |
| for root, dirs, files in os.walk(source_path): | |
| # Filter out excluded directories | |
| dirs[:] = [d for d in dirs if not self._should_exclude(os.path.join(root, d))] | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| if self._should_exclude(file_path): | |
| continue | |
| try: | |
| stat_info = os.stat(file_path) | |
| relative_path = os.path.relpath(file_path, source_path) | |
| file_info = { | |
| "path": relative_path, | |
| "full_path": file_path, | |
| "size": stat_info.st_size, | |
| "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(), | |
| "hash": self._calculate_file_hash(file_path), | |
| "source_dir": source_dir | |
| } | |
| self.inventory.append(file_info) | |
| except Exception as e: | |
| print(f"Error processing {file_path}: {e}") | |
| return self.inventory | |
| def write_inventory(self, output_path: str = "inventory.json") -> str: | |
| """ | |
| Write the file inventory to a JSON file. | |
| Args: | |
| output_path: Path where the JSON inventory will be written | |
| Returns: | |
| Path to the created inventory file | |
| """ | |
| if not self.inventory: | |
| self.audit_files() | |
| inventory_data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "total_files": len(self.inventory), | |
| "total_size": sum(f["size"] for f in self.inventory), | |
| "files": self.inventory | |
| } | |
| with open(output_path, 'w') as f: | |
| json.dump(inventory_data, f, indent=2) | |
| print(f"Inventory written to {output_path}") | |
| return output_path | |
| def create_archive(self, archive_path: str = None) -> str: | |
| """ | |
| Create a gzip-compressed tar archive of the audited files. | |
| Args: | |
| archive_path: Path for the output archive (default: timestamped) | |
| Returns: | |
| Path to the created archive | |
| """ | |
| if archive_path is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| archive_path = f"genesis_archive_{timestamp}.tar.gz" | |
| if not self.inventory: | |
| self.audit_files() | |
| with tarfile.open(archive_path, "w:gz") as tar: | |
| for file_info in self.inventory: | |
| try: | |
| tar.add(file_info["full_path"], arcname=file_info["path"]) | |
| except Exception as e: | |
| print(f"Error adding {file_info['path']} to archive: {e}") | |
| print(f"Archive created at {archive_path}") | |
| return archive_path | |
| def run_full_audit(self, inventory_path: str = "inventory.json", | |
| archive_path: str = None) -> Dict[str, str]: | |
| """ | |
| Run complete audit: scan files, write inventory, create archive. | |
| Args: | |
| inventory_path: Path for the JSON inventory | |
| archive_path: Path for the tar.gz archive | |
| Returns: | |
| Dictionary with paths to created files | |
| """ | |
| print("Starting full audit...") | |
| # Audit files | |
| files = self.audit_files() | |
| print(f"Audited {len(files)} files") | |
| # Write inventory | |
| inv_path = self.write_inventory(inventory_path) | |
| # Create archive if enabled | |
| arch_path = None | |
| if self.config.get('audit', {}).get('create_archive', True): | |
| arch_path = self.create_archive(archive_path) | |
| return { | |
| "inventory": inv_path, | |
| "archive": arch_path, | |
| "file_count": len(files) | |
| } | |
| if __name__ == "__main__": | |
| # Run audit when executed directly | |
| boiler = GenesisBoiler() | |
| results = boiler.run_full_audit() | |
| print(f"\nAudit complete:") | |
| print(f" Inventory: {results['inventory']}") | |
| print(f" Archive: {results['archive']}") | |
| print(f" Files processed: {results['file_count']}") | |
| # DJ GOANNA CODING - GENESIS BOILER (SOVEREIGN EDITION) | |
| # Purpose: Consolidating the 321GB Substrate for TIA-ARCHITECT-CORE | |
| import os | |
| import tarfile | |
| import json | |
| from datetime import datetime | |
| class GenesisBoiler: | |
| def __init__(self): | |
| self.sources = [ | |
| "./Research/GENESIS_VAULT/", # GDrive Partitions | |
| "/data/Mapping-and-Inventory-storage/", # HF Persistent | |
| "./pioneer-trader/vortex_cache/" # Internal Engine Logic | |
| ] | |
| self.output_bin = "/data/genesis_monolith.bin" | |
| self.inventory_path = "./INVENTORY.json" | |
| def audit_territory(self): | |
| """Map every file before consolidation (Visibility before Velocity).""" | |
| inventory = {"timestamp": str(datetime.now()), "files": []} | |
| for src in self.sources: | |
| if os.path.exists(src): | |
| try: | |
| for root, _, files in os.walk(src): | |
| for f in files: | |
| inventory["files"].append(os.path.join(root, f)) | |
| except (OSError, PermissionError) as e: | |
| print(f"[T.I.A.] WARNING: Could not access {src}: {e}") | |
| try: | |
| with open(self.inventory_path, 'w') as f: | |
| json.dump(inventory, f, indent=4) | |
| print(f"[T.I.A.] TERRITORY AUDITED. {len(inventory['files'])} FILES MARKED.") | |
| except IOError as e: | |
| print(f"[T.I.A.] ERROR: Could not write inventory file: {e}") | |
| raise | |
| def boil_and_weld(self): | |
| """Consolidate sources into the Monolith.""" | |
| print("[T.I.A.] INITIALIZING BOILER... COMPRESSING SUBSTRATE.") | |
| output_dir = os.path.dirname(self.output_bin) | |
| if output_dir and not os.path.exists(output_dir): | |
| try: | |
| os.makedirs(output_dir, exist_ok=True) | |
| except OSError as e: | |
| print(f"[T.I.A.] ERROR: Could not create output directory {output_dir}: {e}") | |
| raise | |
| try: | |
| with tarfile.open(self.output_bin, "w:gz") as tar: | |
| for src in self.sources: | |
| if os.path.exists(src): | |
| try: | |
| tar.add(src, arcname=os.path.basename(src)) | |
| except (OSError, PermissionError) as e: | |
| print(f"[T.I.A.] WARNING: Could not add {src} to archive: {e}") | |
| print(f"[T.I.A.] BOILER COMPLETE: {self.output_bin} IS READY.") | |
| except (IOError, tarfile.TarError) as e: | |
| print(f"[T.I.A.] ERROR: Could not create tarball: {e}") | |
| raise | |
| # FIELD EXECUTION | |
| if __name__ == "__main__": | |
| boiler = GenesisBoiler() | |
| boiler.audit_territory() | |
| boiler.boil_and_weld() | |