#!/usr/bin/env python3 """ WebSocket Client Test for Factor Agent Tests the WebSocket endpoint with LLM response handling """ import asyncio import json import logging from typing import Optional from urllib.parse import urlparse, urlunparse try: import websockets except ImportError: print("Installing websockets...") import subprocess subprocess.check_call(["pip", "install", "websockets"]) import websockets import httpx # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) BASE_URL = "http://localhost:7860" def get_ws_url(base_url: str) -> str: """Convert HTTP URL to WebSocket URL""" parsed = urlparse(base_url) # Convert http -> ws, https -> wss scheme = "wss" if parsed.scheme == "https" else "ws" return urlunparse((scheme, parsed.netloc, parsed.path, parsed.params, parsed.query, parsed.fragment)) async def create_session() -> Optional[str]: """Create a new session via HTTP""" try: async with httpx.AsyncClient() as client: response = await client.post(f"{BASE_URL}/api/session") if response.status_code == 201: data = response.json() session_id = data.get("session_id") logger.info(f"āœ… Session created: {session_id}") return session_id else: logger.error(f"āŒ Failed to create session: {response.text}") return None except Exception as e: logger.error(f"āŒ Error creating session: {e}") return None async def test_websocket(session_id: str): """Test WebSocket communication with LLM responses""" ws_base = get_ws_url(BASE_URL) ws_path = f"{ws_base}/api/ws/{session_id}" logger.info(f"šŸ”— Connecting to WebSocket: {ws_path}") try: async with websockets.connect(ws_path) as websocket: # Receive ready message ready_msg = await websocket.recv() logger.info(f"šŸ“Ø Ready message: {ready_msg}") # Test messages test_messages = [ "Hello, how are you?", "What is 2+2?", "Tell me a joke", ] for test_msg in test_messages: logger.info(f"\nšŸ“¤ Sending: {test_msg}") # Send message await websocket.send(json.dumps({ "type": "message", "text": test_msg })) # Receive acknowledgment ack_msg = await asyncio.wait_for(websocket.recv(), timeout=5) ack_data = json.loads(ack_msg) logger.info(f"āœ“ ACK: {ack_data.get('event_type')}") # Receive response try: response_msg = await asyncio.wait_for(websocket.recv(), timeout=30) response_data = json.loads(response_msg) if response_data.get("event_type") == "message_response": response_text = response_data["data"]["message"] logger.info(f"šŸ¤– Response: {response_text[:200]}...") elif response_data.get("event_type") == "error": logger.error(f"āŒ Error: {response_data['data']['message']}") else: logger.info(f"šŸ“© Message: {response_data}") except asyncio.TimeoutError: logger.error("ā±ļø Timeout waiting for response") # Wait between messages await asyncio.sleep(1) logger.info("\nāœ… WebSocket test completed successfully!") except Exception as e: logger.error(f"āŒ WebSocket error: {e}") async def main(): """Main test function""" logger.info("šŸš€ Starting WebSocket Client Test\n") # Create session session_id = await create_session() if not session_id: logger.error("Failed to create session") return # Test WebSocket await test_websocket(session_id) if __name__ == "__main__": asyncio.run(main())