Spaces:
Runtime error
Runtime error
| """Tests for WayyDB streaming functionality with PubSub integration.""" | |
| import asyncio | |
| import time | |
| from datetime import datetime, timezone | |
| import numpy as np | |
| import pytest | |
| import wayy_db as wdb | |
| import sys | |
| sys.path.insert(0, str(__file__).replace("/tests/python/test_streaming.py", "")) | |
| from api.pubsub import InMemoryPubSub | |
| from api.streaming import StreamingManager, TickBuffer | |
| class TestTickBuffer: | |
| """Tests for TickBuffer data structure.""" | |
| def test_empty_buffer(self): | |
| buffer = TickBuffer() | |
| assert len(buffer) == 0 | |
| def test_append_single_tick(self): | |
| buffer = TickBuffer() | |
| buffer.append( | |
| timestamp=1704067200000000000, | |
| symbol="BTC-USD", | |
| price=42150.50, | |
| volume=1.5, | |
| bid=42150.00, | |
| ask=42151.00, | |
| ) | |
| assert len(buffer) == 1 | |
| assert buffer.timestamps[0] == 1704067200000000000 | |
| assert buffer.symbols[0] == "BTC-USD" | |
| assert buffer.prices[0] == 42150.50 | |
| def test_append_multiple_ticks(self): | |
| buffer = TickBuffer() | |
| for i in range(100): | |
| buffer.append( | |
| timestamp=1704067200000000000 + i * 1000000, | |
| symbol=f"SYM-{i % 5}", | |
| price=100.0 + i * 0.1, | |
| volume=float(i), | |
| ) | |
| assert len(buffer) == 100 | |
| def test_clear(self): | |
| buffer = TickBuffer() | |
| for i in range(10): | |
| buffer.append( | |
| timestamp=i, | |
| symbol="BTC-USD", | |
| price=100.0, | |
| ) | |
| assert len(buffer) == 10 | |
| buffer.clear() | |
| assert len(buffer) == 0 | |
| def test_to_columnar(self): | |
| buffer = TickBuffer() | |
| buffer.append(1000, "BTC-USD", 42150.0, 1.0, 42149.0, 42151.0) | |
| buffer.append(2000, "ETH-USD", 2250.0, 10.0, 2249.0, 2251.0) | |
| data = buffer.to_columnar() | |
| assert "timestamp" in data | |
| assert "symbol" in data | |
| assert "price" in data | |
| assert "volume" in data | |
| assert "bid" in data | |
| assert "ask" in data | |
| assert data["timestamp"].dtype == np.int64 | |
| assert data["symbol"].dtype == np.uint32 | |
| assert data["price"].dtype == np.float64 | |
| assert len(data["timestamp"]) == 2 | |
| np.testing.assert_array_equal(data["timestamp"], [1000, 2000]) | |
| np.testing.assert_array_equal(data["price"], [42150.0, 2250.0]) | |
| class TestStreamingManager: | |
| """Tests for StreamingManager with PubSub integration.""" | |
| def pubsub(self): | |
| return InMemoryPubSub(max_buffer_per_channel=10000) | |
| def manager(self, pubsub): | |
| """Create a streaming manager with in-memory pubsub.""" | |
| manager = StreamingManager( | |
| flush_interval=0.1, | |
| max_buffer_size=100, | |
| batch_broadcast_interval=0.01, | |
| pubsub=pubsub, | |
| ) | |
| return manager | |
| def temp_db(self, temp_dir): | |
| """Create a temporary database for testing.""" | |
| return wdb.Database(temp_dir) | |
| async def test_ingest_single_tick(self, manager): | |
| """Test ingesting a single tick.""" | |
| await manager.start() | |
| await manager.ingest_tick( | |
| table="ticks", | |
| symbol="BTC-USD", | |
| price=42150.50, | |
| volume=1.5, | |
| ) | |
| stats = manager.get_stats() | |
| assert stats["ticks_received"] == 1 | |
| assert stats["buffer_sizes"]["ticks"] == 1 | |
| await manager.stop() | |
| async def test_ingest_publishes_to_pubsub(self, manager): | |
| """Test that ingestion publishes to PubSub channels.""" | |
| await manager.start() | |
| received = [] | |
| async def on_tick(msg): | |
| received.append(msg) | |
| await manager._pubsub.subscribe("ticks:BTC-USD", on_tick, "test") | |
| await manager.ingest_tick( | |
| table="ticks", | |
| symbol="BTC-USD", | |
| price=42150.50, | |
| ) | |
| assert len(received) == 1 | |
| assert received[0]["price"] == 42150.50 | |
| assert received[0]["_channel"] == "ticks:BTC-USD" | |
| assert received[0]["_seq"] == 1 | |
| await manager.stop() | |
| async def test_ingest_batch(self, manager): | |
| """Test ingesting a batch of ticks.""" | |
| await manager.start() | |
| ticks = [ | |
| {"symbol": "BTC-USD", "price": 42150.0, "volume": 1.0}, | |
| {"symbol": "ETH-USD", "price": 2250.0, "volume": 10.0}, | |
| {"symbol": "SOL-USD", "price": 100.0, "volume": 100.0}, | |
| ] | |
| await manager.ingest_batch(table="ticks", ticks=ticks) | |
| stats = manager.get_stats() | |
| assert stats["ticks_received"] == 3 | |
| assert stats["buffer_sizes"]["ticks"] == 3 | |
| await manager.stop() | |
| async def test_batch_publishes_to_channels(self, manager): | |
| """Test that batch ingestion publishes to per-symbol channels.""" | |
| await manager.start() | |
| btc_received = [] | |
| eth_received = [] | |
| async def on_btc(msg): | |
| btc_received.append(msg) | |
| async def on_eth(msg): | |
| eth_received.append(msg) | |
| await manager._pubsub.subscribe("ticks:BTC-USD", on_btc, "btc_sub") | |
| await manager._pubsub.subscribe("ticks:ETH-USD", on_eth, "eth_sub") | |
| ticks = [ | |
| {"symbol": "BTC-USD", "price": 42150.0}, | |
| {"symbol": "ETH-USD", "price": 2250.0}, | |
| {"symbol": "BTC-USD", "price": 42160.0}, | |
| ] | |
| await manager.ingest_batch(table="ticks", ticks=ticks) | |
| assert len(btc_received) == 2 | |
| assert len(eth_received) == 1 | |
| assert btc_received[0]["price"] == 42150.0 | |
| assert btc_received[1]["price"] == 42160.0 | |
| await manager.stop() | |
| async def test_latest_quotes(self, manager): | |
| """Test that latest quotes are cached.""" | |
| await manager.start() | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42150.50) | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42200.00) | |
| quote = manager.get_latest_quote("ticks", "BTC-USD") | |
| assert quote is not None | |
| assert quote["price"] == 42200.00 | |
| await manager.stop() | |
| async def test_get_all_quotes(self, manager): | |
| """Test getting all quotes for a table.""" | |
| await manager.start() | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42150.0) | |
| await manager.ingest_tick(table="ticks", symbol="ETH-USD", price=2250.0) | |
| await manager.ingest_tick(table="ticks", symbol="SOL-USD", price=100.0) | |
| quotes = manager.get_all_quotes("ticks") | |
| assert len(quotes) == 3 | |
| assert "BTC-USD" in quotes | |
| assert "ETH-USD" in quotes | |
| assert "SOL-USD" in quotes | |
| await manager.stop() | |
| async def test_flush_to_database(self, manager, temp_db): | |
| """Test flushing buffered data to database.""" | |
| manager.set_database(temp_db) | |
| await manager.start() | |
| for i in range(10): | |
| await manager.ingest_tick( | |
| table="test_ticks", | |
| symbol="BTC-USD", | |
| price=42000.0 + i, | |
| timestamp=1704067200000000000 + i * 1000000000, | |
| ) | |
| await manager._flush_table("test_ticks") | |
| assert temp_db.has_table("test_ticks") | |
| table = temp_db["test_ticks"] | |
| assert table.num_rows == 10 | |
| prices = table["price"].to_numpy() | |
| assert prices[0] == pytest.approx(42000.0) | |
| assert prices[9] == pytest.approx(42009.0) | |
| await manager.stop() | |
| async def test_append_to_existing_table(self, manager, temp_db): | |
| """Test appending to an existing table.""" | |
| manager.set_database(temp_db) | |
| await manager.start() | |
| for i in range(5): | |
| await manager.ingest_tick( | |
| table="test_ticks", | |
| symbol="BTC-USD", | |
| price=42000.0 + i, | |
| timestamp=1704067200000000000 + i * 1000000000, | |
| ) | |
| await manager._flush_table("test_ticks") | |
| for i in range(5, 10): | |
| await manager.ingest_tick( | |
| table="test_ticks", | |
| symbol="BTC-USD", | |
| price=42000.0 + i, | |
| timestamp=1704067200000000000 + i * 1000000000, | |
| ) | |
| await manager._flush_table("test_ticks") | |
| table = temp_db["test_ticks"] | |
| assert table.num_rows == 10 | |
| prices = table["price"].to_numpy() | |
| for i in range(10): | |
| assert prices[i] == pytest.approx(42000.0 + i) | |
| await manager.stop() | |
| async def test_auto_flush_on_buffer_full(self, manager, temp_db): | |
| """Test that buffer auto-flushes when full.""" | |
| manager.set_database(temp_db) | |
| manager.max_buffer_size = 50 | |
| await manager.start() | |
| for i in range(60): | |
| await manager.ingest_tick( | |
| table="test_ticks", | |
| symbol="BTC-USD", | |
| price=42000.0 + i, | |
| timestamp=1704067200000000000 + i * 1000000000, | |
| ) | |
| stats = manager.get_stats() | |
| assert stats["ticks_flushed"] >= 50 | |
| await manager.stop() | |
| async def test_start_stop(self, manager, temp_db): | |
| """Test starting and stopping the streaming manager.""" | |
| manager.set_database(temp_db) | |
| await manager.start() | |
| assert manager._running | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42150.0) | |
| await manager.stop() | |
| assert not manager._running | |
| async def test_stats_includes_pubsub(self, manager): | |
| """Test that stats include pubsub info.""" | |
| await manager.start() | |
| for i in range(10): | |
| await manager.ingest_tick( | |
| table="ticks", | |
| symbol=f"SYM-{i % 3}", | |
| price=100.0 + i, | |
| ) | |
| stats = manager.get_stats() | |
| assert stats["ticks_received"] == 10 | |
| assert "buffer_sizes" in stats | |
| assert "subscriber_counts" in stats | |
| assert stats["latest_quotes"] == 3 | |
| assert "pubsub" in stats | |
| assert stats["pubsub"]["backend"] == "in_memory" | |
| assert stats["pubsub"]["messages_published"] == 10 | |
| await manager.stop() | |
| class TestStreamingManagerNoPubSub: | |
| """Tests for StreamingManager without PubSub (backward compat).""" | |
| def manager(self): | |
| return StreamingManager( | |
| flush_interval=0.1, | |
| max_buffer_size=100, | |
| batch_broadcast_interval=0.01, | |
| pubsub=None, | |
| ) | |
| async def test_works_without_pubsub(self, manager): | |
| """Test that streaming still works without a PubSub backend.""" | |
| await manager.start() | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42150.0) | |
| stats = manager.get_stats() | |
| assert stats["ticks_received"] == 1 | |
| assert "pubsub" not in stats | |
| await manager.stop() | |
| class TestStreamingPerformance: | |
| """Performance tests for streaming with PubSub.""" | |
| async def test_high_throughput_ingestion(self): | |
| """Test high-throughput tick ingestion.""" | |
| pubsub = InMemoryPubSub(max_buffer_per_channel=100000) | |
| manager = StreamingManager( | |
| flush_interval=10.0, | |
| max_buffer_size=100000, | |
| pubsub=pubsub, | |
| ) | |
| await manager.start() | |
| num_ticks = 10000 | |
| start = time.time() | |
| for i in range(num_ticks): | |
| await manager.ingest_tick( | |
| table="benchmark", | |
| symbol=f"SYM-{i % 100}", | |
| price=100.0 + (i % 1000) * 0.01, | |
| timestamp=1704067200000000000 + i * 1000, | |
| ) | |
| elapsed = time.time() - start | |
| ticks_per_second = num_ticks / elapsed | |
| print(f"\nIngestion w/ PubSub: {ticks_per_second:.0f} ticks/second") | |
| print(f" Total ticks: {num_ticks}") | |
| print(f" Elapsed: {elapsed:.3f}s") | |
| assert ticks_per_second > 5000 | |
| stats = manager.get_stats() | |
| assert stats["ticks_received"] == num_ticks | |
| assert stats["pubsub"]["messages_published"] == num_ticks | |
| await manager.stop() | |
| async def test_batch_vs_single_ingestion(self): | |
| """Compare batch vs single tick ingestion performance.""" | |
| pubsub = InMemoryPubSub(max_buffer_per_channel=100000) | |
| manager = StreamingManager( | |
| flush_interval=10.0, | |
| max_buffer_size=100000, | |
| pubsub=pubsub, | |
| ) | |
| await manager.start() | |
| num_ticks = 5000 | |
| # Single tick ingestion | |
| start = time.time() | |
| for i in range(num_ticks): | |
| await manager.ingest_tick( | |
| table="single", | |
| symbol="BTC-USD", | |
| price=42000.0 + i * 0.01, | |
| ) | |
| single_elapsed = time.time() - start | |
| # Batch ingestion | |
| batch_size = 100 | |
| batches = num_ticks // batch_size | |
| start = time.time() | |
| for b in range(batches): | |
| ticks = [ | |
| {"symbol": "BTC-USD", "price": 42000.0 + (b * batch_size + i) * 0.01} | |
| for i in range(batch_size) | |
| ] | |
| await manager.ingest_batch(table="batch", ticks=ticks) | |
| batch_elapsed = time.time() - start | |
| print(f"\nSingle ingestion: {num_ticks / single_elapsed:.0f} ticks/second") | |
| print(f"Batch ingestion: {num_ticks / batch_elapsed:.0f} ticks/second") | |
| print(f"Batch speedup: {single_elapsed / batch_elapsed:.1f}x") | |
| assert batch_elapsed <= single_elapsed * 1.5 | |
| await manager.stop() | |
| class TestMultipleSymbols: | |
| """Tests for multi-symbol streaming.""" | |
| async def test_multiple_symbols_separate_quotes(self): | |
| pubsub = InMemoryPubSub() | |
| manager = StreamingManager(pubsub=pubsub) | |
| await manager.start() | |
| symbols = ["BTC-USD", "ETH-USD", "SOL-USD", "ADA-USD", "DOT-USD"] | |
| for i, symbol in enumerate(symbols): | |
| await manager.ingest_tick( | |
| table="ticks", | |
| symbol=symbol, | |
| price=1000.0 * (i + 1), | |
| ) | |
| for i, symbol in enumerate(symbols): | |
| quote = manager.get_latest_quote("ticks", symbol) | |
| assert quote is not None | |
| assert quote["price"] == pytest.approx(1000.0 * (i + 1)) | |
| await manager.stop() | |
| async def test_pubsub_channels_per_symbol(self): | |
| """Test that each symbol gets its own PubSub channel.""" | |
| pubsub = InMemoryPubSub() | |
| manager = StreamingManager(pubsub=pubsub) | |
| await manager.start() | |
| btc_msgs = [] | |
| eth_msgs = [] | |
| async def on_btc(msg): | |
| btc_msgs.append(msg) | |
| async def on_eth(msg): | |
| eth_msgs.append(msg) | |
| await pubsub.subscribe("ticks:BTC-USD", on_btc, "btc") | |
| await pubsub.subscribe("ticks:ETH-USD", on_eth, "eth") | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42000.0) | |
| await manager.ingest_tick(table="ticks", symbol="ETH-USD", price=2200.0) | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42100.0) | |
| assert len(btc_msgs) == 2 | |
| assert len(eth_msgs) == 1 | |
| await manager.stop() | |
| class TestTimestamps: | |
| """Tests for timestamp handling.""" | |
| async def test_auto_timestamp(self): | |
| pubsub = InMemoryPubSub() | |
| manager = StreamingManager(pubsub=pubsub) | |
| await manager.start() | |
| before = int(datetime.now(timezone.utc).timestamp() * 1e9) | |
| await manager.ingest_tick(table="ticks", symbol="BTC-USD", price=42000.0) | |
| after = int(datetime.now(timezone.utc).timestamp() * 1e9) | |
| quote = manager.get_latest_quote("ticks", "BTC-USD") | |
| assert before <= quote["timestamp"] <= after | |
| await manager.stop() | |
| async def test_explicit_timestamp(self): | |
| pubsub = InMemoryPubSub() | |
| manager = StreamingManager(pubsub=pubsub) | |
| await manager.start() | |
| ts = 1704067200000000000 | |
| await manager.ingest_tick( | |
| table="ticks", | |
| symbol="BTC-USD", | |
| price=42000.0, | |
| timestamp=ts, | |
| ) | |
| quote = manager.get_latest_quote("ticks", "BTC-USD") | |
| assert quote["timestamp"] == ts | |
| await manager.stop() | |
| async def test_timestamp_ordering_in_flush(self, temp_dir): | |
| pubsub = InMemoryPubSub() | |
| manager = StreamingManager(pubsub=pubsub) | |
| db = wdb.Database(temp_dir) | |
| manager.set_database(db) | |
| await manager.start() | |
| timestamps = [3000, 1000, 4000, 2000, 5000] | |
| for ts in timestamps: | |
| await manager.ingest_tick( | |
| table="ticks", | |
| symbol="BTC-USD", | |
| price=42000.0, | |
| timestamp=ts, | |
| ) | |
| await manager._flush_table("ticks") | |
| table = db["ticks"] | |
| stored_ts = table["timestamp"].to_numpy() | |
| np.testing.assert_array_equal(stored_ts, timestamps) | |
| await manager.stop() | |