import os import subprocess import json import threading import time import signal import queue from datetime import datetime from flask import Flask, render_template, request, jsonify, send_file, abort, redirect, url_for from flask_socketio import SocketIO, emit, join_room, leave_room import sys import logging # 配置日志 logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'webshell-secret-key-2024' socketio = SocketIO(app, cors_allowed_origins="*") # 全局终端变量 shell_process = None def execute_command(command): """执行单个命令并返回输出""" try: logger.info(f"执行命令: {repr(command)}") if os.name == 'nt': # Windows环境 result = subprocess.run( command, shell=True, capture_output=True, text=True, cwd=os.getcwd(), timeout=30 ) output = result.stdout + result.stderr logger.debug(f"命令输出: {repr(output)}") return output else: # Unix/Linux环境 result = subprocess.run( command, shell=True, capture_output=True, text=True, cwd=os.getcwd(), timeout=30 ) output = result.stdout + result.stderr logger.debug(f"命令输出: {repr(output)}") return output except subprocess.TimeoutExpired: return "命令执行超时\n" except Exception as e: logger.error(f"执行命令错误: {e}") return f"错误: {e}\n" def init_terminal(): """初始化全局终端""" logger.info("终端初始化") return True @app.route('/') def index(): return render_template('index.html') @app.route('/files') def file_manager(): return render_template('files.html') @app.route('/terminal') def terminal(): return render_template('terminal.html') @app.route('/api/files') def list_files(): path = request.args.get('path', '.') try: # 获取绝对路径 abs_path = os.path.abspath(path) # 确保路径存在 if not os.path.exists(abs_path): abs_path = os.getcwd() items = [] # 添加返回上级目录的选项(除非已经在根目录) parent_path = os.path.dirname(abs_path) if abs_path != parent_path: # 不在根目录 items.append({ 'name': '..', 'type': 'directory', 'size': 0, 'modified': '', 'path': parent_path.replace('\\', '/') }) # 列出当前目录下的文件和文件夹 try: for item in os.listdir(abs_path): item_path = os.path.join(abs_path, item) try: stat = os.stat(item_path) items.append({ 'name': item, 'type': 'directory' if os.path.isdir(item_path) else 'file', 'size': stat.st_size, 'modified': datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'path': item_path.replace('\\', '/') }) except (PermissionError, OSError): # 跳过无权限访问的文件 continue except PermissionError: return jsonify({'success': False, 'error': '没有权限访问此目录'}) return jsonify({ 'success': True, 'path': abs_path.replace('\\', '/'), 'items': sorted(items, key=lambda x: (x['name'] != '..', x['type'] != 'directory', x['name'].lower())) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/files/create', methods=['POST']) def create_file(): data = request.json try: file_path = data['path'] abs_path = os.path.abspath(file_path) if data['type'] == 'directory': os.makedirs(abs_path, exist_ok=True) else: os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, 'w', encoding='utf-8') as f: f.write(data.get('content', '')) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/files/delete', methods=['POST']) def delete_file(): data = request.json try: file_path = data['path'] abs_path = os.path.abspath(file_path) if os.path.isdir(abs_path): import shutil shutil.rmtree(abs_path) else: os.remove(abs_path) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/files/rename', methods=['POST']) def rename_file(): data = request.json try: old_path = os.path.abspath(data['oldPath']) new_path = os.path.abspath(data['newPath']) os.rename(old_path, new_path) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/files/read') def read_file(): file_path = request.args.get('path') try: abs_path = os.path.abspath(file_path) with open(abs_path, 'r', encoding='utf-8') as f: content = f.read() return jsonify({'success': True, 'content': content}) except UnicodeDecodeError: try: with open(abs_path, 'r', encoding='gbk') as f: content = f.read() return jsonify({'success': True, 'content': content}) except: return jsonify({'success': False, 'error': '无法读取文件,可能是二进制文件'}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/files/write', methods=['POST']) def write_file(): data = request.json try: file_path = data['path'] content = data['content'] abs_path = os.path.abspath(file_path) with open(abs_path, 'w', encoding='utf-8') as f: f.write(content) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @app.route('/api/files/download') def download_file(): file_path = request.args.get('path') try: abs_path = os.path.abspath(file_path) return send_file(abs_path, as_attachment=True) except Exception as e: abort(404) @app.route('/api/files/upload', methods=['POST']) def upload_file(): try: file = request.files['file'] path = request.form.get('path', '.') abs_path = os.path.abspath(path) file_path = os.path.join(abs_path, file.filename) file.save(file_path) return jsonify({'success': True}) except Exception as e: return jsonify({'success': False, 'error': str(e)}) # WebSocket事件处理 @socketio.on('connect') def on_connect(): logger.info(f'客户端连接: {request.sid}') @socketio.on('disconnect') def on_disconnect(): logger.info(f'客户端断开: {request.sid}') @socketio.on('start_terminal') def on_start_terminal(): logger.info('启动终端请求') emit('output', '=== WebShell Terminal Started ===\n') emit('output', f'Working Directory: {os.getcwd()}\n') emit('output', 'Type commands below:\n') emit('terminal_ready') @socketio.on('input') def on_terminal_input(data): command = data # 成功案例直接发送字符串,不是对象 logger.info(f"收到终端输入: {repr(command)}") try: output = execute_command(command) # 确保输出末尾有换行符,以区分命令和输出 if output and not output.endswith('\n'): output += '\n' emit('output', output) # 改为'output'事件 except Exception as e: logger.error(f"执行命令错误: {e}") emit('output', f'错误: {e}\n') @socketio.on('terminal_resize') def on_terminal_resize(data): logger.debug(f'终端调整大小: {data["rows"]}x{data["cols"]}') # 对于全局终端,暂时忽略调整大小 pass # 错误处理 @app.errorhandler(404) def not_found(error): return jsonify({'success': False, 'error': '页面未找到'}), 404 @app.errorhandler(500) def internal_error(error): return jsonify({'success': False, 'error': '内部服务器错误'}), 500 if __name__ == '__main__': # 从环境变量获取端口,默认7860(Hugging Face Spaces标准端口) port = int(os.environ.get('PORT', 7860)) host = os.environ.get('HOST', '0.0.0.0') debug = os.environ.get('DEBUG', 'False').lower() == 'true' logger.info(f"🚀 WebShell 启动在 {host}:{port}") # 初始化终端 if init_terminal(): logger.info("终端初始化成功") else: logger.error("终端初始化失败") socketio.run(app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True)