oppo-node / tests /test_genesis_boiler.py
DJ-Goanna-Coding's picture
Deploy from GitHub Actions
c87f72b verified
"""
Comprehensive tests for genesis_boiler.py
Tests cover:
- Initialization
- Territory auditing
- File consolidation
- Error handling
- Path validation
"""
import pytest
import json
import tarfile
import os
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
import sys
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from genesis_boiler import GenesisBoiler
class TestGenesisBoilerInit:
"""Test GenesisBoiler initialization"""
def test_init_default_values(self):
"""Test that GenesisBoiler initializes with correct default values"""
boiler = GenesisBoiler()
assert boiler.sources is not None
assert isinstance(boiler.sources, list)
assert len(boiler.sources) == 3
assert boiler.output_bin == "/data/genesis_monolith.bin"
assert boiler.inventory_path == "./INVENTORY.json"
def test_init_sources_contain_expected_paths(self):
"""Test that source paths contain expected directories"""
boiler = GenesisBoiler()
assert "./Research/GENESIS_VAULT/" in boiler.sources
assert "/data/Mapping-and-Inventory-storage/" in boiler.sources
assert "./pioneer-trader/vortex_cache/" in boiler.sources
class TestGenesisBoilerAuditTerritory:
"""Test the audit_territory method"""
def test_audit_territory_creates_inventory_file(self, temp_dir, monkeypatch):
"""Test that audit_territory creates an inventory JSON file"""
boiler = GenesisBoiler()
# Override paths to use temp directory
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
(test_source / "file2.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.audit_territory()
# Check inventory file was created
assert Path(boiler.inventory_path).exists()
def test_audit_territory_inventory_structure(self, temp_dir):
"""Test that inventory has correct structure"""
boiler = GenesisBoiler()
# Create test structure
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.audit_territory()
# Load and verify inventory
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert "timestamp" in inventory
assert "files" in inventory
assert isinstance(inventory["files"], list)
def test_audit_territory_counts_files_correctly(self, temp_dir):
"""Test that audit_territory counts all files correctly"""
boiler = GenesisBoiler()
# Create test structure with known number of files
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "subdir").mkdir()
(test_source / "file1.txt").write_text("content")
(test_source / "file2.txt").write_text("content")
(test_source / "subdir" / "file3.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.audit_territory()
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert len(inventory["files"]) == 3
def test_audit_territory_handles_nonexistent_source(self, temp_dir):
"""Test that audit_territory handles non-existent sources gracefully"""
boiler = GenesisBoiler()
boiler.sources = [str(temp_dir / "nonexistent")]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
# Should not raise an exception
boiler.audit_territory()
# Inventory should still be created, just empty
assert Path(boiler.inventory_path).exists()
def test_audit_territory_handles_permission_error(self, temp_dir, monkeypatch):
"""Test that audit_territory handles permission errors gracefully"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
# Mock os.walk to raise PermissionError
def mock_walk(*args, **kwargs):
raise PermissionError("Access denied")
with patch('os.walk', side_effect=mock_walk):
# Should handle error gracefully
boiler.audit_territory()
# Inventory should still be created
assert Path(boiler.inventory_path).exists()
def test_audit_territory_raises_on_write_error(self, temp_dir, monkeypatch):
"""Test that audit_territory raises IOError when unable to write"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
# Set to an invalid path
boiler.inventory_path = str(temp_dir / "nonexistent_dir" / "INVENTORY.json")
with pytest.raises(IOError):
boiler.audit_territory()
class TestGenesisBoilerBoilAndWeld:
"""Test the boil_and_weld method"""
def test_boil_and_weld_creates_tarball(self, temp_dir):
"""Test that boil_and_weld creates a tarball"""
boiler = GenesisBoiler()
# Create test source
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.output_bin = str(temp_dir / "output.tar.gz")
boiler.boil_and_weld()
assert Path(boiler.output_bin).exists()
def test_boil_and_weld_tarball_is_valid(self, temp_dir):
"""Test that created tarball is valid and can be opened"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
boiler.sources = [str(test_source)]
boiler.output_bin = str(temp_dir / "output.tar.gz")
boiler.boil_and_weld()
# Verify tarball can be opened
with tarfile.open(boiler.output_bin, "r:gz") as tar:
members = tar.getmembers()
assert len(members) > 0
def test_boil_and_weld_creates_output_directory(self, temp_dir):
"""Test that boil_and_weld creates output directory if needed"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "file1.txt").write_text("content")
output_dir = temp_dir / "output_dir"
boiler.sources = [str(test_source)]
boiler.output_bin = str(output_dir / "output.tar.gz")
assert not output_dir.exists()
boiler.boil_and_weld()
assert output_dir.exists()
assert Path(boiler.output_bin).exists()
def test_boil_and_weld_handles_nonexistent_source(self, temp_dir):
"""Test that boil_and_weld handles non-existent sources"""
boiler = GenesisBoiler()
boiler.sources = [str(temp_dir / "nonexistent")]
boiler.output_bin = str(temp_dir / "output.tar.gz")
# Should not raise an exception
boiler.boil_and_weld()
# Tarball should still be created (just empty)
assert Path(boiler.output_bin).exists()
def test_boil_and_weld_handles_permission_error(self, temp_dir, monkeypatch):
"""Test that boil_and_weld handles permission errors"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
boiler.output_bin = str(temp_dir / "output.tar.gz")
# Mock tarfile to raise PermissionError
original_open = tarfile.open
def mock_tarfile_open(*args, **kwargs):
tar = original_open(*args, **kwargs)
original_add = tar.add
def mock_add(*add_args, **add_kwargs):
raise PermissionError("Access denied")
tar.add = mock_add
return tar
with patch('tarfile.open', side_effect=mock_tarfile_open):
# Should handle error gracefully
boiler.boil_and_weld()
def test_boil_and_weld_raises_on_write_error(self, temp_dir):
"""Test that boil_and_weld raises error when cannot write"""
boiler = GenesisBoiler()
test_source = temp_dir / "test_source"
test_source.mkdir()
boiler.sources = [str(test_source)]
# Try to write to root (should fail without permissions)
boiler.output_bin = "/invalid_path/output.tar.gz"
with pytest.raises((IOError, OSError, tarfile.TarError)):
boiler.boil_and_weld()
class TestGenesisBoilerIntegration:
"""Integration tests for GenesisBoiler"""
def test_full_workflow(self, temp_dir):
"""Test complete workflow: audit then boil"""
boiler = GenesisBoiler()
# Create test structure
test_source = temp_dir / "test_source"
test_source.mkdir()
(test_source / "subdir").mkdir()
(test_source / "file1.txt").write_text("content1")
(test_source / "subdir" / "file2.txt").write_text("content2")
boiler.sources = [str(test_source)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.output_bin = str(temp_dir / "output.tar.gz")
# Run full workflow
boiler.audit_territory()
boiler.boil_and_weld()
# Verify both operations completed
assert Path(boiler.inventory_path).exists()
assert Path(boiler.output_bin).exists()
# Verify inventory content
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert len(inventory["files"]) == 2
def test_multiple_sources(self, temp_dir):
"""Test with multiple source directories"""
boiler = GenesisBoiler()
# Create multiple sources
source1 = temp_dir / "source1"
source2 = temp_dir / "source2"
source1.mkdir()
source2.mkdir()
(source1 / "file1.txt").write_text("content1")
(source2 / "file2.txt").write_text("content2")
boiler.sources = [str(source1), str(source2)]
boiler.inventory_path = str(temp_dir / "INVENTORY.json")
boiler.output_bin = str(temp_dir / "output.tar.gz")
boiler.audit_territory()
boiler.boil_and_weld()
# Verify all files inventoried
with open(boiler.inventory_path, 'r') as f:
inventory = json.load(f)
assert len(inventory["files"]) == 2
# Verify tarball contains both sources
with tarfile.open(boiler.output_bin, "r:gz") as tar:
members = tar.getmembers()
assert len(members) >= 2