|
|
|
|
|
import httpx
|
|
|
import asyncio
|
|
|
import json
|
|
|
|
|
|
|
|
|
|
|
|
async def test_pool_service():
|
|
|
base_url = "http://localhost:8019"
|
|
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
|
|
|
|
try:
|
|
|
resp = await client.get(base_url, timeout=5)
|
|
|
print(f"根路径测试: {resp.status_code}")
|
|
|
print(f"响应: {resp.json()}")
|
|
|
except Exception as e:
|
|
|
print(f"根路径测试失败: {e}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
resp = await client.get(f"{base_url}/api/status", timeout=5)
|
|
|
print(f"\n状态接口测试: {resp.status_code}")
|
|
|
print(f"响应: {json.dumps(resp.json(), indent=2)}")
|
|
|
except Exception as e:
|
|
|
print(f"状态接口测试失败: {e}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
resp = await client.post(
|
|
|
f"{base_url}/api/accounts/allocate",
|
|
|
json={"count": 1, "session_duration": 1800},
|
|
|
timeout=10
|
|
|
)
|
|
|
print(f"\n分配账号测试: {resp.status_code}")
|
|
|
if resp.status_code == 200:
|
|
|
data = resp.json()
|
|
|
print(f"成功分配,会话ID: {data.get('session_id')}")
|
|
|
print(f"账号数量: {len(data.get('accounts', []))}")
|
|
|
else:
|
|
|
print(f"分配失败: {resp.text}")
|
|
|
except Exception as e:
|
|
|
print(f"分配账号测试失败: {e}")
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
await test_pool_service()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main())
|
|
|
|