warp2api / test /test_pool_api.py
baohuixiao's picture
Upload 115 files
8551878 verified
# test_pool_api.py
import httpx
import asyncio
import json
# 测试账号池服务连接
async def test_pool_service():
base_url = "http://localhost:8019"
async with httpx.AsyncClient() as client:
# 1. 测试根路径
try:
resp = await client.get(base_url, timeout=5)
print(f"根路径测试: {resp.status_code}")
print(f"响应: {resp.json()}")
except Exception as e:
print(f"根路径测试失败: {e}")
# 2. 测试状态接口
try:
resp = await client.get(f"{base_url}/api/status", timeout=5)
print(f"\n状态接口测试: {resp.status_code}")
print(f"响应: {json.dumps(resp.json(), indent=2)}")
except Exception as e:
print(f"状态接口测试失败: {e}")
# 3. 测试分配账号
try:
resp = await client.post(
f"{base_url}/api/accounts/allocate",
json={"count": 1, "session_duration": 1800},
timeout=10
)
print(f"\n分配账号测试: {resp.status_code}")
if resp.status_code == 200:
data = resp.json()
print(f"成功分配,会话ID: {data.get('session_id')}")
print(f"账号数量: {len(data.get('accounts', []))}")
else:
print(f"分配失败: {resp.text}")
except Exception as e:
print(f"分配账号测试失败: {e}")
async def main():
await test_pool_service()
if __name__ == "__main__":
asyncio.run(main())