bfh-studadmin-assist / test_async_performance.py
awellis's picture
Implement proper async parallelism for multi-agent pipeline
e20e6e8
"""Test script to verify async parallel performance improvements."""
import asyncio
import time
import logging
from src.config import get_config, reset_config
from src.indexing.memory_indexer import MemoryDocumentIndexer
from src.pipeline.memory_orchestrator import MemoryRAGOrchestrator
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
async def test_parallel_vs_sequential():
"""Compare parallel vs sequential execution performance."""
print("\n" + "="*80)
print("ASYNC PERFORMANCE TEST: Parallel vs Sequential")
print("="*80 + "\n")
# Test query
test_query = "Wie kann ich mich exmatrikulieren?"
# Initialize indexer (shared for both tests)
logger.info("Initializing document indexer...")
config = get_config()
indexer = MemoryDocumentIndexer(llm_config=config.llm)
# Load documents from JSON if available
import json
from pathlib import Path
from haystack import Document as HaystackDoc
json_path = Path("data/embedded_documents.json")
if json_path.exists():
logger.info(f"Loading documents from {json_path}...")
with open(json_path) as f:
docs_data = json.load(f)
documents = []
for doc_data in docs_data:
doc = HaystackDoc(
id=doc_data.get("id"),
content=doc_data["content"],
embedding=doc_data.get("embedding"),
meta=doc_data.get("meta", {})
)
documents.append(doc)
indexer.document_store.write_documents(documents)
logger.info(f"Loaded {len(documents)} documents\n")
else:
logger.error(f"Document store not found at {json_path}")
logger.error("Please run the ingestion script first")
return
# ========================================================================
# TEST 1: SEQUENTIAL MODE
# ========================================================================
print("\n" + "-"*80)
print("TEST 1: SEQUENTIAL MODE (baseline)")
print("-"*80 + "\n")
reset_config()
config_seq = get_config()
config_seq.use_parallel_processing = False # Disable parallelism
orchestrator_seq = MemoryRAGOrchestrator(config_seq, indexer)
start = time.time()
result_seq = await orchestrator_seq.process_query(test_query)
sequential_time = time.time() - start
print(f"\n⏱️ Sequential Time: {sequential_time:.2f}s")
print(f"πŸ“§ Subject: {result_seq.email_draft.subject}")
print(f"βœ“ Accuracy: {result_seq.fact_check.accuracy_score:.0%}")
# ========================================================================
# TEST 2: PARALLEL MODE
# ========================================================================
print("\n" + "-"*80)
print("TEST 2: PARALLEL MODE (optimized)")
print("-"*80 + "\n")
reset_config()
config_par = get_config()
config_par.use_parallel_processing = True # Enable parallelism
orchestrator_par = MemoryRAGOrchestrator(config_par, indexer)
start = time.time()
result_par = await orchestrator_par.process_query(test_query)
parallel_time = time.time() - start
print(f"\n⏱️ Parallel Time: {parallel_time:.2f}s")
print(f"πŸ“§ Subject: {result_par.email_draft.subject}")
print(f"βœ“ Accuracy: {result_par.fact_check.accuracy_score:.0%}")
# ========================================================================
# RESULTS COMPARISON
# ========================================================================
print("\n" + "="*80)
print("RESULTS SUMMARY")
print("="*80 + "\n")
speedup = (sequential_time / parallel_time - 1) * 100
time_saved = sequential_time - parallel_time
print(f"Sequential: {sequential_time:.2f}s 🐌")
print(f"Parallel: {parallel_time:.2f}s ⚑")
print(f"\n{'βœ… IMPROVEMENT' if speedup > 0 else '❌ SLOWER'}")
print(f"Time saved: {time_saved:.2f}s")
print(f"Speedup: {speedup:+.1f}%")
# Verify outputs are identical
print(f"\nπŸ“‹ Output Verification:")
subject_match = result_seq.email_draft.subject == result_par.email_draft.subject
body_match = result_seq.email_draft.body == result_par.email_draft.body
print(f" Subject matches: {'βœ“' if subject_match else 'βœ—'}")
print(f" Body matches: {'βœ“' if body_match else 'βœ—'}")
if subject_match and body_match:
print(f"\nβœ… PASS: Parallel execution produces identical results")
else:
print(f"\n⚠️ WARNING: Outputs differ (may be due to non-deterministic LLM)")
print("\n" + "="*80 + "\n")
return {
"sequential_time": sequential_time,
"parallel_time": parallel_time,
"speedup_percent": speedup,
"time_saved": time_saved,
}
async def test_timeout_handling():
"""Test that timeouts work correctly."""
print("\n" + "="*80)
print("TIMEOUT HANDLING TEST")
print("="*80 + "\n")
reset_config()
config = get_config()
config.agent_timeout = 1 # Very short timeout (will likely fail)
indexer = MemoryDocumentIndexer(llm_config=config.llm)
orchestrator = MemoryRAGOrchestrator(config, indexer)
try:
await orchestrator.process_query("Test query")
print("❌ FAIL: Should have timed out")
except (TimeoutError, asyncio.TimeoutError):
print("βœ… PASS: Timeout handled correctly")
except Exception as e:
print(f"⚠️ Unexpected error: {e}")
if __name__ == "__main__":
# Run performance comparison
results = asyncio.run(test_parallel_vs_sequential())
# Optional: Run timeout test
# asyncio.run(test_timeout_handling())