webui / app.py
BG5's picture
Upload 7 files
c4bdf8c verified
import os
import subprocess
import json
import threading
import time
import signal
import queue
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_file, abort, redirect, url_for
from flask_socketio import SocketIO, emit, join_room, leave_room
import sys
import logging
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'webshell-secret-key-2024'
socketio = SocketIO(app, cors_allowed_origins="*")
# 全局终端变量
shell_process = None
def execute_command(command):
"""执行单个命令并返回输出"""
try:
logger.info(f"执行命令: {repr(command)}")
if os.name == 'nt':
# Windows环境
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
cwd=os.getcwd(),
timeout=30
)
output = result.stdout + result.stderr
logger.debug(f"命令输出: {repr(output)}")
return output
else:
# Unix/Linux环境
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
cwd=os.getcwd(),
timeout=30
)
output = result.stdout + result.stderr
logger.debug(f"命令输出: {repr(output)}")
return output
except subprocess.TimeoutExpired:
return "命令执行超时\n"
except Exception as e:
logger.error(f"执行命令错误: {e}")
return f"错误: {e}\n"
def init_terminal():
"""初始化全局终端"""
logger.info("终端初始化")
return True
@app.route('/')
def index():
return render_template('index.html')
@app.route('/files')
def file_manager():
return render_template('files.html')
@app.route('/terminal')
def terminal():
return render_template('terminal.html')
@app.route('/api/files')
def list_files():
path = request.args.get('path', '.')
try:
# 获取绝对路径
abs_path = os.path.abspath(path)
# 确保路径存在
if not os.path.exists(abs_path):
abs_path = os.getcwd()
items = []
# 添加返回上级目录的选项(除非已经在根目录)
parent_path = os.path.dirname(abs_path)
if abs_path != parent_path: # 不在根目录
items.append({
'name': '..',
'type': 'directory',
'size': 0,
'modified': '',
'path': parent_path.replace('\\', '/')
})
# 列出当前目录下的文件和文件夹
try:
for item in os.listdir(abs_path):
item_path = os.path.join(abs_path, item)
try:
stat = os.stat(item_path)
items.append({
'name': item,
'type': 'directory' if os.path.isdir(item_path) else 'file',
'size': stat.st_size,
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
'path': item_path.replace('\\', '/')
})
except (PermissionError, OSError):
# 跳过无权限访问的文件
continue
except PermissionError:
return jsonify({'success': False, 'error': '没有权限访问此目录'})
return jsonify({
'success': True,
'path': abs_path.replace('\\', '/'),
'items': sorted(items, key=lambda x: (x['name'] != '..', x['type'] != 'directory', x['name'].lower()))
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/files/create', methods=['POST'])
def create_file():
data = request.json
try:
file_path = data['path']
abs_path = os.path.abspath(file_path)
if data['type'] == 'directory':
os.makedirs(abs_path, exist_ok=True)
else:
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
with open(abs_path, 'w', encoding='utf-8') as f:
f.write(data.get('content', ''))
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/files/delete', methods=['POST'])
def delete_file():
data = request.json
try:
file_path = data['path']
abs_path = os.path.abspath(file_path)
if os.path.isdir(abs_path):
import shutil
shutil.rmtree(abs_path)
else:
os.remove(abs_path)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/files/rename', methods=['POST'])
def rename_file():
data = request.json
try:
old_path = os.path.abspath(data['oldPath'])
new_path = os.path.abspath(data['newPath'])
os.rename(old_path, new_path)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/files/read')
def read_file():
file_path = request.args.get('path')
try:
abs_path = os.path.abspath(file_path)
with open(abs_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({'success': True, 'content': content})
except UnicodeDecodeError:
try:
with open(abs_path, 'r', encoding='gbk') as f:
content = f.read()
return jsonify({'success': True, 'content': content})
except:
return jsonify({'success': False, 'error': '无法读取文件,可能是二进制文件'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/files/write', methods=['POST'])
def write_file():
data = request.json
try:
file_path = data['path']
content = data['content']
abs_path = os.path.abspath(file_path)
with open(abs_path, 'w', encoding='utf-8') as f:
f.write(content)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/files/download')
def download_file():
file_path = request.args.get('path')
try:
abs_path = os.path.abspath(file_path)
return send_file(abs_path, as_attachment=True)
except Exception as e:
abort(404)
@app.route('/api/files/upload', methods=['POST'])
def upload_file():
try:
file = request.files['file']
path = request.form.get('path', '.')
abs_path = os.path.abspath(path)
file_path = os.path.join(abs_path, file.filename)
file.save(file_path)
return jsonify({'success': True})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
# WebSocket事件处理
@socketio.on('connect')
def on_connect():
logger.info(f'客户端连接: {request.sid}')
@socketio.on('disconnect')
def on_disconnect():
logger.info(f'客户端断开: {request.sid}')
@socketio.on('start_terminal')
def on_start_terminal():
logger.info('启动终端请求')
emit('output', '=== WebShell Terminal Started ===\n')
emit('output', f'Working Directory: {os.getcwd()}\n')
emit('output', 'Type commands below:\n')
emit('terminal_ready')
@socketio.on('input')
def on_terminal_input(data):
command = data # 成功案例直接发送字符串,不是对象
logger.info(f"收到终端输入: {repr(command)}")
try:
output = execute_command(command)
# 确保输出末尾有换行符,以区分命令和输出
if output and not output.endswith('\n'):
output += '\n'
emit('output', output) # 改为'output'事件
except Exception as e:
logger.error(f"执行命令错误: {e}")
emit('output', f'错误: {e}\n')
@socketio.on('terminal_resize')
def on_terminal_resize(data):
logger.debug(f'终端调整大小: {data["rows"]}x{data["cols"]}')
# 对于全局终端,暂时忽略调整大小
pass
# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'error': '页面未找到'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'success': False, 'error': '内部服务器错误'}), 500
if __name__ == '__main__':
# 从环境变量获取端口,默认7860(Hugging Face Spaces标准端口)
port = int(os.environ.get('PORT', 7860))
host = os.environ.get('HOST', '0.0.0.0')
debug = os.environ.get('DEBUG', 'False').lower() == 'true'
logger.info(f"🚀 WebShell 启动在 {host}:{port}")
# 初始化终端
if init_terminal():
logger.info("终端初始化成功")
else:
logger.error("终端初始化失败")
socketio.run(app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True)