Spaces:
Sleeping
Sleeping
| import asyncio | |
| import aiohttp | |
| from aiohttp import web | |
| import subprocess | |
| import shlex | |
| async def handle_terminal(websocket, path): | |
| prompt_active = False | |
| prompt_response = None | |
| try: | |
| async for message in websocket: | |
| command = shlex.split(message.data) | |
| if not command or command[0] in ['rm', 'sudo', 'halt', 'reboot']: | |
| await websocket.send_str("Command not allowed") | |
| continue | |
| try: | |
| process = await asyncio.create_subprocess_exec( | |
| *command, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| async for line in process.stdout: | |
| # Check if line contains a prompt | |
| line_str = line.decode('utf-8').rstrip() | |
| if line_str.endswith('[Y/n]') or line_str.endswith('[y/n]'): | |
| await websocket.send_str(line_str) | |
| prompt_active = True | |
| # Wait for user response | |
| while prompt_active: | |
| prompt_response = await websocket.receive_str() | |
| if prompt_response.lower() in ['y', 'n']: | |
| prompt_active = False | |
| await websocket.send_str("") # Signal end of prompt | |
| else: | |
| await websocket.send_str(line_str) | |
| async for line in process.stderr: | |
| await websocket.send_str(line.decode('utf-8').rstrip()) | |
| await process.wait() | |
| await websocket.send_str("") | |
| except FileNotFoundError: | |
| await websocket.send_str(f"Command not found: {command[0]}") | |
| except Exception as e: | |
| await websocket.send_str(f"Error: {str(e)}") | |
| except aiohttp.WSCloseCode: | |
| pass | |
| async def index(request): | |
| return web.Response( | |
| text="""<!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Web Terminal</title> | |
| <style> | |
| body { | |
| background-color: #000; | |
| color: #fff; | |
| font-family: 'Courier New', monospace; | |
| margin: 0; | |
| padding: 10px; | |
| height: 100vh; | |
| overflow: hidden; | |
| } | |
| #terminal { | |
| background-color: #000; | |
| color: #fff; | |
| height: 90vh; | |
| overflow-y: auto; | |
| padding: 10px; | |
| white-space: pre-wrap; | |
| word-wrap: break-word; | |
| } | |
| .line { | |
| margin-bottom: 5px; | |
| } | |
| .prompt { | |
| color: #0f0; | |
| } | |
| #input { | |
| background-color: transparent; | |
| color: #fff; | |
| border: none; | |
| font-family: 'Courier New', monospace; | |
| font-size: 16px; | |
| outline: none; | |
| width: 100%; | |
| caret-color: #fff; | |
| display: none; /* Hidden by default, shown on prompt */ | |
| } | |
| </style> | |
| </head> | |
| <body> | |
| <div id="terminal"> | |
| <div class="line">zoyo@zoyos-MacBook-Air 10% <span class="prompt">$ </span></div> | |
| </div> | |
| <input id="input" type="text" autofocus> | |
| <script> | |
| const ws = new WebSocket('wss://' + location.host + '/ws'); | |
| const terminal = document.getElementById('terminal'); | |
| const input = document.getElementById('input'); | |
| let awaitingPrompt = false; | |
| ws.onopen = () => { | |
| input.focus(); | |
| }; | |
| ws.onmessage = (event) => { | |
| const data = event.data; | |
| if (data.endsWith('[Y/n]') || data.endsWith('[y/n]')) { | |
| // Display prompt and show input | |
| const promptLine = document.createElement('div'); | |
| promptLine.className = 'line'; | |
| promptLine.textContent = data + ' '; | |
| terminal.appendChild(promptLine); | |
| input.style.display = 'block'; | |
| input.focus(); | |
| awaitingPrompt = true; | |
| } else if (data === '') { | |
| // End of prompt or command, hide input and show new prompt | |
| input.style.display = 'none'; | |
| awaitingPrompt = false; | |
| const newLine = document.createElement('div'); | |
| newLine.className = 'line'; | |
| newLine.textContent = 'zoyo@zoyos-MacBook-Air 10% '; | |
| const prompt = document.createElement('span'); | |
| prompt.className = 'prompt'; | |
| prompt.textContent = '$ '; | |
| newLine.appendChild(prompt); | |
| terminal.appendChild(newLine); | |
| input.value = ''; | |
| } else { | |
| // Regular output | |
| const output = document.createElement('div'); | |
| output.className = 'line'; | |
| output.textContent = data; | |
| terminal.appendChild(output); | |
| } | |
| terminal.scrollTop = terminal.scrollHeight; | |
| }; | |
| ws.onclose = () => { | |
| const disconnect = document.createElement('div'); | |
| disconnect.className = 'line'; | |
| disconnect.textContent = 'Disconnected from terminal server'; | |
| terminal.appendChild(disconnect); | |
| terminal.scrollTop = terminal.scrollHeight; | |
| }; | |
| input.addEventListener('keydown', (event) => { | |
| if (event.key === 'Enter' && awaitingPrompt) { | |
| const response = input.value.trim().toLowerCase(); | |
| if (response === 'y' || response === 'n') { | |
| ws.send(response); | |
| input.value = ''; | |
| } | |
| } else if (event.key === 'Enter' && !awaitingPrompt) { | |
| const command = input.value.trim(); | |
| if (command) { | |
| ws.send(command); | |
| input.value = ''; | |
| } | |
| } | |
| }); | |
| </script> | |
| </body> | |
| </html>""", | |
| content_type='text/html' | |
| ) | |
| async def websocket_handler(request): | |
| ws = web.WebSocketResponse() | |
| await ws.prepare(request) | |
| await handle_terminal(ws, None) | |
| return ws | |
| app = web.Application() | |
| app.add_routes([ | |
| web.get('/', index), | |
| web.get('/ws', websocket_handler), | |
| ]) | |
| if __name__ == '__main__': | |
| web.run_app(app, host='0.0.0.0', port=7860) |