""" Ledger Tests ============= Tests for ledger store, ECDSA signing, and parallel execution. """ import pytest import sys import os import tempfile import json from pathlib import Path # Add src to path sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) from ledger.store import LedgerStore, LedgerEntry, LedgerBlock from ledger.signing import ECDSASigner, SignatureVerifier, verify_block_signature from ledger.merkle import hash_leaf class TestLedgerStore: """Tests for LedgerStore.""" def test_create_empty_store(self): """Test creating an empty ledger store.""" store = LedgerStore() assert len(store.blocks) == 0 assert len(store.pending_entries) == 0 def test_add_entry(self): """Test adding entries to the ledger.""" store = LedgerStore() entry = store.add_entry( action_type="search", agent="research", input_data={"query": "test"}, output_data={"results": []} ) assert entry.action_type == "search" assert entry.agent == "research" assert len(entry.entry_hash) == 64 assert len(store.pending_entries) == 1 def test_create_block(self): """Test creating a block from pending entries.""" store = LedgerStore() # Add some entries store.add_entry("search", "research", {"q": "1"}, {"r": "1"}) store.add_entry("summarize", "summarizer", {"d": "2"}, {"s": "2"}) # Create block block = store.create_block() assert block.block_id == 1 assert len(block.entries) == 2 assert len(block.merkle_root) == 64 assert len(block.block_hash) == 64 assert len(store.pending_entries) == 0 assert len(store.blocks) == 1 def test_block_chain_linking(self): """Test that blocks are properly linked.""" store = LedgerStore() # Create first block store.add_entry("action1", "agent1", {}, {}) block1 = store.create_block() # Create second block store.add_entry("action2", "agent2", {}, {}) block2 = store.create_block() assert block2.previous_block_hash == block1.block_hash def test_verify_chain(self): """Test chain verification.""" store = LedgerStore() # Create multiple blocks for i in range(3): store.add_entry(f"action{i}", f"agent{i}", {"i": i}, {"o": i}) store.create_block() is_valid, message = store.verify_chain() assert is_valid is True assert "valid" in message.lower() def test_persistence(self): """Test saving and loading ledger.""" with tempfile.TemporaryDirectory() as tmpdir: storage_path = Path(tmpdir) / "ledger.json" # Create and populate store store1 = LedgerStore(storage_path=str(storage_path)) store1.add_entry("test", "agent", {"input": "data"}, {"output": "result"}) store1.create_block() # Create new store from same file store2 = LedgerStore(storage_path=str(storage_path)) assert len(store2.blocks) == 1 assert store2.blocks[0].block_hash == store1.blocks[0].block_hash def test_empty_block_raises(self): """Test that creating a block with no entries raises.""" store = LedgerStore() with pytest.raises(ValueError): store.create_block() class TestECDSASigning: """Tests for ECDSA signing functionality.""" def test_generate_keypair(self): """Test key pair generation.""" keypair = ECDSASigner.generate_keypair() assert keypair.private_key is not None assert keypair.public_key is not None assert len(keypair.get_public_key_hex()) > 0 def test_sign_and_verify(self): """Test signing data and verifying signature.""" signer = ECDSASigner() data = "test block hash 12345" signature = signer.sign(data) assert len(signature) > 0 assert signer.verify(data, signature) is True def test_invalid_signature_fails(self): """Test that invalid signatures are rejected.""" signer = ECDSASigner() data = "original data" signature = signer.sign(data) # Verify with different data should fail assert signer.verify("different data", signature) is False def test_signature_verifier(self): """Test standalone signature verifier.""" signer = ECDSASigner() data = "test data for verification" signature = signer.sign(data) public_key_hex = signer.get_public_key_hex() # Create verifier from public key verifier = SignatureVerifier(public_key_hex) assert verifier.verify(data, signature) is True def test_verify_block_signature_convenience(self): """Test the convenience function for block signature verification.""" signer = ECDSASigner() block_hash = hash_leaf("sample block content") signature = signer.sign(block_hash) public_key = signer.get_public_key_hex() assert verify_block_signature(block_hash, signature, public_key) is True def test_signed_block(self): """Test creating a signed block.""" store = LedgerStore() signer = ECDSASigner() store.add_entry("signed_action", "agent", {}, {}) block = store.create_block(signer=signer) assert len(block.signature) > 0 assert signer.verify(block.block_hash, block.signature) is True class TestParallelExecution: """Tests for parallel agent execution.""" @pytest.mark.asyncio async def test_parallel_executor_single_task(self): """Test executing a single task.""" from agents.parallel import ParallelExecutor, AgentTask from agents.research_agent import ResearchAgent executor = ParallelExecutor() agent = ResearchAgent(use_real_search=False) task = AgentTask(agent=agent, input_data={"query": "test"}) result = await executor.execute_single(task) assert result.success is True assert result.agent_role == "research" assert "results" in result.output_data @pytest.mark.asyncio async def test_parallel_executor_multiple_tasks(self): """Test executing multiple tasks in parallel.""" from agents.parallel import ParallelExecutor, AgentTask from agents.research_agent import ResearchAgent executor = ParallelExecutor(max_concurrency=5) tasks = [ AgentTask( agent=ResearchAgent(use_real_search=False), input_data={"query": f"query {i}"} ) for i in range(3) ] results = await executor.execute_parallel(tasks) assert len(results) == 3 assert all(r.success for r in results) @pytest.mark.asyncio async def test_parallel_executor_handles_errors(self): """Test that executor handles task errors gracefully.""" from agents.parallel import ParallelExecutor, AgentTask from agents.base import BaseAgent from dataclasses import dataclass @dataclass class FailingAgent(BaseAgent): role: str = "failing" async def run(self, input): raise ValueError("Intentional failure") executor = ParallelExecutor() task = AgentTask(agent=FailingAgent(), input_data={}) result = await executor.execute_single(task) assert result.success is False assert "failure" in result.error.lower() @pytest.mark.asyncio async def test_merge_results(self): """Test merging multiple agent results.""" from agents.parallel import ParallelExecutor, AgentTask, merge_results from agents.research_agent import ResearchAgent from agents.summarizer_agent import SummarizerAgent executor = ParallelExecutor() tasks = [ AgentTask(agent=ResearchAgent(use_real_search=False), input_data={"query": "test"}), AgentTask(agent=SummarizerAgent(), input_data={"documents": []}) ] results = await executor.execute_parallel(tasks) merged = merge_results(results) assert merged["total_agents"] == 2 assert merged["successful"] == 2 assert merged["failed"] == 0 assert len(merged["results"]) == 2 class TestLedgerIntegration: """Integration tests for ledger with signing.""" def test_full_audit_trail(self): """Test complete audit trail with signed blocks.""" store = LedgerStore() signer = ECDSASigner() # Simulate agent actions actions = [ ("search", "research", {"query": "AI health"}, {"results": ["r1", "r2"]}), ("summarize", "summarizer", {"docs": ["d1"]}, {"summary": "test"}), ("validate", "critic", {"summary": "test"}, {"valid": True}) ] for action_type, agent, inp, out in actions: store.add_entry(action_type, agent, inp, out) block = store.create_block(signer=signer) # Verify complete audit trail assert len(block.entries) == 3 assert len(block.merkle_root) == 64 assert signer.verify(block.block_hash, block.signature) # Verify chain integrity is_valid, _ = store.verify_chain() assert is_valid if __name__ == "__main__": pytest.main([__file__, "-v"])