Spaces:
Sleeping
Sleeping
| """ | |
| Ledger Tests | |
| ============= | |
| Tests for ledger store, ECDSA signing, and parallel execution. | |
| """ | |
| import pytest | |
| import sys | |
| import os | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| # Add src to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) | |
| from ledger.store import LedgerStore, LedgerEntry, LedgerBlock | |
| from ledger.signing import ECDSASigner, SignatureVerifier, verify_block_signature | |
| from ledger.merkle import hash_leaf | |
| class TestLedgerStore: | |
| """Tests for LedgerStore.""" | |
| def test_create_empty_store(self): | |
| """Test creating an empty ledger store.""" | |
| store = LedgerStore() | |
| assert len(store.blocks) == 0 | |
| assert len(store.pending_entries) == 0 | |
| def test_add_entry(self): | |
| """Test adding entries to the ledger.""" | |
| store = LedgerStore() | |
| entry = store.add_entry( | |
| action_type="search", | |
| agent="research", | |
| input_data={"query": "test"}, | |
| output_data={"results": []} | |
| ) | |
| assert entry.action_type == "search" | |
| assert entry.agent == "research" | |
| assert len(entry.entry_hash) == 64 | |
| assert len(store.pending_entries) == 1 | |
| def test_create_block(self): | |
| """Test creating a block from pending entries.""" | |
| store = LedgerStore() | |
| # Add some entries | |
| store.add_entry("search", "research", {"q": "1"}, {"r": "1"}) | |
| store.add_entry("summarize", "summarizer", {"d": "2"}, {"s": "2"}) | |
| # Create block | |
| block = store.create_block() | |
| assert block.block_id == 1 | |
| assert len(block.entries) == 2 | |
| assert len(block.merkle_root) == 64 | |
| assert len(block.block_hash) == 64 | |
| assert len(store.pending_entries) == 0 | |
| assert len(store.blocks) == 1 | |
| def test_block_chain_linking(self): | |
| """Test that blocks are properly linked.""" | |
| store = LedgerStore() | |
| # Create first block | |
| store.add_entry("action1", "agent1", {}, {}) | |
| block1 = store.create_block() | |
| # Create second block | |
| store.add_entry("action2", "agent2", {}, {}) | |
| block2 = store.create_block() | |
| assert block2.previous_block_hash == block1.block_hash | |
| def test_verify_chain(self): | |
| """Test chain verification.""" | |
| store = LedgerStore() | |
| # Create multiple blocks | |
| for i in range(3): | |
| store.add_entry(f"action{i}", f"agent{i}", {"i": i}, {"o": i}) | |
| store.create_block() | |
| is_valid, message = store.verify_chain() | |
| assert is_valid is True | |
| assert "valid" in message.lower() | |
| def test_persistence(self): | |
| """Test saving and loading ledger.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| storage_path = Path(tmpdir) / "ledger.json" | |
| # Create and populate store | |
| store1 = LedgerStore(storage_path=str(storage_path)) | |
| store1.add_entry("test", "agent", {"input": "data"}, {"output": "result"}) | |
| store1.create_block() | |
| # Create new store from same file | |
| store2 = LedgerStore(storage_path=str(storage_path)) | |
| assert len(store2.blocks) == 1 | |
| assert store2.blocks[0].block_hash == store1.blocks[0].block_hash | |
| def test_empty_block_raises(self): | |
| """Test that creating a block with no entries raises.""" | |
| store = LedgerStore() | |
| with pytest.raises(ValueError): | |
| store.create_block() | |
| class TestECDSASigning: | |
| """Tests for ECDSA signing functionality.""" | |
| def test_generate_keypair(self): | |
| """Test key pair generation.""" | |
| keypair = ECDSASigner.generate_keypair() | |
| assert keypair.private_key is not None | |
| assert keypair.public_key is not None | |
| assert len(keypair.get_public_key_hex()) > 0 | |
| def test_sign_and_verify(self): | |
| """Test signing data and verifying signature.""" | |
| signer = ECDSASigner() | |
| data = "test block hash 12345" | |
| signature = signer.sign(data) | |
| assert len(signature) > 0 | |
| assert signer.verify(data, signature) is True | |
| def test_invalid_signature_fails(self): | |
| """Test that invalid signatures are rejected.""" | |
| signer = ECDSASigner() | |
| data = "original data" | |
| signature = signer.sign(data) | |
| # Verify with different data should fail | |
| assert signer.verify("different data", signature) is False | |
| def test_signature_verifier(self): | |
| """Test standalone signature verifier.""" | |
| signer = ECDSASigner() | |
| data = "test data for verification" | |
| signature = signer.sign(data) | |
| public_key_hex = signer.get_public_key_hex() | |
| # Create verifier from public key | |
| verifier = SignatureVerifier(public_key_hex) | |
| assert verifier.verify(data, signature) is True | |
| def test_verify_block_signature_convenience(self): | |
| """Test the convenience function for block signature verification.""" | |
| signer = ECDSASigner() | |
| block_hash = hash_leaf("sample block content") | |
| signature = signer.sign(block_hash) | |
| public_key = signer.get_public_key_hex() | |
| assert verify_block_signature(block_hash, signature, public_key) is True | |
| def test_signed_block(self): | |
| """Test creating a signed block.""" | |
| store = LedgerStore() | |
| signer = ECDSASigner() | |
| store.add_entry("signed_action", "agent", {}, {}) | |
| block = store.create_block(signer=signer) | |
| assert len(block.signature) > 0 | |
| assert signer.verify(block.block_hash, block.signature) is True | |
| class TestParallelExecution: | |
| """Tests for parallel agent execution.""" | |
| async def test_parallel_executor_single_task(self): | |
| """Test executing a single task.""" | |
| from agents.parallel import ParallelExecutor, AgentTask | |
| from agents.research_agent import ResearchAgent | |
| executor = ParallelExecutor() | |
| agent = ResearchAgent(use_real_search=False) | |
| task = AgentTask(agent=agent, input_data={"query": "test"}) | |
| result = await executor.execute_single(task) | |
| assert result.success is True | |
| assert result.agent_role == "research" | |
| assert "results" in result.output_data | |
| async def test_parallel_executor_multiple_tasks(self): | |
| """Test executing multiple tasks in parallel.""" | |
| from agents.parallel import ParallelExecutor, AgentTask | |
| from agents.research_agent import ResearchAgent | |
| executor = ParallelExecutor(max_concurrency=5) | |
| tasks = [ | |
| AgentTask( | |
| agent=ResearchAgent(use_real_search=False), | |
| input_data={"query": f"query {i}"} | |
| ) | |
| for i in range(3) | |
| ] | |
| results = await executor.execute_parallel(tasks) | |
| assert len(results) == 3 | |
| assert all(r.success for r in results) | |
| async def test_parallel_executor_handles_errors(self): | |
| """Test that executor handles task errors gracefully.""" | |
| from agents.parallel import ParallelExecutor, AgentTask | |
| from agents.base import BaseAgent | |
| from dataclasses import dataclass | |
| class FailingAgent(BaseAgent): | |
| role: str = "failing" | |
| async def run(self, input): | |
| raise ValueError("Intentional failure") | |
| executor = ParallelExecutor() | |
| task = AgentTask(agent=FailingAgent(), input_data={}) | |
| result = await executor.execute_single(task) | |
| assert result.success is False | |
| assert "failure" in result.error.lower() | |
| async def test_merge_results(self): | |
| """Test merging multiple agent results.""" | |
| from agents.parallel import ParallelExecutor, AgentTask, merge_results | |
| from agents.research_agent import ResearchAgent | |
| from agents.summarizer_agent import SummarizerAgent | |
| executor = ParallelExecutor() | |
| tasks = [ | |
| AgentTask(agent=ResearchAgent(use_real_search=False), input_data={"query": "test"}), | |
| AgentTask(agent=SummarizerAgent(), input_data={"documents": []}) | |
| ] | |
| results = await executor.execute_parallel(tasks) | |
| merged = merge_results(results) | |
| assert merged["total_agents"] == 2 | |
| assert merged["successful"] == 2 | |
| assert merged["failed"] == 0 | |
| assert len(merged["results"]) == 2 | |
| class TestLedgerIntegration: | |
| """Integration tests for ledger with signing.""" | |
| def test_full_audit_trail(self): | |
| """Test complete audit trail with signed blocks.""" | |
| store = LedgerStore() | |
| signer = ECDSASigner() | |
| # Simulate agent actions | |
| actions = [ | |
| ("search", "research", {"query": "AI health"}, {"results": ["r1", "r2"]}), | |
| ("summarize", "summarizer", {"docs": ["d1"]}, {"summary": "test"}), | |
| ("validate", "critic", {"summary": "test"}, {"valid": True}) | |
| ] | |
| for action_type, agent, inp, out in actions: | |
| store.add_entry(action_type, agent, inp, out) | |
| block = store.create_block(signer=signer) | |
| # Verify complete audit trail | |
| assert len(block.entries) == 3 | |
| assert len(block.merkle_root) == 64 | |
| assert signer.verify(block.block_hash, block.signature) | |
| # Verify chain integrity | |
| is_valid, _ = store.verify_chain() | |
| assert is_valid | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) | |