owominal / app.py
ozone-research's picture
Update app.py
9c03dbc verified
import asyncio
import aiohttp
from aiohttp import web
import subprocess
import shlex
async def handle_terminal(websocket, path):
prompt_active = False
prompt_response = None
try:
async for message in websocket:
command = shlex.split(message.data)
if not command or command[0] in ['rm', 'sudo', 'halt', 'reboot']:
await websocket.send_str("Command not allowed")
continue
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
async for line in process.stdout:
# Check if line contains a prompt
line_str = line.decode('utf-8').rstrip()
if line_str.endswith('[Y/n]') or line_str.endswith('[y/n]'):
await websocket.send_str(line_str)
prompt_active = True
# Wait for user response
while prompt_active:
prompt_response = await websocket.receive_str()
if prompt_response.lower() in ['y', 'n']:
prompt_active = False
await websocket.send_str("") # Signal end of prompt
else:
await websocket.send_str(line_str)
async for line in process.stderr:
await websocket.send_str(line.decode('utf-8').rstrip())
await process.wait()
await websocket.send_str("")
except FileNotFoundError:
await websocket.send_str(f"Command not found: {command[0]}")
except Exception as e:
await websocket.send_str(f"Error: {str(e)}")
except aiohttp.WSCloseCode:
pass
async def index(request):
return web.Response(
text="""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Web Terminal</title>
<style>
body {
background-color: #000;
color: #fff;
font-family: 'Courier New', monospace;
margin: 0;
padding: 10px;
height: 100vh;
overflow: hidden;
}
#terminal {
background-color: #000;
color: #fff;
height: 90vh;
overflow-y: auto;
padding: 10px;
white-space: pre-wrap;
word-wrap: break-word;
}
.line {
margin-bottom: 5px;
}
.prompt {
color: #0f0;
}
#input {
background-color: transparent;
color: #fff;
border: none;
font-family: 'Courier New', monospace;
font-size: 16px;
outline: none;
width: 100%;
caret-color: #fff;
display: none; /* Hidden by default, shown on prompt */
}
</style>
</head>
<body>
<div id="terminal">
<div class="line">zoyo@zoyos-MacBook-Air 10% <span class="prompt">$ </span></div>
</div>
<input id="input" type="text" autofocus>
<script>
const ws = new WebSocket('wss://' + location.host + '/ws');
const terminal = document.getElementById('terminal');
const input = document.getElementById('input');
let awaitingPrompt = false;
ws.onopen = () => {
input.focus();
};
ws.onmessage = (event) => {
const data = event.data;
if (data.endsWith('[Y/n]') || data.endsWith('[y/n]')) {
// Display prompt and show input
const promptLine = document.createElement('div');
promptLine.className = 'line';
promptLine.textContent = data + ' ';
terminal.appendChild(promptLine);
input.style.display = 'block';
input.focus();
awaitingPrompt = true;
} else if (data === '') {
// End of prompt or command, hide input and show new prompt
input.style.display = 'none';
awaitingPrompt = false;
const newLine = document.createElement('div');
newLine.className = 'line';
newLine.textContent = 'zoyo@zoyos-MacBook-Air 10% ';
const prompt = document.createElement('span');
prompt.className = 'prompt';
prompt.textContent = '$ ';
newLine.appendChild(prompt);
terminal.appendChild(newLine);
input.value = '';
} else {
// Regular output
const output = document.createElement('div');
output.className = 'line';
output.textContent = data;
terminal.appendChild(output);
}
terminal.scrollTop = terminal.scrollHeight;
};
ws.onclose = () => {
const disconnect = document.createElement('div');
disconnect.className = 'line';
disconnect.textContent = 'Disconnected from terminal server';
terminal.appendChild(disconnect);
terminal.scrollTop = terminal.scrollHeight;
};
input.addEventListener('keydown', (event) => {
if (event.key === 'Enter' && awaitingPrompt) {
const response = input.value.trim().toLowerCase();
if (response === 'y' || response === 'n') {
ws.send(response);
input.value = '';
}
} else if (event.key === 'Enter' && !awaitingPrompt) {
const command = input.value.trim();
if (command) {
ws.send(command);
input.value = '';
}
}
});
</script>
</body>
</html>""",
content_type='text/html'
)
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
await handle_terminal(ws, None)
return ws
app = web.Application()
app.add_routes([
web.get('/', index),
web.get('/ws', websocket_handler),
])
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=7860)