File size: 4,506 Bytes
070daf8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""

WebSocket Client Test for Factor Agent

Tests the WebSocket endpoint with LLM response handling

"""

import asyncio
import json
import logging
from typing import Optional
from urllib.parse import urlparse, urlunparse

try:
    import websockets
except ImportError:
    print("Installing websockets...")
    import subprocess
    subprocess.check_call(["pip", "install", "websockets"])
    import websockets

import httpx

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

BASE_URL = "http://localhost:7860"


def get_ws_url(base_url: str) -> str:
    """Convert HTTP URL to WebSocket URL"""
    parsed = urlparse(base_url)
    # Convert http -> ws, https -> wss
    scheme = "wss" if parsed.scheme == "https" else "ws"
    return urlunparse((scheme, parsed.netloc, parsed.path, parsed.params, parsed.query, parsed.fragment))


async def create_session() -> Optional[str]:
    """Create a new session via HTTP"""
    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(f"{BASE_URL}/api/session")
            if response.status_code == 201:
                data = response.json()
                session_id = data.get("session_id")
                logger.info(f"βœ… Session created: {session_id}")
                return session_id
            else:
                logger.error(f"❌ Failed to create session: {response.text}")
                return None
    except Exception as e:
        logger.error(f"❌ Error creating session: {e}")
        return None


async def test_websocket(session_id: str):
    """Test WebSocket communication with LLM responses"""
    
    ws_base = get_ws_url(BASE_URL)
    ws_path = f"{ws_base}/api/ws/{session_id}"
    logger.info(f"πŸ”— Connecting to WebSocket: {ws_path}")
    
    try:
        async with websockets.connect(ws_path) as websocket:
            # Receive ready message
            ready_msg = await websocket.recv()
            logger.info(f"πŸ“¨ Ready message: {ready_msg}")
            
            # Test messages
            test_messages = [
                "Hello, how are you?",
                "What is 2+2?",
                "Tell me a joke",
            ]
            
            for test_msg in test_messages:
                logger.info(f"\nπŸ“€ Sending: {test_msg}")
                
                # Send message
                await websocket.send(json.dumps({
                    "type": "message",
                    "text": test_msg
                }))
                
                # Receive acknowledgment
                ack_msg = await asyncio.wait_for(websocket.recv(), timeout=5)
                ack_data = json.loads(ack_msg)
                logger.info(f"βœ“ ACK: {ack_data.get('event_type')}")
                
                # Receive response
                try:
                    response_msg = await asyncio.wait_for(websocket.recv(), timeout=30)
                    response_data = json.loads(response_msg)
                    
                    if response_data.get("event_type") == "message_response":
                        response_text = response_data["data"]["message"]
                        logger.info(f"πŸ€– Response: {response_text[:200]}...")
                    elif response_data.get("event_type") == "error":
                        logger.error(f"❌ Error: {response_data['data']['message']}")
                    else:
                        logger.info(f"πŸ“© Message: {response_data}")
                        
                except asyncio.TimeoutError:
                    logger.error("⏱️ Timeout waiting for response")
                
                # Wait between messages
                await asyncio.sleep(1)
            
            logger.info("\nβœ… WebSocket test completed successfully!")
            
    except Exception as e:
        logger.error(f"❌ WebSocket error: {e}")


async def main():
    """Main test function"""
    logger.info("πŸš€ Starting WebSocket Client Test\n")
    
    # Create session
    session_id = await create_session()
    if not session_id:
        logger.error("Failed to create session")
        return
    
    # Test WebSocket
    await test_websocket(session_id)


if __name__ == "__main__":
    asyncio.run(main())