#!/usr/bin/env python3 """ WayyDB Read/Write Performance Benchmarks """ import time import sys import numpy as np # Add local build to path sys.path.insert(0, "/home/rcgalbo/wayy-research/wf/wayyDB/build") import wayy_db as wdb def format_rate(rows: int, elapsed: float) -> str: rate = rows / elapsed if rate >= 1e6: return f"{rate/1e6:.2f}M rows/sec" elif rate >= 1e3: return f"{rate/1e3:.2f}K rows/sec" else: return f"{rate:.2f} rows/sec" def format_bytes(bytes_count: int, elapsed: float) -> str: rate = bytes_count / elapsed if rate >= 1e9: return f"{rate/1e9:.2f} GB/sec" elif rate >= 1e6: return f"{rate/1e6:.2f} MB/sec" else: return f"{rate/1e3:.2f} KB/sec" def bench_write(sizes: list[int]): """Benchmark table creation/write performance.""" print("\n=== WRITE PERFORMANCE ===\n") print(f"{'Rows':<12} {'Time (ms)':<12} {'Rate':<20} {'Throughput':<15}") print("-" * 60) for n in sizes: # Generate data timestamps = np.arange(n, dtype=np.int64) prices = np.random.uniform(100, 200, n).astype(np.float64) volumes = np.random.randint(100, 10000, n).astype(np.int64) symbols = np.random.randint(0, 100, n).astype(np.uint32) # Time table creation start = time.perf_counter() table = wdb.Table(f"bench_{n}") table.add_column_from_numpy("timestamp", timestamps, wdb.DType.Timestamp) table.add_column_from_numpy("price", prices, wdb.DType.Float64) table.add_column_from_numpy("volume", volumes, wdb.DType.Int64) table.add_column_from_numpy("symbol", symbols, wdb.DType.Symbol) table.set_sorted_by("timestamp") elapsed = time.perf_counter() - start # Calculate bytes (8+8+8+4 = 28 bytes per row) bytes_per_row = 28 total_bytes = n * bytes_per_row print(f"{n:<12,} {elapsed*1000:<12.2f} {format_rate(n, elapsed):<20} {format_bytes(total_bytes, elapsed):<15}") return table # Return last table for further tests def bench_read(table: wdb.Table): """Benchmark read/access performance.""" print("\n=== READ PERFORMANCE ===\n") n = table.num_rows # Column access start = time.perf_counter() for _ in range(100): col = table["price"] elapsed = time.perf_counter() - start print(f"Column lookup (100x): {elapsed*1000:.3f}ms ({elapsed*10:.3f}ms per lookup)") # Zero-copy numpy access start = time.perf_counter() for _ in range(100): arr = table["price"].to_numpy() elapsed = time.perf_counter() - start print(f"to_numpy() (100x): {elapsed*1000:.3f}ms ({elapsed*10:.3f}ms per call)") # Full table scan (sum all values) col = table["price"] start = time.perf_counter() for _ in range(10): total = wdb.ops.sum(col) elapsed = time.perf_counter() - start print(f"Full scan sum (10x): {elapsed*1000:.3f}ms ({elapsed*100:.3f}ms per scan)") print(f" -> {format_rate(n * 10, elapsed)}") def bench_aggregations(table: wdb.Table): """Benchmark aggregation operations.""" print("\n=== AGGREGATION PERFORMANCE ===\n") n = table.num_rows col = table["price"] ops = [ ("sum", wdb.ops.sum), ("avg", wdb.ops.avg), ("min", wdb.ops.min), ("max", wdb.ops.max), ("std", wdb.ops.std), ] print(f"{'Operation':<12} {'Time (ms)':<12} {'Rate':<20}") print("-" * 45) for name, func in ops: # Warm up func(col) # Benchmark start = time.perf_counter() for _ in range(100): result = func(col) elapsed = time.perf_counter() - start print(f"{name:<12} {elapsed*10:.3f} {format_rate(n * 100, elapsed)}") def bench_window_functions(table: wdb.Table): """Benchmark window functions.""" print("\n=== WINDOW FUNCTION PERFORMANCE ===\n") n = table.num_rows col = table["price"] ops = [ ("mavg(20)", lambda c: wdb.ops.mavg(c, 20)), ("msum(20)", lambda c: wdb.ops.msum(c, 20)), ("mstd(20)", lambda c: wdb.ops.mstd(c, 20)), ("ema(0.1)", lambda c: wdb.ops.ema(c, 0.1)), ("diff(1)", lambda c: wdb.ops.diff(c, 1)), ("pct_change", lambda c: wdb.ops.pct_change(c, 1)), ] print(f"{'Operation':<15} {'Time (ms)':<12} {'Rate':<20}") print("-" * 50) for name, func in ops: # Warm up func(col) # Benchmark start = time.perf_counter() for _ in range(10): result = func(col) elapsed = time.perf_counter() - start print(f"{name:<15} {elapsed*100:.3f} {format_rate(n * 10, elapsed)}") def bench_joins(): """Benchmark temporal join operations.""" print("\n=== JOIN PERFORMANCE ===\n") sizes = [(10_000, 10_000), (100_000, 100_000), (1_000_000, 1_000_000)] print(f"{'Left x Right':<20} {'aj (ms)':<12} {'Rate':<20}") print("-" * 55) for left_n, right_n in sizes: # Create left table (trades) left = wdb.Table("trades") left.add_column_from_numpy("timestamp", np.sort(np.random.randint(0, left_n * 10, left_n)).astype(np.int64), wdb.DType.Timestamp) left.add_column_from_numpy("symbol", np.random.randint(0, 10, left_n).astype(np.uint32), wdb.DType.Symbol) left.add_column_from_numpy("price", np.random.uniform(100, 200, left_n).astype(np.float64), wdb.DType.Float64) left.set_sorted_by("timestamp") # Create right table (quotes) right = wdb.Table("quotes") right.add_column_from_numpy("timestamp", np.sort(np.random.randint(0, right_n * 10, right_n)).astype(np.int64), wdb.DType.Timestamp) right.add_column_from_numpy("symbol", np.random.randint(0, 10, right_n).astype(np.uint32), wdb.DType.Symbol) right.add_column_from_numpy("bid", np.random.uniform(99, 199, right_n).astype(np.float64), wdb.DType.Float64) right.add_column_from_numpy("ask", np.random.uniform(101, 201, right_n).astype(np.float64), wdb.DType.Float64) right.set_sorted_by("timestamp") # Warm up if left_n <= 100_000: wdb.ops.aj(left, right, ["symbol"], "timestamp") # Benchmark as-of join start = time.perf_counter() result = wdb.ops.aj(left, right, ["symbol"], "timestamp") elapsed = time.perf_counter() - start size_str = f"{left_n//1000}K x {right_n//1000}K" print(f"{size_str:<20} {elapsed*1000:<12.2f} {format_rate(left_n, elapsed)}") def bench_persistence(n: int = 1_000_000): """Benchmark save/load/mmap performance.""" print("\n=== PERSISTENCE PERFORMANCE ===\n") import tempfile import os # Create table table = wdb.Table("persist_test") table.add_column_from_numpy("timestamp", np.arange(n, dtype=np.int64), wdb.DType.Timestamp) table.add_column_from_numpy("price", np.random.uniform(100, 200, n).astype(np.float64), wdb.DType.Float64) table.add_column_from_numpy("volume", np.random.randint(100, 10000, n).astype(np.int64), wdb.DType.Int64) table.set_sorted_by("timestamp") bytes_total = n * (8 + 8 + 8) # 24 bytes per row with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, "test_table") # Benchmark save start = time.perf_counter() table.save(path) save_elapsed = time.perf_counter() - start print(f"Save {n:,} rows: {save_elapsed*1000:.2f}ms ({format_bytes(bytes_total, save_elapsed)})") # Benchmark load (copies data) start = time.perf_counter() loaded = wdb.Table.load(path) load_elapsed = time.perf_counter() - start print(f"Load {n:,} rows: {load_elapsed*1000:.2f}ms ({format_bytes(bytes_total, load_elapsed)})") # Benchmark mmap (zero-copy) start = time.perf_counter() mmapped = wdb.Table.mmap(path) mmap_elapsed = time.perf_counter() - start print(f"Mmap {n:,} rows: {mmap_elapsed*1000:.2f}ms ({format_bytes(bytes_total, mmap_elapsed)})") print(f" -> mmap is {load_elapsed/mmap_elapsed:.0f}x faster than load") def bench_concurrent(): """Benchmark concurrent read performance.""" print("\n=== CONCURRENT READ PERFORMANCE ===\n") import threading n = 1_000_000 table = wdb.Table("concurrent_test") table.add_column_from_numpy("price", np.random.uniform(100, 200, n).astype(np.float64), wdb.DType.Float64) col = table["price"] def worker(results, idx): for _ in range(10): results[idx] = wdb.ops.sum(col) for num_threads in [1, 2, 4, 8]: results = [0.0] * num_threads threads = [threading.Thread(target=worker, args=(results, i)) for i in range(num_threads)] start = time.perf_counter() for t in threads: t.start() for t in threads: t.join() elapsed = time.perf_counter() - start ops_per_sec = (num_threads * 10) / elapsed print(f"{num_threads} threads: {elapsed*1000:.2f}ms ({ops_per_sec:.1f} ops/sec, {format_rate(n * num_threads * 10, elapsed)})") if __name__ == "__main__": print("=" * 60) print(" WayyDB Performance Benchmarks") print("=" * 60) # Write benchmarks with increasing sizes sizes = [10_000, 100_000, 1_000_000, 10_000_000] table = bench_write(sizes) # Use 1M row table for read tests table_1m = wdb.Table("bench_1m") n = 1_000_000 table_1m.add_column_from_numpy("timestamp", np.arange(n, dtype=np.int64), wdb.DType.Timestamp) table_1m.add_column_from_numpy("price", np.random.uniform(100, 200, n).astype(np.float64), wdb.DType.Float64) table_1m.add_column_from_numpy("volume", np.random.randint(100, 10000, n).astype(np.int64), wdb.DType.Int64) table_1m.add_column_from_numpy("symbol", np.random.randint(0, 100, n).astype(np.uint32), wdb.DType.Symbol) table_1m.set_sorted_by("timestamp") bench_read(table_1m) bench_aggregations(table_1m) bench_window_functions(table_1m) bench_joins() bench_persistence() bench_concurrent() print("\n" + "=" * 60) print(" Benchmarks Complete") print("=" * 60)