Spaces:
Sleeping
Sleeping
File size: 6,509 Bytes
cca2eda 9c03dbc cca2eda edade8f cca2eda 8c0c8d2 cca2eda 9c03dbc cca2eda 9c03dbc cca2eda 8c0c8d2 cca2eda 8c0c8d2 cca2eda 8c0c8d2 cca2eda 8c0c8d2 cca2eda 818060f 6836b48 cca2eda 6836b48 cca2eda 818060f 6836b48 cca2eda 6836b48 cca2eda 9c03dbc cca2eda 6836b48 cca2eda 6260432 cca2eda 6836b48 9c03dbc cca2eda 818060f 6836b48 cca2eda 818060f 9c03dbc 6836b48 cca2eda 9c03dbc cca2eda 6836b48 9c03dbc 6836b48 cca2eda 6836b48 cca2eda 818060f 6836b48 818060f cca2eda 6836b48 9c03dbc 6836b48 cca2eda 6836b48 cca2eda dbd7e8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | import asyncio
import aiohttp
from aiohttp import web
import subprocess
import shlex
async def handle_terminal(websocket, path):
prompt_active = False
prompt_response = None
try:
async for message in websocket:
command = shlex.split(message.data)
if not command or command[0] in ['rm', 'sudo', 'halt', 'reboot']:
await websocket.send_str("Command not allowed")
continue
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async for line in process.stdout:
# Check if line contains a prompt
line_str = line.decode('utf-8').rstrip()
if line_str.endswith('[Y/n]') or line_str.endswith('[y/n]'):
await websocket.send_str(line_str)
prompt_active = True
# Wait for user response
while prompt_active:
prompt_response = await websocket.receive_str()
if prompt_response.lower() in ['y', 'n']:
prompt_active = False
await websocket.send_str("") # Signal end of prompt
else:
await websocket.send_str(line_str)
async for line in process.stderr:
await websocket.send_str(line.decode('utf-8').rstrip())
await process.wait()
await websocket.send_str("")
except FileNotFoundError:
await websocket.send_str(f"Command not found: {command[0]}")
except Exception as e:
await websocket.send_str(f"Error: {str(e)}")
except aiohttp.WSCloseCode:
pass
async def index(request):
return web.Response(
text="""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Web Terminal</title>
<style>
body {
background-color: #000;
color: #fff;
font-family: 'Courier New', monospace;
margin: 0;
padding: 10px;
height: 100vh;
overflow: hidden;
}
#terminal {
background-color: #000;
color: #fff;
height: 90vh;
overflow-y: auto;
padding: 10px;
white-space: pre-wrap;
word-wrap: break-word;
}
.line {
margin-bottom: 5px;
}
.prompt {
color: #0f0;
}
#input {
background-color: transparent;
color: #fff;
border: none;
font-family: 'Courier New', monospace;
font-size: 16px;
outline: none;
width: 100%;
caret-color: #fff;
display: none; /* Hidden by default, shown on prompt */
}
</style>
</head>
<body>
<div id="terminal">
<div class="line">zoyo@zoyos-MacBook-Air 10% <span class="prompt">$ </span></div>
</div>
<input id="input" type="text" autofocus>
<script>
const ws = new WebSocket('wss://' + location.host + '/ws');
const terminal = document.getElementById('terminal');
const input = document.getElementById('input');
let awaitingPrompt = false;
ws.onopen = () => {
input.focus();
};
ws.onmessage = (event) => {
const data = event.data;
if (data.endsWith('[Y/n]') || data.endsWith('[y/n]')) {
// Display prompt and show input
const promptLine = document.createElement('div');
promptLine.className = 'line';
promptLine.textContent = data + ' ';
terminal.appendChild(promptLine);
input.style.display = 'block';
input.focus();
awaitingPrompt = true;
} else if (data === '') {
// End of prompt or command, hide input and show new prompt
input.style.display = 'none';
awaitingPrompt = false;
const newLine = document.createElement('div');
newLine.className = 'line';
newLine.textContent = 'zoyo@zoyos-MacBook-Air 10% ';
const prompt = document.createElement('span');
prompt.className = 'prompt';
prompt.textContent = '$ ';
newLine.appendChild(prompt);
terminal.appendChild(newLine);
input.value = '';
} else {
// Regular output
const output = document.createElement('div');
output.className = 'line';
output.textContent = data;
terminal.appendChild(output);
}
terminal.scrollTop = terminal.scrollHeight;
};
ws.onclose = () => {
const disconnect = document.createElement('div');
disconnect.className = 'line';
disconnect.textContent = 'Disconnected from terminal server';
terminal.appendChild(disconnect);
terminal.scrollTop = terminal.scrollHeight;
};
input.addEventListener('keydown', (event) => {
if (event.key === 'Enter' && awaitingPrompt) {
const response = input.value.trim().toLowerCase();
if (response === 'y' || response === 'n') {
ws.send(response);
input.value = '';
}
} else if (event.key === 'Enter' && !awaitingPrompt) {
const command = input.value.trim();
if (command) {
ws.send(command);
input.value = '';
}
}
});
</script>
</body>
</html>""",
content_type='text/html'
)
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await handle_terminal(ws, None)
return ws
app = web.Application()
app.add_routes([
web.get('/', index),
web.get('/ws', websocket_handler),
])
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=7860) |