BOSS commited on
Commit
2f8c384
·
0 Parent(s):

客户端

Browse files
Files changed (4) hide show
  1. .gitignore +3 -0
  2. README.md +36 -0
  3. client.py +250 -0
  4. requirements.txt +3 -0
.gitignore ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ .env
2
+ venv/
3
+ node_modules/
README.md ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Client agent demo
2
+
3
+ This is a minimal client agent that connects to the control server WebSocket and forwards TCP connections from a local port to the server using a simple message framing protocol. This demo is for prototyping only and not production-ready.
4
+
5
+ ## Features
6
+ - Connects to control server WebSocket (`/ws`) and performs handshake
7
+ - Supports a simple multiplexing prototype: multiple local connections are assigned short connection IDs and multiplexed over a single WebSocket
8
+ - Can forward local TCP connections to the server using a framed protocol
9
+ - Offers an optional simple HTTP page response for browser tests (not full HTTP proxy)
10
+
11
+ ## Running locally
12
+
13
+ 1. Create virtual environment and install dependencies:
14
+
15
+ ```bash
16
+ cd client
17
+ python -m venv .venv
18
+ source .venv/bin/activate
19
+ pip install -r requirements.txt
20
+ ```
21
+
22
+ 2. Run the client pointing to local server:
23
+
24
+ ```bash
25
+ python client.py --server ws://localhost:8000/ws --local-port 9000
26
+ ```
27
+
28
+ 3. Test with a simple TCP client (keeps connection open):
29
+
30
+ ```bash
31
+ python test_local_tcp.py
32
+ ```
33
+
34
+ 4. For browser test: open `http://localhost:9000` — the client will return a simple demo page (prototype only).
35
+
36
+ Note: For remote HF Space server, use `wss://<your-space-host>/ws` as the `--server` URL.
client.py ADDED
@@ -0,0 +1,250 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import asyncio
3
+ import struct
4
+ import websockets
5
+ import socket
6
+ import uuid
7
+
8
+ # Simple length-prefixed framing
9
+ # frame: 4-byte big-endian length + payload
10
+
11
+ async def websocket_send_frame(ws, data: bytes, frame_type: bytes = b'D'): # frame_type: b'D' for data, b'O' for open, etc.
12
+ # frame format: 1-byte type + 4-byte length + payload
13
+ header = frame_type + struct.pack('>I', len(data))
14
+ await ws.send(header + data)
15
+
16
+ async def websocket_recv_frame(ws, recv_lock: asyncio.Lock):
17
+ # Serialize calls to ws.recv using recv_lock to avoid concurrent recv errors
18
+ async with recv_lock:
19
+ data = await ws.recv()
20
+ if isinstance(data, str):
21
+ data = data.encode()
22
+ if len(data) < 5:
23
+ raise ValueError('frame too short')
24
+ frame_type = data[0:1]
25
+ length = struct.unpack('>I', data[1:5])[0]
26
+ payload = data[5:5+length]
27
+ return frame_type, payload
28
+
29
+ async def handle_local_connection(local_reader, local_writer, ws, recv_lock: asyncio.Lock):
30
+ peername = None
31
+ try:
32
+ sock = local_writer.get_extra_info('socket')
33
+ if sock:
34
+ peername = sock.getpeername()
35
+ except Exception:
36
+ peername = None
37
+
38
+ print(f'New local connection from {peername}')
39
+
40
+ async def read_local_then_send():
41
+ try:
42
+ while True:
43
+ chunk = await local_reader.read(4096)
44
+ if not chunk:
45
+ # EOF
46
+ print('local EOF, sending close frame')
47
+ await websocket_send_frame(ws, b'__CLOSE__')
48
+ break
49
+ await websocket_send_frame(ws, chunk)
50
+ except asyncio.CancelledError:
51
+ pass
52
+ except Exception as e:
53
+ print('local read/send error', e)
54
+
55
+ async def recv_then_write_local():
56
+ try:
57
+ while True:
58
+ try:
59
+ frame_type, payload = await websocket_recv_frame(ws, recv_lock)
60
+ except Exception as e:
61
+ print('ws recv/local write error', e)
62
+ break
63
+ if payload == b'__CLOSE__':
64
+ print('received close frame from server')
65
+ break
66
+ try:
67
+ local_writer.write(payload)
68
+ await local_writer.drain()
69
+ except Exception as e:
70
+ print('local writer error while writing payload', e)
71
+ break
72
+ finally:
73
+ try:
74
+ local_writer.close()
75
+ await local_writer.wait_closed()
76
+ except Exception:
77
+ pass
78
+
79
+ # run both tasks and ensure exceptions are handled
80
+ task_send = asyncio.create_task(read_local_then_send())
81
+ task_recv = asyncio.create_task(recv_then_write_local())
82
+ done, pending = await asyncio.wait({task_send, task_recv}, return_when=asyncio.FIRST_EXCEPTION)
83
+ for t in pending:
84
+ t.cancel()
85
+ for t in done:
86
+ if t.exception():
87
+ print('task exception in handle_local_connection:', t.exception())
88
+ print(f'Connection from {peername} closed')
89
+
90
+
91
+ async def handle_local_connection_multiplex(local_reader, local_writer, ws, recv_lock: asyncio.Lock, conn_id: bytes, conn_queue: asyncio.Queue, conn_queues: dict):
92
+ # Similar to handle_local_connection but prefixes payload with conn_id
93
+ peername = None
94
+ try:
95
+ sock = local_writer.get_extra_info('socket')
96
+ if sock:
97
+ peername = sock.getpeername()
98
+ except Exception:
99
+ peername = None
100
+
101
+ print(f'New multiplexed connection {conn_id.decode()} from {peername}')
102
+
103
+ async def read_local_then_send():
104
+ try:
105
+ while True:
106
+ chunk = await local_reader.read(4096)
107
+ if not chunk:
108
+ # EOF
109
+ print(f'local EOF for {conn_id.decode()}, sending close frame')
110
+ await websocket_send_frame(ws, conn_id + b'__CLOSE__', frame_type=b'D')
111
+ break
112
+ await websocket_send_frame(ws, conn_id + chunk, frame_type=b'D')
113
+ except asyncio.CancelledError:
114
+ pass
115
+ except Exception as e:
116
+ print('local read/send error', e)
117
+
118
+ async def recv_then_write_local():
119
+ try:
120
+ while True:
121
+ try:
122
+ payload = await conn_queue.get()
123
+ except Exception as e:
124
+ print('conn_queue get error', e)
125
+ break
126
+ if payload == b'__CLOSE__':
127
+ print(f'received close for {conn_id.decode()}')
128
+ break
129
+ try:
130
+ local_writer.write(payload)
131
+ await local_writer.drain()
132
+ except Exception as e:
133
+ print('local writer error while writing payload', e)
134
+ break
135
+ finally:
136
+ try:
137
+ local_writer.close()
138
+ await local_writer.wait_closed()
139
+ except Exception:
140
+ pass
141
+
142
+ task_send = asyncio.create_task(read_local_then_send())
143
+ task_recv = asyncio.create_task(recv_then_write_local())
144
+ done, pending = await asyncio.wait({task_send, task_recv}, return_when=asyncio.FIRST_EXCEPTION)
145
+ for t in pending:
146
+ t.cancel()
147
+ for t in done:
148
+ if t.exception():
149
+ print('task exception in handle_local_connection_multiplex:', t.exception())
150
+ # cleanup queue registration
151
+ try:
152
+ conn_queues.pop(conn_id, None)
153
+ except Exception:
154
+ pass
155
+ print(f'Multiplexed connection {conn_id.decode()} from {peername} closed')
156
+
157
+
158
+ async def run_client(server_url: str, local_port: int):
159
+ async with websockets.connect(server_url) as ws:
160
+ print('Connected to server')
161
+ # create a lock to serialize ws.recv calls
162
+ recv_lock = asyncio.Lock()
163
+
164
+ # handshake using the same recv_lock
165
+ await ws.send('ping')
166
+ async with recv_lock:
167
+ resp = await ws.recv()
168
+ print('handshake response:', resp)
169
+
170
+ # multiplexing: assign a unique client session id
171
+ session_id = uuid.uuid4().hex[:8].encode()
172
+ print('session id:', session_id.decode())
173
+
174
+ # mapping of conn_id bytes -> asyncio.Queue for demuxing incoming frames
175
+ conn_queues: dict[bytes, asyncio.Queue] = {}
176
+
177
+ async def demuxer_loop():
178
+ try:
179
+ while True:
180
+ frame_type, payload = await websocket_recv_frame(ws, recv_lock)
181
+ # only handle data frames (type 'D') for demuxing
182
+ if frame_type != b'D':
183
+ # ignore other frame types for now
184
+ continue
185
+ if len(payload) < 8:
186
+ print('demux: payload too short')
187
+ continue
188
+ cid = payload[:8]
189
+ body = payload[8:]
190
+ q = conn_queues.get(cid)
191
+ if q:
192
+ await q.put(body)
193
+ else:
194
+ print('demux: no queue for', cid)
195
+ except Exception as e:
196
+ print('demuxer exiting:', e)
197
+
198
+ demux_task = asyncio.create_task(demuxer_loop())
199
+
200
+ # start local server to accept connections and forward (each connection gets an ephemeral id)
201
+ async def accept_callback(r, w):
202
+ try:
203
+ data = await asyncio.wait_for(r.read(1024), timeout=1.5)
204
+ except Exception:
205
+ data = b''
206
+ # Browser quick response
207
+ if data.startswith((b'GET ', b'POST ', b'HEAD ')):
208
+ body = b"<html><body><h1>cpolar demo</h1><p>Connected</p></body></html>"
209
+ html = b"HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\nContent-Length: " + str(len(body)).encode() + b"\r\n\r\n" + body
210
+ try:
211
+ w.write(html)
212
+ await w.drain()
213
+ except Exception:
214
+ pass
215
+ try:
216
+ w.close()
217
+ await w.wait_closed()
218
+ except Exception:
219
+ pass
220
+ return
221
+
222
+ # when a new local connection arrives, send an OPEN frame with connection id
223
+ conn_id = uuid.uuid4().hex[:8].encode()
224
+ q: asyncio.Queue = asyncio.Queue()
225
+ conn_queues[conn_id] = q
226
+ await websocket_send_frame(ws, conn_id, frame_type=b'O')
227
+ # First chunk (if any) is data, send it immediately as conn_id+data (if non-empty)
228
+ if data:
229
+ await websocket_send_frame(ws, conn_id + data, frame_type=b'D')
230
+ # start handler with conn-specific id by passing queue and full conn_queues
231
+ asyncio.create_task(handle_local_connection_multiplex(r, w, ws, recv_lock, conn_id, q, conn_queues))
232
+
233
+ server = await asyncio.start_server(accept_callback, '127.0.0.1', local_port)
234
+ addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
235
+ print(f'Listening on {addrs}, forward to server (multiplexed)')
236
+
237
+ async with server:
238
+ await server.serve_forever()
239
+
240
+ if __name__ == '__main__':
241
+ parser = argparse.ArgumentParser()
242
+ parser.add_argument('--server', required=True, help='WebSocket server URL, e.g. ws://localhost:8000/ws')
243
+ parser.add_argument('--local-port', type=int, default=9000, help='Local port to listen on')
244
+ args = parser.parse_args()
245
+
246
+ try:
247
+ asyncio.run(run_client(args.server, args.local_port))
248
+ except KeyboardInterrupt:
249
+ print('client exiting')
250
+
requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ websockets==11.0.3
2
+ anyio==3.7.0
3
+ python-socketio==5.9.0