wayydb-api / benchmarks /bench_api.py
rcgalbo's picture
Fix: add missing #include <cstdint> for uint32_t/uint8_t
2923639
#!/usr/bin/env python3
"""
WayyDB Read/Write Performance Benchmarks
"""
import time
import sys
import numpy as np
# Add local build to path
sys.path.insert(0, "/home/rcgalbo/wayy-research/wf/wayyDB/build")
import wayy_db as wdb
def format_rate(rows: int, elapsed: float) -> str:
rate = rows / elapsed
if rate >= 1e6:
return f"{rate/1e6:.2f}M rows/sec"
elif rate >= 1e3:
return f"{rate/1e3:.2f}K rows/sec"
else:
return f"{rate:.2f} rows/sec"
def format_bytes(bytes_count: int, elapsed: float) -> str:
rate = bytes_count / elapsed
if rate >= 1e9:
return f"{rate/1e9:.2f} GB/sec"
elif rate >= 1e6:
return f"{rate/1e6:.2f} MB/sec"
else:
return f"{rate/1e3:.2f} KB/sec"
def bench_write(sizes: list[int]):
"""Benchmark table creation/write performance."""
print("\n=== WRITE PERFORMANCE ===\n")
print(f"{'Rows':<12} {'Time (ms)':<12} {'Rate':<20} {'Throughput':<15}")
print("-" * 60)
for n in sizes:
# Generate data
timestamps = np.arange(n, dtype=np.int64)
prices = np.random.uniform(100, 200, n).astype(np.float64)
volumes = np.random.randint(100, 10000, n).astype(np.int64)
symbols = np.random.randint(0, 100, n).astype(np.uint32)
# Time table creation
start = time.perf_counter()
table = wdb.Table(f"bench_{n}")
table.add_column_from_numpy("timestamp", timestamps, wdb.DType.Timestamp)
table.add_column_from_numpy("price", prices, wdb.DType.Float64)
table.add_column_from_numpy("volume", volumes, wdb.DType.Int64)
table.add_column_from_numpy("symbol", symbols, wdb.DType.Symbol)
table.set_sorted_by("timestamp")
elapsed = time.perf_counter() - start
# Calculate bytes (8+8+8+4 = 28 bytes per row)
bytes_per_row = 28
total_bytes = n * bytes_per_row
print(f"{n:<12,} {elapsed*1000:<12.2f} {format_rate(n, elapsed):<20} {format_bytes(total_bytes, elapsed):<15}")
return table # Return last table for further tests
def bench_read(table: wdb.Table):
"""Benchmark read/access performance."""
print("\n=== READ PERFORMANCE ===\n")
n = table.num_rows
# Column access
start = time.perf_counter()
for _ in range(100):
col = table["price"]
elapsed = time.perf_counter() - start
print(f"Column lookup (100x): {elapsed*1000:.3f}ms ({elapsed*10:.3f}ms per lookup)")
# Zero-copy numpy access
start = time.perf_counter()
for _ in range(100):
arr = table["price"].to_numpy()
elapsed = time.perf_counter() - start
print(f"to_numpy() (100x): {elapsed*1000:.3f}ms ({elapsed*10:.3f}ms per call)")
# Full table scan (sum all values)
col = table["price"]
start = time.perf_counter()
for _ in range(10):
total = wdb.ops.sum(col)
elapsed = time.perf_counter() - start
print(f"Full scan sum (10x): {elapsed*1000:.3f}ms ({elapsed*100:.3f}ms per scan)")
print(f" -> {format_rate(n * 10, elapsed)}")
def bench_aggregations(table: wdb.Table):
"""Benchmark aggregation operations."""
print("\n=== AGGREGATION PERFORMANCE ===\n")
n = table.num_rows
col = table["price"]
ops = [
("sum", wdb.ops.sum),
("avg", wdb.ops.avg),
("min", wdb.ops.min),
("max", wdb.ops.max),
("std", wdb.ops.std),
]
print(f"{'Operation':<12} {'Time (ms)':<12} {'Rate':<20}")
print("-" * 45)
for name, func in ops:
# Warm up
func(col)
# Benchmark
start = time.perf_counter()
for _ in range(100):
result = func(col)
elapsed = time.perf_counter() - start
print(f"{name:<12} {elapsed*10:.3f} {format_rate(n * 100, elapsed)}")
def bench_window_functions(table: wdb.Table):
"""Benchmark window functions."""
print("\n=== WINDOW FUNCTION PERFORMANCE ===\n")
n = table.num_rows
col = table["price"]
ops = [
("mavg(20)", lambda c: wdb.ops.mavg(c, 20)),
("msum(20)", lambda c: wdb.ops.msum(c, 20)),
("mstd(20)", lambda c: wdb.ops.mstd(c, 20)),
("ema(0.1)", lambda c: wdb.ops.ema(c, 0.1)),
("diff(1)", lambda c: wdb.ops.diff(c, 1)),
("pct_change", lambda c: wdb.ops.pct_change(c, 1)),
]
print(f"{'Operation':<15} {'Time (ms)':<12} {'Rate':<20}")
print("-" * 50)
for name, func in ops:
# Warm up
func(col)
# Benchmark
start = time.perf_counter()
for _ in range(10):
result = func(col)
elapsed = time.perf_counter() - start
print(f"{name:<15} {elapsed*100:.3f} {format_rate(n * 10, elapsed)}")
def bench_joins():
"""Benchmark temporal join operations."""
print("\n=== JOIN PERFORMANCE ===\n")
sizes = [(10_000, 10_000), (100_000, 100_000), (1_000_000, 1_000_000)]
print(f"{'Left x Right':<20} {'aj (ms)':<12} {'Rate':<20}")
print("-" * 55)
for left_n, right_n in sizes:
# Create left table (trades)
left = wdb.Table("trades")
left.add_column_from_numpy("timestamp",
np.sort(np.random.randint(0, left_n * 10, left_n)).astype(np.int64),
wdb.DType.Timestamp)
left.add_column_from_numpy("symbol",
np.random.randint(0, 10, left_n).astype(np.uint32),
wdb.DType.Symbol)
left.add_column_from_numpy("price",
np.random.uniform(100, 200, left_n).astype(np.float64),
wdb.DType.Float64)
left.set_sorted_by("timestamp")
# Create right table (quotes)
right = wdb.Table("quotes")
right.add_column_from_numpy("timestamp",
np.sort(np.random.randint(0, right_n * 10, right_n)).astype(np.int64),
wdb.DType.Timestamp)
right.add_column_from_numpy("symbol",
np.random.randint(0, 10, right_n).astype(np.uint32),
wdb.DType.Symbol)
right.add_column_from_numpy("bid",
np.random.uniform(99, 199, right_n).astype(np.float64),
wdb.DType.Float64)
right.add_column_from_numpy("ask",
np.random.uniform(101, 201, right_n).astype(np.float64),
wdb.DType.Float64)
right.set_sorted_by("timestamp")
# Warm up
if left_n <= 100_000:
wdb.ops.aj(left, right, ["symbol"], "timestamp")
# Benchmark as-of join
start = time.perf_counter()
result = wdb.ops.aj(left, right, ["symbol"], "timestamp")
elapsed = time.perf_counter() - start
size_str = f"{left_n//1000}K x {right_n//1000}K"
print(f"{size_str:<20} {elapsed*1000:<12.2f} {format_rate(left_n, elapsed)}")
def bench_persistence(n: int = 1_000_000):
"""Benchmark save/load/mmap performance."""
print("\n=== PERSISTENCE PERFORMANCE ===\n")
import tempfile
import os
# Create table
table = wdb.Table("persist_test")
table.add_column_from_numpy("timestamp",
np.arange(n, dtype=np.int64), wdb.DType.Timestamp)
table.add_column_from_numpy("price",
np.random.uniform(100, 200, n).astype(np.float64), wdb.DType.Float64)
table.add_column_from_numpy("volume",
np.random.randint(100, 10000, n).astype(np.int64), wdb.DType.Int64)
table.set_sorted_by("timestamp")
bytes_total = n * (8 + 8 + 8) # 24 bytes per row
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, "test_table")
# Benchmark save
start = time.perf_counter()
table.save(path)
save_elapsed = time.perf_counter() - start
print(f"Save {n:,} rows: {save_elapsed*1000:.2f}ms ({format_bytes(bytes_total, save_elapsed)})")
# Benchmark load (copies data)
start = time.perf_counter()
loaded = wdb.Table.load(path)
load_elapsed = time.perf_counter() - start
print(f"Load {n:,} rows: {load_elapsed*1000:.2f}ms ({format_bytes(bytes_total, load_elapsed)})")
# Benchmark mmap (zero-copy)
start = time.perf_counter()
mmapped = wdb.Table.mmap(path)
mmap_elapsed = time.perf_counter() - start
print(f"Mmap {n:,} rows: {mmap_elapsed*1000:.2f}ms ({format_bytes(bytes_total, mmap_elapsed)})")
print(f" -> mmap is {load_elapsed/mmap_elapsed:.0f}x faster than load")
def bench_concurrent():
"""Benchmark concurrent read performance."""
print("\n=== CONCURRENT READ PERFORMANCE ===\n")
import threading
n = 1_000_000
table = wdb.Table("concurrent_test")
table.add_column_from_numpy("price",
np.random.uniform(100, 200, n).astype(np.float64), wdb.DType.Float64)
col = table["price"]
def worker(results, idx):
for _ in range(10):
results[idx] = wdb.ops.sum(col)
for num_threads in [1, 2, 4, 8]:
results = [0.0] * num_threads
threads = [threading.Thread(target=worker, args=(results, i))
for i in range(num_threads)]
start = time.perf_counter()
for t in threads:
t.start()
for t in threads:
t.join()
elapsed = time.perf_counter() - start
ops_per_sec = (num_threads * 10) / elapsed
print(f"{num_threads} threads: {elapsed*1000:.2f}ms ({ops_per_sec:.1f} ops/sec, {format_rate(n * num_threads * 10, elapsed)})")
if __name__ == "__main__":
print("=" * 60)
print(" WayyDB Performance Benchmarks")
print("=" * 60)
# Write benchmarks with increasing sizes
sizes = [10_000, 100_000, 1_000_000, 10_000_000]
table = bench_write(sizes)
# Use 1M row table for read tests
table_1m = wdb.Table("bench_1m")
n = 1_000_000
table_1m.add_column_from_numpy("timestamp", np.arange(n, dtype=np.int64), wdb.DType.Timestamp)
table_1m.add_column_from_numpy("price", np.random.uniform(100, 200, n).astype(np.float64), wdb.DType.Float64)
table_1m.add_column_from_numpy("volume", np.random.randint(100, 10000, n).astype(np.int64), wdb.DType.Int64)
table_1m.add_column_from_numpy("symbol", np.random.randint(0, 100, n).astype(np.uint32), wdb.DType.Symbol)
table_1m.set_sorted_by("timestamp")
bench_read(table_1m)
bench_aggregations(table_1m)
bench_window_functions(table_1m)
bench_joins()
bench_persistence()
bench_concurrent()
print("\n" + "=" * 60)
print(" Benchmarks Complete")
print("=" * 60)