""" Comprehensive tests for genesis_boiler.py Tests cover: - Initialization - Territory auditing - File consolidation - Error handling - Path validation """ import pytest import json import tarfile import os from pathlib import Path from unittest.mock import Mock, patch, MagicMock import sys # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from genesis_boiler import GenesisBoiler class TestGenesisBoilerInit: """Test GenesisBoiler initialization""" def test_init_default_values(self): """Test that GenesisBoiler initializes with correct default values""" boiler = GenesisBoiler() assert boiler.sources is not None assert isinstance(boiler.sources, list) assert len(boiler.sources) == 3 assert boiler.output_bin == "/data/genesis_monolith.bin" assert boiler.inventory_path == "./INVENTORY.json" def test_init_sources_contain_expected_paths(self): """Test that source paths contain expected directories""" boiler = GenesisBoiler() assert "./Research/GENESIS_VAULT/" in boiler.sources assert "/data/Mapping-and-Inventory-storage/" in boiler.sources assert "./pioneer-trader/vortex_cache/" in boiler.sources class TestGenesisBoilerAuditTerritory: """Test the audit_territory method""" def test_audit_territory_creates_inventory_file(self, temp_dir, monkeypatch): """Test that audit_territory creates an inventory JSON file""" boiler = GenesisBoiler() # Override paths to use temp directory test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "file1.txt").write_text("content") (test_source / "file2.txt").write_text("content") boiler.sources = [str(test_source)] boiler.inventory_path = str(temp_dir / "INVENTORY.json") boiler.audit_territory() # Check inventory file was created assert Path(boiler.inventory_path).exists() def test_audit_territory_inventory_structure(self, temp_dir): """Test that inventory has correct structure""" boiler = GenesisBoiler() # Create test structure test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "file1.txt").write_text("content") boiler.sources = [str(test_source)] boiler.inventory_path = str(temp_dir / "INVENTORY.json") boiler.audit_territory() # Load and verify inventory with open(boiler.inventory_path, 'r') as f: inventory = json.load(f) assert "timestamp" in inventory assert "files" in inventory assert isinstance(inventory["files"], list) def test_audit_territory_counts_files_correctly(self, temp_dir): """Test that audit_territory counts all files correctly""" boiler = GenesisBoiler() # Create test structure with known number of files test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "subdir").mkdir() (test_source / "file1.txt").write_text("content") (test_source / "file2.txt").write_text("content") (test_source / "subdir" / "file3.txt").write_text("content") boiler.sources = [str(test_source)] boiler.inventory_path = str(temp_dir / "INVENTORY.json") boiler.audit_territory() with open(boiler.inventory_path, 'r') as f: inventory = json.load(f) assert len(inventory["files"]) == 3 def test_audit_territory_handles_nonexistent_source(self, temp_dir): """Test that audit_territory handles non-existent sources gracefully""" boiler = GenesisBoiler() boiler.sources = [str(temp_dir / "nonexistent")] boiler.inventory_path = str(temp_dir / "INVENTORY.json") # Should not raise an exception boiler.audit_territory() # Inventory should still be created, just empty assert Path(boiler.inventory_path).exists() def test_audit_territory_handles_permission_error(self, temp_dir, monkeypatch): """Test that audit_territory handles permission errors gracefully""" boiler = GenesisBoiler() test_source = temp_dir / "test_source" test_source.mkdir() boiler.sources = [str(test_source)] boiler.inventory_path = str(temp_dir / "INVENTORY.json") # Mock os.walk to raise PermissionError def mock_walk(*args, **kwargs): raise PermissionError("Access denied") with patch('os.walk', side_effect=mock_walk): # Should handle error gracefully boiler.audit_territory() # Inventory should still be created assert Path(boiler.inventory_path).exists() def test_audit_territory_raises_on_write_error(self, temp_dir, monkeypatch): """Test that audit_territory raises IOError when unable to write""" boiler = GenesisBoiler() test_source = temp_dir / "test_source" test_source.mkdir() boiler.sources = [str(test_source)] # Set to an invalid path boiler.inventory_path = str(temp_dir / "nonexistent_dir" / "INVENTORY.json") with pytest.raises(IOError): boiler.audit_territory() class TestGenesisBoilerBoilAndWeld: """Test the boil_and_weld method""" def test_boil_and_weld_creates_tarball(self, temp_dir): """Test that boil_and_weld creates a tarball""" boiler = GenesisBoiler() # Create test source test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "file1.txt").write_text("content") boiler.sources = [str(test_source)] boiler.output_bin = str(temp_dir / "output.tar.gz") boiler.boil_and_weld() assert Path(boiler.output_bin).exists() def test_boil_and_weld_tarball_is_valid(self, temp_dir): """Test that created tarball is valid and can be opened""" boiler = GenesisBoiler() test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "file1.txt").write_text("content") boiler.sources = [str(test_source)] boiler.output_bin = str(temp_dir / "output.tar.gz") boiler.boil_and_weld() # Verify tarball can be opened with tarfile.open(boiler.output_bin, "r:gz") as tar: members = tar.getmembers() assert len(members) > 0 def test_boil_and_weld_creates_output_directory(self, temp_dir): """Test that boil_and_weld creates output directory if needed""" boiler = GenesisBoiler() test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "file1.txt").write_text("content") output_dir = temp_dir / "output_dir" boiler.sources = [str(test_source)] boiler.output_bin = str(output_dir / "output.tar.gz") assert not output_dir.exists() boiler.boil_and_weld() assert output_dir.exists() assert Path(boiler.output_bin).exists() def test_boil_and_weld_handles_nonexistent_source(self, temp_dir): """Test that boil_and_weld handles non-existent sources""" boiler = GenesisBoiler() boiler.sources = [str(temp_dir / "nonexistent")] boiler.output_bin = str(temp_dir / "output.tar.gz") # Should not raise an exception boiler.boil_and_weld() # Tarball should still be created (just empty) assert Path(boiler.output_bin).exists() def test_boil_and_weld_handles_permission_error(self, temp_dir, monkeypatch): """Test that boil_and_weld handles permission errors""" boiler = GenesisBoiler() test_source = temp_dir / "test_source" test_source.mkdir() boiler.sources = [str(test_source)] boiler.output_bin = str(temp_dir / "output.tar.gz") # Mock tarfile to raise PermissionError original_open = tarfile.open def mock_tarfile_open(*args, **kwargs): tar = original_open(*args, **kwargs) original_add = tar.add def mock_add(*add_args, **add_kwargs): raise PermissionError("Access denied") tar.add = mock_add return tar with patch('tarfile.open', side_effect=mock_tarfile_open): # Should handle error gracefully boiler.boil_and_weld() def test_boil_and_weld_raises_on_write_error(self, temp_dir): """Test that boil_and_weld raises error when cannot write""" boiler = GenesisBoiler() test_source = temp_dir / "test_source" test_source.mkdir() boiler.sources = [str(test_source)] # Try to write to root (should fail without permissions) boiler.output_bin = "/invalid_path/output.tar.gz" with pytest.raises((IOError, OSError, tarfile.TarError)): boiler.boil_and_weld() class TestGenesisBoilerIntegration: """Integration tests for GenesisBoiler""" def test_full_workflow(self, temp_dir): """Test complete workflow: audit then boil""" boiler = GenesisBoiler() # Create test structure test_source = temp_dir / "test_source" test_source.mkdir() (test_source / "subdir").mkdir() (test_source / "file1.txt").write_text("content1") (test_source / "subdir" / "file2.txt").write_text("content2") boiler.sources = [str(test_source)] boiler.inventory_path = str(temp_dir / "INVENTORY.json") boiler.output_bin = str(temp_dir / "output.tar.gz") # Run full workflow boiler.audit_territory() boiler.boil_and_weld() # Verify both operations completed assert Path(boiler.inventory_path).exists() assert Path(boiler.output_bin).exists() # Verify inventory content with open(boiler.inventory_path, 'r') as f: inventory = json.load(f) assert len(inventory["files"]) == 2 def test_multiple_sources(self, temp_dir): """Test with multiple source directories""" boiler = GenesisBoiler() # Create multiple sources source1 = temp_dir / "source1" source2 = temp_dir / "source2" source1.mkdir() source2.mkdir() (source1 / "file1.txt").write_text("content1") (source2 / "file2.txt").write_text("content2") boiler.sources = [str(source1), str(source2)] boiler.inventory_path = str(temp_dir / "INVENTORY.json") boiler.output_bin = str(temp_dir / "output.tar.gz") boiler.audit_territory() boiler.boil_and_weld() # Verify all files inventoried with open(boiler.inventory_path, 'r') as f: inventory = json.load(f) assert len(inventory["files"]) == 2 # Verify tarball contains both sources with tarfile.open(boiler.output_bin, "r:gz") as tar: members = tar.getmembers() assert len(members) >= 2