Spaces:
Sleeping
Sleeping
File size: 5,155 Bytes
8a08300 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
"""
Tests for Redis Feature Store
Tests sliding window logic, EMA computation, and Redis operations.
"""
import time
import pytest
from src.features.store import RedisFeatureStore
class TestRedisFeatureStore:
"""Test suite for RedisFeatureStore."""
def test_connection(self, feature_store):
"""Test that Redis connection is established."""
health = feature_store.health_check()
assert health["status"] == "healthy"
assert "ping_ms" in health
def test_add_transaction(self, feature_store):
"""Test adding a transaction."""
feature_store.add_transaction(user_id="test_user_1", amount=100.00, timestamp=1000000)
# Verify transaction was added
features = feature_store.get_features("test_user_1", current_timestamp=1000000)
assert features["trans_count_24h"] == 1.0
assert features["avg_spend_24h"] == 100.00
def test_sliding_window_count(self, feature_store):
"""Test that transaction count respects 24-hour window."""
base_time = 1000000
# Add 3 transactions within 24 hours
for i in range(3):
feature_store.add_transaction(
user_id="test_user_2",
amount=50.00,
timestamp=base_time + (i * 3600), # 1 hour apart
)
# Check count
features = feature_store.get_features(
"test_user_2",
current_timestamp=base_time + 10800, # 3 hours later
)
assert features["trans_count_24h"] == 3.0
# Add transaction 25 hours later - old ones should be excluded
future_time = base_time + (25 * 3600)
feature_store.add_transaction(user_id="test_user_2", amount=50.00, timestamp=future_time)
features = feature_store.get_features("test_user_2", current_timestamp=future_time)
# Should only count the new transaction
assert features["trans_count_24h"] == 1.0
def test_exponential_moving_average(self, feature_store):
"""Test EMA computation."""
base_time = 1000000
# First transaction: EMA = amount
feature_store.add_transaction(user_id="test_user_3", amount=100.00, timestamp=base_time)
features = feature_store.get_features("test_user_3", current_timestamp=base_time)
assert features["avg_spend_24h"] == 100.00
# Second transaction: EMA updates
feature_store.add_transaction(
user_id="test_user_3", amount=200.00, timestamp=base_time + 3600
)
features = feature_store.get_features("test_user_3", current_timestamp=base_time + 3600)
# EMA = alpha * 200 + (1-alpha) * 100
# With alpha = 0.08: 0.08 * 200 + 0.92 * 100 = 16 + 92 = 108
assert 107 < features["avg_spend_24h"] < 109 # Allow small floating point error
def test_get_transaction_history(self, feature_store):
"""Test retrieving transaction history."""
base_time = 1000000
# Add multiple transactions
amounts = [100.00, 150.00, 200.00]
for i, amt in enumerate(amounts):
feature_store.add_transaction(
user_id="test_user_4", amount=amt, timestamp=base_time + (i * 3600)
)
history = feature_store.get_transaction_history("test_user_4", lookback_hours=24)
assert len(history) == 3
# Should be sorted newest first
assert history[0][1] == 200.00
assert history[1][1] == 150.00
assert history[2][1] == 100.00
def test_delete_user_data(self, feature_store):
"""Test GDPR-compliant data deletion."""
feature_store.add_transaction(user_id="test_user_5", amount=100.00, timestamp=1000000)
# Verify data exists
features = feature_store.get_features("test_user_5", current_timestamp=1000000)
assert features["trans_count_24h"] > 0
# Delete user data
deleted_count = feature_store.delete_user_data("test_user_5")
assert deleted_count == 2 # tx_history + avg_spend
# Verify data is gone
features = feature_store.get_features("test_user_5", current_timestamp=1000000)
assert features["trans_count_24h"] == 0.0
assert features["avg_spend_24h"] == 0.0
def test_concurrent_transactions(self, feature_store):
"""Test that multiple users can be tracked concurrently."""
base_time = 1000000
# Add transactions for different users
for user_id in ["user_a", "user_b", "user_c"]:
feature_store.add_transaction(user_id=user_id, amount=100.00, timestamp=base_time)
# Verify each user has independent state
for user_id in ["user_a", "user_b", "user_c"]:
features = feature_store.get_features(user_id, current_timestamp=base_time)
assert features["trans_count_24h"] == 1.0
def test_empty_user(self, feature_store):
"""Test getting features for user with no history."""
features = feature_store.get_features("nonexistent_user", current_timestamp=1000000)
assert features["trans_count_24h"] == 0.0
assert features["avg_spend_24h"] == 0.0
|