| |
|
| | """
|
| | WebSocket Client Test for Factor Agent
|
| | Tests the WebSocket endpoint with LLM response handling
|
| | """
|
| |
|
| | import asyncio
|
| | import json
|
| | import logging
|
| | from typing import Optional
|
| | from urllib.parse import urlparse, urlunparse
|
| |
|
| | try:
|
| | import websockets
|
| | except ImportError:
|
| | print("Installing websockets...")
|
| | import subprocess
|
| | subprocess.check_call(["pip", "install", "websockets"])
|
| | import websockets
|
| |
|
| | import httpx
|
| |
|
| |
|
| | logging.basicConfig(
|
| | level=logging.INFO,
|
| | format='%(asctime)s - %(levelname)s - %(message)s'
|
| | )
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | BASE_URL = "http://localhost:7860"
|
| |
|
| |
|
| | def get_ws_url(base_url: str) -> str:
|
| | """Convert HTTP URL to WebSocket URL"""
|
| | parsed = urlparse(base_url)
|
| |
|
| | scheme = "wss" if parsed.scheme == "https" else "ws"
|
| | return urlunparse((scheme, parsed.netloc, parsed.path, parsed.params, parsed.query, parsed.fragment))
|
| |
|
| |
|
| | async def create_session() -> Optional[str]:
|
| | """Create a new session via HTTP"""
|
| | try:
|
| | async with httpx.AsyncClient() as client:
|
| | response = await client.post(f"{BASE_URL}/api/session")
|
| | if response.status_code == 201:
|
| | data = response.json()
|
| | session_id = data.get("session_id")
|
| | logger.info(f"β
Session created: {session_id}")
|
| | return session_id
|
| | else:
|
| | logger.error(f"β Failed to create session: {response.text}")
|
| | return None
|
| | except Exception as e:
|
| | logger.error(f"β Error creating session: {e}")
|
| | return None
|
| |
|
| |
|
| | async def test_websocket(session_id: str):
|
| | """Test WebSocket communication with LLM responses"""
|
| |
|
| | ws_base = get_ws_url(BASE_URL)
|
| | ws_path = f"{ws_base}/api/ws/{session_id}"
|
| | logger.info(f"π Connecting to WebSocket: {ws_path}")
|
| |
|
| | try:
|
| | async with websockets.connect(ws_path) as websocket:
|
| |
|
| | ready_msg = await websocket.recv()
|
| | logger.info(f"π¨ Ready message: {ready_msg}")
|
| |
|
| |
|
| | test_messages = [
|
| | "Hello, how are you?",
|
| | "What is 2+2?",
|
| | "Tell me a joke",
|
| | ]
|
| |
|
| | for test_msg in test_messages:
|
| | logger.info(f"\nπ€ Sending: {test_msg}")
|
| |
|
| |
|
| | await websocket.send(json.dumps({
|
| | "type": "message",
|
| | "text": test_msg
|
| | }))
|
| |
|
| |
|
| | ack_msg = await asyncio.wait_for(websocket.recv(), timeout=5)
|
| | ack_data = json.loads(ack_msg)
|
| | logger.info(f"β ACK: {ack_data.get('event_type')}")
|
| |
|
| |
|
| | try:
|
| | response_msg = await asyncio.wait_for(websocket.recv(), timeout=30)
|
| | response_data = json.loads(response_msg)
|
| |
|
| | if response_data.get("event_type") == "message_response":
|
| | response_text = response_data["data"]["message"]
|
| | logger.info(f"π€ Response: {response_text[:200]}...")
|
| | elif response_data.get("event_type") == "error":
|
| | logger.error(f"β Error: {response_data['data']['message']}")
|
| | else:
|
| | logger.info(f"π© Message: {response_data}")
|
| |
|
| | except asyncio.TimeoutError:
|
| | logger.error("β±οΈ Timeout waiting for response")
|
| |
|
| |
|
| | await asyncio.sleep(1)
|
| |
|
| | logger.info("\nβ
WebSocket test completed successfully!")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"β WebSocket error: {e}")
|
| |
|
| |
|
| | async def main():
|
| | """Main test function"""
|
| | logger.info("π Starting WebSocket Client Test\n")
|
| |
|
| |
|
| | session_id = await create_session()
|
| | if not session_id:
|
| | logger.error("Failed to create session")
|
| | return
|
| |
|
| |
|
| | await test_websocket(session_id)
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | asyncio.run(main())
|
| |
|