Spaces:
Sleeping
Sleeping
| """ | |
| Comprehensive tests for genesis_boiler.py | |
| Tests cover: | |
| - Initialization | |
| - Territory auditing | |
| - File consolidation | |
| - Error handling | |
| - Path validation | |
| """ | |
| import pytest | |
| import json | |
| import tarfile | |
| import os | |
| from pathlib import Path | |
| from unittest.mock import Mock, patch, MagicMock | |
| import sys | |
| # Add parent directory to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from genesis_boiler import GenesisBoiler | |
| class TestGenesisBoilerInit: | |
| """Test GenesisBoiler initialization""" | |
| def test_init_default_values(self): | |
| """Test that GenesisBoiler initializes with correct default values""" | |
| boiler = GenesisBoiler() | |
| assert boiler.sources is not None | |
| assert isinstance(boiler.sources, list) | |
| assert len(boiler.sources) == 3 | |
| assert boiler.output_bin == "/data/genesis_monolith.bin" | |
| assert boiler.inventory_path == "./INVENTORY.json" | |
| def test_init_sources_contain_expected_paths(self): | |
| """Test that source paths contain expected directories""" | |
| boiler = GenesisBoiler() | |
| assert "./Research/GENESIS_VAULT/" in boiler.sources | |
| assert "/data/Mapping-and-Inventory-storage/" in boiler.sources | |
| assert "./pioneer-trader/vortex_cache/" in boiler.sources | |
| class TestGenesisBoilerAuditTerritory: | |
| """Test the audit_territory method""" | |
| def test_audit_territory_creates_inventory_file(self, temp_dir, monkeypatch): | |
| """Test that audit_territory creates an inventory JSON file""" | |
| boiler = GenesisBoiler() | |
| # Override paths to use temp directory | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "file1.txt").write_text("content") | |
| (test_source / "file2.txt").write_text("content") | |
| boiler.sources = [str(test_source)] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| boiler.audit_territory() | |
| # Check inventory file was created | |
| assert Path(boiler.inventory_path).exists() | |
| def test_audit_territory_inventory_structure(self, temp_dir): | |
| """Test that inventory has correct structure""" | |
| boiler = GenesisBoiler() | |
| # Create test structure | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "file1.txt").write_text("content") | |
| boiler.sources = [str(test_source)] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| boiler.audit_territory() | |
| # Load and verify inventory | |
| with open(boiler.inventory_path, 'r') as f: | |
| inventory = json.load(f) | |
| assert "timestamp" in inventory | |
| assert "files" in inventory | |
| assert isinstance(inventory["files"], list) | |
| def test_audit_territory_counts_files_correctly(self, temp_dir): | |
| """Test that audit_territory counts all files correctly""" | |
| boiler = GenesisBoiler() | |
| # Create test structure with known number of files | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "subdir").mkdir() | |
| (test_source / "file1.txt").write_text("content") | |
| (test_source / "file2.txt").write_text("content") | |
| (test_source / "subdir" / "file3.txt").write_text("content") | |
| boiler.sources = [str(test_source)] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| boiler.audit_territory() | |
| with open(boiler.inventory_path, 'r') as f: | |
| inventory = json.load(f) | |
| assert len(inventory["files"]) == 3 | |
| def test_audit_territory_handles_nonexistent_source(self, temp_dir): | |
| """Test that audit_territory handles non-existent sources gracefully""" | |
| boiler = GenesisBoiler() | |
| boiler.sources = [str(temp_dir / "nonexistent")] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| # Should not raise an exception | |
| boiler.audit_territory() | |
| # Inventory should still be created, just empty | |
| assert Path(boiler.inventory_path).exists() | |
| def test_audit_territory_handles_permission_error(self, temp_dir, monkeypatch): | |
| """Test that audit_territory handles permission errors gracefully""" | |
| boiler = GenesisBoiler() | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| boiler.sources = [str(test_source)] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| # Mock os.walk to raise PermissionError | |
| def mock_walk(*args, **kwargs): | |
| raise PermissionError("Access denied") | |
| with patch('os.walk', side_effect=mock_walk): | |
| # Should handle error gracefully | |
| boiler.audit_territory() | |
| # Inventory should still be created | |
| assert Path(boiler.inventory_path).exists() | |
| def test_audit_territory_raises_on_write_error(self, temp_dir, monkeypatch): | |
| """Test that audit_territory raises IOError when unable to write""" | |
| boiler = GenesisBoiler() | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| boiler.sources = [str(test_source)] | |
| # Set to an invalid path | |
| boiler.inventory_path = str(temp_dir / "nonexistent_dir" / "INVENTORY.json") | |
| with pytest.raises(IOError): | |
| boiler.audit_territory() | |
| class TestGenesisBoilerBoilAndWeld: | |
| """Test the boil_and_weld method""" | |
| def test_boil_and_weld_creates_tarball(self, temp_dir): | |
| """Test that boil_and_weld creates a tarball""" | |
| boiler = GenesisBoiler() | |
| # Create test source | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "file1.txt").write_text("content") | |
| boiler.sources = [str(test_source)] | |
| boiler.output_bin = str(temp_dir / "output.tar.gz") | |
| boiler.boil_and_weld() | |
| assert Path(boiler.output_bin).exists() | |
| def test_boil_and_weld_tarball_is_valid(self, temp_dir): | |
| """Test that created tarball is valid and can be opened""" | |
| boiler = GenesisBoiler() | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "file1.txt").write_text("content") | |
| boiler.sources = [str(test_source)] | |
| boiler.output_bin = str(temp_dir / "output.tar.gz") | |
| boiler.boil_and_weld() | |
| # Verify tarball can be opened | |
| with tarfile.open(boiler.output_bin, "r:gz") as tar: | |
| members = tar.getmembers() | |
| assert len(members) > 0 | |
| def test_boil_and_weld_creates_output_directory(self, temp_dir): | |
| """Test that boil_and_weld creates output directory if needed""" | |
| boiler = GenesisBoiler() | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "file1.txt").write_text("content") | |
| output_dir = temp_dir / "output_dir" | |
| boiler.sources = [str(test_source)] | |
| boiler.output_bin = str(output_dir / "output.tar.gz") | |
| assert not output_dir.exists() | |
| boiler.boil_and_weld() | |
| assert output_dir.exists() | |
| assert Path(boiler.output_bin).exists() | |
| def test_boil_and_weld_handles_nonexistent_source(self, temp_dir): | |
| """Test that boil_and_weld handles non-existent sources""" | |
| boiler = GenesisBoiler() | |
| boiler.sources = [str(temp_dir / "nonexistent")] | |
| boiler.output_bin = str(temp_dir / "output.tar.gz") | |
| # Should not raise an exception | |
| boiler.boil_and_weld() | |
| # Tarball should still be created (just empty) | |
| assert Path(boiler.output_bin).exists() | |
| def test_boil_and_weld_handles_permission_error(self, temp_dir, monkeypatch): | |
| """Test that boil_and_weld handles permission errors""" | |
| boiler = GenesisBoiler() | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| boiler.sources = [str(test_source)] | |
| boiler.output_bin = str(temp_dir / "output.tar.gz") | |
| # Mock tarfile to raise PermissionError | |
| original_open = tarfile.open | |
| def mock_tarfile_open(*args, **kwargs): | |
| tar = original_open(*args, **kwargs) | |
| original_add = tar.add | |
| def mock_add(*add_args, **add_kwargs): | |
| raise PermissionError("Access denied") | |
| tar.add = mock_add | |
| return tar | |
| with patch('tarfile.open', side_effect=mock_tarfile_open): | |
| # Should handle error gracefully | |
| boiler.boil_and_weld() | |
| def test_boil_and_weld_raises_on_write_error(self, temp_dir): | |
| """Test that boil_and_weld raises error when cannot write""" | |
| boiler = GenesisBoiler() | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| boiler.sources = [str(test_source)] | |
| # Try to write to root (should fail without permissions) | |
| boiler.output_bin = "/invalid_path/output.tar.gz" | |
| with pytest.raises((IOError, OSError, tarfile.TarError)): | |
| boiler.boil_and_weld() | |
| class TestGenesisBoilerIntegration: | |
| """Integration tests for GenesisBoiler""" | |
| def test_full_workflow(self, temp_dir): | |
| """Test complete workflow: audit then boil""" | |
| boiler = GenesisBoiler() | |
| # Create test structure | |
| test_source = temp_dir / "test_source" | |
| test_source.mkdir() | |
| (test_source / "subdir").mkdir() | |
| (test_source / "file1.txt").write_text("content1") | |
| (test_source / "subdir" / "file2.txt").write_text("content2") | |
| boiler.sources = [str(test_source)] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| boiler.output_bin = str(temp_dir / "output.tar.gz") | |
| # Run full workflow | |
| boiler.audit_territory() | |
| boiler.boil_and_weld() | |
| # Verify both operations completed | |
| assert Path(boiler.inventory_path).exists() | |
| assert Path(boiler.output_bin).exists() | |
| # Verify inventory content | |
| with open(boiler.inventory_path, 'r') as f: | |
| inventory = json.load(f) | |
| assert len(inventory["files"]) == 2 | |
| def test_multiple_sources(self, temp_dir): | |
| """Test with multiple source directories""" | |
| boiler = GenesisBoiler() | |
| # Create multiple sources | |
| source1 = temp_dir / "source1" | |
| source2 = temp_dir / "source2" | |
| source1.mkdir() | |
| source2.mkdir() | |
| (source1 / "file1.txt").write_text("content1") | |
| (source2 / "file2.txt").write_text("content2") | |
| boiler.sources = [str(source1), str(source2)] | |
| boiler.inventory_path = str(temp_dir / "INVENTORY.json") | |
| boiler.output_bin = str(temp_dir / "output.tar.gz") | |
| boiler.audit_territory() | |
| boiler.boil_and_weld() | |
| # Verify all files inventoried | |
| with open(boiler.inventory_path, 'r') as f: | |
| inventory = json.load(f) | |
| assert len(inventory["files"]) == 2 | |
| # Verify tarball contains both sources | |
| with tarfile.open(boiler.output_bin, "r:gz") as tar: | |
| members = tar.getmembers() | |
| assert len(members) >= 2 | |