| | import asyncio |
| | import json |
| | import websockets |
| | from typing import Dict, Any, Set |
| | import subprocess |
| | import shlex |
| | from queue import Queue |
| | import threading |
| | import time |
| | import logging |
| | from datetime import datetime |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class TerminalStreamManager: |
| | """Manages real-time terminal streaming with WebSocket connections.""" |
| | |
| | def __init__(self): |
| | self.clients: Set[websockets.WebSocketServerProtocol] = set() |
| | self.command_queue = Queue() |
| | self.is_running = False |
| | self.current_process = None |
| | self.server = None |
| | self.server_thread = None |
| | self.loop = None |
| | |
| | async def register_client(self, websocket): |
| | """Register a new WebSocket client.""" |
| | self.clients.add(websocket) |
| | await websocket.send(json.dumps({ |
| | 'type': 'connected', |
| | 'message': '🚀 Terminal connected successfully', |
| | 'timestamp': datetime.now().isoformat() |
| | })) |
| | logger.info(f"Terminal client connected. Total clients: {len(self.clients)}") |
| | |
| | async def unregister_client(self, websocket): |
| | """Unregister a WebSocket client.""" |
| | self.clients.discard(websocket) |
| | logger.info(f"Terminal client disconnected. Total clients: {len(self.clients)}") |
| | |
| | async def broadcast(self, message: Dict[str, Any]): |
| | """Broadcast message to all connected clients.""" |
| | if self.clients: |
| | disconnected = set() |
| | message['timestamp'] = datetime.now().isoformat() |
| | |
| | for client in self.clients: |
| | try: |
| | await client.send(json.dumps(message)) |
| | except websockets.exceptions.ConnectionClosed: |
| | disconnected.add(client) |
| | except Exception as e: |
| | logger.error(f"Error broadcasting to client: {e}") |
| | disconnected.add(client) |
| | |
| | |
| | for client in disconnected: |
| | self.clients.discard(client) |
| | |
| | async def execute_command(self, command: str): |
| | """Execute a command and stream output in real-time.""" |
| | await self.broadcast({ |
| | 'type': 'command_start', |
| | 'command': command, |
| | 'message': f'$ {command}' |
| | }) |
| | |
| | try: |
| | |
| | safe_command = shlex.split(command) |
| | |
| | self.current_process = subprocess.Popen( |
| | safe_command, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | text=True, |
| | bufsize=1, |
| | universal_newlines=True |
| | ) |
| | |
| | |
| | while True: |
| | |
| | if self.current_process.poll() is not None: |
| | |
| | remaining_stdout = self.current_process.stdout.read() |
| | remaining_stderr = self.current_process.stderr.read() |
| | |
| | if remaining_stdout: |
| | await self.broadcast({ |
| | 'type': 'output', |
| | 'data': remaining_stdout, |
| | 'stream': 'stdout' |
| | }) |
| | |
| | if remaining_stderr: |
| | await self.broadcast({ |
| | 'type': 'output', |
| | 'data': remaining_stderr, |
| | 'stream': 'stderr' |
| | }) |
| | |
| | break |
| | |
| | |
| | try: |
| | |
| | import select |
| | ready, _, _ = select.select([self.current_process.stdout, self.current_process.stderr], [], [], 0.1) |
| | |
| | for stream in ready: |
| | if stream == self.current_process.stdout: |
| | line = stream.readline() |
| | if line: |
| | await self.broadcast({ |
| | 'type': 'output', |
| | 'data': line, |
| | 'stream': 'stdout' |
| | }) |
| | elif stream == self.current_process.stderr: |
| | line = stream.readline() |
| | if line: |
| | await self.broadcast({ |
| | 'type': 'output', |
| | 'data': line, |
| | 'stream': 'stderr' |
| | }) |
| | except: |
| | |
| | await asyncio.sleep(0.1) |
| | |
| | |
| | await self.broadcast({ |
| | 'type': 'command_complete', |
| | 'exit_code': self.current_process.returncode, |
| | 'message': f'Process exited with code {self.current_process.returncode}' |
| | }) |
| | |
| | except Exception as e: |
| | await self.broadcast({ |
| | 'type': 'error', |
| | 'data': str(e), |
| | 'stream': 'system' |
| | }) |
| | finally: |
| | self.current_process = None |
| | |
| | async def handle_client(self, websocket, path): |
| | """Handle WebSocket client connections.""" |
| | await self.register_client(websocket) |
| | try: |
| | async for message in websocket: |
| | try: |
| | data = json.loads(message) |
| | |
| | if data.get('type') == 'command': |
| | command = data.get('command', '').strip() |
| | if command: |
| | await self.execute_command(command) |
| | |
| | elif data.get('type') == 'interrupt': |
| | if self.current_process: |
| | self.current_process.terminate() |
| | await self.broadcast({ |
| | 'type': 'interrupted', |
| | 'message': 'Process interrupted by user' |
| | }) |
| | |
| | except json.JSONDecodeError: |
| | await websocket.send(json.dumps({ |
| | 'type': 'error', |
| | 'message': 'Invalid JSON message' |
| | })) |
| | |
| | except websockets.exceptions.ConnectionClosed: |
| | pass |
| | finally: |
| | await self.unregister_client(websocket) |
| | |
| | def stop_server(self): |
| | """Stop the WebSocket server gracefully.""" |
| | if self.server: |
| | logger.info("Stopping terminal WebSocket server...") |
| | self.is_running = False |
| | |
| | |
| | if self.clients: |
| | import asyncio |
| | try: |
| | loop = asyncio.get_event_loop() |
| | for client in self.clients.copy(): |
| | try: |
| | loop.create_task(client.close()) |
| | except Exception as e: |
| | logger.warning(f"Error closing client connection: {e}") |
| | self.clients.clear() |
| | except Exception as e: |
| | logger.warning(f"Error closing client connections: {e}") |
| | |
| | |
| | if self.current_process: |
| | try: |
| | self.current_process.terminate() |
| | self.current_process = None |
| | except Exception as e: |
| | logger.warning(f"Error terminating process: {e}") |
| | |
| | |
| | try: |
| | if hasattr(self.server, 'close'): |
| | self.server.close() |
| | |
| | |
| | if self.loop and self.loop.is_running(): |
| | self.loop.call_soon_threadsafe(self.loop.stop) |
| | |
| | logger.info("Terminal WebSocket server stopped") |
| | except Exception as e: |
| | logger.error(f"Error stopping WebSocket server: {e}") |
| | else: |
| | logger.info("Terminal WebSocket server was not running") |
| |
|
| | |
| | terminal_manager = TerminalStreamManager() |
| |
|
| | async def start_websocket_server(host='localhost', port=8765): |
| | """Start the WebSocket server for terminal streaming.""" |
| | logger.info(f"Starting terminal WebSocket server on {host}:{port}") |
| | |
| | async def handler(websocket, path): |
| | await terminal_manager.handle_client(websocket, path) |
| | |
| | server = await websockets.serve(handler, host, port) |
| | terminal_manager.server = server |
| | terminal_manager.is_running = True |
| | return server |
| |
|
| | def run_websocket_server(): |
| | """Run WebSocket server in a separate thread.""" |
| | def start_server(): |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| | terminal_manager.loop = loop |
| | |
| | try: |
| | server = loop.run_until_complete(start_websocket_server()) |
| | logger.info("Terminal WebSocket server started successfully") |
| | loop.run_forever() |
| | except Exception as e: |
| | logger.error(f"Error starting WebSocket server: {e}") |
| | finally: |
| | logger.info("WebSocket server loop ended") |
| | |
| | thread = threading.Thread(target=start_server, daemon=True) |
| | terminal_manager.server_thread = thread |
| | thread.start() |
| | return thread |