Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Test script to verify async parallel performance improvements.""" | |
| import asyncio | |
| import time | |
| import logging | |
| from src.config import get_config, reset_config | |
| from src.indexing.memory_indexer import MemoryDocumentIndexer | |
| from src.pipeline.memory_orchestrator import MemoryRAGOrchestrator | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| async def test_parallel_vs_sequential(): | |
| """Compare parallel vs sequential execution performance.""" | |
| print("\n" + "="*80) | |
| print("ASYNC PERFORMANCE TEST: Parallel vs Sequential") | |
| print("="*80 + "\n") | |
| # Test query | |
| test_query = "Wie kann ich mich exmatrikulieren?" | |
| # Initialize indexer (shared for both tests) | |
| logger.info("Initializing document indexer...") | |
| config = get_config() | |
| indexer = MemoryDocumentIndexer(llm_config=config.llm) | |
| # Load documents from JSON if available | |
| import json | |
| from pathlib import Path | |
| from haystack import Document as HaystackDoc | |
| json_path = Path("data/embedded_documents.json") | |
| if json_path.exists(): | |
| logger.info(f"Loading documents from {json_path}...") | |
| with open(json_path) as f: | |
| docs_data = json.load(f) | |
| documents = [] | |
| for doc_data in docs_data: | |
| doc = HaystackDoc( | |
| id=doc_data.get("id"), | |
| content=doc_data["content"], | |
| embedding=doc_data.get("embedding"), | |
| meta=doc_data.get("meta", {}) | |
| ) | |
| documents.append(doc) | |
| indexer.document_store.write_documents(documents) | |
| logger.info(f"Loaded {len(documents)} documents\n") | |
| else: | |
| logger.error(f"Document store not found at {json_path}") | |
| logger.error("Please run the ingestion script first") | |
| return | |
| # ======================================================================== | |
| # TEST 1: SEQUENTIAL MODE | |
| # ======================================================================== | |
| print("\n" + "-"*80) | |
| print("TEST 1: SEQUENTIAL MODE (baseline)") | |
| print("-"*80 + "\n") | |
| reset_config() | |
| config_seq = get_config() | |
| config_seq.use_parallel_processing = False # Disable parallelism | |
| orchestrator_seq = MemoryRAGOrchestrator(config_seq, indexer) | |
| start = time.time() | |
| result_seq = await orchestrator_seq.process_query(test_query) | |
| sequential_time = time.time() - start | |
| print(f"\nβ±οΈ Sequential Time: {sequential_time:.2f}s") | |
| print(f"π§ Subject: {result_seq.email_draft.subject}") | |
| print(f"β Accuracy: {result_seq.fact_check.accuracy_score:.0%}") | |
| # ======================================================================== | |
| # TEST 2: PARALLEL MODE | |
| # ======================================================================== | |
| print("\n" + "-"*80) | |
| print("TEST 2: PARALLEL MODE (optimized)") | |
| print("-"*80 + "\n") | |
| reset_config() | |
| config_par = get_config() | |
| config_par.use_parallel_processing = True # Enable parallelism | |
| orchestrator_par = MemoryRAGOrchestrator(config_par, indexer) | |
| start = time.time() | |
| result_par = await orchestrator_par.process_query(test_query) | |
| parallel_time = time.time() - start | |
| print(f"\nβ±οΈ Parallel Time: {parallel_time:.2f}s") | |
| print(f"π§ Subject: {result_par.email_draft.subject}") | |
| print(f"β Accuracy: {result_par.fact_check.accuracy_score:.0%}") | |
| # ======================================================================== | |
| # RESULTS COMPARISON | |
| # ======================================================================== | |
| print("\n" + "="*80) | |
| print("RESULTS SUMMARY") | |
| print("="*80 + "\n") | |
| speedup = (sequential_time / parallel_time - 1) * 100 | |
| time_saved = sequential_time - parallel_time | |
| print(f"Sequential: {sequential_time:.2f}s π") | |
| print(f"Parallel: {parallel_time:.2f}s β‘") | |
| print(f"\n{'β IMPROVEMENT' if speedup > 0 else 'β SLOWER'}") | |
| print(f"Time saved: {time_saved:.2f}s") | |
| print(f"Speedup: {speedup:+.1f}%") | |
| # Verify outputs are identical | |
| print(f"\nπ Output Verification:") | |
| subject_match = result_seq.email_draft.subject == result_par.email_draft.subject | |
| body_match = result_seq.email_draft.body == result_par.email_draft.body | |
| print(f" Subject matches: {'β' if subject_match else 'β'}") | |
| print(f" Body matches: {'β' if body_match else 'β'}") | |
| if subject_match and body_match: | |
| print(f"\nβ PASS: Parallel execution produces identical results") | |
| else: | |
| print(f"\nβ οΈ WARNING: Outputs differ (may be due to non-deterministic LLM)") | |
| print("\n" + "="*80 + "\n") | |
| return { | |
| "sequential_time": sequential_time, | |
| "parallel_time": parallel_time, | |
| "speedup_percent": speedup, | |
| "time_saved": time_saved, | |
| } | |
| async def test_timeout_handling(): | |
| """Test that timeouts work correctly.""" | |
| print("\n" + "="*80) | |
| print("TIMEOUT HANDLING TEST") | |
| print("="*80 + "\n") | |
| reset_config() | |
| config = get_config() | |
| config.agent_timeout = 1 # Very short timeout (will likely fail) | |
| indexer = MemoryDocumentIndexer(llm_config=config.llm) | |
| orchestrator = MemoryRAGOrchestrator(config, indexer) | |
| try: | |
| await orchestrator.process_query("Test query") | |
| print("β FAIL: Should have timed out") | |
| except (TimeoutError, asyncio.TimeoutError): | |
| print("β PASS: Timeout handled correctly") | |
| except Exception as e: | |
| print(f"β οΈ Unexpected error: {e}") | |
| if __name__ == "__main__": | |
| # Run performance comparison | |
| results = asyncio.run(test_parallel_vs_sequential()) | |
| # Optional: Run timeout test | |
| # asyncio.run(test_timeout_handling()) | |