|
|
import os
|
|
|
import subprocess
|
|
|
import json
|
|
|
import threading
|
|
|
import time
|
|
|
import signal
|
|
|
import queue
|
|
|
from datetime import datetime
|
|
|
from flask import Flask, render_template, request, jsonify, send_file, abort, redirect, url_for
|
|
|
from flask_socketio import SocketIO, emit, join_room, leave_room
|
|
|
import sys
|
|
|
import logging
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
app.config['SECRET_KEY'] = 'webshell-secret-key-2024'
|
|
|
socketio = SocketIO(app, cors_allowed_origins="*")
|
|
|
|
|
|
|
|
|
shell_process = None
|
|
|
|
|
|
def execute_command(command):
|
|
|
"""执行单个命令并返回输出"""
|
|
|
try:
|
|
|
logger.info(f"执行命令: {repr(command)}")
|
|
|
|
|
|
if os.name == 'nt':
|
|
|
|
|
|
result = subprocess.run(
|
|
|
command,
|
|
|
shell=True,
|
|
|
capture_output=True,
|
|
|
text=True,
|
|
|
cwd=os.getcwd(),
|
|
|
timeout=30
|
|
|
)
|
|
|
|
|
|
output = result.stdout + result.stderr
|
|
|
logger.debug(f"命令输出: {repr(output)}")
|
|
|
return output
|
|
|
else:
|
|
|
|
|
|
result = subprocess.run(
|
|
|
command,
|
|
|
shell=True,
|
|
|
capture_output=True,
|
|
|
text=True,
|
|
|
cwd=os.getcwd(),
|
|
|
timeout=30
|
|
|
)
|
|
|
|
|
|
output = result.stdout + result.stderr
|
|
|
logger.debug(f"命令输出: {repr(output)}")
|
|
|
return output
|
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
return "命令执行超时\n"
|
|
|
except Exception as e:
|
|
|
logger.error(f"执行命令错误: {e}")
|
|
|
return f"错误: {e}\n"
|
|
|
|
|
|
def init_terminal():
|
|
|
"""初始化全局终端"""
|
|
|
logger.info("终端初始化")
|
|
|
return True
|
|
|
|
|
|
@app.route('/')
|
|
|
def index():
|
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/files')
|
|
|
def file_manager():
|
|
|
return render_template('files.html')
|
|
|
|
|
|
@app.route('/terminal')
|
|
|
def terminal():
|
|
|
return render_template('terminal.html')
|
|
|
|
|
|
@app.route('/api/files')
|
|
|
def list_files():
|
|
|
path = request.args.get('path', '.')
|
|
|
try:
|
|
|
|
|
|
abs_path = os.path.abspath(path)
|
|
|
|
|
|
|
|
|
if not os.path.exists(abs_path):
|
|
|
abs_path = os.getcwd()
|
|
|
|
|
|
items = []
|
|
|
|
|
|
|
|
|
parent_path = os.path.dirname(abs_path)
|
|
|
if abs_path != parent_path:
|
|
|
items.append({
|
|
|
'name': '..',
|
|
|
'type': 'directory',
|
|
|
'size': 0,
|
|
|
'modified': '',
|
|
|
'path': parent_path.replace('\\', '/')
|
|
|
})
|
|
|
|
|
|
|
|
|
try:
|
|
|
for item in os.listdir(abs_path):
|
|
|
item_path = os.path.join(abs_path, item)
|
|
|
try:
|
|
|
stat = os.stat(item_path)
|
|
|
items.append({
|
|
|
'name': item,
|
|
|
'type': 'directory' if os.path.isdir(item_path) else 'file',
|
|
|
'size': stat.st_size,
|
|
|
'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
|
|
|
'path': item_path.replace('\\', '/')
|
|
|
})
|
|
|
except (PermissionError, OSError):
|
|
|
|
|
|
continue
|
|
|
except PermissionError:
|
|
|
return jsonify({'success': False, 'error': '没有权限访问此目录'})
|
|
|
|
|
|
return jsonify({
|
|
|
'success': True,
|
|
|
'path': abs_path.replace('\\', '/'),
|
|
|
'items': sorted(items, key=lambda x: (x['name'] != '..', x['type'] != 'directory', x['name'].lower()))
|
|
|
})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
@app.route('/api/files/create', methods=['POST'])
|
|
|
def create_file():
|
|
|
data = request.json
|
|
|
try:
|
|
|
file_path = data['path']
|
|
|
abs_path = os.path.abspath(file_path)
|
|
|
|
|
|
if data['type'] == 'directory':
|
|
|
os.makedirs(abs_path, exist_ok=True)
|
|
|
else:
|
|
|
os.makedirs(os.path.dirname(abs_path), exist_ok=True)
|
|
|
with open(abs_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(data.get('content', ''))
|
|
|
return jsonify({'success': True})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
@app.route('/api/files/delete', methods=['POST'])
|
|
|
def delete_file():
|
|
|
data = request.json
|
|
|
try:
|
|
|
file_path = data['path']
|
|
|
abs_path = os.path.abspath(file_path)
|
|
|
|
|
|
if os.path.isdir(abs_path):
|
|
|
import shutil
|
|
|
shutil.rmtree(abs_path)
|
|
|
else:
|
|
|
os.remove(abs_path)
|
|
|
return jsonify({'success': True})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
@app.route('/api/files/rename', methods=['POST'])
|
|
|
def rename_file():
|
|
|
data = request.json
|
|
|
try:
|
|
|
old_path = os.path.abspath(data['oldPath'])
|
|
|
new_path = os.path.abspath(data['newPath'])
|
|
|
|
|
|
os.rename(old_path, new_path)
|
|
|
return jsonify({'success': True})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
@app.route('/api/files/read')
|
|
|
def read_file():
|
|
|
file_path = request.args.get('path')
|
|
|
try:
|
|
|
abs_path = os.path.abspath(file_path)
|
|
|
|
|
|
with open(abs_path, 'r', encoding='utf-8') as f:
|
|
|
content = f.read()
|
|
|
return jsonify({'success': True, 'content': content})
|
|
|
except UnicodeDecodeError:
|
|
|
try:
|
|
|
with open(abs_path, 'r', encoding='gbk') as f:
|
|
|
content = f.read()
|
|
|
return jsonify({'success': True, 'content': content})
|
|
|
except:
|
|
|
return jsonify({'success': False, 'error': '无法读取文件,可能是二进制文件'})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
@app.route('/api/files/write', methods=['POST'])
|
|
|
def write_file():
|
|
|
data = request.json
|
|
|
try:
|
|
|
file_path = data['path']
|
|
|
content = data['content']
|
|
|
abs_path = os.path.abspath(file_path)
|
|
|
|
|
|
with open(abs_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(content)
|
|
|
return jsonify({'success': True})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
@app.route('/api/files/download')
|
|
|
def download_file():
|
|
|
file_path = request.args.get('path')
|
|
|
try:
|
|
|
abs_path = os.path.abspath(file_path)
|
|
|
return send_file(abs_path, as_attachment=True)
|
|
|
except Exception as e:
|
|
|
abort(404)
|
|
|
|
|
|
@app.route('/api/files/upload', methods=['POST'])
|
|
|
def upload_file():
|
|
|
try:
|
|
|
file = request.files['file']
|
|
|
path = request.form.get('path', '.')
|
|
|
abs_path = os.path.abspath(path)
|
|
|
|
|
|
file_path = os.path.join(abs_path, file.filename)
|
|
|
file.save(file_path)
|
|
|
return jsonify({'success': True})
|
|
|
except Exception as e:
|
|
|
return jsonify({'success': False, 'error': str(e)})
|
|
|
|
|
|
|
|
|
@socketio.on('connect')
|
|
|
def on_connect():
|
|
|
logger.info(f'客户端连接: {request.sid}')
|
|
|
|
|
|
@socketio.on('disconnect')
|
|
|
def on_disconnect():
|
|
|
logger.info(f'客户端断开: {request.sid}')
|
|
|
|
|
|
@socketio.on('start_terminal')
|
|
|
def on_start_terminal():
|
|
|
logger.info('启动终端请求')
|
|
|
emit('output', '=== WebShell Terminal Started ===\n')
|
|
|
emit('output', f'Working Directory: {os.getcwd()}\n')
|
|
|
emit('output', 'Type commands below:\n')
|
|
|
emit('terminal_ready')
|
|
|
|
|
|
@socketio.on('input')
|
|
|
def on_terminal_input(data):
|
|
|
command = data
|
|
|
logger.info(f"收到终端输入: {repr(command)}")
|
|
|
|
|
|
try:
|
|
|
output = execute_command(command)
|
|
|
|
|
|
if output and not output.endswith('\n'):
|
|
|
output += '\n'
|
|
|
emit('output', output)
|
|
|
except Exception as e:
|
|
|
logger.error(f"执行命令错误: {e}")
|
|
|
emit('output', f'错误: {e}\n')
|
|
|
|
|
|
@socketio.on('terminal_resize')
|
|
|
def on_terminal_resize(data):
|
|
|
logger.debug(f'终端调整大小: {data["rows"]}x{data["cols"]}')
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
@app.errorhandler(404)
|
|
|
def not_found(error):
|
|
|
return jsonify({'success': False, 'error': '页面未找到'}), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
|
def internal_error(error):
|
|
|
return jsonify({'success': False, 'error': '内部服务器错误'}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
port = int(os.environ.get('PORT', 7860))
|
|
|
host = os.environ.get('HOST', '0.0.0.0')
|
|
|
debug = os.environ.get('DEBUG', 'False').lower() == 'true'
|
|
|
|
|
|
logger.info(f"🚀 WebShell 启动在 {host}:{port}")
|
|
|
|
|
|
|
|
|
if init_terminal():
|
|
|
logger.info("终端初始化成功")
|
|
|
else:
|
|
|
logger.error("终端初始化失败")
|
|
|
|
|
|
socketio.run(app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True) |