"""Test script to verify async parallel performance improvements.""" import asyncio import time import logging from src.config import get_config, reset_config from src.indexing.memory_indexer import MemoryDocumentIndexer from src.pipeline.memory_orchestrator import MemoryRAGOrchestrator # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) async def test_parallel_vs_sequential(): """Compare parallel vs sequential execution performance.""" print("\n" + "="*80) print("ASYNC PERFORMANCE TEST: Parallel vs Sequential") print("="*80 + "\n") # Test query test_query = "Wie kann ich mich exmatrikulieren?" # Initialize indexer (shared for both tests) logger.info("Initializing document indexer...") config = get_config() indexer = MemoryDocumentIndexer(llm_config=config.llm) # Load documents from JSON if available import json from pathlib import Path from haystack import Document as HaystackDoc json_path = Path("data/embedded_documents.json") if json_path.exists(): logger.info(f"Loading documents from {json_path}...") with open(json_path) as f: docs_data = json.load(f) documents = [] for doc_data in docs_data: doc = HaystackDoc( id=doc_data.get("id"), content=doc_data["content"], embedding=doc_data.get("embedding"), meta=doc_data.get("meta", {}) ) documents.append(doc) indexer.document_store.write_documents(documents) logger.info(f"Loaded {len(documents)} documents\n") else: logger.error(f"Document store not found at {json_path}") logger.error("Please run the ingestion script first") return # ======================================================================== # TEST 1: SEQUENTIAL MODE # ======================================================================== print("\n" + "-"*80) print("TEST 1: SEQUENTIAL MODE (baseline)") print("-"*80 + "\n") reset_config() config_seq = get_config() config_seq.use_parallel_processing = False # Disable parallelism orchestrator_seq = MemoryRAGOrchestrator(config_seq, indexer) start = time.time() result_seq = await orchestrator_seq.process_query(test_query) sequential_time = time.time() - start print(f"\nā±ļø Sequential Time: {sequential_time:.2f}s") print(f"šŸ“§ Subject: {result_seq.email_draft.subject}") print(f"āœ“ Accuracy: {result_seq.fact_check.accuracy_score:.0%}") # ======================================================================== # TEST 2: PARALLEL MODE # ======================================================================== print("\n" + "-"*80) print("TEST 2: PARALLEL MODE (optimized)") print("-"*80 + "\n") reset_config() config_par = get_config() config_par.use_parallel_processing = True # Enable parallelism orchestrator_par = MemoryRAGOrchestrator(config_par, indexer) start = time.time() result_par = await orchestrator_par.process_query(test_query) parallel_time = time.time() - start print(f"\nā±ļø Parallel Time: {parallel_time:.2f}s") print(f"šŸ“§ Subject: {result_par.email_draft.subject}") print(f"āœ“ Accuracy: {result_par.fact_check.accuracy_score:.0%}") # ======================================================================== # RESULTS COMPARISON # ======================================================================== print("\n" + "="*80) print("RESULTS SUMMARY") print("="*80 + "\n") speedup = (sequential_time / parallel_time - 1) * 100 time_saved = sequential_time - parallel_time print(f"Sequential: {sequential_time:.2f}s 🐌") print(f"Parallel: {parallel_time:.2f}s ⚔") print(f"\n{'āœ… IMPROVEMENT' if speedup > 0 else 'āŒ SLOWER'}") print(f"Time saved: {time_saved:.2f}s") print(f"Speedup: {speedup:+.1f}%") # Verify outputs are identical print(f"\nšŸ“‹ Output Verification:") subject_match = result_seq.email_draft.subject == result_par.email_draft.subject body_match = result_seq.email_draft.body == result_par.email_draft.body print(f" Subject matches: {'āœ“' if subject_match else 'āœ—'}") print(f" Body matches: {'āœ“' if body_match else 'āœ—'}") if subject_match and body_match: print(f"\nāœ… PASS: Parallel execution produces identical results") else: print(f"\nāš ļø WARNING: Outputs differ (may be due to non-deterministic LLM)") print("\n" + "="*80 + "\n") return { "sequential_time": sequential_time, "parallel_time": parallel_time, "speedup_percent": speedup, "time_saved": time_saved, } async def test_timeout_handling(): """Test that timeouts work correctly.""" print("\n" + "="*80) print("TIMEOUT HANDLING TEST") print("="*80 + "\n") reset_config() config = get_config() config.agent_timeout = 1 # Very short timeout (will likely fail) indexer = MemoryDocumentIndexer(llm_config=config.llm) orchestrator = MemoryRAGOrchestrator(config, indexer) try: await orchestrator.process_query("Test query") print("āŒ FAIL: Should have timed out") except (TimeoutError, asyncio.TimeoutError): print("āœ… PASS: Timeout handled correctly") except Exception as e: print(f"āš ļø Unexpected error: {e}") if __name__ == "__main__": # Run performance comparison results = asyncio.run(test_parallel_vs_sequential()) # Optional: Run timeout test # asyncio.run(test_timeout_handling())