arcisvlm / scripts /scale_test.py
Hardik Sanghvi
feat: add Gradio demo, ONNX export, scale test + Stages 4-7 complete
6ea918e
Raw
History Blame Contribute Delete
9.78 kB
"""
ArcisVLM Concurrent Camera Stream Simulator
Simulates N cameras each sending async inference queries to the ArcisVLM API
and reports throughput / latency statistics in real time.
Usage:
python scale_test.py --api-url http://localhost:8000 --num-cameras 100 --duration-secs 60
"""
import argparse
import asyncio
import json
import random
import sys
import time
from dataclasses import dataclass, field
from typing import List
try:
import aiohttp
except ImportError:
print("aiohttp is required. Install with: pip install aiohttp")
sys.exit(1)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
QUERY_INTERVAL_MIN = 2.0 # seconds between camera queries (min)
QUERY_INTERVAL_MAX = 5.0 # seconds between camera queries (max)
DASHBOARD_INTERVAL = 5.0 # seconds between live dashboard prints
REQUEST_TIMEOUT = 30.0 # per-request timeout in seconds
# Sample payloads — camera sends a still frame description query
SAMPLE_QUERIES = [
{"prompt": "Describe any unusual activity in the scene.", "stream": False},
{"prompt": "Detect persons and report count.", "stream": False},
{"prompt": "Is there a vehicle in the frame?", "stream": False},
{"prompt": "Report any detected anomalies.", "stream": False},
{"prompt": "Describe objects in the foreground.", "stream": False},
]
# ---------------------------------------------------------------------------
# Stats tracker (shared across all camera coroutines)
# ---------------------------------------------------------------------------
@dataclass
class Stats:
start_time: float = field(default_factory=time.monotonic)
total_queries: int = 0
success_count: int = 0
fail_count: int = 0
latencies_ms: List[float] = field(default_factory=list)
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
async def record(self, success: bool, latency_ms: float):
async with self._lock:
self.total_queries += 1
if success:
self.success_count += 1
self.latencies_ms.append(latency_ms)
else:
self.fail_count += 1
def elapsed(self) -> float:
return time.monotonic() - self.start_time
def avg_latency(self) -> float:
if not self.latencies_ms:
return 0.0
return sum(self.latencies_ms) / len(self.latencies_ms)
def percentile(self, p: float) -> float:
if not self.latencies_ms:
return 0.0
sorted_l = sorted(self.latencies_ms)
idx = max(0, int(len(sorted_l) * p / 100) - 1)
return sorted_l[idx]
def queries_per_sec(self) -> float:
elapsed = self.elapsed()
if elapsed <= 0:
return 0.0
return self.total_queries / elapsed
# ---------------------------------------------------------------------------
# Single camera coroutine
# ---------------------------------------------------------------------------
async def camera_worker(
camera_id: int,
api_url: str,
duration_secs: float,
stats: Stats,
session: aiohttp.ClientSession,
):
"""Simulate one camera: sends a query every 2-5 seconds until time expires."""
endpoint = api_url.rstrip("/") + "/v1/chat"
deadline = stats.start_time + duration_secs
# Stagger start so cameras don't all fire at t=0
await asyncio.sleep(random.uniform(0, QUERY_INTERVAL_MAX))
while time.monotonic() < deadline:
payload = random.choice(SAMPLE_QUERIES).copy()
payload["camera_id"] = camera_id
t0 = time.monotonic()
success = False
latency_ms = 0.0
try:
async with session.post(
endpoint,
json=payload,
timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
) as resp:
_ = await resp.text()
latency_ms = (time.monotonic() - t0) * 1000
success = resp.status < 400
except asyncio.TimeoutError:
latency_ms = REQUEST_TIMEOUT * 1000
except aiohttp.ClientError:
latency_ms = (time.monotonic() - t0) * 1000
await stats.record(success, latency_ms)
# Wait a random interval before the next query
interval = random.uniform(QUERY_INTERVAL_MIN, QUERY_INTERVAL_MAX)
remaining = deadline - time.monotonic()
if remaining <= 0:
break
await asyncio.sleep(min(interval, remaining))
# ---------------------------------------------------------------------------
# Live dashboard
# ---------------------------------------------------------------------------
def render_dashboard(stats: Stats, num_cameras: int, duration_secs: float):
elapsed = stats.elapsed()
remaining = max(0.0, duration_secs - elapsed)
avg = stats.avg_latency()
p95 = stats.percentile(95)
qps = stats.queries_per_sec()
success_rate = (
100.0 * stats.success_count / stats.total_queries
if stats.total_queries > 0 else 0.0
)
bar_width = 30
progress = min(1.0, elapsed / duration_secs)
filled = int(bar_width * progress)
bar = "#" * filled + "-" * (bar_width - filled)
lines = [
"",
f" ArcisVLM Scale Test [{bar}] {elapsed:5.1f}s / {duration_secs}s ({remaining:.1f}s left)",
f" Cameras: {num_cameras}",
f" Queries : total={stats.total_queries} ok={stats.success_count} fail={stats.fail_count} success={success_rate:.1f}%",
f" Latency : avg={avg:.1f}ms p95={p95:.1f}ms",
f" QPS : {qps:.2f} queries/sec",
"",
]
print("\n".join(lines), flush=True)
async def dashboard_task(stats: Stats, num_cameras: int, duration_secs: float, stop_event: asyncio.Event):
while not stop_event.is_set():
render_dashboard(stats, num_cameras, duration_secs)
try:
await asyncio.wait_for(stop_event.wait(), timeout=DASHBOARD_INTERVAL)
except asyncio.TimeoutError:
pass
# ---------------------------------------------------------------------------
# Main runner
# ---------------------------------------------------------------------------
async def run_test(api_url: str, num_cameras: int, duration_secs: float):
stats = Stats()
stop_event = asyncio.Event()
connector = aiohttp.TCPConnector(limit=num_cameras + 10, ttl_dns_cache=300)
timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT + 5)
print(f"\nStarting ArcisVLM scale test:")
print(f" API URL : {api_url}")
print(f" Cameras : {num_cameras}")
print(f" Duration : {duration_secs}s")
print(f" Query interval: {QUERY_INTERVAL_MIN}-{QUERY_INTERVAL_MAX}s per camera")
print()
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
camera_tasks = [
asyncio.create_task(
camera_worker(i, api_url, duration_secs, stats, session)
)
for i in range(num_cameras)
]
dash_task = asyncio.create_task(
dashboard_task(stats, num_cameras, duration_secs, stop_event)
)
# Wait for all cameras to finish
await asyncio.gather(*camera_tasks)
# Signal dashboard to stop, let it print a final update
stop_event.set()
await dash_task
return stats
def print_final_summary(stats: Stats, num_cameras: int, duration_secs: float):
elapsed = stats.elapsed()
avg = stats.avg_latency()
p50 = stats.percentile(50)
p95 = stats.percentile(95)
p99 = stats.percentile(99)
qps = stats.queries_per_sec()
success_rate = (
100.0 * stats.success_count / stats.total_queries
if stats.total_queries > 0 else 0.0
)
sep = "=" * 60
print(sep)
print(" FINAL SUMMARY")
print(sep)
print(f" API target : {num_cameras} simulated cameras")
print(f" Test duration : {elapsed:.1f}s (requested {duration_secs}s)")
print()
print(f" Total queries : {stats.total_queries}")
print(f" Successful : {stats.success_count} ({success_rate:.1f}%)")
print(f" Failed : {stats.fail_count}")
print(f" Queries/sec : {qps:.2f}")
print()
print(f" Latency (successful requests only):")
print(f" Mean : {avg:.1f} ms")
print(f" P50 : {p50:.1f} ms")
print(f" P95 : {p95:.1f} ms")
print(f" P99 : {p99:.1f} ms")
print(sep)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args():
p = argparse.ArgumentParser(
description="Concurrent camera stream simulator for ArcisVLM API load testing."
)
p.add_argument(
"--api-url",
default="http://localhost:8000",
help="Base URL of the ArcisVLM API (default: http://localhost:8000).",
)
p.add_argument(
"--num-cameras",
type=int,
default=100,
help="Number of concurrent simulated cameras (default: 100).",
)
p.add_argument(
"--duration-secs",
type=int,
default=60,
help="Total test duration in seconds (default: 60).",
)
return p.parse_args()
def main():
args = parse_args()
stats = asyncio.run(
run_test(
api_url=args.api_url,
num_cameras=args.num_cameras,
duration_secs=args.duration_secs,
)
)
print_final_summary(stats, args.num_cameras, args.duration_secs)
if __name__ == "__main__":
main()