z.ai / tests /test_admin_stats.py
Gemini CLI
Configure for Hugging Face Spaces
7864524
import json
from datetime import datetime
from urllib.parse import urlencode
import pytest
from starlette.requests import Request
from app.admin import api as admin_api
from app.admin.stats import collect_admin_stats, format_uptime
from app.services import token_dao as token_dao_module
from app.services.request_log_dao import RequestLogDAO
from app.services.token_dao import TokenDAO
from app.utils import token_pool as token_pool_module
from app.utils.token_pool import TokenPool, sync_token_stats_to_db
class DummyPool:
def __init__(self, status):
self._status = status
def get_pool_status(self):
return self._status
def _make_get_request(path: str, query: dict[str, str] | None = None) -> Request:
query_string = urlencode(query or {}).encode()
async def receive():
return {"type": "http.request", "body": b"", "more_body": False}
scope = {
"type": "http",
"http_version": "1.1",
"method": "GET",
"scheme": "http",
"path": path,
"raw_path": path.encode(),
"query_string": query_string,
"headers": [],
"client": ("testclient", 50000),
"server": ("testserver", 80),
}
return Request(scope, receive)
@pytest.mark.asyncio
async def test_collect_admin_stats_uses_request_logs_and_token_inventory(tmp_path):
db_path = tmp_path / "admin_stats.db"
token_dao = TokenDAO(str(db_path))
await token_dao.init_database()
request_log_dao = RequestLogDAO(str(db_path))
await token_dao.add_token("zai", "token-user-1", validate=False)
await token_dao.add_token("zai", "token-user-2", validate=False)
await token_dao.add_token(
"zai",
"token-guest-1",
token_type="guest",
validate=False,
)
unknown_token_id = await token_dao.add_token(
"zai",
"token-unknown-1",
token_type="unknown",
validate=False,
)
await token_dao.update_token_status(int(unknown_token_id), False)
await request_log_dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=200,
success=True,
duration=0.5,
input_tokens=100,
output_tokens=40,
cache_read_tokens=20,
total_tokens=140,
)
await request_log_dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=500,
success=False,
duration=1.2,
input_tokens=60,
output_tokens=10,
cache_creation_tokens=15,
total_tokens=70,
error_message="upstream failed",
)
await request_log_dao.add_log(
provider="zai",
endpoint="/v1/messages",
source="pytest",
protocol="anthropic",
client_name="pytest",
model="glm-4.5",
status_code=200,
success=True,
duration=0.9,
input_tokens=30,
output_tokens=20,
total_tokens=50,
)
await request_log_dao.add_log(
provider="other",
endpoint="/ignored",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-ignored",
status_code=200,
success=True,
duration=0.1,
)
stats = await collect_admin_stats(
"zai",
token_dao=token_dao,
request_log_dao=request_log_dao,
token_pool=DummyPool(
{
"total_tokens": 2,
"available_tokens": 1,
"healthy_tokens": 1,
"unhealthy_tokens": 1,
}
),
)
assert stats["total_tokens"] == 4
assert stats["enabled_tokens"] == 3
assert stats["user_tokens"] == 2
assert stats["guest_tokens"] == 1
assert stats["unknown_tokens"] == 1
assert stats["pool_total_tokens"] == 2
assert stats["available_tokens"] == 1
assert stats["healthy_tokens"] == 1
assert stats["unhealthy_tokens"] == 1
assert stats["total_requests"] == 3
assert stats["successful_requests"] == 2
assert stats["failed_requests"] == 1
assert stats["success_rate"] == pytest.approx(66.7)
assert stats["input_tokens"] == 190
assert stats["output_tokens"] == 70
assert stats["total_consumed_tokens"] == 260
assert stats["cache_creation_tokens"] == 15
assert stats["cache_read_tokens"] == 20
assert stats["total_cache_tokens"] == 35
assert stats["cache_creation_requests"] == 1
assert stats["cache_hit_requests"] == 1
assert stats["average_latency"] == pytest.approx(0.87, rel=1e-2)
assert stats["trend_window"] == "7d"
assert len(stats["usage_trend"]) == 7
assert stats["usage_trend"][-1]["total_tokens"] == 260
assert stats["usage_trend"][-1]["cache_total_tokens"] == 35
@pytest.mark.asyncio
async def test_get_model_stats_from_db_includes_recent_same_day_logs(tmp_path):
dao = RequestLogDAO(str(tmp_path / "request_logs.db"))
await dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=200,
success=True,
duration=0.25,
input_tokens=10,
output_tokens=20,
)
stats = await dao.get_model_stats_from_db(hours=1)
assert "glm-5" in stats
assert stats["glm-5"]["total"] == 1
assert stats["glm-5"]["success"] == 1
assert stats["glm-5"]["failed"] == 0
@pytest.mark.asyncio
async def test_request_log_dao_supports_count_and_offset_pagination(tmp_path):
dao = RequestLogDAO(str(tmp_path / "request_logs_paging.db"))
for index in range(5):
await dao.add_log(
provider="zai",
endpoint=f"/v1/chat/completions/{index}",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=200,
success=True,
duration=0.1,
)
total_count = await dao.count_logs(provider="zai")
paged_logs = await dao.get_recent_logs(
limit=2,
offset=2,
provider="zai",
)
assert total_count == 5
assert len(paged_logs) == 2
assert paged_logs[0]["endpoint"] == "/v1/chat/completions/2"
assert paged_logs[1]["endpoint"] == "/v1/chat/completions/1"
@pytest.mark.asyncio
async def test_request_log_dao_returns_usage_trend_with_missing_days_filled(
tmp_path,
):
dao = RequestLogDAO(str(tmp_path / "request_logs_trend.db"))
await dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=200,
success=True,
duration=0.2,
input_tokens=12,
output_tokens=8,
cache_read_tokens=3,
total_tokens=20,
)
trend = await dao.get_provider_usage_trend("zai", days=7)
assert len(trend) == 7
assert sum(day["total_requests"] for day in trend) == 1
assert trend[-1]["total_tokens"] == 20
assert trend[-1]["cache_total_tokens"] == 3
@pytest.mark.asyncio
async def test_request_log_dao_returns_hourly_usage_trend_with_missing_hours(
tmp_path,
):
dao = RequestLogDAO(str(tmp_path / "request_logs_hourly_trend.db"))
log_id = await dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=200,
success=True,
duration=0.2,
input_tokens=18,
output_tokens=7,
cache_creation_tokens=5,
cache_read_tokens=3,
total_tokens=25,
)
async with dao.get_connection() as conn:
await conn.execute(
"UPDATE request_logs SET timestamp = ? WHERE id = ?",
("2026-03-10 12:00:00", log_id),
)
await conn.commit()
trend = await dao.get_provider_usage_trend(
"zai",
window="24h",
now=datetime(2026, 3, 10, 12, 0, 0),
)
assert len(trend) == 24
assert trend[-1]["label"] == "12:00"
assert trend[-1]["tooltip_label"] == "2026-03-10 12:00"
assert trend[-1]["input_tokens"] == 18
assert trend[-1]["output_tokens"] == 7
assert trend[-1]["cache_creation_tokens"] == 5
assert trend[-1]["cache_read_tokens"] == 3
assert sum(point["total_requests"] for point in trend) == 1
assert all(point["total_requests"] == 0 for point in trend[:-1])
@pytest.mark.asyncio
async def test_dashboard_usage_trend_api_returns_requested_window(
tmp_path,
monkeypatch,
):
dao = RequestLogDAO(str(tmp_path / "request_logs_api_trend.db"))
log_id = await dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest",
model="glm-5",
status_code=200,
success=True,
duration=0.2,
input_tokens=30,
output_tokens=12,
cache_read_tokens=4,
total_tokens=42,
)
async with dao.get_connection() as conn:
await conn.execute(
"UPDATE request_logs SET timestamp = ? WHERE id = ?",
("2026-03-10 09:00:00", log_id),
)
await conn.commit()
async def fixed_usage_trend(provider, days=None, *, window=None, now=None):
return await RequestLogDAO.get_provider_usage_trend(
dao,
provider,
days=days,
window=window,
now=datetime(2026, 3, 10, 12, 0, 0),
)
monkeypatch.setattr(dao, "get_provider_usage_trend", fixed_usage_trend)
monkeypatch.setattr(admin_api, "get_request_log_dao", lambda: dao)
request = _make_get_request(
"/admin/api/dashboard/usage-trend",
{"window": "24h"},
)
response = await admin_api.get_dashboard_usage_trend(request)
payload = json.loads(response.body.decode("utf-8"))
assert response.status_code == 200
assert payload["window"] == "24h"
assert len(payload["points"]) == 24
assert payload["points"][-4]["input_tokens"] == 30
assert payload["points"][-4]["cache_read_tokens"] == 4
@pytest.mark.asyncio
async def test_recent_logs_component_includes_usage_cache_and_latency_fields(
tmp_path,
monkeypatch,
):
dao = RequestLogDAO(str(tmp_path / "request_logs_recent_component.db"))
await dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="pytest",
protocol="openai",
client_name="pytest-client",
model="glm-5",
status_code=200,
success=True,
duration=1.25,
first_token_time=0.42,
input_tokens=111,
output_tokens=22,
cache_creation_tokens=9,
cache_read_tokens=7,
total_tokens=133,
)
monkeypatch.setattr(admin_api, "get_request_log_dao", lambda: dao)
request = _make_get_request(
"/admin/api/recent-logs",
{"page": "1", "page_size": "12"},
)
response = await admin_api.get_recent_logs(request)
body = response.body.decode("utf-8")
assert response.status_code == 200
assert "请求" in body
assert "标记" in body
assert "输入 / 输出" in body
assert "缓存创建 / 命中" in body
assert "用时 / 首字" in body
assert "111" in body
assert "22" in body
assert "9" in body
assert "7" in body
assert "1.25s" in body
assert "0.42s" in body
@pytest.mark.asyncio
async def test_recent_logs_component_deduplicates_client_and_source_labels(
tmp_path,
monkeypatch,
):
dao = RequestLogDAO(str(tmp_path / "request_logs_recent_dedupe.db"))
await dao.add_log(
provider="zai",
endpoint="/v1/chat/completions",
source="browser",
protocol="openai",
client_name="Browser",
model="glm-5",
status_code=200,
success=True,
duration=1.0,
)
monkeypatch.setattr(admin_api, "get_request_log_dao", lambda: dao)
request = _make_get_request(
"/admin/api/recent-logs",
{"page": "1", "page_size": "12"},
)
response = await admin_api.get_recent_logs(request)
body = response.body.decode("utf-8")
assert response.status_code == 200
assert "Browser" in body
assert "OpenAI" in body
assert "glm-5" in body
assert ">browser<" not in body
assert ">zai<" not in body
@pytest.mark.asyncio
async def test_token_dao_supports_count_and_offset_pagination(tmp_path):
dao = TokenDAO(str(tmp_path / "tokens_paging.db"))
await dao.init_database()
for index in range(5):
await dao.add_token("zai", f"token-{index}", validate=False)
total_count = await dao.count_tokens_by_provider("zai", enabled_only=False)
paged_tokens = await dao.get_tokens_by_provider(
"zai",
enabled_only=False,
limit=2,
offset=2,
)
assert total_count == 5
assert len(paged_tokens) == 2
assert paged_tokens[0]["token"] == "token-2"
assert paged_tokens[1]["token"] == "token-3"
@pytest.mark.asyncio
async def test_token_pool_realtime_usage_stats_sync_to_db(tmp_path, monkeypatch):
dao = TokenDAO(str(tmp_path / "token_usage.db"))
await dao.init_database()
token_id = await dao.add_token("zai", "token-usage", validate=False)
assert token_id is not None
pool = TokenPool([(token_id, "token-usage", "user")])
await pool.record_token_success("token-usage", dao=dao)
await pool.record_token_failure("token-usage", Exception("boom"), dao=dao)
stats = await dao.get_token_stats(token_id)
assert stats is not None
assert stats["total_requests"] == 2
assert stats["successful_requests"] == 1
assert stats["failed_requests"] == 1
monkeypatch.setattr(token_pool_module, "_token_pool", pool)
monkeypatch.setattr(token_dao_module, "_token_dao", dao)
await sync_token_stats_to_db()
stats_after_sync = await dao.get_token_stats(token_id)
assert stats_after_sync is not None
assert stats_after_sync["total_requests"] == 2
assert stats_after_sync["successful_requests"] == 1
assert stats_after_sync["failed_requests"] == 1
def test_format_uptime_formats_seconds_minutes_and_hours():
assert format_uptime(59) == "59秒"
assert format_uptime(3661) == "1小时 1分钟 1秒"