general-reasoning-agent / test_persistence_integration.py
chmielvu's picture
feat: add production refinements (Phase 1-3)
4454066 verified
"""
Integration test for workflow persistence with actual workflow execution
"""
import os
import shutil
import logging
from workflows import WorkflowTask, WorkflowDefinition, WorkflowExecutor, WorkflowStore
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Mock tool for testing
class MockTool:
"""Simple mock tool for testing workflow execution"""
def __init__(self, name):
self.name = name
def forward(self, **kwargs):
"""Execute tool with given arguments"""
if "value" in kwargs:
return {"result": kwargs["value"]}
elif "expression" in kwargs:
try:
result = eval(kwargs["expression"])
return {"result": result}
except Exception as e:
return {"error": str(e)}
return {"result": "mock_result"}
def test_persistence_integration():
"""Test that workflows are automatically persisted after successful execution"""
test_cache = "./test_integration_cache"
# Clean up any existing test cache
if os.path.exists(test_cache):
shutil.rmtree(test_cache)
print("=" * 60)
print("Testing Workflow Persistence Integration")
print("=" * 60)
# Step 1: Create tools registry
print("\n[1] Setting up tools...")
mock_tool = MockTool("mock_tool")
tools_registry = {
"mock_tool": mock_tool
}
print("[PASS] Tools registry created")
# Step 2: Create workflow executor with custom cache path
print("\n[2] Creating workflow executor...")
executor = WorkflowExecutor(
tools_registry=tools_registry,
store_path=test_cache
)
print("[PASS] Executor created with persistence enabled")
print(f" - Cache path: {test_cache}")
# Step 3: Create a simple workflow
print("\n[3] Creating workflow...")
workflow = WorkflowDefinition(
name="math_workflow",
description="Simple math computation workflow",
tasks=[
WorkflowTask(
id="add_task",
tool="mock_tool",
args={"expression": "10 + 5"},
depends_on=[]
),
WorkflowTask(
id="multiply_task",
tool="mock_tool",
args={"expression": "3 * 7"},
depends_on=[]
),
WorkflowTask(
id="combine_task",
tool="mock_tool",
args={"expression": "21 + 15"}, # Results from previous tasks
depends_on=["add_task", "multiply_task"]
)
],
final_task="combine_task",
max_parallel=2
)
print(f"[PASS] Workflow created: {workflow.name}")
print(f" - Tasks: {len(workflow.tasks)}")
print(f" - Dependencies: add_task, multiply_task -> combine_task")
# Step 4: Execute workflow
print("\n[4] Executing workflow...")
result = executor.execute(workflow)
if not result["success"]:
print(f"[FAIL] Workflow execution failed: {result.get('error')}")
return False
print("[PASS] Workflow executed successfully")
print(f" - Execution time: {result['execution_time']:.2f}s")
print(f" - Final result: {result['result']}")
print(f" - Trace entries: {len(result['trace'])}")
# Step 5: Verify workflow was persisted
print("\n[5] Verifying persistence...")
store = WorkflowStore(store_path=test_cache)
workflow_ids = store.list_workflows()
if len(workflow_ids) != 1:
print(f"[FAIL] Expected 1 persisted workflow, found {len(workflow_ids)}")
return False
print(f"[PASS] Found {len(workflow_ids)} persisted workflow")
print(f" - Workflow ID: {workflow_ids[0]}")
# Step 6: Load and verify persisted workflow
print("\n[6] Loading persisted workflow...")
workflow_id = workflow_ids[0]
loaded_data = store.load_workflow(workflow_id)
if loaded_data is None:
print("[FAIL] Failed to load persisted workflow")
return False
print("[PASS] Workflow loaded successfully")
print(f" - Workflow ID: {loaded_data['workflow_id']}")
print(f" - Workflow name: {loaded_data['workflow']['name']}")
# Step 7: Verify loaded data matches execution
print("\n[7] Validating persisted data...")
# Check workflow definition
assert loaded_data["workflow"]["name"] == workflow.name, "Workflow name mismatch"
assert len(loaded_data["workflow"]["tasks"]) == len(workflow.tasks), "Task count mismatch"
assert loaded_data["workflow"]["final_task"] == workflow.final_task, "Final task mismatch"
# Check execution result
assert loaded_data["result"]["success"] == result["success"], "Success status mismatch"
assert loaded_data["result"]["result"] == result["result"], "Final result mismatch"
assert "execution_time" in loaded_data["result"], "Missing execution time"
assert "trace" in loaded_data["result"], "Missing trace"
print("[PASS] All data validated successfully")
print(" - Workflow definition: OK")
print(" - Execution result: OK")
print(" - Trace: OK")
# Step 8: Execute another workflow to test multiple workflows
print("\n[8] Testing multiple workflow persistence...")
workflow2 = WorkflowDefinition(
name="simple_calc",
description="Simple calculation",
tasks=[
WorkflowTask(
id="calc",
tool="mock_tool",
args={"expression": "100 * 2"},
depends_on=[]
)
],
final_task="calc"
)
result2 = executor.execute(workflow2)
assert result2["success"], "Second workflow execution failed"
workflow_ids = store.list_workflows()
assert len(workflow_ids) == 2, f"Expected 2 workflows, found {len(workflow_ids)}"
print(f"[PASS] Multiple workflows persisted correctly")
print(f" - Total workflows: {len(workflow_ids)}")
# Cleanup
print("\n[Cleanup] Removing test cache...")
shutil.rmtree(test_cache)
print("[PASS] Test cache removed")
print("\n" + "=" * 60)
print("[SUCCESS] INTEGRATION TEST PASSED")
print("=" * 60)
print("\nVerified:")
print(" - Workflows auto-persist after successful execution")
print(" - Persisted data matches execution results")
print(" - Multiple workflows can be stored")
print(" - Workflows can be loaded and validated")
return True
if __name__ == "__main__":
try:
success = test_persistence_integration()
exit(0 if success else 1)
except Exception as e:
logger.error(f"Test failed with exception: {e}", exc_info=True)
exit(1)