Spaces:
Running
Running
fix: agent_run param mismatch (send agent_name) + add GitHub push-update (3 inputs: repo name, token, username; --force-with-lease)
b87f702 verified | """FastAPI / Gradio Server routes. | |
| Defines all HTTP and API endpoints: | |
| - GET / β serves the index.html frontend | |
| - GET /api/model-status β model loading status | |
| - GET /images/{f} β serve generated plot images | |
| - GET /download/{f} β serve project ZIP downloads | |
| - API web_search β Google search scraping | |
| - API chat β streaming chat with code execution | |
| - API push_hf β push to HuggingFace Hub | |
| - API switch_model β switch between loaded models | |
| - API upload_image β upload image for VLM inference | |
| - API hf_auth β get HF OAuth profile & organizations | |
| - API agent_run β Claude Code-style agent loop with tools | |
| - API list_skills β list available skills | |
| - API list_commandsβ list available slash commands | |
| - API list_hooks β list configured hooks | |
| - API workspace_treeβ list workspace files | |
| - API workspace_readβ read a workspace file | |
| - API workspace_writeβ write a workspace file | |
| - API workspace_bashβ run a bash command in workspace | |
| - API todo_read β read current todo list | |
| - API todo_write β update todo list | |
| - API import_github β clone a GitHub repo into the workspace | |
| - API github_url_examples β return accepted GitHub URL formats | |
| - API push_github β push the current workspace to a GitHub repo | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| try: | |
| from gradio import Server | |
| except ImportError: | |
| # Fallback for older/newer Gradio versions where Server may not be exposed | |
| # at the top level. We provide a minimal shim so the module can still be | |
| # imported for testing purposes. | |
| class Server: # type: ignore | |
| """Minimal shim for Gradio Server when not available.""" | |
| def __init__(self, *args, **kwargs): | |
| from fastapi import FastAPI | |
| self._fastapi = FastAPI() | |
| def get(self, path: str, **kwargs): | |
| return self._fastapi.get(path, **kwargs) | |
| def api(self, name: str = None, concurrency_limit: int = 1): | |
| def decorator(fn): | |
| # Store as attribute so it can be inspected | |
| fn._api_name = name | |
| fn._concurrency_limit = concurrency_limit | |
| return fn | |
| return decorator | |
| from code.config.constants import ( | |
| APP_TITLE, | |
| DEFAULT_MODEL_KEY, | |
| EXAMPLE_PROMPTS, | |
| LANGUAGE_OPTIONS, | |
| MODEL_CONFIGS, | |
| MODEL_URL, | |
| PY_TIMEOUT_S, | |
| ) | |
| from code.execution.code_extractor import ( | |
| build_iframe, | |
| extract_code, | |
| extract_multi_file, | |
| is_gradio_code, | |
| normalize_language, | |
| strip_thinking_blocks, | |
| ) | |
| from code.execution.gradio_runner import run_gradio_app, stop_gradio_app | |
| from code.execution.python_runner import run_python | |
| from code.huggingface.push import create_project_zip, push_to_huggingface | |
| from code.model.loader import ( | |
| get_model_status, | |
| is_model_loaded, | |
| get_current_model_key, | |
| get_current_model_type, | |
| switch_model, | |
| ) | |
| from code.model.inference import call_model | |
| from code.server.chat_helpers import chat_history_to_messages, targeted_prompt | |
| from code.websearch.google_scraper import web_search_google, format_search_results | |
| logger = logging.getLogger(__name__) | |
| # βββ Served Files Registry ββββββββββββββββββββββββββββββββββββββββββββββ | |
| _served_files: dict[str, str] = {} | |
| # βββ Uploaded Images Registry βββββββββββββββββββββββββββββββββββββββββββ | |
| _uploaded_images: dict[str, str] = {} | |
| # βββ Server Instance ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = Server() | |
| # βββ HTTP Routes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def homepage(): | |
| """Serve the index.html frontend with runtime config injected.""" | |
| html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html") | |
| with open(html_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # Load skills, commands, hooks for the frontend | |
| try: | |
| from code.skills import list_skills | |
| skills_list = list_skills() | |
| except Exception: | |
| skills_list = [] | |
| try: | |
| from code.commands import list_commands | |
| commands_list = list_commands() | |
| except Exception: | |
| commands_list = [] | |
| try: | |
| from code.hooks import list_hooks | |
| hooks_list = list_hooks() | |
| except Exception: | |
| hooks_list = [] | |
| try: | |
| from code.agents import list_agents, get_active_agent | |
| agents_list = list_agents() | |
| active_agent = get_active_agent() | |
| except Exception: | |
| agents_list = [] | |
| active_agent = None | |
| config = json.dumps({ | |
| "app_title": APP_TITLE, | |
| "model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"], | |
| "model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()}, | |
| "model_url": MODEL_URL, | |
| "languages": LANGUAGE_OPTIONS, | |
| "examples": [ | |
| {"label": label, "prompt": prompt, "language": lang, "framework": fw} | |
| for label, prompt, lang, fw in EXAMPLE_PROMPTS | |
| ], | |
| "default_model": "minicpm5-1b", | |
| "skills": skills_list, | |
| "commands": commands_list, | |
| "hooks": hooks_list, | |
| "agents": agents_list, | |
| "active_agent": active_agent, | |
| }) | |
| content = content.replace("__RUNTIME_CONFIG__", config) | |
| return content | |
| async def model_status_endpoint(): | |
| """Return the current model loading status.""" | |
| return get_model_status() | |
| async def serve_image(filename: str): | |
| """Serve a generated plot image by filename.""" | |
| path = _served_files.get(f"img:{filename}") | |
| if path and os.path.exists(path): | |
| return FileResponse(path, media_type="image/png") | |
| return HTMLResponse("Not found", status_code=404) | |
| async def serve_download(filename: str): | |
| """Serve a project ZIP download by filename.""" | |
| path = _served_files.get(f"dl:{filename}") | |
| if path and os.path.exists(path): | |
| return FileResponse(path, filename=filename, media_type="application/octet-stream") | |
| return HTMLResponse("Not found", status_code=404) | |
| async def serve_uploaded_image(image_id: str): | |
| """Serve an uploaded image by its ID.""" | |
| path = _uploaded_images.get(image_id) | |
| if path and os.path.exists(path): | |
| return FileResponse(path, media_type="image/png") | |
| return HTMLResponse("Not found", status_code=404) | |
| # βββ Gradio API Endpoints ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def handle_switch_model(model_key: str) -> str: | |
| """Switch to a different model.""" | |
| result = switch_model(model_key) | |
| yield json.dumps(result) | |
| def handle_upload_image(image_data: str) -> str: | |
| """Upload a base64-encoded image for VLM inference. | |
| Returns an image ID that can be referenced in chat. | |
| """ | |
| try: | |
| if not image_data: | |
| yield json.dumps({"success": False, "message": "No image data provided"}) | |
| return | |
| # Handle data URI format: data:image/png;base64,... | |
| if image_data.startswith("data:"): | |
| # Extract the base64 part | |
| parts = image_data.split(",", 1) | |
| if len(parts) == 2: | |
| image_data = parts[1] | |
| # Decode base64 | |
| image_bytes = base64.b64decode(image_data) | |
| # Save to temp file | |
| img_dir = tempfile.mkdtemp(prefix="uploaded_img_") | |
| image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}" | |
| img_path = os.path.join(img_dir, f"{image_id}.png") | |
| Path(img_path).write_bytes(image_bytes) | |
| # Register for serving | |
| _uploaded_images[image_id] = img_path | |
| # Create a URL for the image that the VLM can access | |
| image_url = f"/uploaded-images/{image_id}" | |
| # Also save as a file:// URL for local VLM access | |
| file_url = f"file://{img_path}" | |
| yield json.dumps({ | |
| "success": True, | |
| "image_id": image_id, | |
| "image_url": image_url, | |
| "file_url": file_url, | |
| "message": "Image uploaded successfully", | |
| }) | |
| except Exception as exc: | |
| logger.exception("Image upload failed") | |
| yield json.dumps({ | |
| "success": False, | |
| "message": f"Upload failed: {str(exc)}", | |
| }) | |
| def handle_web_search(query: str) -> str: | |
| """Search the web using Google scraping. No API key needed.""" | |
| query = (query or "").strip() | |
| if not query: | |
| yield json.dumps({"success": False, "results": [], "message": "Empty search query"}) | |
| return | |
| try: | |
| results = web_search_google(query, num_results=8) | |
| formatted = format_search_results(results) | |
| yield json.dumps({ | |
| "success": True, | |
| "results": results, | |
| "formatted": formatted, | |
| "message": f"Found {len(results)} results", | |
| }) | |
| except Exception as exc: | |
| logger.exception("Web search failed") | |
| yield json.dumps({ | |
| "success": False, | |
| "results": [], | |
| "message": f"Search failed: {str(exc)}", | |
| }) | |
| def handle_chat( | |
| prompt: str, | |
| target_language: str, | |
| target_framework: str, | |
| history_json: str, | |
| exec_context_json: str, | |
| search_enabled: str = "false", | |
| image_url: str = "", | |
| ) -> str: | |
| """Stream chat responses with code execution. Yields JSON strings.""" | |
| history = json.loads(history_json) if history_json else [] | |
| execution_context = json.loads(exec_context_json) if exec_context_json else {} | |
| prompt = (prompt or "").strip() | |
| if not prompt: | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": "Enter a prompt to get started.", | |
| "status_state": "info", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| # Check model status | |
| model_status = get_model_status() | |
| if model_status["status"] == "loading": | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": model_status["message"], | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| if model_status["status"] != "ready": | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": model_status["message"], | |
| "status_state": "error", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| # Add user message and placeholder assistant message | |
| history = list(history) + [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": ""}, | |
| ] | |
| yield json.dumps({ | |
| "type": "status", | |
| "status_text": "Thinking...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| # Web search if enabled | |
| search_context = "" | |
| if search_enabled.lower() == "true": | |
| yield json.dumps({ | |
| "type": "status", | |
| "status_text": "Searching the web...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| search_results = web_search_google(prompt, num_results=6) | |
| if search_results: | |
| search_context = format_search_results(search_results) | |
| yield json.dumps({ | |
| "type": "search_results", | |
| "status_text": f"Found {len(search_results)} results, generating code...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| "search_results": search_results, | |
| }) | |
| # Build messages for model | |
| model_history = list(history[:-1]) | |
| model_history[-1] = { | |
| "role": "user", | |
| "content": targeted_prompt( | |
| prompt, target_language, target_framework, execution_context, search_context | |
| ), | |
| } | |
| messages = chat_history_to_messages(model_history) | |
| # Determine image URL for VLM | |
| vlm_image_url = image_url.strip() if image_url else None | |
| final_response = "" | |
| for partial in call_model(messages, image_url=vlm_image_url): | |
| final_response = partial | |
| # Strip thinking blocks so chat only shows clean output | |
| clean_partial = strip_thinking_blocks(partial) | |
| history[-1]["content"] = clean_partial | |
| yield json.dumps({ | |
| "type": "streaming", | |
| "status_text": "Generating...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| if not final_response: | |
| history[-1]["content"] = "The model did not return a response." | |
| yield json.dumps({ | |
| "type": "error", | |
| "status_text": "No model response.", | |
| "status_state": "error", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| # Extract code from response (use cleaned version) | |
| clean_response = strip_thinking_blocks(final_response) | |
| code, fence_lang = extract_code(clean_response) | |
| target = normalize_language(target_language, fence_lang) | |
| # Also try multi-file extraction | |
| multi_files = extract_multi_file(clean_response) | |
| if not code and not multi_files: | |
| yield json.dumps({ | |
| "type": "complete", | |
| "status_text": "Answered without running code.", | |
| "status_state": "info", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| return | |
| yield json.dumps({ | |
| "type": "status", | |
| "status_text": "Running...", | |
| "status_state": "working", | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| # Execute code | |
| stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success" | |
| is_gradio = False | |
| gradio_url = None | |
| if target == "python" and code: | |
| if is_gradio_code(code) or target_framework == "Gradio": | |
| is_gradio = True | |
| gradio_result = run_gradio_app(code) | |
| if gradio_result["success"]: | |
| gradio_url = gradio_result["url"] | |
| status_text = f"Gradio app running at {gradio_url}" | |
| status_state = "success" | |
| stderr = f"Gradio app launched successfully at {gradio_url}" | |
| else: | |
| status_text = "Gradio launch failed" | |
| status_state = "error" | |
| stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed")) | |
| else: | |
| result = run_python(code) | |
| if result.timed_out: | |
| stdout, stderr, image_path = result.stdout, result.stderr, result.image_path | |
| status_text = f"Timed out after {PY_TIMEOUT_S}s" | |
| status_state = "error" | |
| elif result.returncode: | |
| stdout, stderr, image_path = result.stdout, result.stderr, result.image_path | |
| status_text = "Finished with errors" | |
| status_state = "error" | |
| else: | |
| stdout, stderr, image_path = result.stdout, result.stderr, result.image_path | |
| status_text = "Ran successfully" | |
| status_state = "success" | |
| # Register image for serving | |
| image_url_out = None | |
| if image_path: | |
| filename = os.path.basename(image_path) | |
| _served_files[f"img:{filename}"] = image_path | |
| image_url_out = f"/images/{filename}" | |
| # Register code for download | |
| download_url = None | |
| project_files = dict(multi_files) if multi_files else {} | |
| # Rename main.py β app.py for Python/Gradio projects (HF Spaces expects app.py) | |
| if project_files and "main.py" in project_files and "app.py" not in project_files: | |
| if target == "python" or is_gradio: | |
| project_files["app.py"] = project_files.pop("main.py") | |
| # If project_files is empty but we have single code, add it | |
| if not project_files and code: | |
| if target == "python": | |
| fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py" | |
| elif target in {"web", "html", "javascript"}: | |
| fname = "index.html" | |
| else: | |
| fname = f"main.{fence_lang or 'txt'}" | |
| project_files = {fname: code} | |
| if project_files: | |
| project_name = "generated-project" | |
| zip_path = create_project_zip(project_files, project_name) | |
| zip_filename = f"{project_name}.zip" | |
| _served_files[f"dl:{zip_filename}"] = zip_path | |
| download_url = f"/download/{zip_filename}" | |
| elif code: | |
| ext = "py" if target == "python" else "html" | |
| dl_filename = f"generated.{ext}" | |
| dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_") | |
| dl_path = os.path.join(dl_dir, dl_filename) | |
| Path(dl_path).write_text(code, encoding="utf-8") | |
| _served_files[f"dl:{dl_filename}"] = dl_path | |
| download_url = f"/download/{dl_filename}" | |
| # Determine if this is web previewable | |
| is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"} | |
| web_code = code if is_web else None | |
| execution_context = { | |
| "code": code, | |
| "target": target, | |
| "fence_lang": fence_lang or target, | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "image_url": image_url_out, | |
| "image_path": image_path, | |
| "status": status_text, | |
| "language": fence_lang or target, | |
| "suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console", | |
| "download_url": download_url, | |
| "project_files": project_files, | |
| "is_web": is_web, | |
| "web_code": web_code, | |
| "is_gradio": is_gradio, | |
| "gradio_url": gradio_url, | |
| } | |
| yield json.dumps({ | |
| "type": "complete", | |
| "status_text": status_text, | |
| "status_state": status_state, | |
| "history": history, | |
| "execution": execution_context, | |
| }) | |
| def handle_hf_auth( | |
| oauth_token: str = "", | |
| ) -> str: | |
| """Get HuggingFace OAuth profile and list of organizations. | |
| If oauth_token is provided (from Gradio OAuth), uses it to fetch user info. | |
| Otherwise, returns empty auth info. | |
| """ | |
| try: | |
| import gradio as gr | |
| from huggingface_hub import whoami | |
| token = oauth_token.strip() if oauth_token else "" | |
| if not token: | |
| yield json.dumps({ | |
| "authenticated": False, | |
| "username": "", | |
| "name": "", | |
| "picture": "", | |
| "organizations": [], | |
| "message": "Not signed in. Click Sign In to authenticate with HuggingFace.", | |
| }) | |
| return | |
| # Get user info using the OAuth token | |
| user_info = whoami(token=token) | |
| username = user_info.get("name", "") | |
| fullname = user_info.get("fullname", username) | |
| # Get avatar | |
| avatar_url = "" | |
| avatar_info = user_info.get("avatarUrl", "") | |
| if avatar_info: | |
| avatar_url = avatar_info | |
| # Get organizations | |
| orgs = [] | |
| for org in user_info.get("orgs", []): | |
| orgs.append({ | |
| "name": org.get("name", ""), | |
| "avatar": org.get("avatarUrl", ""), | |
| }) | |
| # Also check orgRoles for role info | |
| org_roles = user_info.get("orgRoles", []) | |
| for role_info in org_roles: | |
| org_name = role_info.get("org", "") | |
| role = role_info.get("role", "member") | |
| # Add role info to existing org if found | |
| for org in orgs: | |
| if org["name"] == org_name: | |
| org["role"] = role | |
| break | |
| yield json.dumps({ | |
| "authenticated": True, | |
| "username": username, | |
| "name": fullname, | |
| "picture": avatar_url, | |
| "organizations": orgs, | |
| "token": token, | |
| "message": f"Signed in as {username}", | |
| }) | |
| except Exception as exc: | |
| logger.exception("HF auth check failed") | |
| yield json.dumps({ | |
| "authenticated": False, | |
| "username": "", | |
| "name": "", | |
| "picture": "", | |
| "organizations": [], | |
| "message": f"Auth check failed: {str(exc)}", | |
| }) | |
| def handle_push_hf( | |
| exec_context_json: str, | |
| repo_name: str, | |
| hf_token: str, | |
| space_sdk: str = "auto", | |
| is_space: str = "true", | |
| ) -> str: | |
| """Push generated project to HuggingFace Hub.""" | |
| try: | |
| execution_context = json.loads(exec_context_json) if exec_context_json else {} | |
| project_files = dict(execution_context.get("project_files", {}) or {}) | |
| code = execution_context.get("code", "") | |
| # If project_files is empty but we have code, build files from code | |
| if not project_files and code: | |
| lang = execution_context.get("language", "python") | |
| is_gradio = execution_context.get("is_gradio", False) | |
| # Map language to entry file β JS/TS single-files get wrapped for Docker | |
| if lang in ("javascript", "js", "typescript", "ts"): | |
| # For single-file JS/TS code that is HTML (vanilla), keep as index.html | |
| if "<!doctype" in code.lower() or "<html" in code.lower(): | |
| filename = "index.html" | |
| else: | |
| filename = "index.js" | |
| elif lang in ("html", "web"): | |
| filename = "index.html" | |
| else: | |
| ext_map = { | |
| "python": "app.py", "py": "app.py", | |
| } | |
| filename = ext_map.get(lang, "app.py") | |
| project_files = {filename: code} | |
| # Auto-detect SDK for Gradio apps | |
| if is_gradio or is_gradio_code(code): | |
| space_sdk = "gradio" | |
| # If still no files, try extracting from the raw response | |
| if not project_files and code: | |
| project_files = extract_multi_file(code) | |
| if not project_files: | |
| yield json.dumps({ | |
| "success": False, | |
| "message": "No code to push. Generate some code first.", | |
| "url": "", | |
| }) | |
| return | |
| # "auto" SDK means let push_to_huggingface decide | |
| if space_sdk == "auto": | |
| space_sdk = "static" # push_to_huggingface will auto-detect from files | |
| project_name = repo_name.split("/")[-1] if "/" in repo_name else repo_name | |
| result = push_to_huggingface( | |
| files=project_files, | |
| project_name=project_name, | |
| repo_name=repo_name, | |
| hf_token=hf_token, | |
| space_sdk=space_sdk, | |
| is_space=is_space.lower() == "true", | |
| ) | |
| yield json.dumps(result) | |
| except Exception as exc: | |
| logger.exception("Push to HuggingFace failed") | |
| yield json.dumps({ | |
| "success": False, | |
| "message": f"Push failed: {str(exc)}", | |
| "url": "", | |
| }) | |
| def get_app() -> Server: | |
| """Return the configured Gradio Server app instance.""" | |
| return app | |
| # βββ Agent / Skills / Commands / Hooks / Workspace Endpoints ββββββββββ | |
| def handle_agent_run( | |
| prompt: str, | |
| target_language: str = "", | |
| target_framework: str = "", | |
| history_json: str = "[]", | |
| skills_json: str = "[]", | |
| search_enabled: str = "false", | |
| image_url: str = "", | |
| agent_name: str = "", | |
| ) -> str: | |
| """Run the Claude Code-style agent loop with tools. | |
| Yields JSON events: status, tool_call, tool_result, streaming, complete, error. | |
| `agent_name` (optional) overrides the session-active agent for this run. | |
| The `/agent use`, `/agent reset`, and `/agent delete` slash commands are | |
| intercepted here and dispatched to the agents module before the model runs. | |
| """ | |
| from code.agent import run_agent | |
| history = json.loads(history_json) if history_json else [] | |
| skills = json.loads(skills_json) if skills_json else [] | |
| prompt = (prompt or "").strip() | |
| if not prompt: | |
| yield json.dumps({ | |
| "type": "error", | |
| "message": "Empty prompt", | |
| }) | |
| return | |
| # ββ Intercept /agent use|reset|delete (session-state mutations) ββββ | |
| # These need to happen server-side BEFORE the model runs so the very | |
| # next prompt reflects the change. | |
| stripped = prompt.lstrip() | |
| if stripped.startswith("/agent ") or stripped == "/agent": | |
| from code.agents import ( | |
| set_active_agent, | |
| delete_agent as _delete_agent, | |
| list_agents as _list_agents, | |
| get_active_agent, | |
| ) | |
| parts = stripped.split(None, 2) # ["/agent", <sub>, <rest>] | |
| sub = parts[1] if len(parts) > 1 else "" | |
| arg = parts[2].strip() if len(parts) > 2 else "" | |
| if sub == "use" and arg: | |
| result = set_active_agent(arg) | |
| yield json.dumps({ | |
| "type": "complete", | |
| "content": ( | |
| f"**Agent activated: `{result.get('active_agent')}`**\n\n" | |
| + (result.get("message", "") if not result.get("success") else "Subsequent prompts will use this agent's persona and tool whitelist.") | |
| ), | |
| "agent": result.get("active_agent"), | |
| "agent_op": "use", | |
| }) | |
| return | |
| if sub == "reset" or (sub == "" and arg == ""): | |
| result = set_active_agent(None) | |
| yield json.dumps({ | |
| "type": "complete", | |
| "content": "**Active agent reset.** Subsequent prompts will use the default SoniCoder persona.", | |
| "agent": None, | |
| "agent_op": "reset", | |
| }) | |
| return | |
| if sub == "delete" and arg: | |
| result = _delete_agent(arg) | |
| if result.get("success"): | |
| yield json.dumps({ | |
| "type": "complete", | |
| "content": f"**Agent `{arg}` deleted.**", | |
| "agent": None, | |
| "agent_op": "delete", | |
| }) | |
| else: | |
| yield json.dumps({ | |
| "type": "error", | |
| "message": result.get("error", f"Failed to delete agent '{arg}'"), | |
| "agent_op": "delete", | |
| }) | |
| return | |
| if sub == "list": | |
| agents_list = _list_agents() | |
| if not agents_list: | |
| content = "_No agents available._ Create one with `/agent create <description>`." | |
| else: | |
| lines = ["| Name | Description | Author | Tools |", "|------|-------------|--------|-------|"] | |
| for a in agents_list: | |
| tools = ", ".join(a.get("tools", [])) or "(all)" | |
| active_marker = " **(active)**" if a.get("active") else "" | |
| lines.append(f"| `{a['name']}`{active_marker} | {a.get('description', '')[:80]} | {a.get('author', '')} | {tools} |") | |
| content = "\n".join(lines) | |
| yield json.dumps({ | |
| "type": "complete", | |
| "content": content, | |
| "agents": agents_list, | |
| "agent_op": "list", | |
| }) | |
| return | |
| # /agent create|show β fall through to the model (handled by slash command expansion) | |
| # Optional web search | |
| search_context = "" | |
| if search_enabled.lower() == "true": | |
| try: | |
| search_results = web_search_google(prompt, num_results=6) | |
| if search_results: | |
| search_context = format_search_results(search_results) | |
| yield json.dumps({ | |
| "type": "search_results", | |
| "results": search_results, | |
| "status_text": f"Found {len(search_results)} results, running agent...", | |
| }) | |
| except Exception as exc: | |
| logger.warning("Web search failed: %s", exc) | |
| try: | |
| for event in run_agent( | |
| user_input=prompt, | |
| history=history, | |
| target_language=target_language, | |
| target_framework=target_framework, | |
| skills=skills, | |
| search_context=search_context, | |
| image_url=image_url.strip() or None, | |
| agent_name=agent_name.strip() or None, | |
| ): | |
| yield json.dumps(event, default=str) | |
| except Exception as exc: | |
| logger.exception("Agent run failed") | |
| yield json.dumps({ | |
| "type": "error", | |
| "message": str(exc), | |
| }) | |
| def handle_list_skills() -> str: | |
| """List all available skills.""" | |
| from code.skills import list_skills | |
| skills = list_skills() | |
| yield json.dumps({"success": True, "skills": skills}) | |
| def handle_list_commands() -> str: | |
| """List all available slash commands.""" | |
| from code.commands import list_commands | |
| commands = list_commands() | |
| yield json.dumps({"success": True, "commands": commands}) | |
| def handle_list_hooks() -> str: | |
| """List all configured hooks.""" | |
| from code.hooks import list_hooks | |
| hooks = list_hooks() | |
| yield json.dumps({"success": True, "hooks": hooks}) | |
| def handle_workspace_tree() -> str: | |
| """Return the workspace file tree.""" | |
| from code.tools.fs import list_workspace_tree | |
| result = list_workspace_tree() | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_read(path: str, offset: int = 0, limit: int = 0) -> str: | |
| """Read a file from the workspace.""" | |
| from code.tools.fs import read_file | |
| args = {"path": path} | |
| if offset: | |
| args["offset"] = offset | |
| if limit: | |
| args["limit"] = limit | |
| result = read_file(**args) | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_write(path: str, content: str) -> str: | |
| """Write a file to the workspace.""" | |
| from code.tools.fs import write_file | |
| result = write_file(path=path, content=content) | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_bash(command: str, timeout: int = 30) -> str: | |
| """Run a bash command in the workspace.""" | |
| from code.tools.bash import run_bash | |
| result = run_bash(command=command, timeout=timeout) | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_edit( | |
| path: str, | |
| old_str: str, | |
| new_str: str, | |
| replace_all: str = "false", | |
| ) -> str: | |
| """Edit a file in the workspace.""" | |
| from code.tools.fs import edit_file | |
| result = edit_file( | |
| path=path, | |
| old_str=old_str, | |
| new_str=new_str, | |
| replace_all=replace_all.lower() == "true", | |
| ) | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_glob(pattern: str, path: str = ".") -> str: | |
| """Glob files in the workspace.""" | |
| from code.tools.fs import glob_paths | |
| result = glob_paths(pattern=pattern, path=path) | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_grep( | |
| pattern: str, | |
| path: str = ".", | |
| include: str = "", | |
| ignore_case: str = "false", | |
| ) -> str: | |
| """Grep file contents in the workspace.""" | |
| from code.tools.fs import grep_search | |
| result = grep_search( | |
| pattern=pattern, | |
| path=path, | |
| include=include or None, | |
| ignore_case=ignore_case.lower() == "true", | |
| ) | |
| yield json.dumps(result, default=str) | |
| def handle_todo_read(session_id: str = "default") -> str: | |
| """Read the current todo list.""" | |
| from code.tools.todos import todo_read | |
| result = todo_read(session_id=session_id) | |
| yield json.dumps(result, default=str) | |
| def handle_todo_write(todos_json: str, session_id: str = "default") -> str: | |
| """Replace the todo list.""" | |
| from code.tools.todos import todo_write | |
| todos = json.loads(todos_json) if todos_json else [] | |
| result = todo_write(todos=todos, session_id=session_id) | |
| yield json.dumps(result, default=str) | |
| def handle_workspace_snapshot() -> str: | |
| """Return all workspace files for ZIP/deploy.""" | |
| from code.tools.fs import snapshot_workspace | |
| files = snapshot_workspace() | |
| yield json.dumps({"success": True, "files": files, "count": len(files)}) | |
| def handle_workspace_reset() -> str: | |
| """Clear the workspace.""" | |
| from code.tools.fs import reset_workspace | |
| result = reset_workspace() | |
| yield json.dumps(result, default=str) | |
| def handle_create_hook( | |
| name: str, | |
| event: str, | |
| pattern: str, | |
| action: str = "warn", | |
| message: str = "", | |
| enabled: str = "true", | |
| ) -> str: | |
| """Create a new user hook.""" | |
| from code.hooks import create_hook | |
| result = create_hook( | |
| name=name, | |
| event=event, | |
| pattern=pattern, | |
| action=action, | |
| message=message, | |
| enabled=enabled.lower() == "true", | |
| ) | |
| yield json.dumps(result, default=str) | |
| def handle_delete_hook(name: str) -> str: | |
| """Delete a user hook by name.""" | |
| from code.hooks import delete_hook | |
| result = delete_hook(name) | |
| yield json.dumps(result, default=str) | |
| # βββ Custom Agent Endpoints ββββββββββββββββββββββββββββββββββββββββββββ | |
| def handle_list_agents() -> str: | |
| """List all available custom agents (builtins + user).""" | |
| from code.agents import list_agents, get_active_agent | |
| agents = list_agents() | |
| active = get_active_agent() | |
| yield json.dumps({ | |
| "success": True, | |
| "agents": agents, | |
| "active_agent": active, | |
| }, default=str) | |
| def handle_get_agent(name: str) -> str: | |
| """Get the full definition of a single agent.""" | |
| from code.agents import get_agent | |
| agent = get_agent(name) | |
| if not agent: | |
| yield json.dumps({"success": False, "error": f"Agent not found: {name}"}) | |
| return | |
| # Strip non-serializable path | |
| agent_serializable = {k: v for k, v in agent.items() if k != "path"} | |
| yield json.dumps({"success": True, "agent": agent_serializable}, default=str) | |
| def handle_save_agent( | |
| name: str, | |
| description: str, | |
| body: str, | |
| tools: str = "", | |
| skills: str = "", | |
| temperature: str = "", | |
| max_iterations: str = "", | |
| tags: str = "", | |
| author: str = "user", | |
| ) -> str: | |
| """Create or overwrite a custom agent definition (manual save, no AI). | |
| `tools`, `skills`, `tags` are comma-separated strings. `temperature` and | |
| `max_iterations` are strings that will be parsed if non-empty. | |
| """ | |
| from code.agents import save_agent, ALL_TOOLS | |
| def _split(s: str) -> list[str]: | |
| return [x.strip() for x in (s or "").split(",") if x.strip()] | |
| tools_list = _split(tools) or list(ALL_TOOLS) | |
| skills_list = _split(skills) | |
| tags_list = _split(tags) | |
| temp_val = None | |
| if temperature.strip(): | |
| try: | |
| temp_val = float(temperature) | |
| except ValueError: | |
| yield json.dumps({"success": False, "error": f"Invalid temperature: {temperature}"}) | |
| return | |
| iter_val = None | |
| if max_iterations.strip(): | |
| try: | |
| iter_val = int(max_iterations) | |
| except ValueError: | |
| yield json.dumps({"success": False, "error": f"Invalid max_iterations: {max_iterations}"}) | |
| return | |
| result = save_agent( | |
| name=name, | |
| description=description, | |
| body=body, | |
| tools=tools_list, | |
| skills=skills_list, | |
| temperature=temp_val, | |
| max_iterations=iter_val, | |
| tags=tags_list, | |
| author=author, | |
| ) | |
| yield json.dumps(result, default=str) | |
| def handle_delete_agent(name: str) -> str: | |
| """Delete a user-defined agent by name.""" | |
| from code.agents import delete_agent | |
| result = delete_agent(name) | |
| yield json.dumps(result, default=str) | |
| def handle_set_active_agent(name: str = "") -> str: | |
| """Set the active agent for subsequent prompts. Empty string resets.""" | |
| from code.agents import set_active_agent, list_agents, get_active_agent | |
| result = set_active_agent(name.strip() or None) | |
| if not result.get("success"): | |
| yield json.dumps(result, default=str) | |
| return | |
| # Return fresh list + active agent so frontend can re-render | |
| yield json.dumps({ | |
| **result, | |
| "agents": list_agents(), | |
| "active_agent": get_active_agent(), | |
| }, default=str) | |
| # βββ GitHub Import Endpoint ββββββββββββββββββββββββββββββββββββββββββββ | |
| def handle_import_github( | |
| url: str, | |
| branch: str = "", | |
| subdir: str = "", | |
| target_subdir: str = "", | |
| depth: str = "1", | |
| timeout: str = "120", | |
| ) -> str: | |
| """Clone a GitHub repo into the sandboxed workspace. | |
| Parameters | |
| ---------- | |
| url : str | |
| GitHub URL. Accepts: | |
| - https://github.com/<owner>/<repo>[.git] | |
| - https://github.com/<owner>/<repo>/tree/<branch>[/<subdir>] | |
| - git@github.com:<owner>/<repo>.git | |
| branch : str | |
| Optional branch/tag override. If empty, uses URL's branch or the | |
| repo's default branch. | |
| subdir : str | |
| Optional sub-directory inside the repo to import. | |
| target_subdir : str | |
| Where inside the workspace to place the import. Empty = root. | |
| depth : str | |
| Git clone depth (default "1" for shallow clone). | |
| timeout : str | |
| Git clone timeout in seconds (default "120"). | |
| Yields | |
| ------ | |
| JSON dict with keys: success, message, url, owner, repo, branch, | |
| subdir, files_imported, dirs_skipped, workspace_path, tree_preview. | |
| """ | |
| from code.tools.github import import_github_repo | |
| try: | |
| depth_int = int(depth) if str(depth).strip() else 1 | |
| depth_int = max(1, min(50, depth_int)) | |
| except (ValueError, TypeError): | |
| depth_int = 1 | |
| try: | |
| timeout_int = int(timeout) if str(timeout).strip() else 120 | |
| timeout_int = max(10, min(600, timeout_int)) | |
| except (ValueError, TypeError): | |
| timeout_int = 120 | |
| result = import_github_repo( | |
| url=url, | |
| branch=branch, | |
| subdir=subdir, | |
| target_subdir=target_subdir, | |
| depth=depth_int, | |
| timeout=timeout_int, | |
| ) | |
| yield json.dumps(result, default=str) | |
| def handle_github_url_examples() -> str: | |
| """Return example GitHub URL formats accepted by import_github.""" | |
| from code.tools.github import list_github_url_examples | |
| result = list_github_url_examples() | |
| yield json.dumps(result, default=str) | |
| def handle_push_github( | |
| repo_name: str, | |
| github_token: str, | |
| username: str, | |
| branch: str = "main", | |
| commit_message: str = "", | |
| timeout: str = "120", | |
| ) -> str: | |
| """Push the current workspace to a GitHub repo. | |
| Requires only 3 user inputs (repo_name, github_token, username) plus | |
| optional branch / commit_message / timeout. The workspace is snapshotted | |
| (via `snapshot_workspace`), written into a fresh git repo in a temp dir, | |
| committed, and pushed to `https://github.com/<username>/<repo_name>.git` | |
| using HTTPS basic auth with the token. | |
| The push uses `--force-with-lease` so it replaces the remote tip with the | |
| SoniCoder workspace contents. If the remote doesn't exist yet (no refs | |
| to lease against), it retries with a plain push. | |
| Yields | |
| ------ | |
| JSON dict with keys: success, message, repo_full_name, branch, | |
| commit_sha, commit_url, repo_url, files_pushed, error (on failure). | |
| """ | |
| from code.tools.github import push_to_github | |
| try: | |
| timeout_int = int(timeout) if str(timeout).strip() else 120 | |
| timeout_int = max(10, min(600, timeout_int)) | |
| except (ValueError, TypeError): | |
| timeout_int = 120 | |
| result = push_to_github( | |
| repo_name=repo_name, | |
| github_token=github_token, | |
| username=username, | |
| branch=branch or "main", | |
| commit_message=commit_message or "", | |
| timeout=timeout_int, | |
| ) | |
| yield json.dumps(result, default=str) | |