#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 修复说明: 1. 核心问题: 容器环境下缺乏对/app/runs目录的写入权限(PermissionError: [Errno 13]) 2. 解决方案: - 实现更健壮的存储路径选择机制 - 增加权限验证和自动回退逻辑 - 使用临时目录作为最终回退路径 - 修复路径处理中的绝对/相对路径问题 - 增强错误处理和日志记录 3. 主要修改点: - _ensure_dir(): 增加权限验证 - pick_storage_root(): 增强路径选择逻辑 - 目录初始化: 使用绝对路径并验证可写性 - run_in_sandbox(): 使用更安全的临时目录 - 增加多级回退机制(从/data → ~/.cache → /tmp) """ try: DEBUG_BUFFER except NameError: DEBUG_BUFFER = [] # 早期 debug 存根,避免在真正 debug 定义前发生 NameError from datetime import datetime def _sanitize(o): return o def debug(level, msg, details=None): try: ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] except Exception: ts = "time" line = f"[{ts}] [{str(level).upper():<8}] {msg}" try: print(line) if details is not None: print(details) except Exception: pass try: DEBUG_BUFFER.append(line) if len(DEBUG_BUFFER) > 1000: del DEBUG_BUFFER[: len(DEBUG_BUFFER) - 1000] except Exception: pass def get_debug_text() -> str: try: return "\n".join(DEBUG_BUFFER[-2000:]) except Exception: return "" ## 第一部分(导入和基础设置): # requirements: gradio>=4.44.1, requests, google-generativeai, GitPython # app.py import gradio as gr import requests, os, re, subprocess, json, sys, html, time, shutil, zipfile, shlex, threading, hashlib, inspect, traceback, urllib.parse from pathlib import Path from datetime import datetime import tempfile import shutil import concurrent.futures from typing import List, Dict, Any, Tuple, Optional from dataclasses import dataclass, field import ast, difflib from collections import deque APP_NAME = ( "AI 自主编程修复代理(团队规划 + 持久化 + 实时日志 + 选择修复 + 审计 + 智能补丁)" ) # ========================== 存储与运行根 ========================== def _ensure_dir(d: str) -> bool: try: path = Path(d) if not path.is_absolute(): path = (Path.cwd() / path).resolve() path.mkdir(parents=True, exist_ok=True) # 写测试文件 t = path / ".wtest" t.write_text("ok", encoding="utf-8") try: t.unlink() except Exception: pass # 读写权限 if not os.access(str(path), os.R_OK) or not os.access(str(path), os.W_OK): return False return True except Exception as e: try: debug("WARN", f"目录验证失败: {d}", {"error": str(e), "path": str(d)}) except Exception: print(f"[WARN] 目录验证失败: {d} -> {e}") return False def pick_storage_root() -> str: candidates = [ os.getenv("SPACE_STORAGE", "") or "", "/data", str(Path.home() / ".cache" / "pyzdh"), str(Path.cwd() / "data"), str(Path.cwd()), ] seen = set() for candidate in candidates: if not candidate: continue try: norm_path = str(Path(candidate).resolve()) except Exception: norm_path = candidate if norm_path in seen: continue seen.add(norm_path) if _ensure_dir(norm_path): try: debug("INFO", f"选择存储根目录: {norm_path}") except Exception: pass return norm_path fallback = str(Path(tempfile.gettempdir()) / "pyzdh_storage") _ensure_dir(fallback) try: debug("WARN", f"回退到临时目录: {fallback}") except Exception: pass return fallback # 初始化存储根目录 STORAGE_ROOT = pick_storage_root() storage_path = Path(STORAGE_ROOT) # 定义所有子目录路径(使用绝对路径) RUN_ROOT = str((storage_path / "runs").resolve()) LOGS_DIR = str((storage_path / "logs").resolve()) PROJECT_ROOT = str((storage_path / "projects").resolve()) TOOLS_ROOT = str((storage_path / "mcp_tools").resolve()) UPLOADS_DIR = str((storage_path / "uploads").resolve()) TASKS_FILE = str((storage_path / "tasks.json").resolve()) KEYS_FILE = str((storage_path / "keys.json").resolve()) # 验证并创建所有必要目录 essential_dirs = [ ("runs", RUN_ROOT), ("logs", LOGS_DIR), ("projects", PROJECT_ROOT), ("mcp_tools", TOOLS_ROOT), ("uploads", UPLOADS_DIR) ] for dir_name, dir_path in essential_dirs: if not _ensure_dir(dir_path): # 如果目录不可用,回退到临时目录 fallback_path = str(Path(tempfile.gettempdir()) / "pyzdh" / dir_name) debug("WARN", f"目录{dir_path}不可用,回退到{fallback_path}") _ensure_dir(fallback_path) # 更新全局变量 if dir_name == "runs": RUN_ROOT = fallback_path elif dir_name == "logs": LOGS_DIR = fallback_path elif dir_name == "projects": PROJECT_ROOT = fallback_path elif dir_name == "mcp_tools": TOOLS_ROOT = fallback_path elif dir_name == "uploads": UPLOADS_DIR = fallback_path debug("INFO", "存储目录初始化完成", { "storage_root": STORAGE_ROOT, "runs": RUN_ROOT, "logs": LOGS_DIR, "projects": PROJECT_ROOT, "uploads": UPLOADS_DIR }) # ========================== Git 可选 ========================== try: os.environ["GIT_PYTHON_REFRESH"] = "quiet" from git import Repo, GitCommandError GIT_AVAILABLE = True except Exception as e: print(f"[WARN] GitPython unavailable: {e}") GIT_AVAILABLE = False class Repo: ... class GitCommandError(Exception): ... # ========================== 常量与模型 ========================== HF_API_URL = "https://api-inference.huggingface.co/models/" DEFAULT_BASES = { "openai": "https://api.openai.com/v1", "groq": "https://api.groq.com/openai/v1", "mistral": "https://api.mistral.ai/v1", "deepseek": "https://api.deepseek.com/v1", "openrouter": "https://openrouter.ai/api/v1", "perplexity": "https://api.perplexity.ai/v1", "xai": "https://api.x.ai/v1", "azure": "https://YOUR-RESOURCE.openai.azure.com", "anthropic": "https://api.anthropic.com/v1", "siliconflow": "https://api.siliconflow.cn/v1", } RECOMMENDED_MODELS = { "gemini": ["gemini-1.5-flash", "gemini-1.5-pro"], "openai": ["gpt-4o-mini", "gpt-4o"], "anthropic": ["claude-3-5-sonnet-20240620", "claude-3-haiku-20240307"], # "cohere": ["command-r-plus"], # 暂时禁用,llm_call 未实现 "groq": ["llama-3.1-70b-versatile"], "mistral": ["mistral-large-latest"], "deepseek": ["deepseek-chat"], "openrouter": ["openai/gpt-4o"], "hf": ["Qwen/Qwen1.5-7B-Chat"], "azure": ["<你的部署名>"], "perplexity": ["llama-3-sonar-large-32k-online"], "xai": ["grok-1"], "mock": ["mock-echo"], "siliconflow": ["deepseek-ai/DeepSeek-V2", "alibaba/Qwen2-7B-Instruct"], } OPENAI_LIKE = { "openai", "groq", "mistral", "deepseek", "openrouter", "perplexity", "xai", "siliconflow", } MODEL_OK_CACHE: Dict[str, bool] = {} DEFAULT_GLOBAL_HINT_STR = "你是一个自动化编程维修测试员,根据要求自动运行测试修复以下代码,为正常没有问题所有功能完整实现" DEFAULT_REQ_TIMEOUT = int(os.getenv("REQ_TIMEOUT", "1200")) # ========================== 日志工具 ========================== DEBUG_BUFFER: List[str] = [] def _sanitize(o): try: if isinstance(o, dict): d = {} for k, v in o.items(): if any(s in k.lower() for s in ["key", "token", "secret", "auth"]): d[k] = ( (str(v)[:4] + "..." + str(v)[-4:]) if isinstance(v, str) and len(str(v)) > 8 else "***" ) else: d[k] = _sanitize(v) return d if isinstance(o, list): return [_sanitize(x) for x in o] return o except Exception: return o def debug(level, msg, details=None): try: fr = inspect.stack()[1] loc = f"{os.path.basename(fr.filename)}:{fr.lineno}({fr.function})" except Exception: loc = "unknown" ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] line = f"[{ts}] [{level.upper():<8}] [{loc}] {msg}" if details is not None: try: d = _sanitize(details) line += os.linesep + (json.dumps(d, ensure_ascii=False, indent=2) if isinstance(d, (dict, list)) else str(d)) except Exception: line += os.linesep + str(details) try: Path(LOGS_DIR).mkdir(parents=True, exist_ok=True) log_file = Path(LOGS_DIR) / f"debug-{datetime.now().strftime('%Y%m%d')}.log" with open(log_file, "a", encoding="utf-8") as f: f.write(line + os.linesep + "=" * 80 + os.linesep) except Exception as e: print(f"DEBUG_LOG_ERROR: {e}", file=sys.stderr) print(line, file=sys.stderr) DEBUG_BUFFER.append(line) if len(DEBUG_BUFFER) > 4000: del DEBUG_BUFFER[: len(DEBUG_BUFFER) - 4000] print(line) def get_debug_text() -> str: return "\n".join(DEBUG_BUFFER[-2000:]) # ========================== 配置持久化 ========================== AUTO_SAVE_ENABLED = True _LAST_SAVE_TS = 0.0 def load_all() -> dict: try: if Path(KEYS_FILE).exists(): txt = Path(KEYS_FILE).read_text("utf-8") debug("CONF", "载入 keys.json", {"path": KEYS_FILE, "bytes": len(txt)}) return json.loads(txt) except Exception as e: debug( "CONF_ERR", "读取 keys.json 失败", {"err": str(e), "trace": traceback.format_exc()}, ) return {} def _merge_non_empty(old: dict, new: dict) -> dict: out = dict(old or {}) for k, v in (new or {}).items(): if v is None: continue if isinstance(v, str) and v.strip() == "": continue out[k] = v return out def save_json(obj: dict, auto=False) -> str: try: tmp = KEYS_FILE + ".tmp" s = json.dumps(obj, ensure_ascii=False, indent=2) Path(tmp).write_text(s, "utf-8") os.replace(tmp, KEYS_FILE) debug( "CONF", "写入 keys.json", {"path": KEYS_FILE, "bytes": len(s), "auto": auto} ) return f"✅ {'自动保存成功' if auto else '已保存到本地'}({datetime.now().strftime('%H:%M:%S')})" except Exception as e: debug( "CONF_ERR", "写入 keys.json 失败", {"err": str(e), "trace": traceback.format_exc()}, ) return f"❌ 保存失败:{e}" def get_custom_providers() -> Dict[str, Dict[str, Any]]: cps = load_all().get("custom_providers", {}) return cps if isinstance(cps, dict) else {} def provider_choices() -> List[str]: return list(RECOMMENDED_MODELS.keys()) + list(get_custom_providers().keys()) def save_custom_provider( name: str, base: str, key: str, referer: str = "", title: str = "" ) -> str: name, base, key = (name or "").strip(), (base or "").strip(), (key or "").strip() debug( "UI_ACTION", "尝试保存自定义API", {"name": name, "base": base, "referer": referer, "title": title}, ) if not name: return "❗ 请输入自定义提供商 ID" if not base or not key: return "❗ Base URL 和 API Key 不能为空" try: data = load_all() cps = data.get("custom_providers", {}) or {} if not isinstance(cps, dict): cps = {} cps[name] = { "base": base, "key": key, "referer": (referer or ""), "title": (title or ""), } data["custom_providers"] = cps return save_json(data, auto=True) except Exception as e: debug( "EXCEPTION", "保存自定义API异常", {"err": str(e), "trace": traceback.format_exc()}, ) return f"❌ 内部错误,保存失败: {e}" def remove_custom_provider(name: str) -> str: data = load_all() cps = data.get("custom_providers", {}) or {} if name in cps: cps.pop(name, None) data["custom_providers"] = cps return save_json(data, auto=True) return "❌ 未找到该自定义提供商" def save_all_ui( keys: dict, provider: str, model: str, github_token: str, global_hint: str = "", last_task: Optional[str] = None, auto=False, ) -> str: old = load_all() merged = _merge_non_empty(old, keys or {}) merged["ui_provider"] = provider or old.get("ui_provider", "") merged["ui_model"] = model or old.get("ui_model", "") if github_token: merged["github_token"] = github_token merged["global_hint"] = global_hint or old.get("global_hint", "") if last_task is not None: lt = last_task.strip() merged["last_task"] = lt[:4000] + "…(截断)" if len(lt) > 4000 else lt debug( "CONF", "保存UI与键值", {"provider": merged.get("ui_provider"), "model": merged.get("ui_model")}, ) return save_json(merged, auto=auto) def auto_save_ui( keys: dict, provider: str, model: str, github_token: str, global_hint: str = "", last_task: str = "", ): global _LAST_SAVE_TS if not AUTO_SAVE_ENABLED: return "" now = time.time() if now - _LAST_SAVE_TS >= 1.2: _LAST_SAVE_TS = now return save_all_ui( keys, provider, model, github_token, global_hint, last_task=last_task, auto=True, ) return "" def get_models_cache(provider: str) -> List[str]: data = load_all() cache = data.get("models_cache", {}).get(provider, []) if isinstance(cache, list): debug("MODELS", "读取模型缓存", {"provider": provider, "count": len(cache)}) return cache return [] def set_models_cache(provider: str, models: List[str]): data = load_all() mc = data.get("models_cache", {}) mc[provider] = models or [] data["models_cache"] = mc debug("MODELS", "写入模型缓存", {"provider": provider, "count": len(models or [])}) save_json(data, auto=True) def get_saved_rpm(provider: str, model: str) -> int: data = load_all() return int(data.get("rate_limits", {}).get(f"{provider}:{model}", 0) or 0) def set_saved_rpm(provider: str, model: str, rpm: int): data = load_all() rl = data.get("rate_limits", {}) rl[f"{provider}:{model}"] = int(max(0, rpm)) data["rate_limits"] = rl debug("RL", "保存RPM限制", {"key": f"{provider}:{model}", "rpm": int(max(0, rpm))}) save_json(data, auto=True) ## 第二部分(核心功能函数): # ========================== 任务/对话/附件持久化(分离两种对话) ========================== def load_tasks_db() -> dict: try: if Path(TASKS_FILE).exists(): txt = Path(TASKS_FILE).read_text("utf-8") debug("TASK", "加载任务库", {"path": TASKS_FILE, "bytes": len(txt)}) return json.loads(txt) except Exception as e: debug( "TASK_ERR", "读取任务库失败", {"err": str(e), "trace": traceback.format_exc()}, ) return {"history": [], "last": {}} def save_tasks_db(db: dict): try: tmp = TASKS_FILE + ".tmp" s = json.dumps(db, ensure_ascii=False, indent=2) Path(tmp).write_text(s, "utf-8") os.replace(tmp, TASKS_FILE) debug("TASK", "写入任务库", {"path": TASKS_FILE, "bytes": len(s)}) return True except Exception as e: debug( "TASK_ERR", "写入任务库失败", {"err": str(e), "trace": traceback.format_exc()}, ) return False def save_task_state( task: str, files: List[str], baseline_code: str, req: str, exp: str, cli: str, provider: str, model: str, add_history=False, ) -> str: db = load_tasks_db() now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") payload = { "task": task or "", "files": files or [], "baseline_code": baseline_code or "", "required_kws": req or "", "expected_stdout": exp or "", "cli_args": cli or "", "provider": provider or "", "model": model or "", "time": now, "chat_history": db.get("last", {}).get("chat_history", ""), "chat_history_general": db.get("last", {}).get("chat_history_general", ""), } db["last"] = payload if add_history: hist = db.get("history", []) hist.insert(0, payload) db["history"] = hist[:20] save_tasks_db(db) debug("TASK", "保存任务快照", {"time": now, "files": len(files or [])}) return f"✅ 任务已保存({now})" def persist_chat_history(hist: str) -> str: db = load_tasks_db() last = db.get("last", {}) last["chat_history"] = hist or "" db["last"] = last save_tasks_db(db) debug("CHAT", "保存对话历史(编程)", {"chars": len(hist or "")}) return "✔️ 对话已保存(编程)" def persist_general_chat_history(hist: str) -> str: db = load_tasks_db() last = db.get("last", {}) last["chat_history_general"] = hist or "" db["last"] = last save_tasks_db(db) debug("CHAT", "保存对话历史(智能)", {"chars": len(hist or "")}) return "✔️ 对话已保存(智能)" def restore_last_task_state() -> dict: return load_tasks_db().get("last", {}) def get_task_history_titles() -> List[str]: db = load_tasks_db() titles = [] for i, item in enumerate(db.get("history", [])): t = item.get("time", "?") goal = (item.get("task", "") or "").strip() goal = (goal[:30] + ("…" if len(goal) > 30 else "")) if goal else "(空任务)" titles.append(f"{i+1:02d} | {t} | {goal}") return titles def get_task_history_by_index(idx: int) -> dict: db = load_tasks_db() hist = db.get("history", []) return hist[idx] if 0 <= idx < len(hist) else {} # ========================== 附件工具 ========================== TEXT_EXTS = { ".py", ".txt", ".json", ".csv", ".yml", ".yaml", ".ini", ".conf", ".md", ".xml", ".html", ".css", ".js", ".ts", ".tsx", ".jsx", ".toml", ".cfg", ".log", ".ipynb", ".java", ".kt", ".go", ".rs", ".c", ".h", ".cpp", ".hpp", ".cs", ".php", ".rb", ".sh", ".ps1", ".r", ".m", ".scala", ".pl", ".erl", ".ex", ".exs", ".dart", ".swift", ".sql", } def resolve_file_path(f) -> str: try: if not f: return "" if isinstance(f, (str, Path)): return str(f) if isinstance(f, dict): for k in ("name", "path"): v = f.get(k) if isinstance(v, str) and v: return v return "" for attr in ("name", "path", "orig_name", "tmp_path"): v = getattr(f, attr, None) if isinstance(v, str) and v: return v fo = getattr(f, "file", None) or getattr(f, "tempfile", None) if fo is not None: fn = getattr(fo, "name", "") if isinstance(fn, str) and fn: return fn except Exception: return "" return "" def looks_text(path: str, max_probe=4096) -> bool: try: b = Path(path).read_bytes()[:max_probe] if not b: return True text_chars = bytes(range(32, 127)) + b"\n\r\t\f\b" printable = sum(1 for ch in b if ch in text_chars) return printable / max(1, len(b)) > 0.85 except Exception: return False def persist_files_to_uploads(files: List[str]) -> List[str]: global UPLOADS_DIR debug("ENTRY", "开始持久化文件", {"input_files_count": len(files or [])}) saved = [] upload_path = Path(UPLOADS_DIR) if not _ensure_dir(str(upload_path)): upload_path = Path(tempfile.gettempdir()) / "pyzdh_uploads" try: upload_path.mkdir(parents=True, exist_ok=True) except Exception: pass debug("WARN", f"上传目录不可写,回退到: {upload_path}") UPLOADS_DIR = str(upload_path) try: upload_path.mkdir(parents=True, exist_ok=True) except Exception: pass for f in files or []: if not f: continue rp = str(f) if not os.path.isfile(rp): debug("SKIP", "路径非文件", {"path": rp}) continue try: bn = Path(rp).name if re.match(r"^\d{10}-[0-9a-f]{10}-", bn): persisted = upload_path / bn if persisted.exists(): saved.append(str(persisted)) debug("SKIP", "文件已持久化", {"path": str(persisted)}) continue except Exception: pass try: if str(Path(rp).resolve()).startswith(str(upload_path.resolve())): saved.append(rp) debug("SKIP", "文件已在上传目录", {"path": rp}) continue original_basename = Path(rp).name ts = int(time.time()) h = hashlib.sha1((rp + str(Path(rp).stat().st_mtime)).encode("utf-8")).hexdigest()[:10] dest = str(upload_path / f"{ts}-{h}-{original_basename}") shutil.copy2(rp, dest) saved.append(dest) debug("SUCCESS", "成功持久化文件", {"src": rp, "dest": dest}) except Exception as e: debug("EXCEPTION", "持久化文件失败", { "src": rp, "error": str(e), "traceback": traceback.format_exc() }) debug("EXIT", "完成文件持久化", { "saved_count": len(saved), "upload_dir": UPLOADS_DIR }) return saved def build_attachments_preview( paths: List[str], per_file_chars=1200, max_files=5, max_total_chars=8000 ) -> str: previews, total, cnt = [], 0, 0 for p in paths or []: if not p or not os.path.exists(p): continue name = Path(p).name is_text = Path(p).suffix.lower() in TEXT_EXTS or looks_text(p) if not is_text: previews.append(f"- {name}(非文本文件,路径:{p})") continue try: raw = Path(p).read_text("utf-8", errors="replace") except Exception: try: raw = Path(p).read_text("latin-1", errors="replace") except Exception: raw = "" chunk = raw[:per_file_chars] if not chunk.strip(): previews.append(f"- {name}(读取为空)") continue piece = f"文件: {name}\n```\n{chunk}\n```" if total + len(piece) > max_total_chars: previews.append(f"- {name}(内容过长,已略)") continue previews.append(piece) total += len(piece) cnt += 1 if cnt >= max_files: break return ("以下为已上传附件的内容预览(截断展示,仅供参考):\n" + "\n\n".join(previews) if previews else "") # ========================== 运行控制与实时日志 ========================== STOP_FLAG = False CURRENT_PROCS: List[subprocess.Popen] = [] WATCH_STOP = False CURRENT_RUN_DIR = "" def _register_proc(p: subprocess.Popen | None): if p: CURRENT_PROCS.append(p) def stop_all(): global STOP_FLAG STOP_FLAG = True for p in list(CURRENT_PROCS): try: if p and p.poll() is None: p.terminate() time.sleep(0.1) if p.poll() is None: p.kill() except Exception as e: try: debug("WARN", "停止进程失败", {"error": str(e)}) except Exception: pass CURRENT_PROCS.clear() try: debug("WARN", "所有进程已停止") except Exception: pass return "⏹️ 已停止所有进程" def read_tail(path: str, max_bytes=800_000) -> str: try: p = Path(path) if not p.exists(): return "" b = p.read_bytes() if len(b) > max_bytes: b = b[-max_bytes:] return b.decode("utf-8", "replace") except Exception: return "" # ========================== AST/Prompt/校验 ========================== @dataclass class CodeFingerprint: imports: List[str] = field(default_factory=list) classes: List[str] = field(default_factory=list) functions: List[str] = field(default_factory=list) domain_keywords: List[str] = field(default_factory=list) TRADING_KWS = ["order", "trade", "price", "buy", "sell", "position", "market", "exchange", "okx", "binance", "leverage", "symbol", "instrument", "algo"] API_KWS = ["request", "http", "api", "endpoint", "token", "headers", "payload"] ML_KWS = ["train", "predict", "model", "fit", "dataset"] def extract_fingerprint(code: str) -> CodeFingerprint: fp = CodeFingerprint() try: tree = ast.parse(code) for n in ast.walk(tree): if isinstance(n, ast.Import): for a in n.names: fp.imports.append(a.name.split(".")[0]) elif isinstance(n, ast.ImportFrom): if n.module: fp.imports.append(n.module.split(".")[0]) elif isinstance(n, ast.ClassDef): fp.classes.append(n.name) elif isinstance(n, ast.FunctionDef): fp.functions.append(n.name) except Exception: pass low = (code or "").lower() domain = [] for kw in TRADING_KWS: if kw in low: domain.append(kw) for kw in API_KWS: if kw in low and kw not in domain: domain.append(kw) for kw in ML_KWS: if kw in low and kw not in domain: domain.append(kw) fp.domain_keywords = domain[:20] fp.imports = sorted(set(fp.imports))[:30] fp.classes = sorted(set(fp.classes))[:50] fp.functions = sorted(set(fp.functions))[:120] return fp def domain_name(fp: CodeFingerprint) -> str: if any(k in fp.domain_keywords for k in TRADING_KWS): return "交易/金融" if any(k in fp.domain_keywords for k in API_KWS): return "API/网络" if any(k in fp.domain_keywords for k in ML_KWS): return "机器学习" return "通用" def relevance_check(orig: CodeFingerprint, new: CodeFingerprint) -> Tuple[bool, str]: if not any([orig.imports, orig.classes, orig.functions, orig.domain_keywords]): return True, "无基准,不做相关性约束" score, total = 0, 3 if orig.imports: if len(set(orig.imports) & set(new.imports)) >= max(1, len(orig.imports) // 3): score += 1 else: score += 1 kept_main = any(c in new.classes for c in orig.classes) or any(f in new.functions for f in orig.functions[:10]) if kept_main: score += 1 if set(orig.domain_keywords) & set(new.domain_keywords): score += 1 return score >= 2, f"相关性评分 {score}/{total}" def compile_syntax_ok(code: str) -> Tuple[bool, str]: try: compile(code, "", "exec") return True, "" except SyntaxError as e: return False, f"SyntaxError: {e}" except Exception: return True, "" def detect_markdown_fence(s: str) -> bool: return "```" in (s or "") def strip_code_fences(s: str) -> str: """修复: 正确处理代码围栏提取""" if not s: return s # 查找 python 代码块 m = re.findall(r"```(?:python|py)?\s*\n(.*?)```", s, re.S | re.I) if m: # m 是列表,取第一个匹配 return m[0].strip() if m else s.strip() # 查找通用代码块 m2 = re.findall(r"```\s*\n(.*?)```", s, re.S) if m2: return m2[0].strip() if m2 else s.strip() return s.strip() def parse_requirements(code: str) -> List[str]: first = (code or "").splitlines()[:1] if not first: return [] m = re.match(r"^\s*#\s*requirements\s*:\s*(.+)$", first[0], re.I) if not m: return [] raw = [p.strip() for p in re.split(r"[,\s]+", m.group(1)) if p.strip()] stdlibs = { "re", "json", "sys", "os", "time", "pathlib", "subprocess", "tempfile", "math", "typing", "datetime", "zipfile", "shutil", "hashlib", "hmac", "base64", "itertools", "ast", "html", "shlex", "threading", } return [p for p in raw if p.split("==")[0] not in stdlibs][:40] def build_initial_plan(goal: str, fp: CodeFingerprint) -> List[str]: return [ "最小修复优先:先清理格式错误(如 ``` 标记、缩进、语法)", "保持原有结构与领域:不得偏离领域:" + domain_name(fp), "仅修改与报错直接相关的行,避免大规模重写", "必要时补充缺失依赖;若需 CLI 参数,提供默认值", ] def build_error_analysis_prompt(error: str, code: str) -> str: return f""" 分析以下错误,按优先级严格执行: 1) 是否包含 Markdown 代码块标记(```python)混入代码?若是,仅移除标记。 2) 是否为编码/缩进/语法格式问题? 3) 是否缺少必要命令行参数?如是,为参数提供默认值或交互输入。 4) 才考虑逻辑问题(保持原功能不变,最小修改)。 错误: {error} 当前代码(前120字): {(code or '')[:120]} """ def build_fix_prompt( goal: str, original_fp: CodeFingerprint, last_err: str, current_code: str, attempt: int, required_keywords: List[str], hint: str = "", ) -> str: strategy = ( "最小修改;清理格式/语法;保留原有导入/类/函数;提供缺省参数;" if attempt == 1 else ( "检查依赖与版本;为必需参数设置默认;必要时加 try/except;" if attempt == 2 else "深度修复,但不得改变核心功能/领域;如需重构,解释重构理由(但最终仅输出代码)。" ) ) req_kw = ( ("必须保留/包含以下关键词或结构:" + ", ".join(required_keywords) + "。") if required_keywords else "" ) hint2 = (hint.strip() + "\n") if hint.strip() else "" return f""" {hint2} 任务目标(保持不变):{goal} 领域约束:{domain_name(original_fp)}。不得将程序改为与领域无关的示例代码。 修复策略(第{attempt}次):{strategy} {req_kw} 若之前报错: {last_err} 请输出完整可运行的 main.py(只要代码,不要解释)。 若需第三方库,在首行写:# requirements: 包1, 包2 """ def build_reanchor_prompt(goal: str, original_code: str, warning: str, hint: str = "") -> str: hint2 = (hint.strip() + "\n") if hint.strip() else "" return f""" {hint2} 警告:你生成的代码偏离了原始任务/领域({warning})。 请基于原始代码进行"最小必要修改"的修复,绝不要替换为通用示例。 原始任务:{goal} 原始代码(片段): ```python {(original_code or '')[:4000]} ``` 请仅输出修复后的完整 main.py(不要包含```标记)。 """ # ====== 规划师(单人)Prompt 与解析 ====== def build_planner_decision_prompt( task: str, plan: list[str], have_code: bool, last_err: str, last_stdout: str, last_stderr: str, required_kws: list[str], expected_out: str, ) -> str: plan_text = "\n".join(f"- {p}" for p in (plan or [])) or "(无计划)" last_res = "" if last_err or last_stdout or last_stderr: last_res = f"【上一步结果】\nerr:{(last_err or '')[:600]}\nstdout:{(last_stdout or '')[:600]}\nstderr:{(last_stderr or '')[:600]}" req = ", ".join(required_kws or []) exp = expected_out or "(未指定)" return f""" 你是"规划师(Manager)"。任务:{task} 当前计划: {plan_text} 是否已有候选代码:{"是" if have_code else "否"} {last_res} 约束: - 必须尽量少改代码,先修语法/运行问题,再谈重构 - 必须满足关键词: [{req}](若有) - 期望 stdout 片段: {exp} 请只输出 JSON,字段: {{ "action": "code | run | reflect | stop", "reason": "为什么这么做", "hints": "给编码专家的具体指示(仅当 action=code 时)" }} 不要任何解释。 """ def build_planner_reflect_prompt(task: str, old_plan: list[str], last_err: str) -> str: old = "\n".join(f"- {p}" for p in (old_plan or [])) or "(无)" return f""" 我们原计划: {old} 但遇到错误/阻碍: {(last_err or '')[:800]} 请你给出新的分步计划(越简越好,每行一步),只输出纯文本每行一个要点。 """ def parse_planner_action(text: str) -> dict: try: import json as _json, re as _re m = _re.search(r"\{.*\}", text or "", _re.S) if m: d = _json.loads(m.group(0)) a = (d.get("action", "") or "").lower().strip() if a in {"code", "run", "reflect", "stop"}: return {"action": a, "reason": d.get("reason", ""), "hints": d.get("hints", "")} except Exception: pass low = (text or "").lower() if "reflect" in low: return {"action": "reflect", "reason": text, "hints": ""} if "run" in low: return {"action": "run", "reason": text, "hints": ""} if "stop" in low: return {"action": "stop", "reason": text, "hints": ""} return {"action": "code", "reason": text, "hints": text} # ====== 团队规划(多代理协作) ====== DEFAULT_TEAM = [ {"name": "Manager", "persona": "统筹目标,做出行动决策;严控最小修改;优先修复运行/语法问题", "provider": "", "model": ""}, {"name": "Architect", "persona": "审视结构与依赖,提出稳妥的小步修改建议;避免大改动", "provider": "", "model": ""}, {"name": "QA", "persona": "基于报错和输出提出验证要求,指出潜在风险与遗漏", "provider": "", "model": ""}, {"name": "Ops", "persona": "关注依赖、环境、参数与超时等运行因素,提出运行建议", "provider": "", "model": ""}, ] def load_team_from_conf() -> Tuple[bool, List[Dict[str, str]], str, int]: d = load_all() enabled = bool(d.get("planner_team_enabled", False)) members = d.get("planner_team_members") or [] if not isinstance(members, list) or not members: members = DEFAULT_TEAM.copy() max_conc = int(d.get("planner_team_max_concurrency", 4)) priority = d.get("planner_team_priority", "run>code>reflect>stop") return enabled, members, priority, max_conc def save_team_to_conf(enabled: bool, members: List[Dict[str, str]], priority: str, max_conc: int) -> str: d = load_all() d["planner_team_enabled"] = bool(enabled) clean = [] for m in members or []: if not isinstance(m, list) and not isinstance(m, dict): continue if isinstance(m, list): if len(m) < 4: continue if not any(str(x).strip() for x in m): continue clean.append({ "name": str(m[0]).strip(), "provider": str(m[1]).strip(), "model": str(m[2]).strip(), "persona": str(m[3]).strip(), }) else: if not any(str(v).strip() for v in m.values()): continue clean.append({ "name": str(m.get("name", "")).strip(), "provider": str(m.get("provider", "")).strip(), "model": str(m.get("model", "")).strip(), "persona": str(m.get("persona", "")).strip(), }) if not clean: clean = DEFAULT_TEAM.copy() d["planner_team_members"] = clean d["planner_team_priority"] = priority or "run>code>reflect>stop" d["planner_team_max_concurrency"] = int(max(1, min(16, max_conc or 4))) return save_json(d, auto=True) def team_vote_priority_list(pr: str) -> List[str]: parts = [p.strip().lower() for p in (pr or "").split(">") if p.strip()] known = ["run", "code", "reflect", "stop"] out = [p for p in parts if p in known] for k in known: if k not in out: out.append(k) return out def aggregate_team_decisions(decisions: List[Dict[str, str]], priority_order: List[str]) -> Dict[str, str]: counts = {} for d in decisions: a = (d.get("action", "") or "").lower().strip() if not a: continue counts[a] = counts.get(a, 0) + 1 best_action = None best_votes = -1 for a, v in counts.items(): if v > best_votes: best_action = a best_votes = v elif v == best_votes: if priority_order.index(a) < priority_order.index(best_action): best_action = a if not best_action: best_action = priority_order[0] reasons = "\n".join([f"[{d.get('by', '?')}] {d.get('reason', '')}" for d in decisions if d.get("reason")]) hints = "\n".join([f"[{d.get('by', '?')}] {d.get('hints', '')}" for d in decisions if d.get("hints")]) return {"action": best_action, "reason": reasons[:1200], "hints": hints[:2000]} ## 第三部分A(LLM调用、运行器和核心调度): # ========================== 速率限制器 & LLM 聚合 ========================== class RateLimiter: def __init__(self): self.buckets: Dict[str, deque] = {} def wait(self, provider: str, model: str): rpm = get_saved_rpm(provider, model) if rpm == 0 and provider == "gemini" and "1.5" in (model or "").lower() and "pro" in (model or "").lower(): rpm = 5 if rpm <= 0: return key = f"{provider}:{model}" dq = self.buckets.setdefault(key, deque()) now = time.time() while dq and now - dq[0] > 60: dq.popleft() if len(dq) >= rpm: sleep_sec = 60 - (now - dq[0]) if sleep_sec > 0: debug("RL", "速率限制等待", {"key": key, "sleep": round(sleep_sec, 2)}) time.sleep(sleep_sec) now = time.time() while dq and now - dq[0] > 60: dq.popleft() dq.append(time.time()) RATE_LIMITER = RateLimiter() def _parse_retry_seconds(text: str) -> int: try: if not text: return 10 for pat in [ r"retry_delay\s*\{\s*seconds:\s*(\d+)", r"retry[- ]?after[^0-9]*(\d+)", r"after\s+(\d+)\s*second", ]: m = re.search(pat, text, re.I) if m: return max(1, int(m.group(1))) except Exception: pass return 10 def llm_call( provider: str, model: str, prompt: str, keys: dict, req_timeout: int = DEFAULT_REQ_TIMEOUT, ) -> str: t0 = time.time() debug("LLM", "调用开始", {"provider": provider, "model": model, "prompt_len": len(prompt), "timeout": req_timeout}) try: if provider == "mock": m = re.findall(r"```python\s*(.*?)```", prompt, re.S | re.I) return m[0].strip() if m else "print('Hello from mock LLM')" RATE_LIMITER.wait(provider, model) custom = get_custom_providers() is_custom = provider in custom if provider == "gemini": key = keys.get("gemini_key") or os.getenv("GOOGLE_API_KEY") if not key: return "❗ 请配置 GOOGLE_API_KEY" try: import google.generativeai as genai try: from google.api_core.exceptions import ResourceExhausted as GE_ResourceExhausted except Exception: class GE_ResourceExhausted(Exception): ... genai.configure(api_key=key) mdl = genai.GenerativeModel(model) try: resp = mdl.generate_content(prompt) except GE_ResourceExhausted as e: wait_s = _parse_retry_seconds(str(e)) debug("RL", "Gemini 429 等待重试", {"wait": wait_s}) time.sleep(wait_s + 2) resp = mdl.generate_content(prompt) text = "" try: text = resp.text except Exception: cands = getattr(resp, "candidates", []) or [] if cands and getattr(cands[0], "content", None): parts = getattr(cands[0].content, "parts", []) or [] text = "".join([getattr(p, "text", "") for p in parts if hasattr(p, "text")]) debug("LLM_DONE", "Gemini响应", {"elapsed": round(time.time() - t0, 3), "len": len(text or "")}) return text or "❗ Gemini异常:无有效文本返回" except ImportError: return "❗ 未安装 google-generativeai" except Exception as e: debug("LLM_ERR", "Gemini调用异常", {"err": str(e), "trace": traceback.format_exc()}) return f"❗ Gemini异常:{e}" if provider == "anthropic": key = keys.get("anthropic_key") if not key: return "❗ 请配置 ANTHROPIC_API_KEY" url = "https://api.anthropic.com/v1/messages" headers = { "x-api-key": key, "anthropic-version": "2023-06-01", "content-type": "application/json", } payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "max_tokens": 4096, "temperature": 0.2, } r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3)}) if r.status_code != 200: return f"❗ Anthropic错误 {r.status_code}:{r.text[:400]}" data = r.json() blocks = data.get("content", []) if isinstance(blocks, list) and blocks: return blocks[0].get("text", "") return json.dumps(data, ensure_ascii=False) if provider == "azure": key = keys.get("azure_key") base = keys.get("azure_base") or DEFAULT_BASES["azure"] deployment = keys.get("azure_deployment") version = keys.get("azure_version", "2024-02-15-preview") if not all([key, base, deployment]): return "❗ 请配置 Azure OpenAI 的 Key/Base/Deployment" url = f"{base.rstrip('/')}/openai/deployments/{deployment}/chat/completions?api-version={version}" headers = {"api-key": key, "Content-Type": "application/json"} payload = { "messages": [{"role": "user", "content": prompt}], "temperature": 0.2, "max_tokens": 4096, } r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3)}) if r.status_code != 200: return f"❗ Azure错误 {r.status_code}:{r.text[:400]}" data = r.json() return data.get("choices", [{}])[0].get("message", {}).get("content", json.dumps(data, ensure_ascii=False)) if provider == "hf": token = keys.get("hf_token") if not token: return "❗ 请配置 HF_TOKEN" url = HF_API_URL + model headers = {"Authorization": f"Bearer {token}"} payload = {"inputs": prompt, "parameters": {"max_new_tokens": 2048, "temperature": 0.2}} r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3)}) if r.status_code != 200: return f"❗ HuggingFace错误 {r.status_code}:{r.text[:400]}" data = r.json() if isinstance(data, list) and data: return data[0].get("generated_text", str(data)) return json.dumps(data, ensure_ascii=False) if provider in OPENAI_LIKE or is_custom: if is_custom: cinfo = custom.get(provider, {}) base = cinfo.get("base", "") key = cinfo.get("key", "") extra = {} if cinfo.get("referer"): extra["HTTP-Referer"] = cinfo["referer"] if cinfo.get("title"): extra["X-Title"] = cinfo["title"] else: base = keys.get(f"{provider}_base") or DEFAULT_BASES.get(provider, "") key = keys.get(f"{provider}_key") extra = {} if provider == "openrouter": if keys.get("openrouter_referer"): extra["HTTP-Referer"] = keys["openrouter_referer"] if keys.get("openrouter_title"): extra["X-Title"] = keys["openrouter_title"] if not base or not key: return f"❗ 请配置 {provider.upper()} Base/Key" url = base.rstrip("/") + "/chat/completions" headers = {"Authorization": f"Bearer {key}", "Content-Type": "application/json", **extra} payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2, "max_tokens": 4096, } r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) if r.status_code == 429: ra = r.headers.get("Retry-After", "") try: wait_s = max(1, int(float(ra))) if ra else _parse_retry_seconds(r.text) except Exception: wait_s = _parse_retry_seconds(r.text) debug("RL", "429 等待后重试", {"wait": wait_s, "url": url}) time.sleep((wait_s or 10) + 2) r = requests.post(url, headers=headers, json=payload, timeout=req_timeout) debug("HTTP", "RESP", {"status": r.status_code, "elapsed": round(time.time() - t0, 3), "len": len(r.text or "")}) if r.status_code == 401: return "❗ 401 未授权" if r.status_code != 200: return f"❗ API错误 {r.status_code}:{r.text[:400]}" data = r.json() return data.get("choices", [{}])[0].get("message", {}).get("content", json.dumps(data, ensure_ascii=False)) return "❗ 未支持的提供商" except Exception as e: debug("LLM_ERR", "请求异常", {"err": str(e), "trace": traceback.format_exc()}) return f"❗ 请求异常:{str(e)}" # ========================== 模型列表/检测 ========================== def get_models_list(provider: str, keys: dict): err = "" rec = RECOMMENDED_MODELS.get(provider, []) fetched = [] try: custom = get_custom_providers() if provider in custom: base = custom[provider].get("base", "") key = custom[provider].get("key", "") if base and key: r = requests.get(f"{base.rstrip('/')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=20) if r.status_code == 200: data = r.json() fetched = [m.get("id") for m in (data.get("data") or data) if isinstance(m, dict) and m.get("id")] else: err = f"自定义API {r.status_code}: {r.text[:120]}" else: err = "请先在自定义API中配置 Base URL 和 Key" elif provider in OPENAI_LIKE: base = keys.get(f"{provider}_base") or DEFAULT_BASES.get(provider, "") key = keys.get(f"{provider}_key") if base and key: r = requests.get(f"{base.rstrip('/')}/models", headers={"Authorization": f"Bearer {key}"}, timeout=20) if r.status_code == 200: data = r.json() fetched = [m.get("id") for m in (data.get("data") or data) if isinstance(m, dict) and m.get("id")] else: err = f"API {r.status_code}: {r.text[:120]}" else: err = "请先配置 API Key 和 Base URL" elif provider == "gemini": key = keys.get("gemini_key") or os.getenv("GOOGLE_API_KEY") if key: try: import google.generativeai as genai genai.configure(api_key=key) fetched = [getattr(m, "name", "").replace("models/", "") for m in genai.list_models() if getattr(m, "name", "")] except ImportError: err = "未安装 google-generativeai" else: err = "请先配置 GOOGLE_API_KEY" except Exception as e: err = f"异常: {e}" api_success = (err == "") and bool(fetched) models_to_use = sorted(set(fetched)) if api_success else (get_models_cache(provider) or rec) if api_success: set_models_cache(provider, models_to_use) debug("MODELS", "获取模型列表", {"provider": provider, "count": len(models_to_use), "err": err}) return models_to_use, rec, err def refresh_models(provider: str, current_model: str, keys: dict): models, rec, err = get_models_list(provider, keys) info = (f"⚠️ {err}\n" if err else "") + f"ℹ️ 共 {len(models)} 个模型(列表来自 {'缓存/推荐' if err else '在线/缓存'})" fallback = next((m for m in rec if m in models), (models[0] if models else "")) value = current_model if current_model else fallback return (gr.update(choices=models, value=value), info, [["-", "-", "-"]], "\n".join(models or [])) def quick_test_one(provider: str, model: str, keys: dict) -> bool: ck = f"{provider}:{model}" if ck in MODEL_OK_CACHE: return MODEL_OK_CACHE[ck] try: res = llm_call(provider, model, "请仅回复:pong", keys, req_timeout=12) ok = bool(res) and (not str(res).startswith("❗")) debug("DETECT", "模型检测结果", {"key": ck, "ok": ok}) except Exception as e: ok = False debug("DETECT_ERR", "检测异常", {"key": ck, "err": str(e), "trace": traceback.format_exc()}) MODEL_OK_CACHE[ck] = ok return ok def test_models(provider: str, keys: dict): t0 = time.time() models = get_models_cache(provider) if not models: models, _, _ = get_models_list(provider, load_all()) rec = RECOMMENDED_MODELS.get(provider, []) if not models: return [["-", "-", "-"]], "❌ 未获取到模型列表", [] rows = [] max_workers = min(16, len(models)) default_workers = 8 rpm_hint = get_saved_rpm(provider, models[0] if models else "") max_workers = max(2, min(max_workers, rpm_hint)) if rpm_hint and rpm_hint > 0 else min(max_workers, default_workers) debug("DETECT", "开始批量检测", {"provider": provider, "count": len(models), "workers": max_workers}) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as ex: fut = {ex.submit(quick_test_one, provider, m, load_all()): m for m in models} for f in concurrent.futures.as_completed(fut): m = fut[f] try: ok = f.result() except Exception: ok = False rows.append([m, "⭐" if m in rec else "", "✅" if ok else "❌"]) status_map = {r[0]: r[2] for r in rows} ordered = [[m, "⭐" if m in rec else "", status_map.get(m, "❓")] for m in models] ok_models = [m for m in models if status_map.get(m) == "✅"] dt = time.time() - t0 debug("DETECT", "批量检测完成", {"provider": provider, "ok": len(ok_models), "elapsed": round(dt, 2)}) info = f"✅ 已检测 {len(models)} 个模型。通过: {len(ok_models)} 并发: {max_workers} ⏱ {dt:.2f}s" return ordered, info, ok_models # ========================== 运行器与打包 ========================== def static_validate(code: str) -> Tuple[bool, str]: if not (code or "").strip(): return False, "代码为空" ok, msg = compile_syntax_ok(code) if not ok: return False, msg return True, "" def dynamic_validate(rc: int, out: str, err: str) -> Tuple[bool, str]: if rc == 0: return True, "" msg = f"进程退出码 {rc}" if "Traceback" in (err or "") or "Error" in (err or "") or "Exception" in (err or ""): msg += "(检测到异常)" return False, msg def semantic_validate( original_fp: CodeFingerprint, code: str, required_keywords: List[str], expected_stdout_contains: str, ) -> Tuple[bool, str]: try: new_fp = extract_fingerprint(code or "") ok_rel, rel_msg = relevance_check(original_fp, new_fp) if not ok_rel: return False, f"候选代码可能偏离原始领域/结构:{rel_msg}" low = (code or "").lower() for kw in required_keywords or []: if kw and kw.lower() not in low: return False, f"缺少必须关键词/结构:{kw}" return True, "通过" except Exception as e: return False, f"语义校验异常:{e}" def run_in_sandbox(code: str, attach_paths: List[str], cli_args: str, timeout: int) -> Tuple[int, str, str, str, str]: global CURRENT_RUN_DIR pip_target = "" try: # 用系统临时目录 temp_dir = Path(tempfile.gettempdir()) / "pyzdh_runs" temp_dir.mkdir(parents=True, exist_ok=True) rid = datetime.now().strftime("%Y%m%d-%H%M%S") + "-" + hashlib.sha1(code[:2000].encode("utf-8")).hexdigest()[:8] workdir = str(temp_dir / rid) Path(workdir).mkdir(parents=True, exist_ok=True) CURRENT_RUN_DIR = workdir # 写主程序 main_py = Path(workdir) / "main.py" main_py.write_text(code, "utf-8") # 附件 in_dir = Path(workdir) / "inputs" in_dir.mkdir(exist_ok=True) for pth in attach_paths or []: try: if os.path.isfile(pth): shutil.copy2(pth, in_dir / Path(pth).name) except Exception as e: try: debug("WARN", f"复制附件失败: {pth}", {"error": str(e)}) except Exception: pass # 依赖 pip_log = "" pip_log_path = Path(workdir) / "pip.log" reqs = parse_requirements(code) if reqs: pip_target = str(Path(workdir) / ".pip_packages") cmd = [sys.executable, "-m", "pip", "install", "--target", pip_target, "-U", *reqs] try: env = os.environ.copy() env["PIP_TARGET"] = pip_target p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) _register_proc(p) out, err = p.communicate(timeout=min(900, max(60, int(timeout or 40) * 5))) pip_log = (out or "") + "\n" + (err or "") except Exception as e: pip_log = f"pip 安装异常: {e}\n" try: pip_log_path.write_text(pip_log, "utf-8") except Exception: pass # 运行 env = os.environ.copy() if pip_target and Path(pip_target).exists(): env["PYTHONPATH"] = pip_target argv = [sys.executable, str(main_py)] if (cli_args or "").strip(): try: argv += shlex.split(cli_args) except Exception: argv += [cli_args] try: debug("RUN", "开始执行", {"argv": argv, "cwd": workdir}) except Exception: pass p = subprocess.Popen(argv, cwd=workdir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env) _register_proc(p) deadline = time.time() + int(timeout or 40) out_lines = [] err_lines = [] while True: if STOP_FLAG: try: p.terminate() time.sleep(0.1) if p.poll() is None: p.kill() except Exception: pass break if p.stdout: line = p.stdout.readline() if line: out_lines.append(line) if p.stderr: el = p.stderr.readline() if el: err_lines.append(el) if p.poll() is not None: try: ro, re_ = p.communicate(timeout=0.2) if ro: out_lines.append(ro) if re_: err_lines.append(re_) except Exception: pass break if time.time() > deadline: try: p.terminate() time.sleep(0.1) if p.poll() is None: p.kill() except Exception: pass err_lines.append(f"\n[执行超时, 超时={timeout}s]") break time.sleep(0.01) out = "".join(out_lines) err = "".join(err_lines) rc = p.poll() if p else 0 try: (Path(workdir) / "stdout.txt").write_text(out, "utf-8") (Path(workdir) / "stderr.txt").write_text(err, "utf-8") except Exception: pass return rc, out, err, pip_log, workdir except Exception as e: error_msg = f"run_in_sandbox 异常: {e}" try: debug("RUN_ERR", "沙箱执行失败", {"error": error_msg, "workdir": CURRENT_RUN_DIR}) except Exception: pass return 998, "", error_msg, "", CURRENT_RUN_DIR or "" def package_run_dir(run_dir: str) -> str: try: if not run_dir or not os.path.isdir(run_dir): return "" zname = f"package-{Path(run_dir).name}.zip" zpath = str(Path(LOGS_DIR) / zname) with zipfile.ZipFile(zpath, "w", zipfile.ZIP_DEFLATED) as zf: for root, dirs, files in os.walk(run_dir): for fn in files: fp = os.path.join(root, fn) arc = os.path.relpath(fp, run_dir) zf.write(fp, arcname=arc) return zpath except Exception as e: debug("ZIP", "打包运行目录失败", {"err": str(e)}) return "" def package_all_logs(current_run_dir: str = "") -> str: try: ts = datetime.now().strftime("%Y%m%d-%H%M%S") zpath = str(Path(LOGS_DIR) / f"all-logs-{ts}.zip") with zipfile.ZipFile(zpath, "w", zipfile.ZIP_DEFLATED) as zf: for p in Path(LOGS_DIR).glob("*.log"): zf.write(str(p), arcname=p.name) if current_run_dir and os.path.isdir(current_run_dir): for root, dirs, files in os.walk(current_run_dir): for fn in files: fp = os.path.join(root, fn) arc = os.path.join("run_dir", os.path.relpath(fp, current_run_dir)) zf.write(fp, arcname=arc) return zpath except Exception as e: debug("ZIP", "打包日志失败", {"err": str(e)}) return "" # ========================== 核心调度器 ========================== def orchestrate(**kwargs) -> dict: global STOP_FLAG STOP_FLAG = False task = kwargs.get("task", "") global_hint = kwargs.get("global_hint", "") provider = kwargs.get("provider", "gemini") model = kwargs.get("model", "gemini-1.5-flash") keys = kwargs.get("keys", {}) files = kwargs.get("files", []) max_attempts = kwargs.get("max_attempts", 3) timeout = kwargs.get("timeout", 40) required_kws_text = kwargs.get("required_kws_text", "") expected_out_contains = kwargs.get("expected_out_contains", "") cli_args = kwargs.get("cli_args", "") baseline_code = kwargs.get("baseline_code", "") # 团队规划相关 team_enabled = kwargs.get("team_enabled", False) team_members = kwargs.get("team_members", []) team_priority = kwargs.get("team_priority", "run>code>reflect>stop") team_max_conc = kwargs.get("team_max_conc", 4) # 初始化 current_code = "" original_fp = CodeFingerprint() last_err, last_stdout, last_stderr = "", "", "" workdir = "" status = "开始" # 1. 确定初始代码 if (baseline_code or "").strip(): current_code = baseline_code elif files: py_files = [f for f in files if str(f).endswith((".py", ".pyw"))] target = py_files[0] if py_files else files[0] try: current_code = Path(target).read_text("utf-8") except Exception as e: return {"status": f"❌ 读取初始文件失败: {e}", "logs": get_debug_text()} if not (current_code or "").strip() and not (task or "").strip(): return {"status": "❌ 任务和初始代码均为空", "logs": get_debug_text()} original_fp = extract_fingerprint(current_code) plan = build_initial_plan(task, original_fp) req_kws = [k.strip() for k in (required_kws_text or "").split(",") if k.strip()] # 2. 循环 for attempt in range(1, max_attempts + 1): if STOP_FLAG: status = "⏹️ 用户手动停止" break debug("ORCH", f"第 {attempt}/{max_attempts} 次尝试", {"plan": plan}) # 3. 规划 planner_pvd = provider planner_mdl = model if team_enabled: debug("PLAN", "团队决策模式", {"members": len(team_members), "conc": team_max_conc}) decisions = [] def get_team_decision(member: dict): p = member.get("provider") or provider m = member.get("model") or model persona_prompt = f"你是 {member.get('name', '成员')},你的职责是:{member.get('persona', '规划')}\n" prompt = persona_prompt + build_planner_decision_prompt( task, plan, bool(current_code), last_err, last_stdout, last_stderr, req_kws, expected_out_contains ) resp = llm_call(p, m, prompt, keys) action_parsed = parse_planner_action(resp) action_parsed['by'] = member.get('name', '?') return action_parsed with concurrent.futures.ThreadPoolExecutor(max_workers=team_max_conc) as executor: futures = [executor.submit(get_team_decision, member) for member in team_members] for future in concurrent.futures.as_completed(futures): try: decisions.append(future.result()) except Exception as e: debug("TEAM_ERR", "获取团队成员决策异常", {"err": str(e)}) action = aggregate_team_decisions(decisions, team_vote_priority_list(team_priority)) else: prompt = build_planner_decision_prompt( task, plan, bool(current_code), last_err, last_stdout, last_stderr, req_kws, expected_out_contains ) resp = llm_call(planner_pvd, planner_mdl, prompt, keys) action = parse_planner_action(resp) debug("PLAN", "规划结果", {"action": action.get("action"), "reason": action.get("reason")}) # 4. 执行 act = action.get("action") if act == "stop": status = f"ℹ️ 规划师决定停止:{action.get('reason', '')}" break elif act == "reflect": prompt = build_planner_reflect_prompt(task, plan, last_err) new_plan_text = llm_call(planner_pvd, planner_mdl, prompt, keys) plan = [p.strip() for p in new_plan_text.splitlines() if p.strip()] continue elif act == "run": if not current_code: last_err = "代码为空,无法运行" continue rc, out, err, pip_log, w_dir = run_in_sandbox(current_code, files, cli_args, timeout) workdir = w_dir last_stdout, last_stderr = out, err ok, msg = dynamic_validate(rc, out, err) if ok and (not expected_out_contains or expected_out_contains in out): status = f"✅ 成功 (第 {attempt} 次)" break else: last_err = f"{msg}\nSTDOUT:\n{out[-1000:]}\nSTDERR:\n{err[-1000:]}\nPIP.LOG:\n{pip_log[-1000:]}" elif act == "code": coder_hint = action.get("hints", "") prompt = build_fix_prompt(task, original_fp, last_err, current_code, attempt, req_kws, global_hint + "\n" + coder_hint) new_code_raw = llm_call(provider, model, prompt, keys) if str(new_code_raw).startswith("❗"): last_err = f"生成代码失败: {new_code_raw}" continue new_code = strip_code_fences(new_code_raw) ok, msg = static_validate(new_code) if not ok: last_err = f"生成了无效代码: {msg}" continue ok, msg = semantic_validate(original_fp, new_code, req_kws, expected_out_contains) if not ok: last_err = f"代码偏离目标: {msg}" prompt_reanchor = build_reanchor_prompt(task, current_code, msg, global_hint) new_code_raw = llm_call(provider, model, prompt_reanchor, keys) new_code = strip_code_fences(new_code_raw) current_code = new_code last_err = "" else: status = f"❌ 达到最大尝试次数 ({max_attempts})" # 5. 返回结果 zip_path = package_run_dir(workdir) if workdir else "" main_path = str(Path(workdir) / "main.py") if workdir else "" return { "code": current_code, "status": status, "attempts": attempt, "stdout": last_stdout, "stderr": last_stderr, "download_main": main_path, "zip_path": zip_path, "workdir": workdir, "logs": get_debug_text(), } ## 第三部分B(MCP管理器、工具函数、智能补丁等): # ========================== MCP 简易管理器 ========================== class _MCPManager: def __init__(self, root: str): self.root = Path(root) self.root.mkdir(parents=True, exist_ok=True) def install(self, tool_id: str) -> str: try: td = self.root / str(tool_id) td.mkdir(parents=True, exist_ok=True) (td / "installed.txt").write_text(datetime.now().isoformat(), "utf-8") return f"✅ 已安装: {tool_id}" except Exception as e: return f"❌ 安装失败: {e}" def uninstall(self, tool_id: str) -> str: try: td = self.root / str(tool_id) if td.exists(): shutil.rmtree(td) return f"✅ 已卸载: {tool_id}" except Exception as e: return f"❌ 卸载失败: {e}" def save_config(self, tool_id: str, config_json_str: str) -> str: try: td = self.root / str(tool_id) td.mkdir(parents=True, exist_ok=True) (td / "config.json").write_text(config_json_str or "{}", "utf-8") return f"✅ 已保存配置: {tool_id}" except Exception as e: return f"❌ 配置保存失败: {e}" mcp = _MCPManager(TOOLS_ROOT) # ========================== GitHub 项目管理工具 ========================== def _project_path(name: str) -> Path: return Path(PROJECT_ROOT) / name def list_projects() -> List[str]: try: items = [] for p in sorted(Path(PROJECT_ROOT).glob("*")): if p.is_dir() and (p / ".git").exists(): items.append(p.name) return items except Exception: return [] def list_branches(project_name: str) -> List[str]: try: if not (project_name or "").strip(): return [] repo = Repo(str(_project_path(project_name))) return [h.name for h in repo.heads] except Exception: return [] def list_files_in_project(project_name: str) -> List[List[str]]: rows = [] try: if not (project_name or "").strip(): return [["-", "-", "-"]] root = _project_path(project_name) for p in sorted(root.rglob("*")): if p.is_file() and ".git" not in str(p): try: size = p.stat().st_size mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.stat().st_mtime)) rel = str(p.relative_to(root)) rows.append([rel, f"{size//1024}KB", mtime]) except Exception: rows.append([str(p), "?", "?"]) return rows[:1000] or [["-", "-", "-"]] except Exception: return [["-", "-", "-"]] def read_file_content(project_name: str, file_rel_path: str) -> str: try: root = _project_path(project_name) fp = root / file_rel_path if not fp.exists(): return "" return fp.read_text("utf-8", errors="replace") except Exception as e: return f"读取失败: {e}" def save_file_content(project_name: str, file_rel_path: str, content: str) -> str: try: root = _project_path(project_name) fp = root / file_rel_path fp.parent.mkdir(parents=True, exist_ok=True) fp.write_text(content or "", "utf-8") return "✅ 已保存文件" except Exception as e: return f"❌ 保存失败: {e}" def pull_github_repo(url: str, token: str) -> str: try: if not (url or "").strip(): return "❌ 请输入仓库 URL" clone_url = url.strip() if token and clone_url.startswith("https://"): clone_url = re.sub(r"^https://", f"https://{token}@", clone_url) name = Path(urllib.parse.urlparse(url).path).name.replace(".git", "") or ("repo-" + datetime.now().strftime("%H%M%S")) dest = _project_path(name) if dest.exists(): repo = Repo(str(dest)) repo.remote().pull() return f"✅ 已更新项目:{name}" else: Repo.clone_from(clone_url, str(dest)) return f"✅ 已拉取项目:{name}" except GitCommandError as e: return f"❌ Git 错误: {e}" except Exception as e: return f"❌ 拉取失败: {e}" def create_and_switch_branch(project_name: str, new_branch: str) -> Tuple[str, List[str]]: try: if not new_branch: return "❌ 请输入新分支名", list_branches(project_name) repo = Repo(str(_project_path(project_name))) if new_branch in [h.name for h in repo.heads]: repo.git.checkout(new_branch) else: repo.git.checkout("-b", new_branch) return f"✅ 已切换到分支:{new_branch}", list_branches(project_name) except Exception as e: return f"❌ 创建/切换分支失败: {e}", list_branches(project_name) def commit_changes(project_name: str, msg: str) -> str: try: repo = Repo(str(_project_path(project_name))) repo.git.add("--all") if not msg: msg = f"update at {datetime.now().isoformat()}" if repo.is_dirty(index=True, working_tree=True, untracked_files=True): c = repo.index.commit(msg) return f"✅ 已提交:{c.hexsha[:8]}" return "ℹ️ 无变更可提交" except Exception as e: return f"❌ 提交失败: {e}" # ========================== 智能补丁功能 ========================== def read_text_auto(p: Path) -> Tuple[str, str]: for enc in ("utf-8", "utf-8-sig", "gb18030", "gbk", "big5", "latin-1"): try: return p.read_text(encoding=enc), enc except Exception: continue data = p.read_bytes() return data.decode("latin-1", errors="ignore"), "latin-1" def save_text_keep(p: Path, text: str, original_enc: str, backup=True): if backup and p.exists(): shutil.copy2(p, str(p) + ".bak") p.write_text(text, encoding=original_enc) def unified_diff_str(old: str, new: str, file: Path, limit_kb: int = 512): a = old.splitlines(keepends=True) b = new.splitlines(keepends=True) gen = difflib.unified_diff(a, b, fromfile=str(file), tofile=str(file)) cap = limit_kb * 1024 buf, size = [], 0 for line in gen: buf.append(line) size += len(line) if size > cap: buf.append("\n... [diff truncated]\n") break return "".join(buf) def find_line_bounds(text: str, pos: int) -> Tuple[int, int, int]: ln = text.count("\n", 0, pos) + 1 ls = text.rfind("\n", 0, pos) ls = 0 if ls == -1 else ls + 1 le = text.find("\n", pos) le = len(text) if le == -1 else le return ln, ls, le def preview_line(text: str, s: int, e: int, max_len=160) -> str: _, ls, le = find_line_bounds(text, s) line = text[ls:le] rs, re = s - ls, min(len(line), e - ls) marked = line[:rs] + "<<<" + line[rs:re] + ">>>" + line[re:] if len(marked) > max_len: keep = max_len // 2 - 3 marked = marked[:keep] + "..." + marked[-keep:] return marked def get_indent(line: str) -> int: return len(line) - len(line.lstrip(" \t")) def expand_paragraph(text: str, hit: int) -> Tuple[int, int]: s = hit while s > 0: prev_nl = text.rfind("\n", 0, s) ps = 0 if prev_nl == -1 else prev_nl + 1 if text[ps:s].strip() == "": s = ps break s = ps e = hit n = len(text) while e < n: nl = text.find("\n", e) nl = n if nl == -1 else nl + 1 if text[e:nl].strip() == "": e = nl break e = nl return s, e def expand_python_block(text: str, hit: int) -> Tuple[int, int]: i = hit header_start = None deco_start = None while i >= 0: ln, ls, le = find_line_bounds(text, i) line = text[ls:le] if line.lstrip().startswith(("def ", "class ", "async def ")): header_start = ls j = ls while True: prev_nl = text.rfind("\n", 0, j - 1) ps = 0 if prev_nl == -1 else prev_nl + 1 l2 = text[ps:j] if l2.lstrip().startswith("@"): deco_start = ps j = ps continue break start = deco_start if deco_start is not None else header_start base_indent = get_indent(line) k = le end = len(text) while k < len(text): nl = text.find("\n", k) nl = len(text) if nl == -1 else nl + 1 l3 = text[k:nl] if k > header_start and l3.strip() != "" and not l3.lstrip().startswith("@"): if get_indent(l3) < base_indent: end = k break k = nl return start, end i = ls - 1 return expand_paragraph(text, hit) def expand_brace_block(text: str, hit: int) -> Tuple[int, int]: i = hit while i >= 0: if text[i] == "{": depth = 0 k = i while k < len(text): ch = text[k] if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: return i, k + 1 k += 1 break i -= 1 return expand_paragraph(text, hit) def auto_expand(text: str, hit: int, ext: str) -> Tuple[int, int]: if ext in (".py", ".pyw"): return expand_python_block(text, hit) if "{" in text and "}" in text: return expand_brace_block(text, hit) return expand_paragraph(text, hit) def build_pattern_from_input(s: str) -> re.Pattern: s = s.strip() if len(s) >= 2 and s[0] == "/" and s.count("/") >= 2: last = s.rfind("/") pat = s[1:last] flags_s = s[last + 1:].lower() flags = 0 if "i" in flags_s: flags |= re.I if "m" in flags_s: flags |= re.M if "s" in flags_s: flags |= re.S return re.compile(pat, flags) return re.compile(re.escape(s), re.S) # ========================== 辅助:文件直链与聊天渲染/导出 ========================== def build_file_link_html(path: str, label: str = "") -> str: try: if not path or not os.path.exists(path): return "(暂无可下载文件)" href = f"/file={urllib.parse.quote(path)}" lab = label or Path(path).name return f"📥 {html.escape(lab)}" except Exception as e: return f"生成链接失败: {e}" def render_chat_html(hist: str) -> str: parts = (hist or "").split("|||") htmls = ["
"] for i in range(1, len(parts), 2): u = parts[i] a = parts[i + 1] if i + 1 < len(parts) else "" htmls.append(f"
🧑
{u}
") htmls.append(f"
🤖
{a}
") htmls.append("
") return "\n".join(htmls) def parse_chat_history(hist: str) -> List[Dict[str, str]]: parts = (hist or "").split("|||") conv = [] for i in range(1, len(parts), 2): u = html.unescape(parts[i]) a = html.unescape(parts[i + 1]) if i + 1 < len(parts) else "" conv.append({"id": (i // 2) + 1, "user": u, "assistant": a}) return conv def chat_choices_from_hist(hist: str) -> List[str]: conv = parse_chat_history(hist) out = [] for turn in conv: u_snip = (turn["user"].replace("\n", " ")[:30] + ("…" if len(turn["user"]) > 30 else "")) or "(空)" out.append(f"{turn['id']:02d} | {u_snip}") return out def preview_chat_by_choice(hist: str, choice: str) -> str: try: if not choice: return "(未选择)" idx = int(choice.split("|")[0].strip()) conv = parse_chat_history(hist) item = next((t for t in conv if t["id"] == idx), None) if not item: return "(未找到该轮对话)" u = item["user"] or "(空)" a = item["assistant"] or "(空)" return f"【用户】\n{u}\n\n【AI】\n{a}" except Exception as e: return f"(预览失败:{e})" def plain_chat_text(hist: str) -> str: conv = parse_chat_history(hist) lines = [] for t in conv: lines.append(f"【用户】\n{t['user']}\n\n【AI】\n{t['assistant']}") lines.append("-" * 60) return "\n".join(lines).strip() or "(空)" # ========================== 主题/背景构建 ========================== def build_theme_css(mode: str, img_path: str) -> str: common = """ /* 紧凑聊天工具条与容器间距 */ .chat-container{margin-bottom:8px} .chat-toolbar{margin-top:6px;gap:8px} /* 顶部Logo按钮已美化,见静态CSS */ """ if mode == "美化": return common + """ body{ background: radial-gradient(1200px 600px at 10% 10%, rgba(255,255,255,.15), transparent 60%), radial-gradient(1200px 600px at 90% 30%, rgba(255,255,255,.12), transparent 60%), linear-gradient(135deg, #0ea5e9 0%, #9333ea 100%); background-attachment: fixed; } """ if mode == "白色": return common + """ body{ background:#ffffff!important; } """ if mode == "自定义背景" and img_path and os.path.exists(img_path): href = f"/file={urllib.parse.quote(img_path)}" return common + f""" body{{ background: url('{href}') center/cover no-repeat fixed; }} """ return common + """ body{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%)} """ def set_ui_theme(mode: str, img_path: str) -> str: data = load_all() data["ui_background_mode"] = mode data["ui_background_img"] = img_path or "" return save_json(data, auto=True) def preload_theme_settings(): d = load_all() mode = d.get("ui_background_mode", "默认") img = d.get("ui_background_img", "") css_txt = build_theme_css(mode, img) return gr.update(value=mode), f"" # ========================== 通用聊天发送(用于编程/智能,对话分离) ========================== def chat_send_common( hist, message, chat_extra_files, task_files, pvd, mdl, last_summary, task_text, req_kws_txt, exp_txt, allow_run, allow_full_shell, *kv, ): if not (message or "").strip(): safe = render_chat_html(hist or "") return hist or "", safe, "", {} extra_paths = [resolve_file_path(p) for p in (chat_extra_files or []) if resolve_file_path(p)] task_paths = [resolve_file_path(p) for p in (task_files or []) if resolve_file_path(p)] merged = [] seen = set() for p in task_paths + extra_paths: if p and p not in seen: merged.append(p) seen.add(p) sys_hint = "" if (last_summary or "").strip(): sys_hint += f"【最近任务摘要】\n{last_summary}\n" if (task_text or "").strip(): sys_hint += f"【当前任务】\n{task_text}\n" if (req_kws_txt or "").strip(): sys_hint += f"【必须包含】{req_kws_txt}\n" if (exp_txt or "").strip(): sys_hint += f"【期望输出片段】{exp_txt}\n" if sys_hint: sys_hint += "\n" chat_ctx = build_attachments_preview(merged, per_file_chars=1200, max_files=5, max_total_chars=8000) user_prompt = sys_hint + (message or "") + ("\n\n" + chat_ctx if chat_ctx else "") # 修复: 正确调用 gather_keys_func keys = {} keys_map = [ "gemini_key", "openai_key", "openai_base", "anthropic_key", "cohere_key", "groq_key", "groq_base", "mistral_key", "mistral_base", "deepseek_key", "deepseek_base", "openrouter_key", "openrouter_base", "openrouter_referer", "openrouter_title", "perplexity_key", "perplexity_base", "xai_key", "xai_base", "azure_key", "azure_base", "azure_deployment", "azure_version", "hf_token", "github_token", "siliconflow_key", "siliconflow_base" ] for i, v in enumerate(kv): if i < len(keys_map): keys[keys_map[i]] = v ans = llm_call(pvd, mdl, user_prompt, keys, req_timeout=60) exec_report = "" try: if allow_run: blocks = re.findall(r"```json\s*(\{.*?\})\s*```", str(ans), re.S) + re.findall(r"(\{.*?\})", str(ans), re.S) cmds = [] for b in blocks: try: j = json.loads(b) if isinstance(j, dict) and isinstance(j.get("commands"), list): cmds = [str(c).strip() for c in j["commands"] if str(c).strip()] break except Exception: continue if cmds: logs = run_commands(cmds, bool(allow_full_shell)) exec_report += "\n\n[已根据AI建议执行命令]\n" + (logs[-4000:] if len(logs) > 4000 else logs) mcp_ops = {} for b in blocks: try: j = json.loads(b) if isinstance(j, dict) and isinstance(j.get("mcp"), dict): mcp_ops = j["mcp"] break except Exception: continue if mcp_ops: msgs = [] if isinstance(mcp_ops.get("install"), list): for tid in mcp_ops["install"]: msgs.append(f"安装 {tid}: {mcp.install(str(tid))}") if isinstance(mcp_ops.get("uninstall"), list): for tid in mcp_ops["uninstall"]: msgs.append(f"卸载 {tid}: {mcp.uninstall(str(tid))}") if isinstance(mcp_ops.get("config"), dict): for tid, cfg in mcp_ops["config"].items(): try: msgs.append(f"配置 {tid}: {mcp.save_config(str(tid), json.dumps(cfg, ensure_ascii=False))}") except Exception as e: msgs.append(f"配置 {tid}: 失败 {e}") if msgs: exec_report += "\n\n[已执行MCP操作]\n" + "\n".join(msgs[-50:]) except Exception as e: exec_report += f"\n\n[执行阶段异常] {e}" display_ans = str(ans) + (exec_report if exec_report else "") safe_msg = html.escape((message or "") + ("\n\n[已附加附件上下文]" if chat_ctx else "")) safe_ans = html.escape(display_ans) new_hist = (hist or "") + "|||" + safe_msg + "|||" + safe_ans html_render = render_chat_html(new_hist) payload = {"message": message or "", "extra_paths": extra_paths, "ts": time.time()} return new_hist, html_render, (display_ans if not str(ans).startswith("❗") else ""), payload def chat_retry_common( payload, hist, task_files, pvd, mdl, last_summary, task_text, req_kws_txt, exp_txt, allow_run, allow_full_shell, *kv, ): msg = (payload or {}).get("message", "") extra_paths = (payload or {}).get("extra_paths", []) if not (msg or "").strip(): safe = render_chat_html(hist or "") return hist or "", safe, "" task_paths = [resolve_file_path(p) for p in (task_files or []) if resolve_file_path(p)] merged = [] seen = set() for p in task_paths + extra_paths: if p and p not in seen: merged.append(p) seen.add(p) sys_hint = "" if (last_summary or "").strip(): sys_hint += f"【最近任务摘要】\n{last_summary}\n" if (task_text or "").strip(): sys_hint += f"【当前任务】\n{task_text}\n" if (req_kws_txt or "").strip(): sys_hint += f"【必须包含】{req_kws_txt}\n" if (exp_txt or "").strip(): sys_hint += f"【期望输出片段】{exp_txt}\n" if sys_hint: sys_hint += "\n" chat_ctx = build_attachments_preview(merged, per_file_chars=1200, max_files=5, max_total_chars=8000) user_prompt = sys_hint + msg + ("\n\n" + chat_ctx if chat_ctx else "") # 修复: 正确调用 gather_keys_func keys = {} keys_map = [ "gemini_key", "openai_key", "openai_base", "anthropic_key", "cohere_key", "groq_key", "groq_base", "mistral_key", "mistral_base", "deepseek_key", "deepseek_base", "openrouter_key", "openrouter_base", "openrouter_referer", "openrouter_title", "perplexity_key", "perplexity_base", "xai_key", "xai_base", "azure_key", "azure_base", "azure_deployment", "azure_version", "hf_token", "github_token", "siliconflow_key", "siliconflow_base" ] for i, v in enumerate(kv): if i < len(keys_map): keys[keys_map[i]] = v ans = llm_call(pvd, mdl, user_prompt, keys, req_timeout=60) exec_report = "" try: if allow_run: blocks = re.findall(r"```json\s*(\{.*?\})\s*```", str(ans), re.S) + re.findall(r"(\{.*?\})", str(ans), re.S) cmds = [] for b in blocks: try: j = json.loads(b) if isinstance(j, dict) and isinstance(j.get("commands"), list): cmds = [str(c).strip() for c in j["commands"] if str(c).strip()] break except Exception: continue if cmds: logs = run_commands(cmds, bool(allow_full_shell)) exec_report += "\n\n[已根据AI建议执行命令]\n" + (logs[-4000:] if len(logs) > 4000 else logs) mcp_ops = {} for b in blocks: try: j = json.loads(b) if isinstance(j, dict) and isinstance(j.get("mcp"), dict): mcp_ops = j["mcp"] break except Exception: continue if mcp_ops: msgs = [] if isinstance(mcp_ops.get("install"), list): for tid in mcp_ops["install"]: msgs.append(f"安装 {tid}: {mcp.install(str(tid))}") if isinstance(mcp_ops.get("uninstall"), list): for tid in mcp_ops["uninstall"]: msgs.append(f"卸载 {tid}: {mcp.uninstall(str(tid))}") if isinstance(mcp_ops.get("config"), dict): for tid, cfg in mcp_ops["config"].items(): try: msgs.append(f"配置 {tid}: {mcp.save_config(str(tid), json.dumps(cfg, ensure_ascii=False))}") except Exception as e: msgs.append(f"配置 {tid}: 失败 {e}") if msgs: exec_report += "\n\n[已执行MCP操作]\n" + "\n".join(msgs[-50:]) except Exception as e: exec_report += f"\n\n[执行阶段异常] {e}" display_ans = str(ans) + (exec_report if exec_report else "") safe_msg = html.escape(msg + ("\n\n[已附加附件上下文]" if chat_ctx else "")) safe_ans = html.escape(display_ans) new_hist = (hist or "") + "|||" + safe_msg + "|||" + safe_ans html_render = render_chat_html(new_hist) return new_hist, html_render, (display_ans if not str(ans).startswith("❗") else "") def run_commands(cmds: List[str], allow_full_shell: bool, timeout: int = 900) -> str: if not cmds: return "ℹ️ 无可执行命令" logs = [] SAFE_PREFIXES = ("python -m pip", "pip ", "pip3 ", sys.executable + " -m pip") for cmd in cmds: try: if not allow_full_shell and not cmd.startswith(SAFE_PREFIXES): logs.append(f"⛔ 已拦截(需启用完全Shell): {cmd}") continue if allow_full_shell: p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) else: parts = shlex.split(cmd) p = subprocess.Popen(parts, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) _register_proc(p) deadline = time.time() + timeout out_lines = [] err_lines = [] while True: if STOP_FLAG or time.time() > deadline: try: p.kill() except Exception: pass break line = p.stdout.readline() if line: out_lines.append(line) el = p.stderr.readline() if el: err_lines.append(el) if not line and not el and p.poll() is not None: break time.sleep(0.01) rc = p.poll() try: o, e = p.communicate(timeout=0.2) if o: out_lines.append(o) if e: err_lines.append(e) except Exception: pass out_txt = "".join(out_lines) err_txt = "".join(err_lines) if time.time() > deadline and rc is None: rc = 124 err_txt += "\n执行超时({}s)".format(timeout) if STOP_FLAG and (rc is None or rc == -9): rc = 130 err_txt += "\n[已手动停止]" logs.append(f"$ {cmd}\nrc={rc}\n{out_txt[-4000:]}\n{err_txt[-4000:]}") except Exception as e: logs.append(f"$ {cmd}\n异常: {e}") return "\n\n".join(logs) ## 第四部分A(CSS/JS和Gradio UI定义开始): # ========================== CSS & JS ========================== css = r""" *{font-family:Inter,system-ui,-apple-system,sans-serif!important;-webkit-tap-highlight-color:transparent} body{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%)} .gradio-container{max-width:min(1400px,100vw)!important;margin:0 auto!important;padding:18px!important} .custom-card{background:rgba(255,255,255,.98);border-radius:16px;padding:18px;box-shadow:0 8px 15px -3px rgba(0,0,0,.08);border:1px solid rgba(0,0,0,.06)} .chat-container{min-height:300px;max-height:600px;overflow:auto;background:#fff;border-radius:14px;border:1px solid rgba(0,0,0,.06);padding:12px;margin-bottom:8px} .user-msg,.bot-msg{display:flex;gap:10px;margin:10px 0;align-items:flex-start} .msg-avatar{flex:0 0 36px;width:36px;height:36px;border-radius:50%;display:flex;align-items:center;justify-content:center;background:#e5e7eb} .user-msg .msg-avatar{background:#667eea;color:#fff} .bot-msg .msg-avatar{background:#f093fb;color:#fff} .msg-content{flex:1;padding:10px 12px;border-radius:12px;max-width:100%;white-space:pre-wrap;word-wrap:break-word;background:#f7f7f9} .chat-toolbar{gap:8px;margin-top:6px} .chat-toolbar .gr-button{padding:6px 10px;border-radius:10px} .chat-toolbar .gr-file{max-height:42px;overflow:hidden} .tiny-files .label,.tiny-files label{font-size:12px;opacity:.8} .debug-logs{font-family:monospace!important;font-size:11px;background:#0b1021;color:#aaf255;padding:10px;border-radius:8px} pre, code{white-space:pre;overflow-x:auto} /* 顶部浮动Logo按钮 */ #global-stop-btn{position:fixed;top:10px;right:10px;z-index:9999} #global-stop-btn button{ background:linear-gradient(135deg,#8b5cf6 0%,#06b6d4 100%); color:#fff;width:36px;height:36px;min-height:36px;padding:0;border-radius:10px;line-height:36px;border:0; box-shadow:0 6px 16px rgba(139,92,246,.35);font-size:18px } #global-stop-btn button:hover{filter:brightness(1.05)} /* 移动端优化 */ @media (max-width: 900px){ .gradio-container{max-width:100vw!important;padding:10px!important} .gr-row{flex-direction:column!important} .gr-column{min-width:100%!important;max-width:100%!important} .custom-card{padding:12px} .chat-container{max-height:60vh} } """ js_code = r""" """ # 预加载最近任务与对话(分离:编程/智能) _last = load_tasks_db().get("last", {}) LAST_TASK = _last LAST_FILES = [f for f in _last.get("files", []) if f and os.path.exists(f)] LAST_CHAT_CODER = _last.get("chat_history", "") LAST_CHAT_GENERAL = _last.get("chat_history_general", "") # ========================== Gradio UI(Blocks 作用域内) ========================== with gr.Blocks( title=APP_NAME, css=css, theme=gr.themes.Soft(primary_hue="purple", secondary_hue="blue"), ) as app: gr.HTML(js_code) dynamic_css_html = gr.HTML("") # 全局状态 last_ai_response = gr.State("") last_run_summary = gr.State("") last_chat_payload_coder = gr.State({}) last_chat_payload_general = gr.State({}) selected_tool_id = gr.State("") ai_cmds_state = gr.State([]) history_coder_str = gr.State(LAST_CHAT_CODER) history_general_str = gr.State(LAST_CHAT_GENERAL) # 顶部全局按钮与提示 with gr.Row(): global_stop_btn = gr.Button("⏹", elem_id="global-stop-btn") top_hint = gr.Markdown("### 🧭 团队规划可自定义:在左侧'团队规划'中开启/修改/增减成员") gr.Markdown("## ⚙️ 模型与密钥配置(缓存持久化 + 可用性检测 + 选择修复)") with gr.Row(): with gr.Column(scale=4): # 外观与背景 with gr.Group(elem_classes="custom-card"): gr.Markdown("### 🎨 外观与背景") bg_mode = gr.Radio( choices=["默认", "白色", "美化", "自定义背景"], value=load_all().get("ui_background_mode", "默认"), label="背景模式", interactive=True, ) with gr.Row(): bg_file = gr.File( label="上传自定义背景(图片)", type="filepath", file_types=[".png", ".jpg", ".jpeg", ".webp"], scale=3, ) bg_apply_btn = gr.Button("应用背景", scale=1) bg_reset_btn = gr.Button("恢复默认", scale=1) bg_status = gr.Textbox(label="主题状态", interactive=False) # 主模型 with gr.Group(elem_classes="custom-card"): with gr.Row(): provider = gr.Radio( provider_choices(), value=load_all().get("ui_provider", "gemini"), label="提供商", interactive=True, ) with gr.Row(): cached_init = get_models_cache(load_all().get("ui_provider", "gemini")) or RECOMMENDED_MODELS.get(load_all().get("ui_provider", "gemini"), []) model = gr.Dropdown( choices=cached_init, allow_custom_value=True, label="选择或输入模型(显示全部)", interactive=True, value=(load_all().get("ui_model") or (cached_init[0] if cached_init else "")), ) copy_model_btn = gr.Button("📋 复制", scale=0, min_width=60) with gr.Row(): refresh_btn = gr.Button("🔄 刷新模型列表(自动保存)") test_btn = gr.Button("🔌 测试连接") with gr.Row(): rpm_input = gr.Number( label="每分钟请求上限 (RPM, 0 表示不限)", value=(get_saved_rpm(load_all().get("ui_provider", "gemini"), load_all().get("ui_model", "")) if load_all().get("ui_model") else 0), precision=0, ) save_rpm_btn = gr.Button("💾 保存当前模型 RPM") model_status = gr.Textbox(label="连接与设置状态", interactive=False, lines=2) with gr.Accordion("可用性批量检测", open=False): check_btn = gr.Button("🧪 开始检测(自动刷新后检测)") model_info = gr.Textbox(label="检测信息", interactive=False) model_table = gr.Dataframe( headers=["模型", "推荐", "可用"], datatype=["str", "str", "str"], interactive=True, ) tested_model_select = gr.Dropdown( label="从检测结果选择模型(仅显示可用)", choices=[], interactive=True, ) apply_tested_model_btn = gr.Button("应用所选模型") model_list_text = gr.Textbox( label="模型全列表(仅显示可用,可复制)", lines=5, interactive=False, ) copy_list_btn = gr.Button("📋 复制全部模型名") # 团队规划 team_enabled_default, team_members_default, team_pri_default, team_conc_default = load_team_from_conf() with gr.Group(elem_classes="custom-card"): gr.Markdown("### 👥 团队规划(多代理协作)") with gr.Row(): team_enable_ck = gr.Checkbox( value=team_enabled_default, label="启用团队规划(默认关闭,可在此开启/关闭)", ) team_conc = gr.Slider(1, 16, team_conc_default, step=1, label="最大并发成员数") team_pri = gr.Textbox( label="行动优先级(高→低)", value=team_pri_default or "run>code>reflect>stop", placeholder="run>code>reflect>stop", ) gr.Markdown("成员列表(可自定义增/删/改;provider/model 为空则使用上方规划师设置)") team_df = gr.Dataframe( headers=["name", "provider", "model", "persona"], value=[[m["name"], m.get("provider", ""), m.get("model", ""), m.get("persona", "")] for m in (team_members_default or [])], interactive=True, row_count=(len(team_members_default) or 4), ) with gr.Row(): team_add_row_btn = gr.Button("➕ 新增成员行") team_load_default_btn = gr.Button("📥 载入默认四人") team_save_btn = gr.Button("💾 保存团队配置") team_status = gr.Textbox(label="团队状态", interactive=False) # API 凭据 with gr.Group(elem_classes="custom-card"): gr.Markdown("### 🔐 API 凭据(自动保存,空值不覆盖已保存)") remember_ck = gr.Checkbox(value=True, label="🧠 变更时自动保存") save_status = gr.Textbox(label="保存状态", interactive=False) with gr.Accordion("🧩 自定义 API 提供商 (OpenAI 兼容)", open=False): with gr.Row(): custom_list = gr.Dropdown(label="已保存的自定义提供商", choices=list(get_custom_providers().keys()), interactive=True) custom_api_name = gr.Textbox(label="自定义提供商 ID", placeholder="例如 my-custom-api") custom_api_base = gr.Textbox(label="API Base URL", placeholder="https://api.example.com/v1") custom_api_key = gr.Textbox(label="API Key", type="password") custom_api_referer = gr.Textbox(label="HTTP-Referer (可选)") custom_api_title = gr.Textbox(label="X-Title (可选)") with gr.Row(): save_custom_api_btn = gr.Button("💾 保存/更新") delete_custom_api_btn = gr.Button("🗑 删除选中") reload_providers_btn = gr.Button("🔄 刷新提供商列表") custom_api_status = gr.Textbox(label="自定义API状态", interactive=False) with gr.Accordion("展开/折叠所有凭据", open=False): with gr.Accordion("🪄 Gemini", open=False): gemini_key = gr.Textbox(type="password", label="GOOGLE_API_KEY", value=load_all().get("gemini_key", "")) with gr.Accordion("🧠 OpenAI (openai_like)", open=False): openai_key = gr.Textbox(type="password", label="OPENAI_API_KEY", value=load_all().get("openai_key", "")) openai_base = gr.Textbox(label="OPENAI_BASE_URL", value=load_all().get("openai_base", DEFAULT_BASES["openai"])) with gr.Accordion("✨ Anthropic", open=False): anthropic_key = gr.Textbox(type="password", label="ANTHROPIC_API_KEY", value=load_all().get("anthropic_key", "")) with gr.Accordion("🧩 Cohere", open=False): cohere_key = gr.Textbox(type="password", label="COHERE_API KEY", value=load_all().get("cohere_key", "")) with gr.Accordion("🚀 Groq", open=False): groq_key = gr.Textbox(type="password", label="GROQ_API_KEY", value=load_all().get("groq_key", "")) groq_base = gr.Textbox(label="GROQ_BASE_URL", value=load_all().get("groq_base", DEFAULT_BASES["groq"])) with gr.Accordion("🌪 Mistral", open=False): mistral_key = gr.Textbox(type="password", label="MISTRAL_API KEY", value=load_all().get("mistral_key", "")) mistral_base = gr.Textbox(label="MISTRAL_BASE_URL", value=load_all().get("mistral_base", DEFAULT_BASES["mistral"])) with gr.Accordion("🧠 DeepSeek", open=False): deepseek_key = gr.Textbox(type="password", label="DEEPSEEK_API KEY", value=load_all().get("deepseek_key", "")) deepseek_base = gr.Textbox(label="DEEPSEEK_BASE_URL", value=load_all().get("deepseek_base", DEFAULT_BASES["deepseek"])) with gr.Accordion("🌐 OpenRouter", open=False): openrouter_key = gr.Textbox(type="password", label="OPENROUTER_API KEY", value=load_all().get("openrouter_key", "")) openrouter_base = gr.Textbox(label="OPENROUTER_BASE URL", value=load_all().get("openrouter_base", DEFAULT_BASES["openrouter"])) openrouter_referer = gr.Textbox(label="Referer(可选)", value=load_all().get("openrouter_referer", "")) openrouter_title = gr.Textbox(label="Title(可选)", value=load_all().get("openrouter_title", "")) with gr.Accordion("🔎 Perplexity", open=False): perplexity_key = gr.Textbox(type="password", label="PPLX_API KEY", value=load_all().get("perplexity_key", "")) perplexity_base = gr.Textbox(label="PPLX_BASE_URL", value=load_all().get("perplexity_base", DEFAULT_BASES["perplexity"])) with gr.Accordion("🤖 xAI", open=False): xai_key = gr.Textbox(type="password", label="XAI_API KEY", value=load_all().get("xai_key", "")) xai_base = gr.Textbox(label="XAI_BASE_URL", value=load_all().get("xai_base", DEFAULT_BASES["xai"])) with gr.Accordion("💠 Azure OpenAI", open=False): azure_key = gr.Textbox(type="password", label="AZURE_KEY", value=load_all().get("azure_key", "")) azure_base = gr.Textbox(label="AZURE_BASE", value=load_all().get("azure_base", DEFAULT_BASES["azure"])) azure_deployment = gr.Textbox(label="DEPLOYMENT", value=load_all().get("azure_deployment", "")) azure_version = gr.Textbox(label="API_VERSION", value=load_all().get("azure_version", "2024-02-15-preview")) with gr.Accordion("🤗 Hugging Face", open=False): hf_token = gr.Textbox(type="password", label="HF_TOKEN", value=load_all().get("hf_token", "")) with gr.Accordion("🐙 GitHub", open=False): github_token = gr.Textbox(type="password", label="GitHub PAT", value=load_all().get("github_token", "")) with gr.Accordion("🧪 SiliconFlow", open=False): siliconflow_key = gr.Textbox(type="password", label="SILICONFLOW_API KEY", value=load_all().get("siliconflow_key", "")) siliconflow_base = gr.Textbox(label="SILICONFLOW_BASE_URL", value=load_all().get("siliconflow_base", DEFAULT_BASES["siliconflow"])) # 右侧主操作区 with gr.Column(scale=8): with gr.Tabs() as main_tabs: # ===== 自动编程(修复引擎) ===== with gr.Tab("🛠️ 自动编程(修复引擎)"): with gr.Group(elem_classes="custom-card"): global_hint = gr.Textbox( label="🧭 全局提示词(修改后自动永久保存)", value=load_all().get("global_hint", DEFAULT_GLOBAL_HINT_STR), lines=3, ) task_input = gr.Textbox( label="🧩 任务描述", lines=5, placeholder="例如:修复 OKX 交易脚本下单报错;保持原有功能与结构", value=LAST_TASK.get("task", ""), elem_id="task_input", ) with gr.Row(): paste_btn = gr.Button("📋 粘贴到任务") attach_files = gr.Files( label="📎 附件(自动保存副本至 uploads/)", type="filepath", file_types=None, file_count="multiple", elem_classes="tiny-files", ) with gr.Accordion("🗂 历史附件(从 uploads/ 选择)", open=False): with gr.Row(): hist_files_dropdown = gr.Dropdown( label="选择历史附件(多选,默认最新)", choices=[], multiselect=True, scale=6, value=[], ) hist_files_refresh_btn = gr.Button("🔄 刷新", scale=1) hist_files_apply_btn = gr.Button("✅ 使用所选", scale=1) with gr.Accordion("📄 基线代码(可选)", open=False): baseline_code = gr.Code( label="若填入,将优先以此作为起始 main.py", language="python", value=LAST_TASK.get("baseline_code", ""), lines=14, ) with gr.Row(): required_kws = gr.Textbox(label="必须包含的关键词/结构(逗号分隔)", value=LAST_TASK.get("required_kws", "")) expected_stdout = gr.Textbox(label="期望 stdout 片段(可选)", value=LAST_TASK.get("expected_stdout", "")) cli_args = gr.Textbox(label="运行参数(可选)", value=LAST_TASK.get("cli_args", ""), placeholder="如: --symbol BTC-USDT-SWAP --size 1") with gr.Row(elem_classes="chat-toolbar"): max_attempts = gr.Slider(1, 10, 3, step=1, label="最大尝试次数") timeout_sec = gr.Slider(5, 600, 40, step=1, label="运行超时(秒/次)") run_btn = gr.Button("▶️ 开始自主修复", variant="primary") stop_btn = gr.Button("⏹ 停止") hist_init = get_task_history_titles() with gr.Row(): new_task_btn = gr.Button("🆕 新建任务") save_task_btn = gr.Button("💾 保存任务") restore_last_btn = gr.Button("📥 恢复最近任务") history_select = gr.Dropdown(label="任务历史(最近20条)", choices=hist_init, value=(hist_init[0] if hist_init else None), interactive=True) load_history_btn = gr.Button("📂 载入所选历史") with gr.Accordion("💬 任务对话(独立于'智能对话')", open=True): coder_chat_html = gr.HTML(render_chat_html(LAST_CHAT_CODER)) with gr.Row(): coder_user_in = gr.Textbox(label="", placeholder="与AI讨论任务、追加需求、追问原因…(共享自动编程附件)", lines=3, scale=6) coder_chat_files = gr.Files(label="📎", type="filepath", file_types=None, file_count="multiple", scale=1, elem_classes="tiny-files") coder_insert_files_btn = gr.Button("📥 插入附件内容", scale=1) with gr.Row(elem_classes="chat-toolbar"): coder_enable_shell = gr.Checkbox(label="允许AI执行命令", value=False) coder_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) coder_send_btn = gr.Button("📨 发送", variant="secondary") coder_retry_btn = gr.Button("🔁 重试发送") coder_stop_chat_btn = gr.Button("⏹ 停止聊天") coder_send_to_coder_btn = gr.Button("➡️ 发送到编程任务") coder_copy_msg_btn = gr.Button("📋 复制消息") coder_copy_chat_btn = gr.Button("📋 复制会话") coder_download_chat_btn = gr.Button("💾 下载会话") coder_chat_status = gr.Textbox(label="对话状态", interactive=False) with gr.Row(): coder_download_chat_file = gr.File(label="🗎 会话TXT(任务)", interactive=False) coder_chat_download_link = gr.HTML("") with gr.Accordion("🧾 修复后的 main.py(默认收起)", open=False): code_out = gr.Code(language="python") with gr.Row(): status_out = gr.Textbox(label="状态", interactive=False) attempts_out = gr.Number(label="尝试次数", interactive=False) with gr.Row(): stdout_out = gr.Textbox(label="📤 STDOUT(最后一次)", lines=8) stderr_out = gr.Textbox(label="📥 STDERR(最后一次)", lines=8) with gr.Row(): download_file = gr.File(label="⬇️ 下载 main.py", interactive=False) download_zip = gr.File(label="⬇️ 下载运行目录 ZIP", interactive=False) download_main_link = gr.HTML("") download_zip_link = gr.HTML("") run_folder = gr.Textbox(label="📂 本次运行目录", interactive=False) with gr.Row(): watch_btn = gr.Button("👀 查看实时日志") stop_watch_btn = gr.Button("🛑 停止查看") live_log = gr.Textbox(label="📟 实时日志(仅当前任务 STDOUT/STDERR/pip.log)", lines=20) with gr.Accordion("🪵 详细调试日志(完整记录)", open=False): debug_logs_display = gr.Textbox(label="完整运行日志(包含所有提示与响应、执行细节)", lines=18, interactive=False, elem_classes="debug-logs") with gr.Row(): refresh_debug_btn = gr.Button("🔄 刷新") copy_debug_btn = gr.Button("📋 复制") clear_debug_btn = gr.Button("🗑 清空") with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 🧾 日志管理(调试/运行)") log_selector = gr.Dropdown(label="选择日志文件", choices=[], interactive=True) log_info = gr.Textbox(label="文件信息", interactive=False) with gr.Row(): logs_refresh_btn = gr.Button("🔄 刷新列表") logs_download_btn = gr.Button("⬇️ 下载所选") logs_clear_btn = gr.Button("🗑 清空全部") log_preview = gr.Textbox(label="📄 日志预览(尾部,可复制)", lines=12, interactive=False) download_log = gr.File(label="⬇️ 日志文件", interactive=False) logs_download_link = gr.HTML("") with gr.Row(): download_all_btn = gr.Button("📦 下载全部日志(ZIP)") download_all_file = gr.File(label="⬇️ 全部日志 ZIP", interactive=False) download_all_link = gr.HTML("") with gr.Column(scale=1): gr.Markdown("### 📦 打包运行目录") pack_status = gr.Textbox(label="打包状态", interactive=False) pack_btn = gr.Button("📦 重新打包") # ===== 智能对话 ===== with gr.Tab("🧠 智能对话"): chat_html = gr.HTML(render_chat_html(LAST_CHAT_GENERAL)) with gr.Row(): user_in = gr.Textbox(label="", placeholder="输入你的问题…(共享自动编程附件;支持多行)", lines=3, scale=6) chat_files = gr.Files(label="📎", type="filepath", file_types=None, file_count="multiple", elem_classes="tiny-files") insert_files_btn = gr.Button("📥 插入附件内容") with gr.Row(): import_coder_turn_select = gr.Dropdown(label="从编程对话选择一条引用", choices=chat_choices_from_hist(LAST_CHAT_CODER), interactive=True, scale=3) import_coder_turn_preview = gr.Textbox(label="引用预览", lines=4, interactive=False, scale=5) import_coder_turn_btn = gr.Button("↘️ 插入到输入框", scale=1) with gr.Row(elem_classes="chat-toolbar"): chat_enable_shell = gr.Checkbox(label="允许AI执行命令", value=False) chat_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) send_btn = gr.Button("📨 发送", variant="primary") chat_retry_btn = gr.Button("🔁 重试发送") stop_chat_btn = gr.Button("⏹ 停止聊天") send_to_coder_btn = gr.Button("➡️ 发送到编程任务") clear_btn = gr.Button("🗑 清空") copy_msg_btn = gr.Button("📋 复制消息") copy_chat_btn = gr.Button("📋 复制会话") download_chat_btn = gr.Button("💾 下载会话") send_status = gr.Textbox(label="发送状态", interactive=False) with gr.Row(): download_chat_file = gr.File(label="🗎 会话TXT(智能对话)", interactive=False) chat_download_link = gr.HTML("") with gr.Tab("🖥️ 命令执行"): with gr.Group(elem_classes="custom-card"): gr.Markdown("### 🖥️ 命令执行(AI 生成 + 审阅 + 执行)") with gr.Row(): cmd_ctx_src = gr.Radio(choices=["编程对话", "智能对话"], value="编程对话", label="上下文来源", scale=1) chat_for_cmd_select = gr.Dropdown(label="选择对话轮次", choices=[], interactive=True, scale=2) selected_chat_preview = gr.Textbox(label="引用预览", lines=5, interactive=False) with gr.Row(): ai_task = gr.Textbox(label="任务描述(自然语言)", placeholder="例如:安装 requests pydantic;清理缓存;导出日志", lines=3, scale=3) with gr.Column(scale=1): ai_suggest_btn = gr.Button("🤖 从描述生成命令") ai_from_chat_btn = gr.Button("🧠 从所选对话生成命令") ai_from_all_chat_btn = gr.Button("📚 从整段对话生成命令") with gr.Row(elem_classes="chat-toolbar"): ai_enable_shell = gr.Checkbox(label="允许执行命令", value=False) ai_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) ai_run_btn = gr.Button("▶ 执行命令", variant="primary") with gr.Row(): ai_cmds_json = gr.Code(label="命令 JSON(可编辑)", language="json", value="", lines=10, scale=2) shell_output = gr.Textbox(label="执行输出", lines=12, interactive=False, scale=3) ai_cmds_state = gr.State([]) with gr.Tab("📦 依赖管理"): with gr.Group(elem_classes="custom-card"): gr.Markdown("### 📦 依赖管理(pip)") dep_pkg_text = gr.Textbox(label="包名(可多个,空格分隔)", placeholder="如:requests pydantic", lines=2) with gr.Row(): dep_install_btn = gr.Button("⬇ 安装/升级") dep_uninstall_btn= gr.Button("🗑 卸载") dep_upgrade_btn = gr.Button("⬆ 升级(同 安装/升级)") dep_freeze_btn = gr.Button("📋 列出已安装(pip list)") with gr.Row(elem_classes="chat-toolbar"): dep_enable_shell = gr.Checkbox(label="允许执行命令", value=False) dep_full_shell = gr.Checkbox(label="允许完整 shell(危险)", value=False) dep_out = gr.Textbox(label="输出", lines=14, interactive=False) with gr.Tab("📁 项目管理"): with gr.Group(elem_classes="custom-card"): gr.Markdown("### 📁 GitHub 仓库管理") gh_url = gr.Textbox(label="仓库 URL", placeholder="https://github.com/user/repo.git") gh_pull_btn= gr.Button("⬇ 拉取/更新") gh_status = gr.Textbox(label="状态", interactive=False) with gr.Row(): gh_refresh_btn = gr.Button("🔄 刷新项目列表") projects_dd = gr.Dropdown(label="本地项目(含 .git)", choices=[], interactive=True) branch_dd = gr.Dropdown(label="分支", choices=[], interactive=True) with gr.Row(): new_branch = gr.Textbox(label="新分支名", placeholder="feature/x") branch_create_btn = gr.Button("🌿 创建/切换") files_df = gr.Dataframe(headers=["文件", "大小", "修改时间"], datatype=["str","str","str"], interactive=False) file_sel = gr.Dropdown(label="选择文件", choices=[], interactive=True) file_view = gr.Code(label="文件内容", language="python", lines=16) with gr.Row(): file_save_btn = gr.Button("💾 保存文件") commit_msg = gr.Textbox(label="提交信息", placeholder="update ...") commit_btn = gr.Button("✅ 提交") # -------------------------- 事件与逻辑绑定 # === 依赖管理 事件 === def _dep_make_cmds(action, pkgs): pkgs = (pkgs or "").strip() if action in ("install", "upgrade"): if not pkgs: return [f"echo 请输入要安装/升级的包名"] return [f"{sys.executable} -m pip install -U {pkgs}"] if action == "uninstall": if not pkgs: return [f"echo 请输入要卸载的包名"] return [f"{sys.executable} -m pip uninstall -y {pkgs}"] if action == "freeze": return [f"{sys.executable} -m pip list --format=freeze"] return ["echo 未知动作"] dep_install_btn.click( lambda pkgs, enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("install", pkgs), bool(full))), inputs=[dep_pkg_text, dep_enable_shell, dep_full_shell], outputs=[dep_out], ) dep_uninstall_btn.click( lambda pkgs, enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("uninstall", pkgs), bool(full))), inputs=[dep_pkg_text, dep_enable_shell, dep_full_shell], outputs=[dep_out], ) dep_upgrade_btn.click( lambda pkgs, enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("upgrade", pkgs), bool(full))), inputs=[dep_pkg_text, dep_enable_shell, dep_full_shell], outputs=[dep_out], ) dep_freeze_btn.click( lambda enable, full: ("❗ 未勾选“允许执行命令”" if not enable else run_commands(_dep_make_cmds("freeze", ""), bool(full))), inputs=[dep_enable_shell, dep_full_shell], outputs=[dep_out], ) # === GitHub 项目管理 事件 === def _gh_refresh_projects_ui(): projs = list_projects() return gr.update(choices=projs, value=(projs[0] if projs else None)) def _gh_refresh_branches_files(proj): brs = list_branches(proj) if proj else [] files = list_files_in_project(proj) if proj else [["-", "-", "-"]] file_choices = [row[0] for row in files if row and row[0] not in ("-", "")] return (gr.update(choices=brs, value=(brs[0] if brs else None)), files, gr.update(choices=file_choices, value=(file_choices[0] if file_choices else None)), "") gh_refresh_btn.click(_gh_refresh_projects_ui, outputs=[projects_dd]) projects_dd.change(_gh_refresh_branches_files, inputs=[projects_dd], outputs=[branch_dd, files_df, file_sel, file_view]) branch_dd.change( lambda proj, br: create_and_switch_branch(proj, br)[0], inputs=[projects_dd, branch_dd], outputs=[gh_status], ) gh_pull_btn.click( lambda url, token: (pull_github_repo(url, token),), inputs=[gh_url, github_token], outputs=[gh_status], ).then(_gh_refresh_projects_ui, outputs=[projects_dd]) file_sel.change(lambda proj, rel: read_file_content(proj, rel), inputs=[projects_dd, file_sel], outputs=[file_view]) file_save_btn.click(lambda proj, rel, txt: save_file_content(proj, rel, txt), inputs=[projects_dd, file_sel, file_view], outputs=[gh_status]) commit_btn.click(lambda proj, msg: commit_changes(proj, msg), inputs=[projects_dd, commit_msg], outputs=[gh_status]) # -------------------------- 事件与逻辑绑定(全部在 Blocks 作用域内) -------------------------- # 工具函数(UI链路需要) def update_run_links( code_out, status_out, attempts_out, stdout_out, stderr_out, download_file_val, download_zip_val, run_folder_val, debug_logs_display ): main_link_html = build_file_link_html(download_file_val, "下载 main.py") if download_file_val else "(无 main.py)" zip_link_html = build_file_link_html(download_zip_val, "下载运行目录 ZIP") if download_zip_val else "(无 ZIP)" return main_link_html, zip_link_html def gather_keys_func(*kv): keys_map = [ "gemini_key", "openai_key", "openai_base", "anthropic_key", "cohere_key", "groq_key", "groq_base", "mistral_key", "mistral_base", "deepseek_key", "deepseek_base", "openrouter_key", "openrouter_base", "openrouter_referer", "openrouter_title", "perplexity_key", "perplexity_base", "xai_key", "xai_base", "azure_key", "azure_base", "azure_deployment", "azure_version", "hf_token", "github_token", "siliconflow_key", "siliconflow_base" ] return {keys_map[i]: v for i, v in enumerate(kv) if i < len(keys_map)} # 0) 顶部停止 # 修复 Gradio 上下文 try: from gradio.context import Context as _GrCx _GrCx.root_block = app except: pass global_stop_btn.click(lambda: stop_all()).then( None, None, None, js="(...args) => { console.log('Stop button clicked', args); return args; }" ) # 1) 主题与背景 def apply_background(mode, f): img_path = "" if mode == "自定义背景" and f: persisted = persist_files_to_uploads([resolve_file_path(f)]) img_path = persisted[0] if persisted else "" css_txt = build_theme_css(mode, img_path) set_ui_theme(mode, img_path) return ( f"✅ 已应用背景:{mode}" + (f"({Path(img_path).name})" if img_path else ""), f"", ) def reset_background(): css_txt = build_theme_css("默认", "") set_ui_theme("默认", "") return "✅ 已恢复默认背景", f"" bg_apply_btn.click(apply_background, inputs=[bg_mode, bg_file], outputs=[bg_status, dynamic_css_html]) bg_reset_btn.click(reset_background, outputs=[bg_status, dynamic_css_html]) # 2) 自定义API管理 def ui_refresh_provider_and_list(): choices = provider_choices() data = load_all() cur = data.get("ui_provider", "gemini") val = cur if cur in choices else (choices[0] if choices else "gemini") clist = list(get_custom_providers().keys()) debug("UI", "刷新提供商+自定义列表", {"providers": len(choices), "customs": len(clist)}) return gr.update(choices=choices, value=val), gr.update(choices=clist, value=(clist[0] if clist else None)) def ui_save_custom_api(name, base, key, ref, title): msg = save_custom_provider(name, base, key, referer=ref, title=title) p_upd, list_upd = ui_refresh_provider_and_list() try: if name and isinstance(p_upd, dict) and name in (p_upd.get("choices") or []): p_upd = gr.update(choices=p_upd.get("choices"), value=name) except Exception: pass return msg, list_upd, p_upd def ui_delete_custom_api(sel): msg = remove_custom_provider(sel) p_upd, list_upd = ui_refresh_provider_and_list() return msg, list_upd, p_upd def ui_load_custom_fields(sel): c = get_custom_providers().get(sel or "", {}) return (sel or "", c.get("base", ""), c.get("key", ""), c.get("referer", ""), c.get("title", "")) save_custom_api_btn.click( ui_save_custom_api, inputs=[custom_api_name, custom_api_base, custom_api_key, custom_api_referer, custom_api_title], outputs=[custom_api_status, custom_list, provider], ) delete_custom_api_btn.click(ui_delete_custom_api, inputs=[custom_list], outputs=[custom_api_status, custom_list, provider]) reload_providers_btn.click(ui_refresh_provider_and_list, outputs=[provider, custom_list]) custom_list.change( ui_load_custom_fields, inputs=[custom_list], outputs=[custom_api_name, custom_api_base, custom_api_key, custom_api_referer, custom_api_title], ) # 3) Keys 自动保存 all_key_inputs = [ gemini_key, openai_key, openai_base, anthropic_key, cohere_key, groq_key, groq_base, mistral_key, mistral_base, deepseek_key, deepseek_base, openrouter_key, openrouter_base, openrouter_referer, openrouter_title, perplexity_key, perplexity_base, xai_key, xai_base, azure_key, azure_base, azure_deployment, azure_version, hf_token, github_token, siliconflow_key, siliconflow_base, ] all_inputs_for_save = [remember_ck, provider, model, github_token, global_hint, task_input] + all_key_inputs def on_any_change_proxy(remember, pvd, mdl, token, hint, task, *kv): debug("UI", "on_change 触发", {"remember": bool(remember), "pvd": pvd, "mdl": mdl}) if not remember: return "" keys = gather_keys_func(*kv) return auto_save_ui(keys, pvd, mdl, token, hint, last_task=task) # 自动保存事件绑定 def on_any_change_proxy(remember, pvd, mdl, token, hint, task, *kv): debug("UI", "on_change 触发", {"remember": bool(remember), "pvd": pvd, "mdl": mdl}) if not remember: return "" keys = gather_keys_func(*kv) return auto_save_ui(keys, pvd, mdl, token, hint, last_task=task) all_inputs_for_save = [remember_ck, provider, model, github_token, global_hint, task_input] + all_key_inputs for comp in all_inputs_for_save: comp.change(on_any_change_proxy, inputs=all_inputs_for_save, outputs=save_status) for comp in all_inputs_for_save: comp.change(on_any_change_proxy, inputs=all_inputs_for_save, outputs=save_status) def save_hint_now(hint, pvd, mdl, token, task, *kv): debug("UI", "保存全局提示词", {"len": len(hint or "")}) keys = gather_keys_func(*kv) return save_all_ui(keys, pvd, mdl, token, hint, last_task=task, auto=True) global_hint.change( save_hint_now, inputs=[global_hint, provider, model, github_token, task_input] + all_key_inputs, outputs=[save_status], ) # 4) Provider/Model 选择与检测 def recover_provider_model(): data = load_all() pvd = data.get("ui_provider", "gemini") models = get_models_cache(pvd) or RECOMMENDED_MODELS.get(pvd, []) mdl_saved = data.get("ui_model", "") mdl_val = mdl_saved if mdl_saved in models else (models[0] if models else "") return gr.update(choices=provider_choices(), value=pvd), gr.update(choices=models, value=mdl_val) app.load(recover_provider_model, outputs=[provider, model]) def on_provider_change(pvd, cur_m): choices = get_models_cache(pvd) or RECOMMENDED_MODELS.get(pvd, []) data = load_all() saved_mdl = data.get("ui_model", "") value = saved_mdl if saved_mdl in choices else (cur_m if cur_m in choices else (choices[0] if choices else "")) debug("UI", "提供商切换", {"provider": pvd, "model_value": value}) return gr.update(choices=choices, value=value), gr.update(choices=choices, value=value) provider.change(on_provider_change, inputs=[provider, model], outputs=[model, tested_model_select]) def ui_refresh_models(pvd, mdl, *kv): mdl_update, info, table, list_text = refresh_models(pvd, mdl, gather_keys_func(*kv)) models_cached = get_models_cache(pvd) tested_dd = gr.update( choices=models_cached, value=(mdl if mdl in models_cached else (models_cached[0] if models_cached else "")), ) return mdl_update, info, table, list_text, tested_dd refresh_btn.click( ui_refresh_models, inputs=[provider, model] + all_key_inputs, outputs=[model, model_info, model_table, model_list_text, tested_model_select], ) def load_rpm_for_current(pvd, mdl): return gr.update(value=int(get_saved_rpm(pvd, mdl) if mdl else 0)) def save_current_rpm(pvd, mdl, rpm): if not (pvd and mdl): return "❗ 请先选择提供商与模型" set_saved_rpm(pvd, mdl, int(max(0, rpm or 0))) return f"✅ 已保存 {pvd}:{mdl} 的 RPM = {int(max(0, rpm or 0))}" model.change(load_rpm_for_current, inputs=[provider, model], outputs=[rpm_input]) save_rpm_btn.click(save_current_rpm, inputs=[provider, model, rpm_input], outputs=[model_status]) def run_detection_with_refresh(pvd, *kv): models, rec, err = get_models_list(pvd, gather_keys_func(*kv)) if models: set_models_cache(pvd, models) data = load_all() saved_mdl = data.get("ui_model", "") mdl_val = saved_mdl if saved_mdl in models else (models[0] if models else "") mdl_update = gr.update(choices=models, value=mdl_val) rows, info, ok_models = test_models(pvd, gather_keys_func(*kv)) ok_choices = ok_models ok_value = ok_choices[0] if ok_choices else None ok_list_text = "\n".join(ok_choices) return (mdl_update, rows, info, gr.update(choices=ok_choices, value=ok_value), ok_list_text) check_btn.click( run_detection_with_refresh, inputs=[provider] + all_key_inputs, outputs=[model, model_table, model_info, tested_model_select, model_list_text], ) def on_model_df_select(evt: gr.SelectData, table, pvd, token, hint, task, *kv): try: ridx = evt.index[0] m = table[ridx][0] if m and m not in {"-", ""}: debug("MODELS", "表格选择模型", {"model": m}) save_all_ui(gather_keys_func(*kv), pvd, m, token, hint, last_task=task, auto=True) return gr.update(value=m), f"✅ 已选: {m}" except Exception as e: return gr.update(), f"❌ 选择失败: {e}" return gr.update(), "" model_table.select( on_model_df_select, inputs=[model_table, provider, github_token, global_hint, task_input] + all_key_inputs, outputs=[model, model_status], ) apply_tested_model_btn.click( lambda m: ((gr.update(value=m), f"✅ 已应用模型: {m}") if m else (gr.update(), "❗ 请选择模型")), inputs=[tested_model_select], outputs=[model, model_status], ) test_btn.click( lambda pvd, mdl, *kv: ( "✅ 连接成功!" if not str(llm_call(pvd, mdl, "请仅回复:pong", gather_keys_func(*kv), req_timeout=30)).startswith("❗") else str(llm_call(pvd, mdl, "请仅回复:pong", gather_keys_func(*kv), req_timeout=30)) ), inputs=[provider, model] + all_key_inputs, outputs=[model_status], ) # JS 小工具 copy_model_btn.click(None, inputs=[model], js="(m)=>window.copyText(String(m||''))") copy_list_btn.click(None, inputs=[model_list_text], js="(t)=>window.copyText(String(t||''))") paste_btn.click( None, None, js=""" ()=>{navigator.clipboard.readText().then(t=>{ const ta=document.querySelector('#task_input textarea'); if(ta){ta.value=(ta.value||'')+t;ta.dispatchEvent(new Event('input',{bubbles:true}))} })} """ ) # 5) 团队配置 team_add_row_btn.click(lambda rows: (rows or []) + [["", "", "", ""]], inputs=[team_df], outputs=[team_df]) team_load_default_btn.click(lambda: [[m["name"], m.get("provider", ""), m.get("model", ""), m.get("persona", "")] for m in DEFAULT_TEAM], outputs=[team_df]) def save_team_cfg(enabled, df_rows, pri, conc): msg = save_team_to_conf(bool(enabled), df_rows, pri, int(conc)) return msg team_save_btn.click(save_team_cfg, inputs=[team_enable_ck, team_df, team_pri, team_conc], outputs=[team_status]) # 6) 历史附件与任务状态 def list_uploads_files_ui(limit=200) -> List[str]: try: items = [] for p in sorted(Path(UPLOADS_DIR).glob("*"), key=lambda x: x.stat().st_mtime, reverse=True): if p.is_file(): items.append(str(p)) return items[:limit] except Exception as e: debug("UPLOADS", "列出失败", {"err": str(e)}) return [] def refresh_hist_files(): choices = list_uploads_files_ui() default_val = [choices[0]] if choices else [] return gr.update(choices=choices, value=default_val) hist_files_refresh_btn.click(refresh_hist_files, outputs=[hist_files_dropdown]) def apply_hist_files(selected, task, base, req, exp, cli, pvd, mdl): sel = [resolve_file_path(p) for p in (selected or []) if resolve_file_path(p)] msg = save_task_state(task, sel, base, req, exp, cli, pvd, mdl) titles = get_task_history_titles() return (gr.update(value=sel), msg, gr.update(choices=titles, value=(titles[0] if titles else None))) hist_files_apply_btn.click( apply_hist_files, inputs=[hist_files_dropdown, task_input, baseline_code, required_kws, expected_stdout, cli_args, provider, model], outputs=[attach_files, save_status, history_select], ) def new_task(): return "", gr.update(value=[]), "", "", "", "", "✅ 已新建任务草稿(未保存)" new_task_btn.click( new_task, outputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, save_status], ) def on_files_change(files, task, base, req, exp, cli, pvd, mdl): debug("UI", "附件变更", {"count": len(files or [])}) saved = persist_files_to_uploads([resolve_file_path(p) for p in (files or []) if resolve_file_path(p)]) msg = save_task_state(task, saved, base, req, exp, cli, pvd, mdl) titles = get_task_history_titles() return (gr.update(value=saved), msg, gr.update(choices=titles, value=(titles[0] if titles else None))) attach_files.change( on_files_change, inputs=[attach_files, task_input, baseline_code, required_kws, expected_stdout, cli_args, provider, model], outputs=[attach_files, save_status, history_select], ) def auto_save_task(task, files, base, req, exp, cli, pvd, mdl): saved = [resolve_file_path(p) for p in (files or []) if resolve_file_path(p)] msg = save_task_state(task, saved, base, req, exp, cli, pvd, mdl) titles = get_task_history_titles() return msg, gr.update(choices=titles, value=(titles[0] if titles else None)) for comp in [task_input, baseline_code, required_kws, expected_stdout, cli_args, provider, model]: comp.change( auto_save_task, inputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, provider, model], outputs=[save_status, history_select], ) def do_restore_last(): data = restore_last_task_state() coder_hist = data.get("chat_history", "") general_hist = data.get("chat_history_general", "") return ( data.get("task", ""), gr.update(value=data.get("files", [])), data.get("baseline_code", ""), data.get("required_kws", ""), data.get("expected_stdout", ""), data.get("cli_args", ""), "✅ 已恢复最近任务", coder_hist, render_chat_html(coder_hist), general_hist, render_chat_html(general_hist), ) restore_last_btn.click( do_restore_last, outputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, save_status, history_coder_str, coder_chat_html, history_general_str, chat_html], ) def do_load_history(title): titles = get_task_history_titles() if not titles: return (gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), "ℹ️ 暂无历史", history_coder_str.value, render_chat_html(history_coder_str.value), history_general_str.value, render_chat_html(history_general_str.value)) try: idx = int(title.split("|")[0].strip()) - 1 if title else 0 except Exception: idx = 0 data = get_task_history_by_index(idx) coder_hist = data.get("chat_history", "") general_hist = data.get("chat_history_general", "") return ( data.get("task", ""), gr.update(value=data.get("files", [])), data.get("baseline_code", ""), data.get("required_kws", ""), data.get("expected_stdout", ""), data.get("cli_args", ""), "✅ 已载入历史", coder_hist, render_chat_html(coder_hist), general_hist, render_chat_html(general_hist), ) load_history_btn.click( do_load_history, inputs=[history_select], outputs=[task_input, attach_files, baseline_code, required_kws, expected_stdout, cli_args, save_status, history_coder_str, coder_chat_html, history_general_str, chat_html], ) # 7) 对话功能 def insert_files_into_message(paths, prev_text): blocks = [] for p in paths or []: pp = resolve_file_path(p) if not pp: continue try: name = Path(pp).name if Path(pp).suffix.lower() not in TEXT_EXTS and not looks_text(pp): blocks.append(f"文件: {name}\n<非文本/二进制文件,已记录路径:{pp}>") continue b = Path(pp).read_bytes()[:200_000] t = b.decode("utf-8", "replace") lang = "python" if name.endswith(".py") else "" blocks.append(f"文件: {name}\n```{lang}\n{t}\n```") except Exception as e: blocks.append(f"文件: {Path(pp).name}\n<读取失败: {e}>") new_text = prev_text or "" if blocks: new_text = (new_text + ("\n\n" if new_text else "") + "\n\n".join(blocks)).strip() return gr.update(value=new_text) insert_files_btn.click(insert_files_into_message, inputs=[chat_files, user_in], outputs=[user_in]) coder_insert_files_btn.click(insert_files_into_message, inputs=[coder_chat_files, coder_user_in], outputs=[coder_user_in]) coder_send_btn.click( chat_send_common, inputs=[history_coder_str, coder_user_in, coder_chat_files, attach_files, provider, model, last_run_summary, task_input, required_kws, expected_stdout, coder_enable_shell, coder_full_shell] + all_key_inputs, outputs=[history_coder_str, coder_chat_html, last_ai_response, last_chat_payload_coder], ).then( lambda h: persist_chat_history(h), inputs=[history_coder_str], outputs=[coder_chat_status], ).then(lambda: "", outputs=[coder_user_in]) coder_retry_btn.click( chat_retry_common, inputs=[last_chat_payload_coder, history_coder_str, attach_files, provider, model, last_run_summary, task_input, required_kws, expected_stdout, coder_enable_shell, coder_full_shell] + all_key_inputs, outputs=[history_coder_str, coder_chat_html, last_ai_response], ).then(lambda h: persist_chat_history(h), inputs=[history_coder_str], outputs=[coder_chat_status]) coder_stop_chat_btn.click(lambda: "⏹ 已请求停止聊天(当前请求无法中断,将尽快返回)", outputs=[coder_chat_status]) coder_copy_msg_btn.click(None, inputs=[coder_user_in], js="(t)=>window.copyText(String(t||''))") coder_copy_chat_btn.click( None, inputs=[history_coder_str], js=""" (h)=>{ const parts=String(h||'').split('|||'); let lines=[]; for(let i=1;i导出失败: {e}" coder_download_chat_btn.click( lambda h: export_chat(h, "任务对话"), inputs=[history_coder_str], outputs=[coder_download_chat_file, coder_chat_download_link], ) # 智能对话 send_btn.click( chat_send_common, inputs=[history_general_str, user_in, chat_files, attach_files, provider, model, last_run_summary, task_input, required_kws, expected_stdout, chat_enable_shell, chat_full_shell] + all_key_inputs, outputs=[history_general_str, chat_html, last_ai_response, last_chat_payload_general], ).then(lambda h: persist_general_chat_history(h), inputs=[history_general_str], outputs=[send_status]).then(lambda: "", outputs=[user_in]) chat_retry_btn.click( chat_retry_common, inputs=[last_chat_payload_general, history_general_str, attach_files, provider, model, last_run_summary, task_input, required_kws, expected_stdout, chat_enable_shell, chat_full_shell] + all_key_inputs, outputs=[history_general_str, chat_html, last_ai_response], ).then(lambda h: persist_general_chat_history(h), inputs=[history_general_str], outputs=[send_status]) stop_chat_btn.click(lambda: "⏹ 已请求停止聊天(当前请求无法中断,将尽快返回)", outputs=[send_status]) def refresh_coder_turn_choices(hist: str): choices = chat_choices_from_hist(hist) default = choices[0] if choices else None preview = (preview_chat_by_choice(hist, default) if default else "(没有编程对话历史)") return gr.update(choices=choices, value=default), preview app.load(lambda: refresh_coder_turn_choices(LAST_CHAT_CODER), outputs=[import_coder_turn_select, import_coder_turn_preview]) history_coder_str.change(refresh_coder_turn_choices, inputs=[history_coder_str], outputs=[import_coder_turn_select, import_coder_turn_preview]) import_coder_turn_select.change( lambda h, c: preview_chat_by_choice(h, c), inputs=[history_coder_str, import_coder_turn_select], outputs=[import_coder_turn_preview], ) import_coder_turn_btn.click( lambda prev, txt: gr.update(value=((prev or "") + ("\n\n【引用自编程对话】\n" + txt) if txt else "")), inputs=[user_in, import_coder_turn_preview], outputs=[user_in], ) copy_msg_btn.click(None, inputs=[user_in], js="(t)=>window.copyText(String(t||''))") copy_chat_btn.click( None, inputs=[history_general_str], js=""" (h)=>{ const parts=String(h||'').split('|||'); let lines=[]; for(let i=1;icode>reflect>stop"), team_max_conc=int(max(1, min(16, team_conc_val or 4))), ) code_path = res.get("download_main") or "" zip_path = res.get("zip_path") or "" return ( res.get("code", ""), res.get("status", ""), res.get("attempts", 0), res.get("stdout", ""), res.get("stderr", ""), gr.update(value=code_path if code_path and os.path.exists(code_path) else None), gr.update(value=zip_path if zip_path and os.path.exists(zip_path) else None), res.get("workdir", ""), res.get("logs", ""), ) run_evt = run_btn.click( do_run, inputs=[provider, model, task_input, attach_files, max_attempts, timeout_sec, required_kws, expected_stdout, cli_args, baseline_code, global_hint, team_enable_ck, team_df, team_pri, team_conc] + all_key_inputs, outputs=[code_out, status_out, attempts_out, stdout_out, stderr_out, download_file, download_zip, run_folder, debug_logs_display], ) run_evt.then( update_run_links, inputs=[code_out, status_out, attempts_out, stdout_out, stderr_out, download_file, download_zip, run_folder, debug_logs_display], outputs=[download_main_link, download_zip_link], ) run_evt.then( lambda status, attempts, stdout, stderr, run_dir, code: ( (lambda out_snip, err_snip, run_name: f"状态: {status} | 尝试: {attempts}\n运行目录: {run_name}\n--- STDOUT 片段 ---\n{out_snip}\n--- STDERR 片段 ---\n{err_snip}")( (stdout or "").strip()[:400] + ("…" if len((stdout or "").strip()) > 400 else ""), (stderr or "").strip()[:400] + ("…" if len((stderr or "").strip()) > 400 else ""), (Path(run_dir).name if (run_dir or "").strip() else "(未生成)"), ) ), inputs=[status_out, attempts_out, stdout_out, stderr_out, run_folder, code_out], outputs=[last_run_summary], ) def stop_watch(): global WATCH_STOP WATCH_STOP = True return "🛑 已停止查看实时日志" stop_watch_btn.click(stop_watch, outputs=[status_out]) def stream_logs(): global WATCH_STOP WATCH_STOP = False last_emit = "" while not WATCH_STOP: stdout_t = read_tail(str(Path(CURRENT_RUN_DIR, "stdout.txt")), 800_000) if CURRENT_RUN_DIR else "" stderr_t = read_tail(str(Path(CURRENT_RUN_DIR, "stderr.txt")), 800_000) if CURRENT_RUN_DIR else "" pip_t = read_tail(str(Path(CURRENT_RUN_DIR, "pip.log")), 400_000) if CURRENT_RUN_DIR else "" composed = [ "=== [STDOUT] ===\n" + (stdout_t or "(空)"), "\n=== [STDERR] ===\n" + (stderr_t or "(空)"), "\n=== [pip.log] ===\n" + (pip_t or "(空)"), ] txt = "\n".join(composed) if txt != last_emit: last_emit = txt yield txt time.sleep(0.5) yield last_emit or "(暂无日志)" watch_btn.click(stream_logs, outputs=[live_log]) stop_btn.click(lambda: stop_all(), outputs=[status_out]) # 9) 调试日志/日志管理/打包 refresh_debug_btn.click(lambda: get_debug_text(), outputs=[debug_logs_display]) copy_debug_btn.click(None, inputs=[debug_logs_display], js="(t)=>window.copyText(String(t||''))") clear_debug_btn.click(lambda: (DEBUG_BUFFER.clear() or True) and "", outputs=[debug_logs_display]) def list_log_files_ui() -> List[str]: files = [] try: for p in sorted(Path(LOGS_DIR).glob("*.log"), key=lambda x: x.stat().st_mtime, reverse=True): files.append(p.name) except Exception as e: debug("LOGS", "列出失败", {"err": str(e)}) return files def get_log_info_ui(fn: str) -> str: try: p = Path(LOGS_DIR) / fn if p.exists(): size = p.stat().st_size size_str = f"{size/1024:.2f}KB" if size > 1024 else f"{size}B" mtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.stat().st_mtime)) return f"🗎 {fn} | 大小: {size_str} | 修改: {mtime}" return "文件不存在" except Exception as e: return f"获取信息失败: {e}" def download_log_file_ui(fn: str): p = Path(LOGS_DIR) / fn return str(p) if p.exists() else None def read_log_tail_ui(fn: str, max_bytes=500_000) -> str: try: p = Path(LOGS_DIR) / fn if not p.exists(): return "文件不存在" b = p.read_bytes() if len(b) > max_bytes: b = b[-max_bytes:] return b.decode("utf-8", "replace") except Exception as e: return f"<读取失败: {e}>" def clear_all_logs_ui(): cnt = 0 for p in Path(LOGS_DIR).glob("*.log"): try: p.unlink() cnt += 1 except Exception: pass return (f"🧹 已清空 {cnt} 个日志", list_log_files_ui(), "", None, "(已清空)") def refresh_log_list(): files = list_log_files_ui() debug("LOGS", "刷新日志列表", {"count": len(files)}) return gr.update(choices=files, value=(files[0] if files else None)) logs_refresh_btn.click(refresh_log_list, outputs=[log_selector]) def log_selector_change(fn): info = get_log_info_ui(fn) prev = read_log_tail_ui(fn) p = download_log_file_ui(fn) link = build_file_link_html(p, f"下载 {fn}") if p else "(请选择日志)" return info, prev, gr.update(value=p) if p else gr.update(), link log_selector.change(log_selector_change, inputs=[log_selector], outputs=[log_info, log_preview, download_log, logs_download_link]) logs_download_btn.click( lambda fn: ( gr.update(value=download_log_file_ui(fn)) if fn else gr.update(), (build_file_link_html(download_log_file_ui(fn), f"下载 {fn}") if fn and download_log_file_ui(fn) else "(无文件)"), ), inputs=[log_selector], outputs=[download_log, logs_download_link], ) logs_clear_btn.click(clear_all_logs_ui, outputs=[status_out, log_selector, log_info, download_log, logs_download_link]) download_all_btn.click( lambda: (lambda z: (gr.update(value=z if z and os.path.exists(z) else None), (build_file_link_html(z, "下载全部日志 ZIP") if z else "(打包失败)")))(package_all_logs(CURRENT_RUN_DIR)), outputs=[download_all_file, download_all_link], ) def pack_and_show(run_dir): z = package_run_dir(run_dir) link = (build_file_link_html(z, "下载运行目录 ZIP") if z else "(无可下载ZIP)") return (f"{'✅ 已打包: ' + z if z else '❌ 打包失败/无目录'}", gr.update(value=z if z and os.path.exists(z) else None), link) pack_btn.click(pack_and_show, inputs=[run_folder], outputs=[pack_status, download_zip, download_zip_link]) try: # 10) 命令执行 def ai_suggest_commands(nl_task: str, provider: str, model: str, keys: dict) -> Tuple[str, List[str]]: if not (nl_task or "").strip(): return "❗ 请输入要执行的任务描述", [] prompt = f"""你是DevOps助手。根据用户需求,生成在Linux环境中执行的命令列表(尽量使用python -m pip而非pip),只返回JSON: {{ "commands": ["python -m pip install 包1", "python -m pip install 包2"] }} 不要解释。用户需求:{nl_task}""" out = llm_call(provider, model, prompt, keys, req_timeout=60) try: j = json.loads(out) cmds = j.get("commands", []) if not isinstance(cmds, list): cmds = [] return json.dumps(j, ensure_ascii=False, indent=2), cmds except Exception: lines = [s.strip() for s in str(out).splitlines() if s.strip() and not s.strip().startswith("#")] return out, lines def refresh_cmd_choices(src, coder_hist, general_hist): hist = coder_hist if src == "编程对话" else general_hist choices = chat_choices_from_hist(hist) default = choices[0] if choices else None preview = (preview_chat_by_choice(hist, default) if default else "(没有可用历史)") return gr.update(choices=choices, value=default), preview app.load(lambda: refresh_cmd_choices("编程对话", LAST_CHAT_CODER, LAST_CHAT_GENERAL), outputs=[chat_for_cmd_select, selected_chat_preview]) cmd_ctx_src.change( refresh_cmd_choices, inputs=[cmd_ctx_src, history_coder_str, history_general_str], outputs=[chat_for_cmd_select, selected_chat_preview], ) chat_for_cmd_select.change( lambda src, coder_hist, general_hist, choice: preview_chat_by_choice(coder_hist if src == "编程对话" else general_hist, choice), inputs=[cmd_ctx_src, history_coder_str, history_general_str, chat_for_cmd_select], outputs=[selected_chat_preview], ) ai_from_chat_btn.click( lambda src, coder_hist, general_hist, choice, pvd, mdl, last_sum, *kv: ( (lambda hist: (lambda j, cmds: ("✅ 已基于对话生成命令(请审阅后执行)", j, cmds))( *ai_suggest_commands( f"""根据以下对话内容,生成用于“修复/运行/准备环境”的命令列表(Linux),请尽量使用 python -m pip 而非 pip,仅返回JSON: {{ "commands": ["python -m pip install 包1", "python -m pip install 包2"] }} 不要解释。 【最近任务摘要】\n{(last_sum or '').strip() or '(无)'}\n\n{preview_chat_by_choice(hist, choice)}""", pvd, mdl, gather_keys_func(*kv), ) ))(coder_hist if src == "编程对话" else general_hist) ), inputs=[cmd_ctx_src, history_coder_str, history_general_str, chat_for_cmd_select, provider, model, last_run_summary] + all_key_inputs, outputs=[shell_output, ai_cmds_json, ai_cmds_state], ) ai_from_all_chat_btn.click( lambda src, coder_hist, general_hist, pvd, mdl, last_sum, *kv: (lambda content: ai_suggest_commands( f"""根据整段历史对话,生成用于“修复/运行/准备环境”的命令列表(Linux),要求: - 尽量使用 python -m pip 而非 pip - 谨慎使用 &&、|、重定向,默认不使用,除非必须 - 只返回JSON(包含 commands 数组) 输入对话: {content} 最近任务摘要(可选): {last_sum or '(无)'}""", pvd, mdl, gather_keys_func(*kv), ))("\n\n".join([f"【用户】\n{t['user']}\n\n【AI】\n{t['assistant']}" for t in parse_chat_history(coder_hist if src == "编程对话" else general_hist)])), inputs=[cmd_ctx_src, history_coder_str, history_general_str, provider, model, last_run_summary] + all_key_inputs, outputs=[ai_cmds_json, shell_output], ) ai_suggest_btn.click( lambda nl, pvd, mdl, *kv: (lambda res: ("✅ 已生成命令(请审阅后执行)", res[0], res[1]))( ai_suggest_commands(nl, pvd, mdl, gather_keys_func(*kv))), inputs=[ai_task, provider, model] + all_key_inputs, outputs=[shell_output, ai_cmds_json, ai_cmds_state], ) ai_run_btn.click( lambda json_text, cmds, enable, full_shell: ("❗ 未勾选“允许AI执行命令”" if not enable else (lambda commands: run_commands(commands, bool(full_shell)))( json.loads(json_text).get("commands", cmds) if (json_text or "").strip() else (cmds or []) )), inputs=[ai_cmds_json, ai_cmds_state, ai_enable_shell, ai_full_shell], outputs=[shell_output], ) except NameError as e: debug("UI", "命令执行面板组件未定义,相关事件未绑定", {"err": str(e)}) # 11) 页面加载预设 def list_log_files_loader(): files = [] try: for p in sorted(Path(LOGS_DIR).glob("*.log"), key=lambda x: x.stat().st_mtime, reverse=True): files.append(p.name) except Exception as e: debug("LOGS", "列出失败", {"err": str(e)}) return gr.update(choices=files, value=(files[0] if files else None)) app.load(list_log_files_loader, outputs=[log_selector]) app.load( lambda: (read_tail(str(Path(LOGS_DIR) / f"debug-{datetime.now().strftime('%Y%m%d')}.log")) if Path(LOGS_DIR).exists() else ""), outputs=[debug_logs_display], ) app.load( lambda: (lambda ch: gr.update(choices=ch, value=([ch[0]] if ch else [])))(list_uploads_files_ui()), outputs=[hist_files_dropdown], ) app.load( lambda: (render_chat_html(LAST_CHAT_CODER), render_chat_html(LAST_CHAT_GENERAL), LAST_CHAT_CODER, LAST_CHAT_GENERAL), outputs=[coder_chat_html, chat_html, history_coder_str, history_general_str], ) app.load(ui_refresh_provider_and_list, outputs=[provider, custom_list]) app.load(preload_theme_settings, outputs=[bg_mode, dynamic_css_html]) app.load( lambda: (lambda k=load_all(): [ k.get("gemini_key", ""), k.get("openai_key", ""), k.get("openai_base", DEFAULT_BASES["openai"]), k.get("anthropic_key", ""), k.get("cohere_key", ""), k.get("groq_key", ""), k.get("groq_base", DEFAULT_BASES["groq"]), k.get("mistral_key", ""), k.get("mistral_base", DEFAULT_BASES["mistral"]), k.get("deepseek_key", ""), k.get("deepseek_base", DEFAULT_BASES["deepseek"]), k.get("openrouter_key", ""), k.get("openrouter_base", DEFAULT_BASES["openrouter"]), k.get("openrouter_referer", ""), k.get("openrouter_title", ""), k.get("perplexity_key", ""), k.get("perplexity_base", DEFAULT_BASES["perplexity"]), k.get("xai_key", ""), k.get("xai_base", DEFAULT_BASES["xai"]), k.get("azure_key", ""), k.get("azure_base", DEFAULT_BASES["azure"]), k.get("azure_deployment", ""), k.get("azure_version", "2024-02-15-preview"), k.get("hf_token", ""), k.get("github_token", ""), k.get("siliconflow_key", ""), k.get("siliconflow_base", DEFAULT_BASES["siliconflow"]), ])(), outputs=all_key_inputs, ) # ====== 修正 llm_call(防止外层未捕获异常导致崩溃) ====== def _llm_call_fixed(provider: str, model: str, prompt: str, keys: dict, req_timeout: int = DEFAULT_REQ_TIMEOUT) -> str: try: return llm_call(provider, model, prompt, keys, req_timeout=req_timeout) except Exception as e: debug("LLM_ERR", "外层异常", {"err": str(e), "trace": traceback.format_exc()}) return f"❗ 请求异常:{e}" # 如需严格覆盖上面的 llm_call,可启用: # llm_call = _llm_call_fixed # ========================== 启动 ========================== # 全局错误调试 app.load(None, None, None, js=""" () => { window.addEventListener('error', (e) => { console.error('Global error:', e.message, e.filename, e.lineno); }); console.log('Error logging enabled'); } """) if __name__ == "__main__": debug("INFO", "应用启动", {"storage_root": STORAGE_ROOT}) app.queue() port = int(os.getenv("PORT", "7860")) try: app.launch( server_name="0.0.0.0", server_port=port, allowed_paths=[STORAGE_ROOT, RUN_ROOT, LOGS_DIR, PROJECT_ROOT, UPLOADS_DIR], ) except TypeError: app.launch(server_name="0.0.0.0", server_port=port)