Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| MCP Server Control Plugin - 全功能版 | |
| 功能: | |
| 1. 系统控制:命令执行、服务管理、进程查看 | |
| 2. 文件操作:读写、复制移动、权限管理 | |
| 3. 精准编辑:行号编辑、正则替换、Diff/Patch 应用 (新增) | |
| 4. 文件服务:HTTP 上传/下载、临时文件分享 | |
| 5. 信息查询:系统信息、网络状态、搜索 | |
| 依赖: pip install "fastmcp>=2.3.1" fastapi python-multipart uvicorn | |
| """ | |
| import os | |
| import subprocess | |
| import base64 | |
| import shutil | |
| import pwd | |
| import grp | |
| import stat | |
| import asyncio | |
| import secrets | |
| import mimetypes | |
| import re | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass | |
| from fastmcp import FastMCP | |
| from fastapi import FastAPI, UploadFile, File, Query | |
| from fastapi.responses import JSONResponse, FileResponse | |
| import uvicorn | |
| # ============ 配置 ============ | |
| HOST = "0.0.0.0" | |
| PORT = 8000 | |
| SERVER_NAME = "Server Control MCP" | |
| DEFAULT_UPLOAD_DIR = "/tmp/mcp_uploads" | |
| MAX_UPLOAD_SIZE = 500 * 1024 * 1024 # 500MB | |
| MAX_EDIT_FILE_SIZE = 10 * 1024 * 1024 # 10MB (精准编辑限制) | |
| MAX_MATCHES_RETURN = 50 # 搜索替换最大返回数 | |
| # HuggingFace Spaces 或其他环境的 URL 配置 | |
| BASE_URL = os.getenv("BASE_URL", f"http://localhost:{PORT}").rstrip('/') | |
| # 确保上传目录存在 | |
| Path(DEFAULT_UPLOAD_DIR).mkdir(parents=True, exist_ok=True) | |
| # ============ 数据结构 ============ | |
| class FileShare: | |
| id: str | |
| source_path: str | |
| filename: str | |
| created_at: datetime | |
| expires_at: Optional[datetime] | |
| download_count: int = 0 | |
| max_downloads: Optional[int] = None | |
| class DiffHunk: | |
| old_start: int | |
| old_count: int | |
| new_start: int | |
| new_count: int | |
| lines: List[Tuple[str, str]] | |
| # 全局存储 | |
| file_shares: Dict[str, FileShare] = {} | |
| # ============ 辅助函数: 文件分享 ============ | |
| def generate_short_id(length: int = 6) -> str: | |
| alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" | |
| while True: | |
| new_id = ''.join(secrets.choice(alphabet) for _ in range(length)) | |
| if new_id not in file_shares: | |
| return new_id | |
| def cleanup_expired_shares(): | |
| now = datetime.now() | |
| expired = [sid for sid, s in file_shares.items() if s.expires_at and s.expires_at < now] | |
| for sid in expired: | |
| del file_shares[sid] | |
| # ============ 辅助函数: 精准编辑 ============ | |
| def safe_read_file(path: str) -> Tuple[bool, str, List[str]]: | |
| """安全读取文件,自动处理编码""" | |
| file_path = Path(path).expanduser().resolve() | |
| if not file_path.exists(): return False, f"File not found: {path}", [] | |
| if not file_path.is_file(): return False, f"Not a file: {path}", [] | |
| size = file_path.stat().st_size | |
| if size > MAX_EDIT_FILE_SIZE: | |
| return False, f"File too large: {size} bytes (max {MAX_EDIT_FILE_SIZE})", [] | |
| encodings = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| content = f.read() | |
| if '\x00' in content: return False, "Binary file detected", [] | |
| lines = content.splitlines(keepends=True) | |
| if lines and not lines[-1].endswith('\n'): lines[-1] += '\n' | |
| return True, encoding, lines | |
| except UnicodeDecodeError: | |
| continue | |
| return False, "Unable to decode file", [] | |
| def safe_write_file(path: str, lines: List[str], encoding: str = 'utf-8') -> Tuple[bool, str]: | |
| """安全写入文件(原子写入)""" | |
| file_path = Path(path).expanduser().resolve() | |
| try: | |
| dir_path = file_path.parent | |
| with tempfile.NamedTemporaryFile(mode='w', encoding=encoding, dir=dir_path, delete=False, suffix='.tmp') as tmp: | |
| tmp.writelines(lines) | |
| tmp_path = tmp.name | |
| shutil.move(tmp_path, file_path) | |
| return True, "" | |
| except Exception as e: | |
| if 'tmp_path' in locals(): Path(tmp_path).unlink(missing_ok=True) | |
| return False, str(e) | |
| def create_backup(path: str) -> Tuple[bool, str]: | |
| """创建备份文件""" | |
| file_path = Path(path).expanduser().resolve() | |
| backup_path = file_path.with_suffix(file_path.suffix + '.bak') | |
| try: | |
| shutil.copy2(file_path, backup_path) | |
| return True, str(backup_path) | |
| except Exception as e: | |
| return False, str(e) | |
| def normalize_line_content(content: str) -> List[str]: | |
| if not content: return [] | |
| lines = content.splitlines(keepends=True) | |
| if lines and not lines[-1].endswith('\n'): lines[-1] += '\n' | |
| return lines | |
| def parse_unified_diff(diff_text: str) -> Tuple[bool, str, List[DiffHunk]]: | |
| """解析 Unified Diff""" | |
| lines = diff_text.strip().split('\n') | |
| hunks = [] | |
| i = 0 | |
| # Skip headers | |
| while i < len(lines) and not lines[i].startswith('@@'): i += 1 | |
| while i < len(lines): | |
| line = lines[i] | |
| m = re.match(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@', line) | |
| if not m: | |
| i += 1 | |
| continue | |
| old_start, old_count = int(m.group(1)), int(m.group(2) or 1) | |
| new_start, new_count = int(m.group(3)), int(m.group(4) or 1) | |
| i += 1 | |
| hunk_lines = [] | |
| while i < len(lines): | |
| if lines[i].startswith('@@'): break | |
| hl = lines[i] | |
| if not hl: hunk_lines.append((' ', '')) | |
| elif hl[0] in (' ', '-', '+'): hunk_lines.append((hl[0], hl[1:])) | |
| elif hl[0] == '\\': pass | |
| else: hunk_lines.append((' ', hl)) | |
| i += 1 | |
| hunks.append(DiffHunk(old_start, old_count, new_start, new_count, hunk_lines)) | |
| if not hunks: return False, "No valid hunks found", [] | |
| return True, "", hunks | |
| def apply_hunk(lines: List[str], hunk: DiffHunk, offset: int) -> Tuple[bool, str, List[str], int]: | |
| """应用单个 Hunk""" | |
| start_idx = hunk.old_start - 1 + offset | |
| if start_idx < 0: return False, f"Invalid position: {start_idx}", lines, offset | |
| # 验证 Context | |
| check_idx = start_idx | |
| for type, expected in hunk.lines: | |
| if type in (' ', '-'): | |
| if check_idx >= len(lines): return False, "Hunk extends beyond file", lines, offset | |
| if lines[check_idx].rstrip('\n\r') != expected.rstrip('\n\r'): | |
| return False, f"Context mismatch at line {check_idx+1}", lines, offset | |
| check_idx += 1 | |
| # 应用修改 | |
| new_lines = lines[:start_idx] | |
| read_idx = start_idx | |
| for type, content in hunk.lines: | |
| if type == ' ': | |
| new_lines.append(lines[read_idx]) | |
| read_idx += 1 | |
| elif type == '-': | |
| read_idx += 1 | |
| elif type == '+': | |
| new_lines.append(content + '\n' if not content.endswith('\n') else content) | |
| new_lines.extend(lines[read_idx:]) | |
| removed = sum(1 for t, _ in hunk.lines if t == '-') | |
| added = sum(1 for t, _ in hunk.lines if t == '+') | |
| return True, "", new_lines, offset + added - removed | |
| # ============ MCP 实例 ============ | |
| mcp = FastMCP(name=SERVER_NAME) | |
| # ============ MCP 工具: 精准编辑 (New) ============ | |
| def edit_file_lines( | |
| path: str, | |
| start_line: int, | |
| end_line: Optional[int] = None, | |
| content: str = "", | |
| action: str = "replace", | |
| backup: bool = False, | |
| dry_run: bool = False | |
| ) -> Dict[str, Any]: | |
| """ | |
| 按行号精准编辑文件 | |
| Args: | |
| action: "replace" | "insert_before" | "insert_after" | "delete" | |
| start_line: 起始行 (1-based) | |
| end_line: 结束行 (包含) | |
| backup: 是否备份 | |
| dry_run: 仅预览 | |
| """ | |
| if action not in ["replace", "insert_before", "insert_after", "delete"]: | |
| return {"success": False, "error": f"Invalid action: {action}"} | |
| if start_line < 1: return {"success": False, "error": "start_line must be >= 1"} | |
| if end_line is None: end_line = start_line | |
| if end_line < start_line: return {"success": False, "error": "end_line must be >= start_line"} | |
| success, enc_or_err, lines = safe_read_file(path) | |
| if not success: return {"success": False, "error": enc_or_err} | |
| total = len(lines) | |
| # 验证范围 | |
| if action in ["replace", "delete"] and (start_line > total or end_line > total): | |
| return {"success": False, "error": "Line range exceeds file length"} | |
| work_lines = lines.copy() | |
| new_content = normalize_line_content(content) if content else [] | |
| if action == "replace": | |
| work_lines[start_line-1:end_line] = new_content | |
| removed, added = end_line - start_line + 1, len(new_content) | |
| elif action == "insert_before": | |
| for i, l in enumerate(new_content): work_lines.insert(start_line-1+i, l) | |
| removed, added = 0, len(new_content) | |
| elif action == "insert_after": | |
| for i, l in enumerate(new_content): work_lines.insert(start_line+i, l) | |
| removed, added = 0, len(new_content) | |
| elif action == "delete": | |
| del work_lines[start_line-1:end_line] | |
| removed, added = end_line - start_line + 1, 0 | |
| if dry_run: | |
| return {"success": True, "dry_run": True, "action": action, "lines_removed": removed, "lines_added": added} | |
| backup_path = None | |
| if backup: | |
| s, r = create_backup(path) | |
| if not s: return {"success": False, "error": f"Backup failed: {r}"} | |
| backup_path = r | |
| s, e = safe_write_file(path, work_lines, enc_or_err) | |
| if not s: return {"success": False, "error": e} | |
| return {"success": True, "action": action, "lines_removed": removed, "lines_added": added, "backup_path": backup_path} | |
| def search_replace_file( | |
| path: str, | |
| search: str, | |
| replace: str, | |
| regex: bool = False, | |
| count: int = 0, | |
| ignore_case: bool = False, | |
| whole_word: bool = False, | |
| backup: bool = False, | |
| dry_run: bool = False | |
| ) -> Dict[str, Any]: | |
| """搜索并替换文件内容 (支持正则)""" | |
| if not search: return {"success": False, "error": "Search pattern empty"} | |
| flags = re.IGNORECASE if ignore_case else 0 | |
| try: | |
| if regex: pattern = re.compile(search, flags) | |
| elif whole_word: pattern = re.compile(r'\b' + re.escape(search) + r'\b', flags) | |
| else: pattern = re.compile(re.escape(search), flags) | |
| except re.error as e: return {"success": False, "error": str(e)} | |
| success, enc_or_err, lines = safe_read_file(path) | |
| if not success: return {"success": False, "error": enc_or_err} | |
| matches, new_lines, total_replaced = [], [], 0 | |
| max_count = count if count > 0 else float('inf') | |
| for i, line in enumerate(lines, 1): | |
| if total_replaced >= max_count: | |
| new_lines.append(line) | |
| continue | |
| found = list(pattern.finditer(line)) | |
| if not found: | |
| new_lines.append(line) | |
| continue | |
| remaining = int(max_count - total_replaced) | |
| if len(found) <= remaining: | |
| new_line = pattern.sub(replace, line) | |
| replaced = len(found) | |
| else: | |
| new_line = pattern.sub(replace, line, count=remaining) | |
| replaced = remaining | |
| total_replaced += replaced | |
| new_lines.append(new_line) | |
| if len(matches) < MAX_MATCHES_RETURN: | |
| matches.append({"line": i, "before": line.strip(), "after": new_line.strip()}) | |
| if total_replaced == 0: return {"success": True, "replacements": 0, "message": "No matches"} | |
| if dry_run: return {"success": True, "dry_run": True, "replacements": total_replaced, "matches": matches} | |
| backup_path = None | |
| if backup: | |
| s, r = create_backup(path) | |
| if not s: return {"success": False, "error": f"Backup failed: {r}"} | |
| backup_path = r | |
| s, e = safe_write_file(path, new_lines, enc_or_err) | |
| if not s: return {"success": False, "error": e} | |
| return {"success": True, "replacements": total_replaced, "matches": matches, "backup_path": backup_path} | |
| def apply_diff( | |
| path: str, | |
| diff: str, | |
| backup: bool = False, | |
| dry_run: bool = False, | |
| reverse: bool = False | |
| ) -> Dict[str, Any]: | |
| """应用 Unified Diff 补丁""" | |
| if not diff.strip(): return {"success": False, "error": "Empty diff"} | |
| s, err, hunks = parse_unified_diff(diff) | |
| if not s: return {"success": False, "error": err} | |
| if reverse: | |
| rev_hunks = [] | |
| for h in hunks: | |
| rev_lines = [] | |
| for t, c in h.lines: | |
| if t == '+': rev_lines.append(('-', c)) | |
| elif t == '-': rev_lines.append(('+', c)) | |
| else: rev_lines.append((t, c)) | |
| rev_hunks.append(DiffHunk(h.new_start, h.new_count, h.old_start, h.old_count, rev_lines)) | |
| hunks = rev_hunks | |
| success, enc_or_err, lines = safe_read_file(path) | |
| if not success: return {"success": False, "error": enc_or_err} | |
| offset, details, added, removed = 0, [], 0, 0 | |
| for i, hunk in enumerate(hunks): | |
| s, err, lines, offset = apply_hunk(lines, hunk, offset) | |
| if not s: return {"success": False, "error": f"Hunk #{i+1}: {err}"} | |
| h_rem = sum(1 for t, _ in hunk.lines if t == '-') | |
| h_add = sum(1 for t, _ in hunk.lines if t == '+') | |
| removed += h_rem | |
| added += h_add | |
| details.append({"hunk": i+1, "removed": h_rem, "added": h_add}) | |
| if dry_run: | |
| return {"success": True, "dry_run": True, "hunks": len(hunks), "added": added, "removed": removed} | |
| backup_path = None | |
| if backup: | |
| s, r = create_backup(path) | |
| if not s: return {"success": False, "error": f"Backup failed: {r}"} | |
| backup_path = r | |
| s, e = safe_write_file(path, lines, enc_or_err) | |
| if not s: return {"success": False, "error": e} | |
| return {"success": True, "hunks": len(hunks), "added": added, "removed": removed, "backup_path": backup_path} | |
| # ============ MCP 工具: 命令与系统 ============ | |
| async def execute_command(command: str, working_dir: Optional[str] = None, timeout: int = 300) -> Dict[str, Any]: | |
| """执行系统命令""" | |
| try: | |
| cwd = working_dir or os.getcwd() | |
| proc = await asyncio.create_subprocess_shell( | |
| command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd | |
| ) | |
| stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) | |
| return { | |
| "success": True, | |
| "stdout": stdout.decode('utf-8', errors='replace'), | |
| "stderr": stderr.decode('utf-8', errors='replace'), | |
| "return_code": proc.returncode | |
| } | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| async def execute_script(script_content: str, interpreter: str = "/bin/bash", working_dir: Optional[str] = None) -> Dict[str, Any]: | |
| """执行脚本""" | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f: | |
| f.write(script_content) | |
| path = f.name | |
| os.chmod(path, 0o755) | |
| try: | |
| return await execute_command(f"{interpreter} {path}", working_dir) | |
| finally: | |
| os.unlink(path) | |
| # ============ MCP 工具: 基础文件操作 ============ | |
| def read_file(path: str, encoding: str = "utf-8", binary: bool = False) -> Dict[str, Any]: | |
| """读取文件""" | |
| try: | |
| p = Path(path).expanduser().resolve() | |
| if binary: | |
| with open(p, 'rb') as f: return {"success": True, "content": base64.b64encode(f.read()).decode('ascii'), "encoding": "base64"} | |
| with open(p, 'r', encoding=encoding) as f: return {"success": True, "content": f.read()} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def write_file(path: str, content: str, binary: bool = False, append: bool = False) -> Dict[str, Any]: | |
| """写入文件""" | |
| try: | |
| p = Path(path).expanduser().resolve() | |
| p.parent.mkdir(parents=True, exist_ok=True) | |
| mode = ('ab' if append else 'wb') if binary else ('a' if append else 'w') | |
| data = base64.b64decode(content) if binary else content | |
| encoding = None if binary else 'utf-8' | |
| with open(p, mode, encoding=encoding) as f: f.write(data) | |
| return {"success": True, "path": str(p), "size": p.stat().st_size} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def delete_file(path: str, force: bool = False) -> Dict[str, Any]: | |
| """删除文件或目录""" | |
| try: | |
| p = Path(path).expanduser().resolve() | |
| if p.is_file(): p.unlink() | |
| elif p.is_dir(): shutil.rmtree(p) if force else p.rmdir() | |
| else: return {"success": False, "error": "Not found"} | |
| return {"success": True, "path": str(p)} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def copy_path(source: str, destination: str, overwrite: bool = False) -> Dict[str, Any]: | |
| """复制""" | |
| try: | |
| src, dst = Path(source).expanduser().resolve(), Path(destination).expanduser().resolve() | |
| if not src.exists(): return {"success": False, "error": "Source not found"} | |
| if dst.exists() and not overwrite: return {"success": False, "error": "Destination exists"} | |
| if src.is_file(): shutil.copy2(src, dst) | |
| else: shutil.copytree(src, dst, dirs_exist_ok=overwrite) | |
| return {"success": True} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def move_path(source: str, destination: str) -> Dict[str, Any]: | |
| """移动""" | |
| try: | |
| src, dst = Path(source).expanduser().resolve(), Path(destination).expanduser().resolve() | |
| shutil.move(str(src), str(dst)) | |
| return {"success": True} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def upload_file(path: str, content_base64: str, create_dirs: bool = True) -> Dict[str, Any]: | |
| """上传文件 (Base64)""" | |
| return write_file(path, content_base64, binary=True) | |
| def download_file(path: str) -> Dict[str, Any]: | |
| """下载文件 (Base64)""" | |
| return read_file(path, binary=True) | |
| # ============ MCP 工具: 目录与搜索 ============ | |
| def list_directory(path: str = ".", recursive: bool = False) -> Dict[str, Any]: | |
| """列出目录""" | |
| try: | |
| p = Path(path).expanduser().resolve() | |
| items = [] | |
| for item in (p.rglob('*') if recursive else p.iterdir()): | |
| items.append({"name": item.name, "path": str(item), "type": "dir" if item.is_dir() else "file"}) | |
| return {"success": True, "items": items} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def create_directory(path: str) -> Dict[str, Any]: | |
| """创建目录""" | |
| try: | |
| Path(path).expanduser().resolve().mkdir(parents=True, exist_ok=True) | |
| return {"success": True} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def search_files(directory: str, pattern: str, max_results: int = 100) -> Dict[str, Any]: | |
| """文件名搜索""" | |
| try: | |
| p = Path(directory).expanduser().resolve() | |
| results = [str(f) for i, f in enumerate(p.rglob(pattern)) if i < max_results] | |
| return {"success": True, "results": results} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def search_in_files(directory: str, search_text: str, file_pattern: str = "*", max_results: int = 50) -> Dict[str, Any]: | |
| """文件内容搜索""" | |
| try: | |
| p = Path(directory).expanduser().resolve() | |
| results = [] | |
| for f in p.rglob(file_pattern): | |
| if not f.is_file(): continue | |
| try: | |
| with open(f, 'r', errors='ignore') as fp: | |
| for i, line in enumerate(fp, 1): | |
| if search_text in line: | |
| results.append({"file": str(f), "line": i, "content": line.strip()}) | |
| if len(results) >= max_results: return {"success": True, "results": results, "truncated": True} | |
| except: continue | |
| return {"success": True, "results": results} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| # ============ MCP 工具: 分享与上传 ============ | |
| def share_file(path: str, filename: Optional[str] = None, expires_in: int = 3600) -> Dict[str, Any]: | |
| """生成下载链接""" | |
| try: | |
| p = Path(path).expanduser().resolve() | |
| if not p.is_file(): return {"success": False, "error": "Not a file"} | |
| cleanup_expired_shares() | |
| sid = generate_short_id() | |
| share = FileShare(sid, str(p), filename or p.name, datetime.now(), datetime.now() + timedelta(seconds=expires_in)) | |
| file_shares[sid] = share | |
| return {"success": True, "url": f"{BASE_URL}/f/{sid}", "expires_at": share.expires_at.isoformat()} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def list_shares() -> Dict[str, Any]: | |
| """列出分享""" | |
| return {"success": True, "shares": [{"id": s.id, "url": f"{BASE_URL}/f/{s.id}", "file": s.filename} for s in file_shares.values()]} | |
| def revoke_share(share_id: str) -> Dict[str, Any]: | |
| """撤销分享""" | |
| if share_id in file_shares: | |
| del file_shares[share_id] | |
| return {"success": True} | |
| return {"success": False, "error": "Share not found"} | |
| def get_share_info(share_id: str) -> Dict[str, Any]: | |
| """获取分享详情""" | |
| if share_id not in file_shares: return {"success": False, "error": "Share not found"} | |
| s = file_shares[share_id] | |
| return { | |
| "success": True, "id": s.id, "url": f"{BASE_URL}/f/{s.id}", | |
| "filename": s.filename, "downloads": s.download_count, | |
| "expires_at": s.expires_at.isoformat() if s.expires_at else None | |
| } | |
| def upload_to_server(content: str, filename: str, directory: str = DEFAULT_UPLOAD_DIR) -> Dict[str, Any]: | |
| """小文件上传 (Base64)""" | |
| return write_file(f"{directory}/{filename}", content, binary=True) | |
| def get_upload_endpoint() -> Dict[str, Any]: | |
| """获取 HTTP 上传端点""" | |
| return { | |
| "success": True, "url": f"{BASE_URL}/upload", "method": "POST", "field": "file", | |
| "example": f"curl -F 'file=@file.zip' '{BASE_URL}/upload?dir={DEFAULT_UPLOAD_DIR}'" | |
| } | |
| # ============ MCP 工具: 系统信息与管理 ============ | |
| def get_system_info() -> Dict[str, Any]: | |
| """获取系统信息""" | |
| import platform | |
| try: | |
| disk = shutil.disk_usage('/') | |
| return { | |
| "platform": platform.platform(), | |
| "python": platform.python_version(), | |
| "cpu": platform.machine(), | |
| "disk_total_gb": round(disk.total / (1024**3), 2), | |
| "disk_free_gb": round(disk.free / (1024**3), 2), | |
| "cwd": os.getcwd() | |
| } | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def get_process_list(filter_name: Optional[str] = None) -> Dict[str, Any]: | |
| """获取进程列表""" | |
| cmd = "ps aux" + (f" | grep -i {filter_name}" if filter_name else "") | |
| try: | |
| res = subprocess.run(cmd, shell=True, capture_output=True, text=True) | |
| return {"success": True, "output": res.stdout[:2000] + ("..." if len(res.stdout)>2000 else "")} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def get_network_info() -> Dict[str, Any]: | |
| """获取网络信息""" | |
| try: | |
| res = subprocess.run("ip a", shell=True, capture_output=True, text=True) | |
| return {"success": True, "output": res.stdout} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| async def check_port(host: str, port: int) -> Dict[str, Any]: | |
| """检查端口""" | |
| try: | |
| _, w = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=3) | |
| w.close() | |
| return {"success": True, "status": "open"} | |
| except: return {"success": False, "status": "closed/timeout"} | |
| def get_environment_variables(prefix: Optional[str] = None) -> Dict[str, Any]: | |
| """获取环境变量""" | |
| env = os.environ.copy() | |
| if prefix: env = {k: v for k, v in env.items() if k.startswith(prefix)} | |
| return {"success": True, "env": env} | |
| def change_permissions(path: str, mode: str, recursive: bool = False) -> Dict[str, Any]: | |
| """修改权限 (chmod)""" | |
| try: | |
| p = Path(path).expanduser().resolve() | |
| m = int(mode, 8) | |
| if recursive and p.is_dir(): | |
| for item in p.rglob('*'): os.chmod(item, m) | |
| os.chmod(p, m) | |
| return {"success": True} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| def get_file_info(path: str) -> Dict[str, Any]: | |
| """获取文件详情""" | |
| try: | |
| s = Path(path).expanduser().resolve().stat() | |
| return { | |
| "success": True, "size": s.st_size, "mode": oct(s.st_mode), | |
| "uid": s.st_uid, "gid": s.st_gid, "mtime": datetime.fromtimestamp(s.st_mtime).isoformat() | |
| } | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| async def service_control(service: str, action: str) -> Dict[str, Any]: | |
| """服务控制 (systemctl)""" | |
| if action not in ["start", "stop", "restart", "status"]: return {"error": "Invalid action"} | |
| return await execute_command(f"systemctl {action} {service}") | |
| async def get_server_addr(): | |
| return {"addr":os.getenv("SERVER_URL", default="Unknown")} | |
| # ============ FastAPI 应用构建 ============ | |
| def create_app(): | |
| mcp_app = mcp.http_app(path="/mcp") | |
| app = FastAPI(title=SERVER_NAME, lifespan=mcp_app.lifespan) | |
| async def index(): | |
| return { | |
| "server": SERVER_NAME, | |
| "endpoints": { | |
| "mcp": f"{BASE_URL}/mcp/", | |
| "upload": f"{BASE_URL}/upload", | |
| "download": f"{BASE_URL}/f/{{id}}" | |
| } | |
| } | |
| async def download(sid: str): | |
| if sid not in file_shares: return JSONResponse({"error": "Not found"}, 404) | |
| s = file_shares[sid] | |
| if s.expires_at and datetime.now() > s.expires_at: return JSONResponse({"error": "Expired"}, 410) | |
| s.download_count += 1 | |
| return FileResponse(s.source_path, filename=s.filename) | |
| async def upload( | |
| file: UploadFile = File(...), | |
| dir: str = Query(DEFAULT_UPLOAD_DIR), | |
| overwrite: bool = Query(False) | |
| ): | |
| try: | |
| d = Path(dir).expanduser().resolve() | |
| d.mkdir(parents=True, exist_ok=True) | |
| p = d / file.filename | |
| if p.exists() and not overwrite: return JSONResponse({"error": "Exists"}, 409) | |
| content = await file.read() | |
| if len(content) > MAX_UPLOAD_SIZE: return JSONResponse({"error": "Too large"}, 413) | |
| with open(p, "wb") as f: f.write(content) | |
| return {"success": True, "path": str(p), "size": len(content)} | |
| except Exception as e: return JSONResponse({"error": str(e)}, 500) | |
| app.mount("/", mcp_app) | |
| return app | |
| if __name__ == "__main__": | |
| print(f"Starting {SERVER_NAME} on {BASE_URL}") | |
| uvicorn.run(create_app(), host=HOST, port=PORT) |