water3 / test_websocket_client.py
onewayto's picture
Upload 187 files
070daf8 verified
#!/usr/bin/env python3
"""
WebSocket Client Test for Factor Agent
Tests the WebSocket endpoint with LLM response handling
"""
import asyncio
import json
import logging
from typing import Optional
from urllib.parse import urlparse, urlunparse
try:
import websockets
except ImportError:
print("Installing websockets...")
import subprocess
subprocess.check_call(["pip", "install", "websockets"])
import websockets
import httpx
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
BASE_URL = "http://localhost:7860"
def get_ws_url(base_url: str) -> str:
"""Convert HTTP URL to WebSocket URL"""
parsed = urlparse(base_url)
# Convert http -> ws, https -> wss
scheme = "wss" if parsed.scheme == "https" else "ws"
return urlunparse((scheme, parsed.netloc, parsed.path, parsed.params, parsed.query, parsed.fragment))
async def create_session() -> Optional[str]:
"""Create a new session via HTTP"""
try:
async with httpx.AsyncClient() as client:
response = await client.post(f"{BASE_URL}/api/session")
if response.status_code == 201:
data = response.json()
session_id = data.get("session_id")
logger.info(f"βœ… Session created: {session_id}")
return session_id
else:
logger.error(f"❌ Failed to create session: {response.text}")
return None
except Exception as e:
logger.error(f"❌ Error creating session: {e}")
return None
async def test_websocket(session_id: str):
"""Test WebSocket communication with LLM responses"""
ws_base = get_ws_url(BASE_URL)
ws_path = f"{ws_base}/api/ws/{session_id}"
logger.info(f"πŸ”— Connecting to WebSocket: {ws_path}")
try:
async with websockets.connect(ws_path) as websocket:
# Receive ready message
ready_msg = await websocket.recv()
logger.info(f"πŸ“¨ Ready message: {ready_msg}")
# Test messages
test_messages = [
"Hello, how are you?",
"What is 2+2?",
"Tell me a joke",
]
for test_msg in test_messages:
logger.info(f"\nπŸ“€ Sending: {test_msg}")
# Send message
await websocket.send(json.dumps({
"type": "message",
"text": test_msg
}))
# Receive acknowledgment
ack_msg = await asyncio.wait_for(websocket.recv(), timeout=5)
ack_data = json.loads(ack_msg)
logger.info(f"βœ“ ACK: {ack_data.get('event_type')}")
# Receive response
try:
response_msg = await asyncio.wait_for(websocket.recv(), timeout=30)
response_data = json.loads(response_msg)
if response_data.get("event_type") == "message_response":
response_text = response_data["data"]["message"]
logger.info(f"πŸ€– Response: {response_text[:200]}...")
elif response_data.get("event_type") == "error":
logger.error(f"❌ Error: {response_data['data']['message']}")
else:
logger.info(f"πŸ“© Message: {response_data}")
except asyncio.TimeoutError:
logger.error("⏱️ Timeout waiting for response")
# Wait between messages
await asyncio.sleep(1)
logger.info("\nβœ… WebSocket test completed successfully!")
except Exception as e:
logger.error(f"❌ WebSocket error: {e}")
async def main():
"""Main test function"""
logger.info("πŸš€ Starting WebSocket Client Test\n")
# Create session
session_id = await create_session()
if not session_id:
logger.error("Failed to create session")
return
# Test WebSocket
await test_websocket(session_id)
if __name__ == "__main__":
asyncio.run(main())