Spaces:
Sleeping
Sleeping
File size: 5,435 Bytes
412236b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | """
Test async vs sync query performance
Demonstrates speed improvements from async endpoints and FAISS IVF optimization
"""
import asyncio
import time
from src.core.dual_rag_pipeline import DualStoreRAGPipeline
def test_sync_queries():
"""Test synchronous query performance"""
print("\n" + "="*60)
print("π SYNC QUERY PERFORMANCE TEST")
print("="*60)
pipeline = DualStoreRAGPipeline()
pipeline.load_vector_stores() # Load saved vector stores
test_queries = [
"How do I track my order?",
"I want to cancel my subscription",
"My payment was declined",
"How do I reset my password?",
"Where is my refund?"
]
start = time.time()
for i, query in enumerate(test_queries, 1):
q_start = time.time()
result = pipeline.query(query, top_k=3)
q_time = (time.time() - q_start) * 1000
print(f"\n{i}. Query: {query}")
print(f" Source: {result['source']}")
print(f" Confidence: {result['confidence']:.1%}")
print(f" Latency: {q_time:.0f}ms")
total_time = (time.time() - start) * 1000
avg_time = total_time / len(test_queries)
print(f"\n{'β'*60}")
print(f"Total Time: {total_time:.0f}ms")
print(f"Average Latency: {avg_time:.0f}ms")
print(f"Queries/sec: {1000/avg_time:.2f}")
return avg_time
async def test_async_queries():
"""Test asynchronous query performance"""
print("\n" + "="*60)
print("β‘ ASYNC QUERY PERFORMANCE TEST")
print("="*60)
pipeline = DualStoreRAGPipeline()
pipeline.load_vector_stores() # Load saved vector stores
test_queries = [
"How do I track my order?",
"I want to cancel my subscription",
"My payment was declined",
"How do I reset my password?",
"Where is my refund?"
]
start = time.time()
# Sequential async queries
for i, query in enumerate(test_queries, 1):
q_start = time.time()
result = await pipeline.aquery(query, top_k=3)
q_time = (time.time() - q_start) * 1000
print(f"\n{i}. Query: {query}")
print(f" Source: {result['source']}")
print(f" Confidence: {result['confidence']:.1%}")
print(f" Latency: {q_time:.0f}ms")
total_time = (time.time() - start) * 1000
avg_time = total_time / len(test_queries)
print(f"\n{'β'*60}")
print(f"Total Time: {total_time:.0f}ms")
print(f"Average Latency: {avg_time:.0f}ms")
print(f"Queries/sec: {1000/avg_time:.2f}")
return avg_time
async def test_parallel_async_queries():
"""Test parallel async query performance"""
print("\n" + "="*60)
print("π PARALLEL ASYNC QUERY PERFORMANCE TEST")
print("="*60)
pipeline = DualStoreRAGPipeline()
pipeline.load_vector_stores() # Load saved vector stores
test_queries = [
"How do I track my order?",
"I want to cancel my subscription",
"My payment was declined",
"How do I reset my password?",
"Where is my refund?"
]
start = time.time()
# Run all queries in parallel
tasks = [pipeline.aquery(query, top_k=3) for query in test_queries]
results = await asyncio.gather(*tasks)
total_time = (time.time() - start) * 1000
for i, (query, result) in enumerate(zip(test_queries, results), 1):
print(f"\n{i}. Query: {query}")
print(f" Source: {result['source']}")
print(f" Confidence: {result['confidence']:.1%}")
print(f" Latency: {result['latency_ms']:.0f}ms")
avg_time = total_time / len(test_queries)
print(f"\n{'β'*60}")
print(f"Total Time: {total_time:.0f}ms (All 5 queries in parallel!)")
print(f"Average Latency per Query: {avg_time:.0f}ms")
print(f"Effective Throughput: {1000/avg_time:.2f} queries/sec")
return avg_time, total_time
def main():
"""Run all performance tests"""
print("\n" + "π― "+"="*58)
print(" RAG PERFORMANCE BENCHMARK")
print(" Dual Vector Stores: 10,580 FAQs + 5,000 Tickets")
print(" FAISS Optimization: IVF with 205/141 clusters")
print(" "+"="*58)
# Test 1: Sync queries
sync_avg = test_sync_queries()
# Test 2: Async queries (sequential)
async_avg = asyncio.run(test_async_queries())
# Test 3: Async queries (parallel)
parallel_avg, parallel_total = asyncio.run(test_parallel_async_queries())
# Summary
print("\n" + "π "+"="*58)
print(" PERFORMANCE SUMMARY")
print(" "+"="*58)
print(f"\n Sync Query (baseline): {sync_avg:.0f}ms avg")
print(f" Async Query (sequential): {async_avg:.0f}ms avg")
print(f" Async Query (parallel): {parallel_total:.0f}ms total for 5 queries")
print(f" ({parallel_avg:.0f}ms per query)")
sync_improvement = ((sync_avg - async_avg) / sync_avg) * 100
parallel_speedup = (sync_avg * 5) / parallel_total
print(f"\n π Improvements:")
print(f" β’ Async vs Sync: {sync_improvement:.1f}% faster")
print(f" β’ Parallel Speedup: {parallel_speedup:.1f}x")
print(f" β’ Target Latency (<300ms): {'β
ACHIEVED' if async_avg < 300 else 'β NOT YET'}")
print("\n " + "="*58 + "\n")
if __name__ == "__main__":
main()
|