SupportRAG / tests /test_async_performance.py
Sakshamyadav15's picture
Revision
e7dfc31
"""
Test async vs sync query performance
Demonstrates speed improvements from async endpoints and FAISS IVF optimization
"""
import asyncio
import time
from src.core.dual_rag_pipeline import DualStoreRAGPipeline
def test_sync_queries():
"""Test synchronous query performance"""
print("\n" + "="*60)
print("πŸ”„ SYNC QUERY PERFORMANCE TEST")
print("="*60)
pipeline = DualStoreRAGPipeline()
pipeline.load_vector_stores() # Load saved vector stores
test_queries = [
"How do I track my order?",
"I want to cancel my subscription",
"My payment was declined",
"How do I reset my password?",
"Where is my refund?"
]
start = time.time()
for i, query in enumerate(test_queries, 1):
q_start = time.time()
result = pipeline.query(query, top_k=3)
q_time = (time.time() - q_start) * 1000
print(f"\n{i}. Query: {query}")
print(f" Source: {result['source']}")
print(f" Confidence: {result['confidence']:.1%}")
print(f" Latency: {q_time:.0f}ms")
total_time = (time.time() - start) * 1000
avg_time = total_time / len(test_queries)
print(f"\n{'─'*60}")
print(f"Total Time: {total_time:.0f}ms")
print(f"Average Latency: {avg_time:.0f}ms")
print(f"Queries/sec: {1000/avg_time:.2f}")
return avg_time
async def test_async_queries():
"""Test asynchronous query performance"""
print("\n" + "="*60)
print("⚑ ASYNC QUERY PERFORMANCE TEST")
print("="*60)
pipeline = DualStoreRAGPipeline()
pipeline.load_vector_stores() # Load saved vector stores
test_queries = [
"How do I track my order?",
"I want to cancel my subscription",
"My payment was declined",
"How do I reset my password?",
"Where is my refund?"
]
start = time.time()
# Sequential async queries
for i, query in enumerate(test_queries, 1):
q_start = time.time()
result = await pipeline.aquery(query, top_k=3)
q_time = (time.time() - q_start) * 1000
print(f"\n{i}. Query: {query}")
print(f" Source: {result['source']}")
print(f" Confidence: {result['confidence']:.1%}")
print(f" Latency: {q_time:.0f}ms")
total_time = (time.time() - start) * 1000
avg_time = total_time / len(test_queries)
print(f"\n{'─'*60}")
print(f"Total Time: {total_time:.0f}ms")
print(f"Average Latency: {avg_time:.0f}ms")
print(f"Queries/sec: {1000/avg_time:.2f}")
return avg_time
async def test_parallel_async_queries():
"""Test parallel async query performance"""
print("\n" + "="*60)
print("πŸš€ PARALLEL ASYNC QUERY PERFORMANCE TEST")
print("="*60)
pipeline = DualStoreRAGPipeline()
pipeline.load_vector_stores() # Load saved vector stores
test_queries = [
"How do I track my order?",
"I want to cancel my subscription",
"My payment was declined",
"How do I reset my password?",
"Where is my refund?"
]
start = time.time()
# Run all queries in parallel
tasks = [pipeline.aquery(query, top_k=3) for query in test_queries]
results = await asyncio.gather(*tasks)
total_time = (time.time() - start) * 1000
for i, (query, result) in enumerate(zip(test_queries, results), 1):
print(f"\n{i}. Query: {query}")
print(f" Source: {result['source']}")
print(f" Confidence: {result['confidence']:.1%}")
print(f" Latency: {result['latency_ms']:.0f}ms")
avg_time = total_time / len(test_queries)
print(f"\n{'─'*60}")
print(f"Total Time: {total_time:.0f}ms (All 5 queries in parallel!)")
print(f"Average Latency per Query: {avg_time:.0f}ms")
print(f"Effective Throughput: {1000/avg_time:.2f} queries/sec")
return avg_time, total_time
def main():
"""Run all performance tests"""
print("\n" + "🎯 "+"="*58)
print(" RAG PERFORMANCE BENCHMARK")
print(" Dual Vector Stores: 10,580 FAQs + 5,000 Tickets")
print(" FAISS Optimization: IVF with 205/141 clusters")
print(" "+"="*58)
# Test 1: Sync queries
sync_avg = test_sync_queries()
# Test 2: Async queries (sequential)
async_avg = asyncio.run(test_async_queries())
# Test 3: Async queries (parallel)
parallel_avg, parallel_total = asyncio.run(test_parallel_async_queries())
# Summary
print("\n" + "πŸ† "+"="*58)
print(" PERFORMANCE SUMMARY")
print(" "+"="*58)
print(f"\n Sync Query (baseline): {sync_avg:.0f}ms avg")
print(f" Async Query (sequential): {async_avg:.0f}ms avg")
print(f" Async Query (parallel): {parallel_total:.0f}ms total for 5 queries")
print(f" ({parallel_avg:.0f}ms per query)")
sync_improvement = ((sync_avg - async_avg) / sync_avg) * 100
parallel_speedup = (sync_avg * 5) / parallel_total
print(f"\n πŸ“Š Improvements:")
print(f" β€’ Async vs Sync: {sync_improvement:.1f}% faster")
print(f" β€’ Parallel Speedup: {parallel_speedup:.1f}x")
print(f" β€’ Target Latency (<300ms): {'βœ… ACHIEVED' if async_avg < 300 else '❌ NOT YET'}")
print("\n " + "="*58 + "\n")
if __name__ == "__main__":
main()