File size: 9,786 Bytes
c87f72b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""
Genesis Boiler - File Auditing and Archive Management System

This module provides functionality to audit files from configured source directories,
create JSON inventories, and generate compressed tar archives.
"""

import os
import json
import tarfile
import hashlib
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any
import yaml


class GenesisBoiler:
    """
    Audits files from configured source directories and creates compressed archives.

    This class handles:
    - File system traversal and auditing
    - JSON inventory creation
    - Gzip-compressed tar archive generation
    - File metadata collection (size, hash, modified time)
    """

    def __init__(self, config_path: str = "config.yaml"):
        """
        Initialize GenesisBoiler with configuration.

        Args:
            config_path: Path to the YAML configuration file
        """
        self.config_path = config_path
        self.config = self._load_config()
        self.inventory = []
        self.audit_enabled = self.config.get('audit', {}).get('enabled', True)
        self.source_dirs = self.config.get('audit', {}).get('source_directories', ['.'])
        self.exclude_patterns = self.config.get('audit', {}).get('exclude_from_audit', [])

    def _load_config(self) -> Dict[str, Any]:
        """Load configuration from YAML file."""
        try:
            with open(self.config_path, 'r') as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            print(f"Config file {self.config_path} not found, using defaults")
            return {}

    def _should_exclude(self, path: str) -> bool:
        """Check if a path should be excluded based on patterns."""
        for pattern in self.exclude_patterns:
            if pattern in path or Path(path).match(pattern):
                return True
        return False

    def _calculate_file_hash(self, file_path: str) -> str:
        """Calculate SHA256 hash of a file."""
        sha256_hash = hashlib.sha256()
        try:
            with open(file_path, "rb") as f:
                for byte_block in iter(lambda: f.read(4096), b""):
                    sha256_hash.update(byte_block)
            return sha256_hash.hexdigest()
        except Exception as e:
            print(f"Error hashing {file_path}: {e}")
            return ""

    def audit_files(self) -> List[Dict[str, Any]]:
        """
        Audit files from configured source directories.

        Returns:
            List of dictionaries containing file metadata
        """
        self.inventory = []

        for source_dir in self.source_dirs:
            source_path = Path(source_dir).resolve()

            if not source_path.exists():
                print(f"Source directory {source_dir} does not exist, skipping")
                continue

            for root, dirs, files in os.walk(source_path):
                # Filter out excluded directories
                dirs[:] = [d for d in dirs if not self._should_exclude(os.path.join(root, d))]

                for file in files:
                    file_path = os.path.join(root, file)

                    if self._should_exclude(file_path):
                        continue

                    try:
                        stat_info = os.stat(file_path)
                        relative_path = os.path.relpath(file_path, source_path)

                        file_info = {
                            "path": relative_path,
                            "full_path": file_path,
                            "size": stat_info.st_size,
                            "modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
                            "hash": self._calculate_file_hash(file_path),
                            "source_dir": source_dir
                        }

                        self.inventory.append(file_info)
                    except Exception as e:
                        print(f"Error processing {file_path}: {e}")

        return self.inventory

    def write_inventory(self, output_path: str = "inventory.json") -> str:
        """
        Write the file inventory to a JSON file.

        Args:
            output_path: Path where the JSON inventory will be written

        Returns:
            Path to the created inventory file
        """
        if not self.inventory:
            self.audit_files()

        inventory_data = {
            "timestamp": datetime.now().isoformat(),
            "total_files": len(self.inventory),
            "total_size": sum(f["size"] for f in self.inventory),
            "files": self.inventory
        }

        with open(output_path, 'w') as f:
            json.dump(inventory_data, f, indent=2)

        print(f"Inventory written to {output_path}")
        return output_path

    def create_archive(self, archive_path: str = None) -> str:
        """
        Create a gzip-compressed tar archive of the audited files.

        Args:
            archive_path: Path for the output archive (default: timestamped)

        Returns:
            Path to the created archive
        """
        if archive_path is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            archive_path = f"genesis_archive_{timestamp}.tar.gz"

        if not self.inventory:
            self.audit_files()

        with tarfile.open(archive_path, "w:gz") as tar:
            for file_info in self.inventory:
                try:
                    tar.add(file_info["full_path"], arcname=file_info["path"])
                except Exception as e:
                    print(f"Error adding {file_info['path']} to archive: {e}")

        print(f"Archive created at {archive_path}")
        return archive_path

    def run_full_audit(self, inventory_path: str = "inventory.json",
                       archive_path: str = None) -> Dict[str, str]:
        """
        Run complete audit: scan files, write inventory, create archive.

        Args:
            inventory_path: Path for the JSON inventory
            archive_path: Path for the tar.gz archive

        Returns:
            Dictionary with paths to created files
        """
        print("Starting full audit...")

        # Audit files
        files = self.audit_files()
        print(f"Audited {len(files)} files")

        # Write inventory
        inv_path = self.write_inventory(inventory_path)

        # Create archive if enabled
        arch_path = None
        if self.config.get('audit', {}).get('create_archive', True):
            arch_path = self.create_archive(archive_path)

        return {
            "inventory": inv_path,
            "archive": arch_path,
            "file_count": len(files)
        }


if __name__ == "__main__":
    # Run audit when executed directly
    boiler = GenesisBoiler()
    results = boiler.run_full_audit()
    print(f"\nAudit complete:")
    print(f"  Inventory: {results['inventory']}")
    print(f"  Archive: {results['archive']}")
    print(f"  Files processed: {results['file_count']}")
# DJ GOANNA CODING - GENESIS BOILER (SOVEREIGN EDITION)
# Purpose: Consolidating the 321GB Substrate for TIA-ARCHITECT-CORE
import os
import tarfile
import json
from datetime import datetime

class GenesisBoiler:
    def __init__(self):
        self.sources = [
            "./Research/GENESIS_VAULT/",         # GDrive Partitions
            "/data/Mapping-and-Inventory-storage/", # HF Persistent
            "./pioneer-trader/vortex_cache/"      # Internal Engine Logic
        ]
        self.output_bin = "/data/genesis_monolith.bin"
        self.inventory_path = "./INVENTORY.json"

    def audit_territory(self):
        """Map every file before consolidation (Visibility before Velocity)."""
        inventory = {"timestamp": str(datetime.now()), "files": []}
        for src in self.sources:
            if os.path.exists(src):
                try:
                    for root, _, files in os.walk(src):
                        for f in files:
                            inventory["files"].append(os.path.join(root, f))
                except (OSError, PermissionError) as e:
                    print(f"[T.I.A.] WARNING: Could not access {src}: {e}")

        try:
            with open(self.inventory_path, 'w') as f:
                json.dump(inventory, f, indent=4)
            print(f"[T.I.A.] TERRITORY AUDITED. {len(inventory['files'])} FILES MARKED.")
        except IOError as e:
            print(f"[T.I.A.] ERROR: Could not write inventory file: {e}")
            raise

    def boil_and_weld(self):
        """Consolidate sources into the Monolith."""
        print("[T.I.A.] INITIALIZING BOILER... COMPRESSING SUBSTRATE.")

        output_dir = os.path.dirname(self.output_bin)
        if output_dir and not os.path.exists(output_dir):
            try:
                os.makedirs(output_dir, exist_ok=True)
            except OSError as e:
                print(f"[T.I.A.] ERROR: Could not create output directory {output_dir}: {e}")
                raise

        try:
            with tarfile.open(self.output_bin, "w:gz") as tar:
                for src in self.sources:
                    if os.path.exists(src):
                        try:
                            tar.add(src, arcname=os.path.basename(src))
                        except (OSError, PermissionError) as e:
                            print(f"[T.I.A.] WARNING: Could not add {src} to archive: {e}")
            print(f"[T.I.A.] BOILER COMPLETE: {self.output_bin} IS READY.")
        except (IOError, tarfile.TarError) as e:
            print(f"[T.I.A.] ERROR: Could not create tarball: {e}")
            raise

# FIELD EXECUTION
if __name__ == "__main__":
    boiler = GenesisBoiler()
    boiler.audit_territory()
    boiler.boil_and_weld()