kiroproxy / tests /test_quota_cache.py
KiroProxy User
chore: repo cleanup and maintenance
0edbd7b
"""QuotaCache 属性测试和单元测试
Property 1: 缓存存储完整性 - 存储后读取应返回完整数据
Property 2: 缓存持久化往返 - 保存后加载应产生等价状态
"""
import os
import time
import tempfile
from pathlib import Path
import pytest
from hypothesis import given, strategies as st, settings, assume
# 添加项目路径
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from kiro_proxy.core.quota_cache import (
QuotaCache, CachedQuota, DEFAULT_CACHE_MAX_AGE
)
# ============== 数据生成策略 ==============
# 固定的时间戳范围,避免 hypothesis 的 flaky 问题
FIXED_MAX_TIMESTAMP = 2000000000.0 # 约 2033 年
@st.composite
def valid_quota_strategy(draw):
"""生成有效的 CachedQuota 数据"""
usage_limit = draw(st.floats(min_value=0.0, max_value=10000.0, allow_nan=False, allow_infinity=False))
current_usage = draw(st.floats(min_value=0.0, max_value=usage_limit, allow_nan=False, allow_infinity=False))
balance = usage_limit - current_usage
usage_percent = (current_usage / usage_limit * 100) if usage_limit > 0 else 0.0
free_trial_limit = draw(st.floats(min_value=0.0, max_value=1000.0, allow_nan=False, allow_infinity=False))
free_trial_usage = draw(st.floats(min_value=0.0, max_value=free_trial_limit, allow_nan=False, allow_infinity=False))
bonus_limit = draw(st.floats(min_value=0.0, max_value=500.0, allow_nan=False, allow_infinity=False))
bonus_usage = draw(st.floats(min_value=0.0, max_value=bonus_limit, allow_nan=False, allow_infinity=False))
return CachedQuota(
account_id=draw(st.text(alphabet=st.characters(whitelist_categories=('L', 'N'), whitelist_characters='_-'), min_size=1, max_size=32)),
usage_limit=usage_limit,
current_usage=current_usage,
balance=balance,
usage_percent=round(usage_percent, 2),
is_low_balance=balance < usage_limit * 0.2 if usage_limit > 0 else False,
subscription_title=draw(st.text(min_size=0, max_size=50)),
free_trial_limit=free_trial_limit,
free_trial_usage=free_trial_usage,
bonus_limit=bonus_limit,
bonus_usage=bonus_usage,
updated_at=draw(st.floats(min_value=0.0, max_value=FIXED_MAX_TIMESTAMP, allow_nan=False, allow_infinity=False)),
error=draw(st.one_of(st.none(), st.text(min_size=1, max_size=100)))
)
@st.composite
def account_id_strategy(draw):
"""生成有效的账号ID"""
return draw(st.text(
alphabet=st.characters(whitelist_categories=('L', 'N'), whitelist_characters='_-'),
min_size=1,
max_size=32
))
# ============== Property 1: 缓存存储完整性 ==============
# **Validates: Requirements 1.2, 2.3**
class TestCacheStorageIntegrity:
"""Property 1: 缓存存储完整性测试"""
@given(quota=valid_quota_strategy())
@settings(max_examples=100)
def test_set_then_get_returns_complete_data(self, quota: CachedQuota):
"""
Property 1: 缓存存储完整性
*对于任意*有效的额度信息,当存储到 QuotaCache 后,
读取该账号的缓存应返回包含所有必要字段的完整数据。
**Validates: Requirements 1.2, 2.3**
"""
# 使用临时文件避免影响真实缓存
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
# 存储
cache.set(quota.account_id, quota)
# 读取
retrieved = cache.get(quota.account_id)
# 验证完整性
assert retrieved is not None, "缓存应该存在"
assert retrieved.account_id == quota.account_id, "account_id 应该一致"
assert retrieved.usage_limit == quota.usage_limit, "usage_limit 应该一致"
assert retrieved.current_usage == quota.current_usage, "current_usage 应该一致"
assert retrieved.balance == quota.balance, "balance 应该一致"
assert retrieved.updated_at == quota.updated_at, "updated_at 应该一致"
assert retrieved.error == quota.error, "error 应该一致"
finally:
# 清理临时文件
if os.path.exists(cache_file):
os.unlink(cache_file)
@given(quotas=st.lists(valid_quota_strategy(), min_size=1, max_size=10, unique_by=lambda q: q.account_id))
@settings(max_examples=50)
def test_multiple_accounts_stored_independently(self, quotas: list):
"""多个账号的缓存应该独立存储"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
# 存储所有账号
for quota in quotas:
cache.set(quota.account_id, quota)
# 验证每个账号都能正确读取
for quota in quotas:
retrieved = cache.get(quota.account_id)
assert retrieved is not None
assert retrieved.account_id == quota.account_id
assert retrieved.balance == quota.balance
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
# ============== Property 2: 缓存持久化往返 ==============
# **Validates: Requirements 7.1, 7.2**
class TestCachePersistenceRoundTrip:
"""Property 2: 缓存持久化往返测试"""
@given(quotas=st.lists(valid_quota_strategy(), min_size=1, max_size=10, unique_by=lambda q: q.account_id))
@settings(max_examples=100)
def test_save_then_load_preserves_data(self, quotas: list):
"""
Property 2: 缓存持久化往返
*对于任意*有效的 QuotaCache 状态,保存到文件后再加载,
应产生等价的缓存状态(所有账号的额度信息保持一致)。
**Validates: Requirements 7.1, 7.2**
"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
# 创建并填充缓存
cache1 = QuotaCache(cache_file=cache_file)
for quota in quotas:
cache1.set(quota.account_id, quota)
# 保存到文件
success = cache1.save_to_file()
assert success, "保存应该成功"
# 创建新缓存实例并加载
cache2 = QuotaCache(cache_file=cache_file)
# 验证数据一致性
all_cache1 = cache1.get_all()
all_cache2 = cache2.get_all()
assert len(all_cache1) == len(all_cache2), "账号数量应该一致"
for account_id, quota1 in all_cache1.items():
quota2 = all_cache2.get(account_id)
assert quota2 is not None, f"账号 {account_id} 应该存在"
assert quota1.usage_limit == quota2.usage_limit
assert quota1.current_usage == quota2.current_usage
assert quota1.balance == quota2.balance
assert quota1.updated_at == quota2.updated_at
assert quota1.error == quota2.error
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
@given(quota=valid_quota_strategy())
@settings(max_examples=50)
def test_dict_roundtrip(self, quota: CachedQuota):
"""CachedQuota 的字典序列化往返"""
# 转换为字典
quota_dict = quota.to_dict()
# 从字典恢复
restored = CachedQuota.from_dict(quota_dict)
# 验证一致性
assert restored.account_id == quota.account_id
assert restored.usage_limit == quota.usage_limit
assert restored.current_usage == quota.current_usage
assert restored.balance == quota.balance
assert restored.updated_at == quota.updated_at
assert restored.error == quota.error
# ============== 单元测试:缓存过期检测 ==============
# **Validates: Requirements 7.3**
class TestCacheExpiration:
"""缓存过期检测单元测试"""
def test_fresh_cache_not_stale(self):
"""新缓存不应该过期"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
quota = CachedQuota(
account_id="test_account",
usage_limit=1000.0,
current_usage=500.0,
balance=500.0,
updated_at=time.time() # 当前时间
)
cache.set("test_account", quota)
assert not cache.is_stale("test_account"), "新缓存不应该过期"
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
def test_old_cache_is_stale(self):
"""旧缓存应该过期"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
quota = CachedQuota(
account_id="test_account",
usage_limit=1000.0,
current_usage=500.0,
balance=500.0,
updated_at=time.time() - DEFAULT_CACHE_MAX_AGE - 1 # 超过过期时间
)
cache.set("test_account", quota)
assert cache.is_stale("test_account"), "旧缓存应该过期"
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
def test_nonexistent_account_is_stale(self):
"""不存在的账号应该被视为过期"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
assert cache.is_stale("nonexistent"), "不存在的账号应该被视为过期"
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
# ============== 单元测试:文件读写错误处理 ==============
# **Validates: Requirements 7.3**
class TestFileErrorHandling:
"""文件读写错误处理单元测试"""
def test_load_nonexistent_file(self):
"""加载不存在的文件应该返回 False"""
cache = QuotaCache(cache_file="/nonexistent/path/cache.json")
result = cache.load_from_file()
assert result is False
def test_load_invalid_json(self):
"""加载无效 JSON 应该返回 False"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False, mode='w') as f:
f.write("invalid json {{{")
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
# 构造函数会尝试加载,但应该处理错误
assert len(cache.get_all()) == 0
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
def test_remove_account(self):
"""移除账号应该正常工作"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
quota = CachedQuota(
account_id="test_account",
usage_limit=1000.0,
updated_at=time.time()
)
cache.set("test_account", quota)
assert cache.get("test_account") is not None
cache.remove("test_account")
assert cache.get("test_account") is None
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
def test_clear_cache(self):
"""清空缓存应该正常工作"""
with tempfile.NamedTemporaryFile(suffix='.json', delete=False) as f:
cache_file = f.name
try:
cache = QuotaCache(cache_file=cache_file)
for i in range(5):
quota = CachedQuota(
account_id=f"account_{i}",
usage_limit=1000.0,
updated_at=time.time()
)
cache.set(f"account_{i}", quota)
assert len(cache.get_all()) == 5
cache.clear()
assert len(cache.get_all()) == 0
finally:
if os.path.exists(cache_file):
os.unlink(cache_file)
# ============== 单元测试:CachedQuota 辅助方法 ==============
class TestCachedQuotaMethods:
"""CachedQuota 辅助方法测试"""
def test_has_error(self):
"""has_error 方法测试"""
quota_ok = CachedQuota(account_id="test", error=None)
quota_err = CachedQuota(account_id="test", error="Some error")
assert not quota_ok.has_error()
assert quota_err.has_error()
def test_is_exhausted(self):
"""is_exhausted 属性测试"""
quota_ok = CachedQuota(account_id="test", balance=100.0, usage_limit=1000.0)
quota_zero = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0)
quota_negative = CachedQuota(account_id="test", balance=-10.0, usage_limit=1000.0)
quota_error = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0, error="Error")
assert not quota_ok.is_exhausted
assert quota_zero.is_exhausted
assert quota_negative.is_exhausted
assert not quota_error.is_exhausted # 有错误时不更新状态
def test_balance_status(self):
"""balance_status 属性测试"""
# 正常状态 (>20%)
quota_normal = CachedQuota(account_id="test", balance=500.0, usage_limit=1000.0)
assert quota_normal.balance_status == "normal"
assert not quota_normal.is_low_balance
assert not quota_normal.is_exhausted
# 低额度状态 (0-20%)
quota_low = CachedQuota(account_id="test", balance=100.0, usage_limit=1000.0)
assert quota_low.balance_status == "low"
assert quota_low.is_low_balance
assert not quota_low.is_exhausted
# 无额度状态 (<=0)
quota_exhausted = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0)
assert quota_exhausted.balance_status == "exhausted"
assert not quota_exhausted.is_low_balance
assert quota_exhausted.is_exhausted
def test_is_available(self):
"""is_available 方法测试"""
quota_ok = CachedQuota(account_id="test", balance=100.0, usage_limit=1000.0)
quota_exhausted = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0)
quota_error = CachedQuota(account_id="test", balance=100.0, error="Error")
assert quota_ok.is_available()
assert not quota_exhausted.is_available()
assert not quota_error.is_available()
def test_from_error(self):
"""from_error 工厂方法测试"""
quota = CachedQuota.from_error("test_account", "Connection failed")
assert quota.account_id == "test_account"
assert quota.error == "Connection failed"
assert quota.has_error()
assert quota.updated_at > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])
# ============== Property 10: 低额度与无额度区分 ==============
# **Validates: Requirements 5.5, 5.6**
class TestBalanceStatusDistinction:
"""Property 10: 低额度与无额度区分测试"""
@given(
balance=st.floats(min_value=-100.0, max_value=1000.0, allow_nan=False, allow_infinity=False),
usage_limit=st.floats(min_value=100.0, max_value=1000.0, allow_nan=False, allow_infinity=False)
)
@settings(max_examples=100)
def test_balance_status_distinction(self, balance: float, usage_limit: float):
"""
Property 10: 低额度与无额度区分
*对于任意*账号,当剩余额度大于0但低于总额度的20%时,应标记为"低额度"状态;
当剩余额度为0或负数时,应标记为"无额度"状态。
**Validates: Requirements 5.5, 5.6**
"""
quota = CachedQuota(
account_id="test_account",
balance=balance,
usage_limit=usage_limit,
updated_at=time.time()
)
remaining_percent = (balance / usage_limit) * 100 if usage_limit > 0 else 0
if balance <= 0:
# 无额度状态
assert quota.balance_status == "exhausted", f"余额 {balance} 应该是 exhausted 状态"
assert quota.is_exhausted, f"余额 {balance} 应该标记为 is_exhausted"
assert not quota.is_low_balance, f"余额 {balance} 不应该标记为 is_low_balance"
assert not quota.is_available(), f"余额 {balance} 不应该可用"
elif remaining_percent <= 20:
# 低额度状态
assert quota.balance_status == "low", f"余额 {balance}/{usage_limit} ({remaining_percent:.1f}%) 应该是 low 状态"
assert quota.is_low_balance, f"余额 {balance}/{usage_limit} 应该标记为 is_low_balance"
assert not quota.is_exhausted, f"余额 {balance} 不应该标记为 is_exhausted"
assert quota.is_available(), f"余额 {balance} 应该可用"
else:
# 正常状态
assert quota.balance_status == "normal", f"余额 {balance}/{usage_limit} ({remaining_percent:.1f}%) 应该是 normal 状态"
assert not quota.is_low_balance, f"余额 {balance}/{usage_limit} 不应该标记为 is_low_balance"
assert not quota.is_exhausted, f"余额 {balance} 不应该标记为 is_exhausted"
assert quota.is_available(), f"余额 {balance} 应该可用"
def test_boundary_values(self):
"""边界值测试"""
# 正好 20%
quota_20 = CachedQuota(account_id="test", balance=200.0, usage_limit=1000.0)
assert quota_20.balance_status == "low"
# 刚好超过 20%
quota_21 = CachedQuota(account_id="test", balance=210.0, usage_limit=1000.0)
assert quota_21.balance_status == "normal"
# 正好 0
quota_0 = CachedQuota(account_id="test", balance=0.0, usage_limit=1000.0)
assert quota_0.balance_status == "exhausted"
# 负数
quota_neg = CachedQuota(account_id="test", balance=-10.0, usage_limit=1000.0)
assert quota_neg.balance_status == "exhausted"