|
|
"""QuotaCache 属性测试和单元测试 |
|
|
|
|
|
Property 1: 缓存存储完整性 - 存储后读取应返回完整数据 |
|
|
Property 2: 缓存持久化往返 - 保存后加载应产生等价状态 |
|
|
""" |
|
|
import os |
|
|
import time |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
|
|
|
import pytest |
|
|
from hypothesis import given, strategies as st, settings, assume |
|
|
|
|
|
|
|
|
import sys |
|
|
sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
|
|
|
from kiro_proxy.core.quota_cache import ( |
|
|
QuotaCache, CachedQuota, DEFAULT_CACHE_MAX_AGE |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FIXED_MAX_TIMESTAMP = 2000000000.0 |
|
|
|
|
|
|
|
|
@st.composite |
|
|
def valid_quota_strategy(draw): |
|
|
"""生成有效的 CachedQuota 数据""" |
|
|
usage_limit = draw(st.floats(min_value=0.0, max_value=10000.0, allow_nan=False, allow_infinity=False)) |
|
|
current_usage = draw(st.floats(min_value=0.0, max_value=usage_limit, allow_nan=False, allow_infinity=False)) |
|
|
balance = usage_limit - current_usage |
|
|
usage_percent = (current_usage / usage_limit * 100) if usage_limit > 0 else 0.0 |
|
|
|
|
|
free_trial_limit = draw(st.floats(min_value=0.0, max_value=1000.0, allow_nan=False, allow_infinity=False)) |
|
|
free_trial_usage = draw(st.floats(min_value=0.0, max_value=free_trial_limit, allow_nan=False, allow_infinity=False)) |
|
|
|
|
|
bonus_limit = draw(st.floats(min_value=0.0, max_value=500.0, allow_nan=False, allow_infinity=False)) |
|
|
bonus_usage = draw(st.floats(min_value=0.0, max_value=bonus_limit, allow_nan=False, allow_infinity=False)) |
|
|
|
|
|
return CachedQuota( |
|
|
account_id=draw(st.text(alphabet=st.characters(whitelist_categories=('L', 'N'), whitelist_characters='_-'), min_size=1, max_size=32)), |
|
|
usage_limit=usage_limit, |
|
|
current_usage=current_usage, |
|
|
balance=balance, |
|
|
usage_percent=round(usage_percent, 2), |
|
|
is_low_balance=balance < usage_limit * 0.2 if usage_limit > 0 else False, |
|
|
subscription_title=draw(st.text(min_size=0, max_size=50)), |
|
|
free_trial_limit=free_trial_limit, |
|
|
free_trial_usage=free_trial_usage, |
|
|
bonus_limit=bonus_limit, |
|
|
bonus_usage=bonus_usage, |
|
|
updated_at=draw(st.floats(min_value=0.0, max_value=FIXED_MAX_TIMESTAMP, allow_nan=False, allow_infinity=False)), |
|
|
error=draw(st.one_of(st.none(), st.text(min_size=1, max_size=100))) |
|
|
) |
|
|
|
|
|
|
|
|
@st.composite |
|
|
def account_id_strategy(draw): |
|
|
"""生成有效的账号ID""" |
|
|
return draw(st.text( |
|
|
alphabet=st.characters(whitelist_categories=('L', 'N'), whitelist_characters='_-'), |
|
|
min_size=1, |
|
|
max_size=32 |
|
|
)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestCacheStorageIntegrity: |
|
|
"""Property 1: 缓存存储完整性测试""" |
|
|
|
|
|
@given(quota=valid_quota_strategy()) |
|
|
@settings(max_examples=100) |
|
|
def test_set_then_get_returns_complete_data(self, quota: CachedQuota): |
|
|
""" |
|
|
Property 1: 缓存存储完整性 |
|
|
*对于任意*有效的额度信息,当存储到 QuotaCache 后, |
|
|
读取该账号的缓存应返回包含所有必要字段的完整数据。 |
|
|
|
|
|
**Validates: Requirements 1.2, 2.3** |
|
|
""" |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
|
|
|
|
|
|
cache.set(quota.account_id, quota) |
|
|
|
|
|
|
|
|
retrieved = cache.get(quota.account_id) |
|
|
|
|
|
|
|
|
assert retrieved is not None, "缓存应该存在" |
|
|
assert retrieved.account_id == quota.account_id, "account_id 应该一致" |
|
|
assert retrieved.usage_limit == quota.usage_limit, "usage_limit 应该一致" |
|
|
assert retrieved.current_usage == quota.current_usage, "current_usage 应该一致" |
|
|
assert retrieved.balance == quota.balance, "balance 应该一致" |
|
|
assert retrieved.updated_at == quota.updated_at, "updated_at 应该一致" |
|
|
assert retrieved.error == quota.error, "error 应该一致" |
|
|
|
|
|
finally: |
|
|
|
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
@given(quotas=st.lists(valid_quota_strategy(), min_size=1, max_size=10, unique_by=lambda q: q.account_id)) |
|
|
@settings(max_examples=50) |
|
|
def test_multiple_accounts_stored_independently(self, quotas: list): |
|
|
"""多个账号的缓存应该独立存储""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
|
|
|
|
|
|
for quota in quotas: |
|
|
cache.set(quota.account_id, quota) |
|
|
|
|
|
|
|
|
for quota in quotas: |
|
|
retrieved = cache.get(quota.account_id) |
|
|
assert retrieved is not None |
|
|
assert retrieved.account_id == quota.account_id |
|
|
assert retrieved.balance == quota.balance |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestCachePersistenceRoundTrip: |
|
|
"""Property 2: 缓存持久化往返测试""" |
|
|
|
|
|
@given(quotas=st.lists(valid_quota_strategy(), min_size=1, max_size=10, unique_by=lambda q: q.account_id)) |
|
|
@settings(max_examples=100) |
|
|
def test_save_then_load_preserves_data(self, quotas: list): |
|
|
""" |
|
|
Property 2: 缓存持久化往返 |
|
|
*对于任意*有效的 QuotaCache 状态,保存到文件后再加载, |
|
|
应产生等价的缓存状态(所有账号的额度信息保持一致)。 |
|
|
|
|
|
**Validates: Requirements 7.1, 7.2** |
|
|
""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
|
|
|
cache1 = QuotaCache(cache_file=cache_file) |
|
|
for quota in quotas: |
|
|
cache1.set(quota.account_id, quota) |
|
|
|
|
|
|
|
|
success = cache1.save_to_file() |
|
|
assert success, "保存应该成功" |
|
|
|
|
|
|
|
|
cache2 = QuotaCache(cache_file=cache_file) |
|
|
|
|
|
|
|
|
all_cache1 = cache1.get_all() |
|
|
all_cache2 = cache2.get_all() |
|
|
|
|
|
assert len(all_cache1) == len(all_cache2), "账号数量应该一致" |
|
|
|
|
|
for account_id, quota1 in all_cache1.items(): |
|
|
quota2 = all_cache2.get(account_id) |
|
|
assert quota2 is not None, f"账号 {account_id} 应该存在" |
|
|
assert quota1.usage_limit == quota2.usage_limit |
|
|
assert quota1.current_usage == quota2.current_usage |
|
|
assert quota1.balance == quota2.balance |
|
|
assert quota1.updated_at == quota2.updated_at |
|
|
assert quota1.error == quota2.error |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
@given(quota=valid_quota_strategy()) |
|
|
@settings(max_examples=50) |
|
|
def test_dict_roundtrip(self, quota: CachedQuota): |
|
|
"""CachedQuota 的字典序列化往返""" |
|
|
|
|
|
quota_dict = quota.to_dict() |
|
|
|
|
|
|
|
|
restored = CachedQuota.from_dict(quota_dict) |
|
|
|
|
|
|
|
|
assert restored.account_id == quota.account_id |
|
|
assert restored.usage_limit == quota.usage_limit |
|
|
assert restored.current_usage == quota.current_usage |
|
|
assert restored.balance == quota.balance |
|
|
assert restored.updated_at == quota.updated_at |
|
|
assert restored.error == quota.error |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestCacheExpiration: |
|
|
"""缓存过期检测单元测试""" |
|
|
|
|
|
def test_fresh_cache_not_stale(self): |
|
|
"""新缓存不应该过期""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
quota = CachedQuota( |
|
|
account_id="test_account", |
|
|
usage_limit=1000.0, |
|
|
current_usage=500.0, |
|
|
balance=500.0, |
|
|
updated_at=time.time() |
|
|
) |
|
|
cache.set("test_account", quota) |
|
|
|
|
|
assert not cache.is_stale("test_account"), "新缓存不应该过期" |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
def test_old_cache_is_stale(self): |
|
|
"""旧缓存应该过期""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
quota = CachedQuota( |
|
|
account_id="test_account", |
|
|
usage_limit=1000.0, |
|
|
current_usage=500.0, |
|
|
balance=500.0, |
|
|
updated_at=time.time() - DEFAULT_CACHE_MAX_AGE - 1 |
|
|
) |
|
|
cache.set("test_account", quota) |
|
|
|
|
|
assert cache.is_stale("test_account"), "旧缓存应该过期" |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
def test_nonexistent_account_is_stale(self): |
|
|
"""不存在的账号应该被视为过期""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
assert cache.is_stale("nonexistent"), "不存在的账号应该被视为过期" |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestFileErrorHandling: |
|
|
"""文件读写错误处理单元测试""" |
|
|
|
|
|
def test_load_nonexistent_file(self): |
|
|
"""加载不存在的文件应该返回 False""" |
|
|
cache = QuotaCache(cache_file="/nonexistent/path/cache.json") |
|
|
result = cache.load_from_file() |
|
|
assert result is False |
|
|
|
|
|
def test_load_invalid_json(self): |
|
|
"""加载无效 JSON 应该返回 False""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False, mode='w') as f: |
|
|
f.write("invalid json {{{") |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
|
|
|
assert len(cache.get_all()) == 0 |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
def test_remove_account(self): |
|
|
"""移除账号应该正常工作""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
quota = CachedQuota( |
|
|
account_id="test_account", |
|
|
usage_limit=1000.0, |
|
|
updated_at=time.time() |
|
|
) |
|
|
cache.set("test_account", quota) |
|
|
assert cache.get("test_account") is not None |
|
|
|
|
|
cache.remove("test_account") |
|
|
assert cache.get("test_account") is None |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
def test_clear_cache(self): |
|
|
"""清空缓存应该正常工作""" |
|
|
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f: |
|
|
cache_file = f.name |
|
|
|
|
|
try: |
|
|
cache = QuotaCache(cache_file=cache_file) |
|
|
for i in range(5): |
|
|
quota = CachedQuota( |
|
|
account_id=f"account_{i}", |
|
|
usage_limit=1000.0, |
|
|
updated_at=time.time() |
|
|
) |
|
|
cache.set(f"account_{i}", quota) |
|
|
|
|
|
assert len(cache.get_all()) == 5 |
|
|
|
|
|
cache.clear() |
|
|
assert len(cache.get_all()) == 0 |
|
|
|
|
|
finally: |
|
|
if os.path.exists(cache_file): |
|
|
os.unlink(cache_file) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestCachedQuotaMethods: |
|
|
"""CachedQuota 辅助方法测试""" |
|
|
|
|
|
def test_has_error(self): |
|
|
"""has_error 方法测试""" |
|
|
quota_ok = CachedQuota(account_id="test", error=None) |
|
|
quota_err = CachedQuota(account_id="test", error="Some error") |
|
|
|
|
|
assert not quota_ok.has_error() |
|
|
assert quota_err.has_error() |
|
|
|
|
|
def test_is_exhausted(self): |
|
|
"""is_exhausted 属性测试""" |
|
|
quota_ok = CachedQuota(account_id="test", balance=100.0, usage_limit=1000.0) |
|
|
quota_zero = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0) |
|
|
quota_negative = CachedQuota(account_id="test", balance=-10.0, usage_limit=1000.0) |
|
|
quota_error = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0, error="Error") |
|
|
|
|
|
assert not quota_ok.is_exhausted |
|
|
assert quota_zero.is_exhausted |
|
|
assert quota_negative.is_exhausted |
|
|
assert not quota_error.is_exhausted |
|
|
|
|
|
def test_balance_status(self): |
|
|
"""balance_status 属性测试""" |
|
|
|
|
|
quota_normal = CachedQuota(account_id="test", balance=500.0, usage_limit=1000.0) |
|
|
assert quota_normal.balance_status == "normal" |
|
|
assert not quota_normal.is_low_balance |
|
|
assert not quota_normal.is_exhausted |
|
|
|
|
|
|
|
|
quota_low = CachedQuota(account_id="test", balance=100.0, usage_limit=1000.0) |
|
|
assert quota_low.balance_status == "low" |
|
|
assert quota_low.is_low_balance |
|
|
assert not quota_low.is_exhausted |
|
|
|
|
|
|
|
|
quota_exhausted = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0) |
|
|
assert quota_exhausted.balance_status == "exhausted" |
|
|
assert not quota_exhausted.is_low_balance |
|
|
assert quota_exhausted.is_exhausted |
|
|
|
|
|
def test_is_available(self): |
|
|
"""is_available 方法测试""" |
|
|
quota_ok = CachedQuota(account_id="test", balance=100.0, usage_limit=1000.0) |
|
|
quota_exhausted = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0) |
|
|
quota_error = CachedQuota(account_id="test", balance=100.0, error="Error") |
|
|
|
|
|
assert quota_ok.is_available() |
|
|
assert not quota_exhausted.is_available() |
|
|
assert not quota_error.is_available() |
|
|
|
|
|
def test_from_error(self): |
|
|
"""from_error 工厂方法测试""" |
|
|
quota = CachedQuota.from_error("test_account", "Connection failed") |
|
|
|
|
|
assert quota.account_id == "test_account" |
|
|
assert quota.error == "Connection failed" |
|
|
assert quota.has_error() |
|
|
assert quota.updated_at > 0 |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
pytest.main([__file__, "-v"]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestBalanceStatusDistinction: |
|
|
"""Property 10: 低额度与无额度区分测试""" |
|
|
|
|
|
@given( |
|
|
balance=st.floats(min_value=-100.0, max_value=1000.0, allow_nan=False, allow_infinity=False), |
|
|
usage_limit=st.floats(min_value=100.0, max_value=1000.0, allow_nan=False, allow_infinity=False) |
|
|
) |
|
|
@settings(max_examples=100) |
|
|
def test_balance_status_distinction(self, balance: float, usage_limit: float): |
|
|
""" |
|
|
Property 10: 低额度与无额度区分 |
|
|
*对于任意*账号,当剩余额度大于0但低于总额度的20%时,应标记为"低额度"状态; |
|
|
当剩余额度为0或负数时,应标记为"无额度"状态。 |
|
|
|
|
|
**Validates: Requirements 5.5, 5.6** |
|
|
""" |
|
|
quota = CachedQuota( |
|
|
account_id="test_account", |
|
|
balance=balance, |
|
|
usage_limit=usage_limit, |
|
|
updated_at=time.time() |
|
|
) |
|
|
|
|
|
remaining_percent = (balance / usage_limit) * 100 if usage_limit > 0 else 0 |
|
|
|
|
|
if balance <= 0: |
|
|
|
|
|
assert quota.balance_status == "exhausted", f"余额 {balance} 应该是 exhausted 状态" |
|
|
assert quota.is_exhausted, f"余额 {balance} 应该标记为 is_exhausted" |
|
|
assert not quota.is_low_balance, f"余额 {balance} 不应该标记为 is_low_balance" |
|
|
assert not quota.is_available(), f"余额 {balance} 不应该可用" |
|
|
elif remaining_percent <= 20: |
|
|
|
|
|
assert quota.balance_status == "low", f"余额 {balance}/{usage_limit} ({remaining_percent:.1f}%) 应该是 low 状态" |
|
|
assert quota.is_low_balance, f"余额 {balance}/{usage_limit} 应该标记为 is_low_balance" |
|
|
assert not quota.is_exhausted, f"余额 {balance} 不应该标记为 is_exhausted" |
|
|
assert quota.is_available(), f"余额 {balance} 应该可用" |
|
|
else: |
|
|
|
|
|
assert quota.balance_status == "normal", f"余额 {balance}/{usage_limit} ({remaining_percent:.1f}%) 应该是 normal 状态" |
|
|
assert not quota.is_low_balance, f"余额 {balance}/{usage_limit} 不应该标记为 is_low_balance" |
|
|
assert not quota.is_exhausted, f"余额 {balance} 不应该标记为 is_exhausted" |
|
|
assert quota.is_available(), f"余额 {balance} 应该可用" |
|
|
|
|
|
def test_boundary_values(self): |
|
|
"""边界值测试""" |
|
|
|
|
|
quota_20 = CachedQuota(account_id="test", balance=200.0, usage_limit=1000.0) |
|
|
assert quota_20.balance_status == "low" |
|
|
|
|
|
|
|
|
quota_21 = CachedQuota(account_id="test", balance=210.0, usage_limit=1000.0) |
|
|
assert quota_21.balance_status == "normal" |
|
|
|
|
|
|
|
|
quota_0 = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0) |
|
|
assert quota_0.balance_status == "exhausted" |
|
|
|
|
|
|
|
|
quota_neg = CachedQuota(account_id="test", balance=-10.0, usage_limit=1000.0) |
|
|
assert quota_neg.balance_status == "exhausted" |
|
|
|