""" ArcisVLM Concurrent Camera Stream Simulator Simulates N cameras each sending async inference queries to the ArcisVLM API and reports throughput / latency statistics in real time. Usage: python scale_test.py --api-url http://localhost:8000 --num-cameras 100 --duration-secs 60 """ import argparse import asyncio import json import random import sys import time from dataclasses import dataclass, field from typing import List try: import aiohttp except ImportError: print("aiohttp is required. Install with: pip install aiohttp") sys.exit(1) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- QUERY_INTERVAL_MIN = 2.0 # seconds between camera queries (min) QUERY_INTERVAL_MAX = 5.0 # seconds between camera queries (max) DASHBOARD_INTERVAL = 5.0 # seconds between live dashboard prints REQUEST_TIMEOUT = 30.0 # per-request timeout in seconds # Sample payloads — camera sends a still frame description query SAMPLE_QUERIES = [ {"prompt": "Describe any unusual activity in the scene.", "stream": False}, {"prompt": "Detect persons and report count.", "stream": False}, {"prompt": "Is there a vehicle in the frame?", "stream": False}, {"prompt": "Report any detected anomalies.", "stream": False}, {"prompt": "Describe objects in the foreground.", "stream": False}, ] # --------------------------------------------------------------------------- # Stats tracker (shared across all camera coroutines) # --------------------------------------------------------------------------- @dataclass class Stats: start_time: float = field(default_factory=time.monotonic) total_queries: int = 0 success_count: int = 0 fail_count: int = 0 latencies_ms: List[float] = field(default_factory=list) _lock: asyncio.Lock = field(default_factory=asyncio.Lock) async def record(self, success: bool, latency_ms: float): async with self._lock: self.total_queries += 1 if success: self.success_count += 1 self.latencies_ms.append(latency_ms) else: self.fail_count += 1 def elapsed(self) -> float: return time.monotonic() - self.start_time def avg_latency(self) -> float: if not self.latencies_ms: return 0.0 return sum(self.latencies_ms) / len(self.latencies_ms) def percentile(self, p: float) -> float: if not self.latencies_ms: return 0.0 sorted_l = sorted(self.latencies_ms) idx = max(0, int(len(sorted_l) * p / 100) - 1) return sorted_l[idx] def queries_per_sec(self) -> float: elapsed = self.elapsed() if elapsed <= 0: return 0.0 return self.total_queries / elapsed # --------------------------------------------------------------------------- # Single camera coroutine # --------------------------------------------------------------------------- async def camera_worker( camera_id: int, api_url: str, duration_secs: float, stats: Stats, session: aiohttp.ClientSession, ): """Simulate one camera: sends a query every 2-5 seconds until time expires.""" endpoint = api_url.rstrip("/") + "/v1/chat" deadline = stats.start_time + duration_secs # Stagger start so cameras don't all fire at t=0 await asyncio.sleep(random.uniform(0, QUERY_INTERVAL_MAX)) while time.monotonic() < deadline: payload = random.choice(SAMPLE_QUERIES).copy() payload["camera_id"] = camera_id t0 = time.monotonic() success = False latency_ms = 0.0 try: async with session.post( endpoint, json=payload, timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), ) as resp: _ = await resp.text() latency_ms = (time.monotonic() - t0) * 1000 success = resp.status < 400 except asyncio.TimeoutError: latency_ms = REQUEST_TIMEOUT * 1000 except aiohttp.ClientError: latency_ms = (time.monotonic() - t0) * 1000 await stats.record(success, latency_ms) # Wait a random interval before the next query interval = random.uniform(QUERY_INTERVAL_MIN, QUERY_INTERVAL_MAX) remaining = deadline - time.monotonic() if remaining <= 0: break await asyncio.sleep(min(interval, remaining)) # --------------------------------------------------------------------------- # Live dashboard # --------------------------------------------------------------------------- def render_dashboard(stats: Stats, num_cameras: int, duration_secs: float): elapsed = stats.elapsed() remaining = max(0.0, duration_secs - elapsed) avg = stats.avg_latency() p95 = stats.percentile(95) qps = stats.queries_per_sec() success_rate = ( 100.0 * stats.success_count / stats.total_queries if stats.total_queries > 0 else 0.0 ) bar_width = 30 progress = min(1.0, elapsed / duration_secs) filled = int(bar_width * progress) bar = "#" * filled + "-" * (bar_width - filled) lines = [ "", f" ArcisVLM Scale Test [{bar}] {elapsed:5.1f}s / {duration_secs}s ({remaining:.1f}s left)", f" Cameras: {num_cameras}", f" Queries : total={stats.total_queries} ok={stats.success_count} fail={stats.fail_count} success={success_rate:.1f}%", f" Latency : avg={avg:.1f}ms p95={p95:.1f}ms", f" QPS : {qps:.2f} queries/sec", "", ] print("\n".join(lines), flush=True) async def dashboard_task(stats: Stats, num_cameras: int, duration_secs: float, stop_event: asyncio.Event): while not stop_event.is_set(): render_dashboard(stats, num_cameras, duration_secs) try: await asyncio.wait_for(stop_event.wait(), timeout=DASHBOARD_INTERVAL) except asyncio.TimeoutError: pass # --------------------------------------------------------------------------- # Main runner # --------------------------------------------------------------------------- async def run_test(api_url: str, num_cameras: int, duration_secs: float): stats = Stats() stop_event = asyncio.Event() connector = aiohttp.TCPConnector(limit=num_cameras + 10, ttl_dns_cache=300) timeout = aiohttp.ClientTimeout(total=REQUEST_TIMEOUT + 5) print(f"\nStarting ArcisVLM scale test:") print(f" API URL : {api_url}") print(f" Cameras : {num_cameras}") print(f" Duration : {duration_secs}s") print(f" Query interval: {QUERY_INTERVAL_MIN}-{QUERY_INTERVAL_MAX}s per camera") print() async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: camera_tasks = [ asyncio.create_task( camera_worker(i, api_url, duration_secs, stats, session) ) for i in range(num_cameras) ] dash_task = asyncio.create_task( dashboard_task(stats, num_cameras, duration_secs, stop_event) ) # Wait for all cameras to finish await asyncio.gather(*camera_tasks) # Signal dashboard to stop, let it print a final update stop_event.set() await dash_task return stats def print_final_summary(stats: Stats, num_cameras: int, duration_secs: float): elapsed = stats.elapsed() avg = stats.avg_latency() p50 = stats.percentile(50) p95 = stats.percentile(95) p99 = stats.percentile(99) qps = stats.queries_per_sec() success_rate = ( 100.0 * stats.success_count / stats.total_queries if stats.total_queries > 0 else 0.0 ) sep = "=" * 60 print(sep) print(" FINAL SUMMARY") print(sep) print(f" API target : {num_cameras} simulated cameras") print(f" Test duration : {elapsed:.1f}s (requested {duration_secs}s)") print() print(f" Total queries : {stats.total_queries}") print(f" Successful : {stats.success_count} ({success_rate:.1f}%)") print(f" Failed : {stats.fail_count}") print(f" Queries/sec : {qps:.2f}") print() print(f" Latency (successful requests only):") print(f" Mean : {avg:.1f} ms") print(f" P50 : {p50:.1f} ms") print(f" P95 : {p95:.1f} ms") print(f" P99 : {p99:.1f} ms") print(sep) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args(): p = argparse.ArgumentParser( description="Concurrent camera stream simulator for ArcisVLM API load testing." ) p.add_argument( "--api-url", default="http://localhost:8000", help="Base URL of the ArcisVLM API (default: http://localhost:8000).", ) p.add_argument( "--num-cameras", type=int, default=100, help="Number of concurrent simulated cameras (default: 100).", ) p.add_argument( "--duration-secs", type=int, default=60, help="Total test duration in seconds (default: 60).", ) return p.parse_args() def main(): args = parse_args() stats = asyncio.run( run_test( api_url=args.api_url, num_cameras=args.num_cameras, duration_secs=args.duration_secs, ) ) print_final_summary(stats, args.num_cameras, args.duration_secs) if __name__ == "__main__": main()