AgentMask / tests /test_ledger.py
b2230765034
stage4: ledger persistence, signing, parallel agents, UI improvements
631d977
"""
Ledger Tests
=============
Tests for ledger store, ECDSA signing, and parallel execution.
"""
import pytest
import sys
import os
import tempfile
import json
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from ledger.store import LedgerStore, LedgerEntry, LedgerBlock
from ledger.signing import ECDSASigner, SignatureVerifier, verify_block_signature
from ledger.merkle import hash_leaf
class TestLedgerStore:
"""Tests for LedgerStore."""
def test_create_empty_store(self):
"""Test creating an empty ledger store."""
store = LedgerStore()
assert len(store.blocks) == 0
assert len(store.pending_entries) == 0
def test_add_entry(self):
"""Test adding entries to the ledger."""
store = LedgerStore()
entry = store.add_entry(
action_type="search",
agent="research",
input_data={"query": "test"},
output_data={"results": []}
)
assert entry.action_type == "search"
assert entry.agent == "research"
assert len(entry.entry_hash) == 64
assert len(store.pending_entries) == 1
def test_create_block(self):
"""Test creating a block from pending entries."""
store = LedgerStore()
# Add some entries
store.add_entry("search", "research", {"q": "1"}, {"r": "1"})
store.add_entry("summarize", "summarizer", {"d": "2"}, {"s": "2"})
# Create block
block = store.create_block()
assert block.block_id == 1
assert len(block.entries) == 2
assert len(block.merkle_root) == 64
assert len(block.block_hash) == 64
assert len(store.pending_entries) == 0
assert len(store.blocks) == 1
def test_block_chain_linking(self):
"""Test that blocks are properly linked."""
store = LedgerStore()
# Create first block
store.add_entry("action1", "agent1", {}, {})
block1 = store.create_block()
# Create second block
store.add_entry("action2", "agent2", {}, {})
block2 = store.create_block()
assert block2.previous_block_hash == block1.block_hash
def test_verify_chain(self):
"""Test chain verification."""
store = LedgerStore()
# Create multiple blocks
for i in range(3):
store.add_entry(f"action{i}", f"agent{i}", {"i": i}, {"o": i})
store.create_block()
is_valid, message = store.verify_chain()
assert is_valid is True
assert "valid" in message.lower()
def test_persistence(self):
"""Test saving and loading ledger."""
with tempfile.TemporaryDirectory() as tmpdir:
storage_path = Path(tmpdir) / "ledger.json"
# Create and populate store
store1 = LedgerStore(storage_path=str(storage_path))
store1.add_entry("test", "agent", {"input": "data"}, {"output": "result"})
store1.create_block()
# Create new store from same file
store2 = LedgerStore(storage_path=str(storage_path))
assert len(store2.blocks) == 1
assert store2.blocks[0].block_hash == store1.blocks[0].block_hash
def test_empty_block_raises(self):
"""Test that creating a block with no entries raises."""
store = LedgerStore()
with pytest.raises(ValueError):
store.create_block()
class TestECDSASigning:
"""Tests for ECDSA signing functionality."""
def test_generate_keypair(self):
"""Test key pair generation."""
keypair = ECDSASigner.generate_keypair()
assert keypair.private_key is not None
assert keypair.public_key is not None
assert len(keypair.get_public_key_hex()) > 0
def test_sign_and_verify(self):
"""Test signing data and verifying signature."""
signer = ECDSASigner()
data = "test block hash 12345"
signature = signer.sign(data)
assert len(signature) > 0
assert signer.verify(data, signature) is True
def test_invalid_signature_fails(self):
"""Test that invalid signatures are rejected."""
signer = ECDSASigner()
data = "original data"
signature = signer.sign(data)
# Verify with different data should fail
assert signer.verify("different data", signature) is False
def test_signature_verifier(self):
"""Test standalone signature verifier."""
signer = ECDSASigner()
data = "test data for verification"
signature = signer.sign(data)
public_key_hex = signer.get_public_key_hex()
# Create verifier from public key
verifier = SignatureVerifier(public_key_hex)
assert verifier.verify(data, signature) is True
def test_verify_block_signature_convenience(self):
"""Test the convenience function for block signature verification."""
signer = ECDSASigner()
block_hash = hash_leaf("sample block content")
signature = signer.sign(block_hash)
public_key = signer.get_public_key_hex()
assert verify_block_signature(block_hash, signature, public_key) is True
def test_signed_block(self):
"""Test creating a signed block."""
store = LedgerStore()
signer = ECDSASigner()
store.add_entry("signed_action", "agent", {}, {})
block = store.create_block(signer=signer)
assert len(block.signature) > 0
assert signer.verify(block.block_hash, block.signature) is True
class TestParallelExecution:
"""Tests for parallel agent execution."""
@pytest.mark.asyncio
async def test_parallel_executor_single_task(self):
"""Test executing a single task."""
from agents.parallel import ParallelExecutor, AgentTask
from agents.research_agent import ResearchAgent
executor = ParallelExecutor()
agent = ResearchAgent(use_real_search=False)
task = AgentTask(agent=agent, input_data={"query": "test"})
result = await executor.execute_single(task)
assert result.success is True
assert result.agent_role == "research"
assert "results" in result.output_data
@pytest.mark.asyncio
async def test_parallel_executor_multiple_tasks(self):
"""Test executing multiple tasks in parallel."""
from agents.parallel import ParallelExecutor, AgentTask
from agents.research_agent import ResearchAgent
executor = ParallelExecutor(max_concurrency=5)
tasks = [
AgentTask(
agent=ResearchAgent(use_real_search=False),
input_data={"query": f"query {i}"}
)
for i in range(3)
]
results = await executor.execute_parallel(tasks)
assert len(results) == 3
assert all(r.success for r in results)
@pytest.mark.asyncio
async def test_parallel_executor_handles_errors(self):
"""Test that executor handles task errors gracefully."""
from agents.parallel import ParallelExecutor, AgentTask
from agents.base import BaseAgent
from dataclasses import dataclass
@dataclass
class FailingAgent(BaseAgent):
role: str = "failing"
async def run(self, input):
raise ValueError("Intentional failure")
executor = ParallelExecutor()
task = AgentTask(agent=FailingAgent(), input_data={})
result = await executor.execute_single(task)
assert result.success is False
assert "failure" in result.error.lower()
@pytest.mark.asyncio
async def test_merge_results(self):
"""Test merging multiple agent results."""
from agents.parallel import ParallelExecutor, AgentTask, merge_results
from agents.research_agent import ResearchAgent
from agents.summarizer_agent import SummarizerAgent
executor = ParallelExecutor()
tasks = [
AgentTask(agent=ResearchAgent(use_real_search=False), input_data={"query": "test"}),
AgentTask(agent=SummarizerAgent(), input_data={"documents": []})
]
results = await executor.execute_parallel(tasks)
merged = merge_results(results)
assert merged["total_agents"] == 2
assert merged["successful"] == 2
assert merged["failed"] == 0
assert len(merged["results"]) == 2
class TestLedgerIntegration:
"""Integration tests for ledger with signing."""
def test_full_audit_trail(self):
"""Test complete audit trail with signed blocks."""
store = LedgerStore()
signer = ECDSASigner()
# Simulate agent actions
actions = [
("search", "research", {"query": "AI health"}, {"results": ["r1", "r2"]}),
("summarize", "summarizer", {"docs": ["d1"]}, {"summary": "test"}),
("validate", "critic", {"summary": "test"}, {"valid": True})
]
for action_type, agent, inp, out in actions:
store.add_entry(action_type, agent, inp, out)
block = store.create_block(signer=signer)
# Verify complete audit trail
assert len(block.entries) == 3
assert len(block.merkle_root) == 64
assert signer.verify(block.block_hash, block.signature)
# Verify chain integrity
is_valid, _ = store.verify_chain()
assert is_valid
if __name__ == "__main__":
pytest.main([__file__, "-v"])