Spaces:
Running
Running
File size: 11,047 Bytes
c87f72b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 | """
Comprehensive tests for genesis_boiler.py
Tests cover:
- Initialization
- Territory auditing
- File consolidation
- Error handling
- Path validation
"""
import pytest
import json
import tarfile
import os
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import sys
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from genesis_boiler import GenesisBoiler
class TestGenesisBoilerInit:
"""Test GenesisBoiler initialization"""
def test_init_default_values(self):
"""Test that GenesisBoiler initializes with correct default values"""
boiler = GenesisBoiler()
assert boiler.sources is not None
assert isinstance(boiler.sources, list)
assert len(boiler.sources) == 3
assert boiler.output_bin == "/data/genesis_monolith.bin"
assert boiler.inventory_path == "./INVENTORY.json"
def test_init_sources_contain_expected_paths(self):
"""Test that source paths contain expected directories"""
boiler = GenesisBoiler()
assert "./Research/GENESIS_VAULT/" in boiler.sources
assert "/data/Mapping-and-Inventory-storage/" in boiler.sources
assert "./pioneer-trader/vortex_cache/" in boiler.sources
class TestGenesisBoilerAuditTerritory:
"""Test the audit_territory method"""
def test_audit_territory_creates_inventory_file(self, temp_dir, monkeypatch):
"""Test that audit_territory creates an inventory JSON file"""
boiler = GenesisBoiler()
# Override paths to use temp directory
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
(test_source / "file2.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.audit_territory()
# Check inventory file was created
assert Path(boiler.inventory_path).exists()
def test_audit_territory_inventory_structure(self, temp_dir):
"""Test that inventory has correct structure"""
boiler = GenesisBoiler()
# Create test structure
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.audit_territory()
# Load and verify inventory
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert "timestamp" in inventory
assert "files" in inventory
assert isinstance(inventory["files"], list)
def test_audit_territory_counts_files_correctly(self, temp_dir):
"""Test that audit_territory counts all files correctly"""
boiler = GenesisBoiler()
# Create test structure with known number of files
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "subdir").mkdir()
(test_source / "file1.txt").write_text("content")
(test_source / "file2.txt").write_text("content")
(test_source / "subdir" / "file3.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.audit_territory()
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert len(inventory["files"]) == 3
def test_audit_territory_handles_nonexistent_source(self, temp_dir):
"""Test that audit_territory handles non-existent sources gracefully"""
boiler = GenesisBoiler()
boiler.sources = [str(temp_dir / "nonexistent")]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
# Should not raise an exception
boiler.audit_territory()
# Inventory should still be created, just empty
assert Path(boiler.inventory_path).exists()
def test_audit_territory_handles_permission_error(self, temp_dir, monkeypatch):
"""Test that audit_territory handles permission errors gracefully"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
# Mock os.walk to raise PermissionError
def mock_walk(*args, **kwargs):
raise PermissionError("Access denied")
with patch('os.walk', side_effect=mock_walk):
# Should handle error gracefully
boiler.audit_territory()
# Inventory should still be created
assert Path(boiler.inventory_path).exists()
def test_audit_territory_raises_on_write_error(self, temp_dir, monkeypatch):
"""Test that audit_territory raises IOError when unable to write"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
# Set to an invalid path
boiler.inventory_path = str(temp_dir / "nonexistent_dir" / "INVENTORY.json")
with pytest.raises(IOError):
boiler.audit_territory()
class TestGenesisBoilerBoilAndWeld:
"""Test the boil_and_weld method"""
def test_boil_and_weld_creates_tarball(self, temp_dir):
"""Test that boil_and_weld creates a tarball"""
boiler = GenesisBoiler()
# Create test source
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.output_bin = str(temp_dir / "output.tar.gz")
boiler.boil_and_weld()
assert Path(boiler.output_bin).exists()
def test_boil_and_weld_tarball_is_valid(self, temp_dir):
"""Test that created tarball is valid and can be opened"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.output_bin = str(temp_dir / "output.tar.gz")
boiler.boil_and_weld()
# Verify tarball can be opened
with tarfile.open(boiler.output_bin, "r:gz") as tar:
members = tar.getmembers()
assert len(members) > 0
def test_boil_and_weld_creates_output_directory(self, temp_dir):
"""Test that boil_and_weld creates output directory if needed"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
output_dir = temp_dir / "output_dir"
boiler.sources = [str(test_source)]
boiler.output_bin = str(output_dir / "output.tar.gz")
assert not output_dir.exists()
boiler.boil_and_weld()
assert output_dir.exists()
assert Path(boiler.output_bin).exists()
def test_boil_and_weld_handles_nonexistent_source(self, temp_dir):
"""Test that boil_and_weld handles non-existent sources"""
boiler = GenesisBoiler()
boiler.sources = [str(temp_dir / "nonexistent")]
boiler.output_bin = str(temp_dir / "output.tar.gz")
# Should not raise an exception
boiler.boil_and_weld()
# Tarball should still be created (just empty)
assert Path(boiler.output_bin).exists()
def test_boil_and_weld_handles_permission_error(self, temp_dir, monkeypatch):
"""Test that boil_and_weld handles permission errors"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
boiler.output_bin = str(temp_dir / "output.tar.gz")
# Mock tarfile to raise PermissionError
original_open = tarfile.open
def mock_tarfile_open(*args, **kwargs):
tar = original_open(*args, **kwargs)
original_add = tar.add
def mock_add(*add_args, **add_kwargs):
raise PermissionError("Access denied")
tar.add = mock_add
return tar
with patch('tarfile.open', side_effect=mock_tarfile_open):
# Should handle error gracefully
boiler.boil_and_weld()
def test_boil_and_weld_raises_on_write_error(self, temp_dir):
"""Test that boil_and_weld raises error when cannot write"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
# Try to write to root (should fail without permissions)
boiler.output_bin = "/invalid_path/output.tar.gz"
with pytest.raises((IOError, OSError, tarfile.TarError)):
boiler.boil_and_weld()
class TestGenesisBoilerIntegration:
"""Integration tests for GenesisBoiler"""
def test_full_workflow(self, temp_dir):
"""Test complete workflow: audit then boil"""
boiler = GenesisBoiler()
# Create test structure
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "subdir").mkdir()
(test_source / "file1.txt").write_text("content1")
(test_source / "subdir" / "file2.txt").write_text("content2")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.output_bin = str(temp_dir / "output.tar.gz")
# Run full workflow
boiler.audit_territory()
boiler.boil_and_weld()
# Verify both operations completed
assert Path(boiler.inventory_path).exists()
assert Path(boiler.output_bin).exists()
# Verify inventory content
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert len(inventory["files"]) == 2
def test_multiple_sources(self, temp_dir):
"""Test with multiple source directories"""
boiler = GenesisBoiler()
# Create multiple sources
source1 = temp_dir / "source1"
source2 = temp_dir / "source2"
source1.mkdir()
source2.mkdir()
(source1 / "file1.txt").write_text("content1")
(source2 / "file2.txt").write_text("content2")
boiler.sources = [str(source1), str(source2)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.output_bin = str(temp_dir / "output.tar.gz")
boiler.audit_territory()
boiler.boil_and_weld()
# Verify all files inventoried
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert len(inventory["files"]) == 2
# Verify tarball contains both sources
with tarfile.open(boiler.output_bin, "r:gz") as tar:
members = tar.getmembers()
assert len(members) >= 2
|