"""FastAPI / Gradio Server routes. Defines all HTTP and API endpoints: - GET / → serves the index.html frontend - GET /api/model-status → model loading status - GET /images/{f} → serve generated plot images - GET /download/{f} → serve project ZIP downloads - API web_search → Google search scraping - API chat → streaming chat with code execution - API push_hf → push to HuggingFace Hub - API switch_model → switch between loaded models - API upload_image → upload image for VLM inference - API hf_auth → get HF OAuth profile & organizations - API agent_run → Claude Code-style agent loop with tools - API list_skills → list available skills - API list_commands→ list available slash commands - API list_hooks → list configured hooks - API workspace_tree→ list workspace files - API workspace_read→ read a workspace file - API workspace_write→ write a workspace file - API workspace_bash→ run a bash command in workspace - API todo_read → read current todo list - API todo_write → update todo list - API import_github → clone a GitHub repo into the workspace - API github_url_examples → return accepted GitHub URL formats - API push_github → push the current workspace to a GitHub repo """ from __future__ import annotations import base64 import json import logging import os import tempfile from pathlib import Path from typing import Any from fastapi.responses import HTMLResponse, FileResponse try: from gradio import Server except ImportError: # Fallback for older/newer Gradio versions where Server may not be exposed # at the top level. We provide a minimal shim so the module can still be # imported for testing purposes. class Server: # type: ignore """Minimal shim for Gradio Server when not available.""" def __init__(self, *args, **kwargs): from fastapi import FastAPI self._fastapi = FastAPI() def get(self, path: str, **kwargs): return self._fastapi.get(path, **kwargs) def api(self, name: str = None, concurrency_limit: int = 1): def decorator(fn): # Store as attribute so it can be inspected fn._api_name = name fn._concurrency_limit = concurrency_limit return fn return decorator from code.config.constants import ( APP_TITLE, DEFAULT_MODEL_KEY, EXAMPLE_PROMPTS, LANGUAGE_OPTIONS, MODEL_CONFIGS, MODEL_URL, PY_TIMEOUT_S, ) from code.execution.code_extractor import ( build_iframe, extract_code, extract_multi_file, is_gradio_code, normalize_language, strip_thinking_blocks, ) from code.execution.gradio_runner import run_gradio_app, stop_gradio_app from code.execution.python_runner import run_python from code.huggingface.push import create_project_zip, push_to_huggingface from code.model.loader import ( get_model_status, is_model_loaded, get_current_model_key, get_current_model_type, switch_model, ) from code.model.inference import call_model from code.server.chat_helpers import chat_history_to_messages, targeted_prompt from code.websearch.google_scraper import web_search_google, format_search_results logger = logging.getLogger(__name__) # ─── Served Files Registry ────────────────────────────────────────────── _served_files: dict[str, str] = {} # ─── Uploaded Images Registry ─────────────────────────────────────────── _uploaded_images: dict[str, str] = {} # ─── Server Instance ──────────────────────────────────────────────────── app = Server() # ─── HTTP Routes ──────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def homepage(): """Serve the index.html frontend with runtime config injected.""" html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html") with open(html_path, "r", encoding="utf-8") as f: content = f.read() # Load skills, commands, hooks for the frontend try: from code.skills import list_skills skills_list = list_skills() except Exception: skills_list = [] try: from code.commands import list_commands commands_list = list_commands() except Exception: commands_list = [] try: from code.hooks import list_hooks hooks_list = list_hooks() except Exception: hooks_list = [] try: from code.agents import list_agents, get_active_agent agents_list = list_agents() active_agent = get_active_agent() except Exception: agents_list = [] active_agent = None config = json.dumps({ "app_title": APP_TITLE, "model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"], "model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()}, "model_url": MODEL_URL, "languages": LANGUAGE_OPTIONS, "examples": [ {"label": label, "prompt": prompt, "language": lang, "framework": fw} for label, prompt, lang, fw in EXAMPLE_PROMPTS ], "default_model": "minicpm5-1b", "skills": skills_list, "commands": commands_list, "hooks": hooks_list, "agents": agents_list, "active_agent": active_agent, }) content = content.replace("__RUNTIME_CONFIG__", config) return content @app.get("/api/model-status") async def model_status_endpoint(): """Return the current model loading status.""" return get_model_status() @app.get("/images/{filename}") async def serve_image(filename: str): """Serve a generated plot image by filename.""" path = _served_files.get(f"img:{filename}") if path and os.path.exists(path): return FileResponse(path, media_type="image/png") return HTMLResponse("Not found", status_code=404) @app.get("/download/{filename}") async def serve_download(filename: str): """Serve a project ZIP download by filename.""" path = _served_files.get(f"dl:{filename}") if path and os.path.exists(path): return FileResponse(path, filename=filename, media_type="application/octet-stream") return HTMLResponse("Not found", status_code=404) @app.get("/uploaded-images/{image_id}") async def serve_uploaded_image(image_id: str): """Serve an uploaded image by its ID.""" path = _uploaded_images.get(image_id) if path and os.path.exists(path): return FileResponse(path, media_type="image/png") return HTMLResponse("Not found", status_code=404) # ─── Gradio API Endpoints ────────────────────────────────────────────── @app.api(name="switch_model", concurrency_limit=1) def handle_switch_model(model_key: str) -> str: """Switch to a different model.""" result = switch_model(model_key) yield json.dumps(result) @app.api(name="upload_image", concurrency_limit=4) def handle_upload_image(image_data: str) -> str: """Upload a base64-encoded image for VLM inference. Returns an image ID that can be referenced in chat. """ try: if not image_data: yield json.dumps({"success": False, "message": "No image data provided"}) return # Handle data URI format: data:image/png;base64,... if image_data.startswith("data:"): # Extract the base64 part parts = image_data.split(",", 1) if len(parts) == 2: image_data = parts[1] # Decode base64 image_bytes = base64.b64decode(image_data) # Save to temp file img_dir = tempfile.mkdtemp(prefix="uploaded_img_") image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}" img_path = os.path.join(img_dir, f"{image_id}.png") Path(img_path).write_bytes(image_bytes) # Register for serving _uploaded_images[image_id] = img_path # Create a URL for the image that the VLM can access image_url = f"/uploaded-images/{image_id}" # Also save as a file:// URL for local VLM access file_url = f"file://{img_path}" yield json.dumps({ "success": True, "image_id": image_id, "image_url": image_url, "file_url": file_url, "message": "Image uploaded successfully", }) except Exception as exc: logger.exception("Image upload failed") yield json.dumps({ "success": False, "message": f"Upload failed: {str(exc)}", }) @app.api(name="web_search", concurrency_limit=4) def handle_web_search(query: str) -> str: """Search the web using Google scraping. No API key needed.""" query = (query or "").strip() if not query: yield json.dumps({"success": False, "results": [], "message": "Empty search query"}) return try: results = web_search_google(query, num_results=8) formatted = format_search_results(results) yield json.dumps({ "success": True, "results": results, "formatted": formatted, "message": f"Found {len(results)} results", }) except Exception as exc: logger.exception("Web search failed") yield json.dumps({ "success": False, "results": [], "message": f"Search failed: {str(exc)}", }) @app.api(name="chat", concurrency_limit=2) def handle_chat( prompt: str, target_language: str, target_framework: str, history_json: str, exec_context_json: str, search_enabled: str = "false", image_url: str = "", ) -> str: """Stream chat responses with code execution. Yields JSON strings.""" history = json.loads(history_json) if history_json else [] execution_context = json.loads(exec_context_json) if exec_context_json else {} prompt = (prompt or "").strip() if not prompt: yield json.dumps({ "type": "error", "status_text": "Enter a prompt to get started.", "status_state": "info", "history": history, "execution": execution_context, }) return # Check model status model_status = get_model_status() if model_status["status"] == "loading": yield json.dumps({ "type": "error", "status_text": model_status["message"], "status_state": "working", "history": history, "execution": execution_context, }) return if model_status["status"] != "ready": yield json.dumps({ "type": "error", "status_text": model_status["message"], "status_state": "error", "history": history, "execution": execution_context, }) return # Add user message and placeholder assistant message history = list(history) + [ {"role": "user", "content": prompt}, {"role": "assistant", "content": ""}, ] yield json.dumps({ "type": "status", "status_text": "Thinking...", "status_state": "working", "history": history, "execution": execution_context, }) # Web search if enabled search_context = "" if search_enabled.lower() == "true": yield json.dumps({ "type": "status", "status_text": "Searching the web...", "status_state": "working", "history": history, "execution": execution_context, }) search_results = web_search_google(prompt, num_results=6) if search_results: search_context = format_search_results(search_results) yield json.dumps({ "type": "search_results", "status_text": f"Found {len(search_results)} results, generating code...", "status_state": "working", "history": history, "execution": execution_context, "search_results": search_results, }) # Build messages for model model_history = list(history[:-1]) model_history[-1] = { "role": "user", "content": targeted_prompt( prompt, target_language, target_framework, execution_context, search_context ), } messages = chat_history_to_messages(model_history) # Determine image URL for VLM vlm_image_url = image_url.strip() if image_url else None final_response = "" for partial in call_model(messages, image_url=vlm_image_url): final_response = partial # Strip thinking blocks so chat only shows clean output clean_partial = strip_thinking_blocks(partial) history[-1]["content"] = clean_partial yield json.dumps({ "type": "streaming", "status_text": "Generating...", "status_state": "working", "history": history, "execution": execution_context, }) if not final_response: history[-1]["content"] = "The model did not return a response." yield json.dumps({ "type": "error", "status_text": "No model response.", "status_state": "error", "history": history, "execution": execution_context, }) return # Extract code from response (use cleaned version) clean_response = strip_thinking_blocks(final_response) code, fence_lang = extract_code(clean_response) target = normalize_language(target_language, fence_lang) # Also try multi-file extraction multi_files = extract_multi_file(clean_response) if not code and not multi_files: yield json.dumps({ "type": "complete", "status_text": "Answered without running code.", "status_state": "info", "history": history, "execution": execution_context, }) return yield json.dumps({ "type": "status", "status_text": "Running...", "status_state": "working", "history": history, "execution": execution_context, }) # Execute code stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success" is_gradio = False gradio_url = None if target == "python" and code: if is_gradio_code(code) or target_framework == "Gradio": is_gradio = True gradio_result = run_gradio_app(code) if gradio_result["success"]: gradio_url = gradio_result["url"] status_text = f"Gradio app running at {gradio_url}" status_state = "success" stderr = f"Gradio app launched successfully at {gradio_url}" else: status_text = "Gradio launch failed" status_state = "error" stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed")) else: result = run_python(code) if result.timed_out: stdout, stderr, image_path = result.stdout, result.stderr, result.image_path status_text = f"Timed out after {PY_TIMEOUT_S}s" status_state = "error" elif result.returncode: stdout, stderr, image_path = result.stdout, result.stderr, result.image_path status_text = "Finished with errors" status_state = "error" else: stdout, stderr, image_path = result.stdout, result.stderr, result.image_path status_text = "Ran successfully" status_state = "success" # Register image for serving image_url_out = None if image_path: filename = os.path.basename(image_path) _served_files[f"img:{filename}"] = image_path image_url_out = f"/images/{filename}" # Register code for download download_url = None project_files = dict(multi_files) if multi_files else {} # Rename main.py → app.py for Python/Gradio projects (HF Spaces expects app.py) if project_files and "main.py" in project_files and "app.py" not in project_files: if target == "python" or is_gradio: project_files["app.py"] = project_files.pop("main.py") # If project_files is empty but we have single code, add it if not project_files and code: if target == "python": fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py" elif target in {"web", "html", "javascript"}: fname = "index.html" else: fname = f"main.{fence_lang or 'txt'}" project_files = {fname: code} if project_files: project_name = "generated-project" zip_path = create_project_zip(project_files, project_name) zip_filename = f"{project_name}.zip" _served_files[f"dl:{zip_filename}"] = zip_path download_url = f"/download/{zip_filename}" elif code: ext = "py" if target == "python" else "html" dl_filename = f"generated.{ext}" dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_") dl_path = os.path.join(dl_dir, dl_filename) Path(dl_path).write_text(code, encoding="utf-8") _served_files[f"dl:{dl_filename}"] = dl_path download_url = f"/download/{dl_filename}" # Determine if this is web previewable is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"} web_code = code if is_web else None execution_context = { "code": code, "target": target, "fence_lang": fence_lang or target, "stdout": stdout, "stderr": stderr, "image_url": image_url_out, "image_path": image_path, "status": status_text, "language": fence_lang or target, "suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console", "download_url": download_url, "project_files": project_files, "is_web": is_web, "web_code": web_code, "is_gradio": is_gradio, "gradio_url": gradio_url, } yield json.dumps({ "type": "complete", "status_text": status_text, "status_state": status_state, "history": history, "execution": execution_context, }) @app.api(name="hf_auth", concurrency_limit=4) def handle_hf_auth( oauth_token: str = "", ) -> str: """Get HuggingFace OAuth profile and list of organizations. If oauth_token is provided (from Gradio OAuth), uses it to fetch user info. Otherwise, returns empty auth info. """ try: import gradio as gr from huggingface_hub import whoami token = oauth_token.strip() if oauth_token else "" if not token: yield json.dumps({ "authenticated": False, "username": "", "name": "", "picture": "", "organizations": [], "message": "Not signed in. Click Sign In to authenticate with HuggingFace.", }) return # Get user info using the OAuth token user_info = whoami(token=token) username = user_info.get("name", "") fullname = user_info.get("fullname", username) # Get avatar avatar_url = "" avatar_info = user_info.get("avatarUrl", "") if avatar_info: avatar_url = avatar_info # Get organizations orgs = [] for org in user_info.get("orgs", []): orgs.append({ "name": org.get("name", ""), "avatar": org.get("avatarUrl", ""), }) # Also check orgRoles for role info org_roles = user_info.get("orgRoles", []) for role_info in org_roles: org_name = role_info.get("org", "") role = role_info.get("role", "member") # Add role info to existing org if found for org in orgs: if org["name"] == org_name: org["role"] = role break yield json.dumps({ "authenticated": True, "username": username, "name": fullname, "picture": avatar_url, "organizations": orgs, "token": token, "message": f"Signed in as {username}", }) except Exception as exc: logger.exception("HF auth check failed") yield json.dumps({ "authenticated": False, "username": "", "name": "", "picture": "", "organizations": [], "message": f"Auth check failed: {str(exc)}", }) @app.api(name="push_hf", concurrency_limit=1) def handle_push_hf( exec_context_json: str, repo_name: str, hf_token: str, space_sdk: str = "auto", is_space: str = "true", ) -> str: """Push generated project to HuggingFace Hub.""" try: execution_context = json.loads(exec_context_json) if exec_context_json else {} project_files = dict(execution_context.get("project_files", {}) or {}) code = execution_context.get("code", "") # If project_files is empty but we have code, build files from code if not project_files and code: lang = execution_context.get("language", "python") is_gradio = execution_context.get("is_gradio", False) # Map language to entry file — JS/TS single-files get wrapped for Docker if lang in ("javascript", "js", "typescript", "ts"): # For single-file JS/TS code that is HTML (vanilla), keep as index.html if " Server: """Return the configured Gradio Server app instance.""" return app # ─── Agent / Skills / Commands / Hooks / Workspace Endpoints ────────── @app.api(name="agent_run", concurrency_limit=2) def handle_agent_run( prompt: str, target_language: str = "", target_framework: str = "", history_json: str = "[]", skills_json: str = "[]", search_enabled: str = "false", image_url: str = "", agent_name: str = "", ) -> str: """Run the Claude Code-style agent loop with tools. Yields JSON events: status, tool_call, tool_result, streaming, complete, error. `agent_name` (optional) overrides the session-active agent for this run. The `/agent use`, `/agent reset`, and `/agent delete` slash commands are intercepted here and dispatched to the agents module before the model runs. """ from code.agent import run_agent history = json.loads(history_json) if history_json else [] skills = json.loads(skills_json) if skills_json else [] prompt = (prompt or "").strip() if not prompt: yield json.dumps({ "type": "error", "message": "Empty prompt", }) return # ── Intercept /agent use|reset|delete (session-state mutations) ──── # These need to happen server-side BEFORE the model runs so the very # next prompt reflects the change. stripped = prompt.lstrip() if stripped.startswith("/agent ") or stripped == "/agent": from code.agents import ( set_active_agent, delete_agent as _delete_agent, list_agents as _list_agents, get_active_agent, ) parts = stripped.split(None, 2) # ["/agent", , ] sub = parts[1] if len(parts) > 1 else "" arg = parts[2].strip() if len(parts) > 2 else "" if sub == "use" and arg: result = set_active_agent(arg) yield json.dumps({ "type": "complete", "content": ( f"**Agent activated: `{result.get('active_agent')}`**\n\n" + (result.get("message", "") if not result.get("success") else "Subsequent prompts will use this agent's persona and tool whitelist.") ), "agent": result.get("active_agent"), "agent_op": "use", }) return if sub == "reset" or (sub == "" and arg == ""): result = set_active_agent(None) yield json.dumps({ "type": "complete", "content": "**Active agent reset.** Subsequent prompts will use the default SoniCoder persona.", "agent": None, "agent_op": "reset", }) return if sub == "delete" and arg: result = _delete_agent(arg) if result.get("success"): yield json.dumps({ "type": "complete", "content": f"**Agent `{arg}` deleted.**", "agent": None, "agent_op": "delete", }) else: yield json.dumps({ "type": "error", "message": result.get("error", f"Failed to delete agent '{arg}'"), "agent_op": "delete", }) return if sub == "list": agents_list = _list_agents() if not agents_list: content = "_No agents available._ Create one with `/agent create `." else: lines = ["| Name | Description | Author | Tools |", "|------|-------------|--------|-------|"] for a in agents_list: tools = ", ".join(a.get("tools", [])) or "(all)" active_marker = " **(active)**" if a.get("active") else "" lines.append(f"| `{a['name']}`{active_marker} | {a.get('description', '')[:80]} | {a.get('author', '')} | {tools} |") content = "\n".join(lines) yield json.dumps({ "type": "complete", "content": content, "agents": agents_list, "agent_op": "list", }) return # /agent create|show → fall through to the model (handled by slash command expansion) # Optional web search search_context = "" if search_enabled.lower() == "true": try: search_results = web_search_google(prompt, num_results=6) if search_results: search_context = format_search_results(search_results) yield json.dumps({ "type": "search_results", "results": search_results, "status_text": f"Found {len(search_results)} results, running agent...", }) except Exception as exc: logger.warning("Web search failed: %s", exc) try: for event in run_agent( user_input=prompt, history=history, target_language=target_language, target_framework=target_framework, skills=skills, search_context=search_context, image_url=image_url.strip() or None, agent_name=agent_name.strip() or None, ): yield json.dumps(event, default=str) except Exception as exc: logger.exception("Agent run failed") yield json.dumps({ "type": "error", "message": str(exc), }) @app.api(name="list_skills", concurrency_limit=4) def handle_list_skills() -> str: """List all available skills.""" from code.skills import list_skills skills = list_skills() yield json.dumps({"success": True, "skills": skills}) @app.api(name="list_commands", concurrency_limit=4) def handle_list_commands() -> str: """List all available slash commands.""" from code.commands import list_commands commands = list_commands() yield json.dumps({"success": True, "commands": commands}) @app.api(name="list_hooks", concurrency_limit=4) def handle_list_hooks() -> str: """List all configured hooks.""" from code.hooks import list_hooks hooks = list_hooks() yield json.dumps({"success": True, "hooks": hooks}) @app.api(name="workspace_tree", concurrency_limit=4) def handle_workspace_tree() -> str: """Return the workspace file tree.""" from code.tools.fs import list_workspace_tree result = list_workspace_tree() yield json.dumps(result, default=str) @app.api(name="workspace_read", concurrency_limit=4) def handle_workspace_read(path: str, offset: int = 0, limit: int = 0) -> str: """Read a file from the workspace.""" from code.tools.fs import read_file args = {"path": path} if offset: args["offset"] = offset if limit: args["limit"] = limit result = read_file(**args) yield json.dumps(result, default=str) @app.api(name="workspace_write", concurrency_limit=1) def handle_workspace_write(path: str, content: str) -> str: """Write a file to the workspace.""" from code.tools.fs import write_file result = write_file(path=path, content=content) yield json.dumps(result, default=str) @app.api(name="workspace_bash", concurrency_limit=1) def handle_workspace_bash(command: str, timeout: int = 30) -> str: """Run a bash command in the workspace.""" from code.tools.bash import run_bash result = run_bash(command=command, timeout=timeout) yield json.dumps(result, default=str) @app.api(name="workspace_edit", concurrency_limit=1) def handle_workspace_edit( path: str, old_str: str, new_str: str, replace_all: str = "false", ) -> str: """Edit a file in the workspace.""" from code.tools.fs import edit_file result = edit_file( path=path, old_str=old_str, new_str=new_str, replace_all=replace_all.lower() == "true", ) yield json.dumps(result, default=str) @app.api(name="workspace_glob", concurrency_limit=4) def handle_workspace_glob(pattern: str, path: str = ".") -> str: """Glob files in the workspace.""" from code.tools.fs import glob_paths result = glob_paths(pattern=pattern, path=path) yield json.dumps(result, default=str) @app.api(name="workspace_grep", concurrency_limit=4) def handle_workspace_grep( pattern: str, path: str = ".", include: str = "", ignore_case: str = "false", ) -> str: """Grep file contents in the workspace.""" from code.tools.fs import grep_search result = grep_search( pattern=pattern, path=path, include=include or None, ignore_case=ignore_case.lower() == "true", ) yield json.dumps(result, default=str) @app.api(name="todo_read", concurrency_limit=4) def handle_todo_read(session_id: str = "default") -> str: """Read the current todo list.""" from code.tools.todos import todo_read result = todo_read(session_id=session_id) yield json.dumps(result, default=str) @app.api(name="todo_write", concurrency_limit=1) def handle_todo_write(todos_json: str, session_id: str = "default") -> str: """Replace the todo list.""" from code.tools.todos import todo_write todos = json.loads(todos_json) if todos_json else [] result = todo_write(todos=todos, session_id=session_id) yield json.dumps(result, default=str) @app.api(name="workspace_snapshot", concurrency_limit=2) def handle_workspace_snapshot() -> str: """Return all workspace files for ZIP/deploy.""" from code.tools.fs import snapshot_workspace files = snapshot_workspace() yield json.dumps({"success": True, "files": files, "count": len(files)}) @app.api(name="workspace_reset", concurrency_limit=1) def handle_workspace_reset() -> str: """Clear the workspace.""" from code.tools.fs import reset_workspace result = reset_workspace() yield json.dumps(result, default=str) @app.api(name="create_hook", concurrency_limit=1) def handle_create_hook( name: str, event: str, pattern: str, action: str = "warn", message: str = "", enabled: str = "true", ) -> str: """Create a new user hook.""" from code.hooks import create_hook result = create_hook( name=name, event=event, pattern=pattern, action=action, message=message, enabled=enabled.lower() == "true", ) yield json.dumps(result, default=str) @app.api(name="delete_hook", concurrency_limit=1) def handle_delete_hook(name: str) -> str: """Delete a user hook by name.""" from code.hooks import delete_hook result = delete_hook(name) yield json.dumps(result, default=str) # ─── Custom Agent Endpoints ──────────────────────────────────────────── @app.api(name="list_agents", concurrency_limit=4) def handle_list_agents() -> str: """List all available custom agents (builtins + user).""" from code.agents import list_agents, get_active_agent agents = list_agents() active = get_active_agent() yield json.dumps({ "success": True, "agents": agents, "active_agent": active, }, default=str) @app.api(name="get_agent", concurrency_limit=4) def handle_get_agent(name: str) -> str: """Get the full definition of a single agent.""" from code.agents import get_agent agent = get_agent(name) if not agent: yield json.dumps({"success": False, "error": f"Agent not found: {name}"}) return # Strip non-serializable path agent_serializable = {k: v for k, v in agent.items() if k != "path"} yield json.dumps({"success": True, "agent": agent_serializable}, default=str) @app.api(name="save_agent", concurrency_limit=1) def handle_save_agent( name: str, description: str, body: str, tools: str = "", skills: str = "", temperature: str = "", max_iterations: str = "", tags: str = "", author: str = "user", ) -> str: """Create or overwrite a custom agent definition (manual save, no AI). `tools`, `skills`, `tags` are comma-separated strings. `temperature` and `max_iterations` are strings that will be parsed if non-empty. """ from code.agents import save_agent, ALL_TOOLS def _split(s: str) -> list[str]: return [x.strip() for x in (s or "").split(",") if x.strip()] tools_list = _split(tools) or list(ALL_TOOLS) skills_list = _split(skills) tags_list = _split(tags) temp_val = None if temperature.strip(): try: temp_val = float(temperature) except ValueError: yield json.dumps({"success": False, "error": f"Invalid temperature: {temperature}"}) return iter_val = None if max_iterations.strip(): try: iter_val = int(max_iterations) except ValueError: yield json.dumps({"success": False, "error": f"Invalid max_iterations: {max_iterations}"}) return result = save_agent( name=name, description=description, body=body, tools=tools_list, skills=skills_list, temperature=temp_val, max_iterations=iter_val, tags=tags_list, author=author, ) yield json.dumps(result, default=str) @app.api(name="delete_agent", concurrency_limit=1) def handle_delete_agent(name: str) -> str: """Delete a user-defined agent by name.""" from code.agents import delete_agent result = delete_agent(name) yield json.dumps(result, default=str) @app.api(name="set_active_agent", concurrency_limit=1) def handle_set_active_agent(name: str = "") -> str: """Set the active agent for subsequent prompts. Empty string resets.""" from code.agents import set_active_agent, list_agents, get_active_agent result = set_active_agent(name.strip() or None) if not result.get("success"): yield json.dumps(result, default=str) return # Return fresh list + active agent so frontend can re-render yield json.dumps({ **result, "agents": list_agents(), "active_agent": get_active_agent(), }, default=str) # ─── GitHub Import Endpoint ──────────────────────────────────────────── @app.api(name="import_github", concurrency_limit=1) def handle_import_github( url: str, branch: str = "", subdir: str = "", target_subdir: str = "", depth: str = "1", timeout: str = "120", ) -> str: """Clone a GitHub repo into the sandboxed workspace. Parameters ---------- url : str GitHub URL. Accepts: - https://github.com//[.git] - https://github.com///tree/[/] - git@github.com:/.git branch : str Optional branch/tag override. If empty, uses URL's branch or the repo's default branch. subdir : str Optional sub-directory inside the repo to import. target_subdir : str Where inside the workspace to place the import. Empty = root. depth : str Git clone depth (default "1" for shallow clone). timeout : str Git clone timeout in seconds (default "120"). Yields ------ JSON dict with keys: success, message, url, owner, repo, branch, subdir, files_imported, dirs_skipped, workspace_path, tree_preview. """ from code.tools.github import import_github_repo try: depth_int = int(depth) if str(depth).strip() else 1 depth_int = max(1, min(50, depth_int)) except (ValueError, TypeError): depth_int = 1 try: timeout_int = int(timeout) if str(timeout).strip() else 120 timeout_int = max(10, min(600, timeout_int)) except (ValueError, TypeError): timeout_int = 120 result = import_github_repo( url=url, branch=branch, subdir=subdir, target_subdir=target_subdir, depth=depth_int, timeout=timeout_int, ) yield json.dumps(result, default=str) @app.api(name="github_url_examples", concurrency_limit=4) def handle_github_url_examples() -> str: """Return example GitHub URL formats accepted by import_github.""" from code.tools.github import list_github_url_examples result = list_github_url_examples() yield json.dumps(result, default=str) @app.api(name="push_github", concurrency_limit=1) def handle_push_github( repo_name: str, github_token: str, username: str, branch: str = "main", commit_message: str = "", timeout: str = "120", ) -> str: """Push the current workspace to a GitHub repo. Requires only 3 user inputs (repo_name, github_token, username) plus optional branch / commit_message / timeout. The workspace is snapshotted (via `snapshot_workspace`), written into a fresh git repo in a temp dir, committed, and pushed to `https://github.com//.git` using HTTPS basic auth with the token. The push uses `--force-with-lease` so it replaces the remote tip with the SoniCoder workspace contents. If the remote doesn't exist yet (no refs to lease against), it retries with a plain push. Yields ------ JSON dict with keys: success, message, repo_full_name, branch, commit_sha, commit_url, repo_url, files_pushed, error (on failure). """ from code.tools.github import push_to_github try: timeout_int = int(timeout) if str(timeout).strip() else 120 timeout_int = max(10, min(600, timeout_int)) except (ValueError, TypeError): timeout_int = 120 result = push_to_github( repo_name=repo_name, github_token=github_token, username=username, branch=branch or "main", commit_message=commit_message or "", timeout=timeout_int, ) yield json.dumps(result, default=str)