sonicoder / code /server /routes.py
R-Kentaren's picture
fix: agent_run param mismatch (send agent_name) + add GitHub push-update (3 inputs: repo name, token, username; --force-with-lease)
b87f702 verified
Raw
History Blame Contribute Delete
42.3 kB
"""FastAPI / Gradio Server routes.
Defines all HTTP and API endpoints:
- GET / β†’ serves the index.html frontend
- GET /api/model-status β†’ model loading status
- GET /images/{f} β†’ serve generated plot images
- GET /download/{f} β†’ serve project ZIP downloads
- API web_search β†’ Google search scraping
- API chat β†’ streaming chat with code execution
- API push_hf β†’ push to HuggingFace Hub
- API switch_model β†’ switch between loaded models
- API upload_image β†’ upload image for VLM inference
- API hf_auth β†’ get HF OAuth profile & organizations
- API agent_run β†’ Claude Code-style agent loop with tools
- API list_skills β†’ list available skills
- API list_commands→ list available slash commands
- API list_hooks β†’ list configured hooks
- API workspace_tree→ list workspace files
- API workspace_read→ read a workspace file
- API workspace_write→ write a workspace file
- API workspace_bash→ run a bash command in workspace
- API todo_read β†’ read current todo list
- API todo_write β†’ update todo list
- API import_github β†’ clone a GitHub repo into the workspace
- API github_url_examples β†’ return accepted GitHub URL formats
- API push_github β†’ push the current workspace to a GitHub repo
"""
from __future__ import annotations
import base64
import json
import logging
import os
import tempfile
from pathlib import Path
from typing import Any
from fastapi.responses import HTMLResponse, FileResponse
try:
from gradio import Server
except ImportError:
# Fallback for older/newer Gradio versions where Server may not be exposed
# at the top level. We provide a minimal shim so the module can still be
# imported for testing purposes.
class Server: # type: ignore
"""Minimal shim for Gradio Server when not available."""
def __init__(self, *args, **kwargs):
from fastapi import FastAPI
self._fastapi = FastAPI()
def get(self, path: str, **kwargs):
return self._fastapi.get(path, **kwargs)
def api(self, name: str = None, concurrency_limit: int = 1):
def decorator(fn):
# Store as attribute so it can be inspected
fn._api_name = name
fn._concurrency_limit = concurrency_limit
return fn
return decorator
from code.config.constants import (
APP_TITLE,
DEFAULT_MODEL_KEY,
EXAMPLE_PROMPTS,
LANGUAGE_OPTIONS,
MODEL_CONFIGS,
MODEL_URL,
PY_TIMEOUT_S,
)
from code.execution.code_extractor import (
build_iframe,
extract_code,
extract_multi_file,
is_gradio_code,
normalize_language,
strip_thinking_blocks,
)
from code.execution.gradio_runner import run_gradio_app, stop_gradio_app
from code.execution.python_runner import run_python
from code.huggingface.push import create_project_zip, push_to_huggingface
from code.model.loader import (
get_model_status,
is_model_loaded,
get_current_model_key,
get_current_model_type,
switch_model,
)
from code.model.inference import call_model
from code.server.chat_helpers import chat_history_to_messages, targeted_prompt
from code.websearch.google_scraper import web_search_google, format_search_results
logger = logging.getLogger(__name__)
# ─── Served Files Registry ──────────────────────────────────────────────
_served_files: dict[str, str] = {}
# ─── Uploaded Images Registry ───────────────────────────────────────────
_uploaded_images: dict[str, str] = {}
# ─── Server Instance ────────────────────────────────────────────────────
app = Server()
# ─── HTTP Routes ────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def homepage():
"""Serve the index.html frontend with runtime config injected."""
html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html")
with open(html_path, "r", encoding="utf-8") as f:
content = f.read()
# Load skills, commands, hooks for the frontend
try:
from code.skills import list_skills
skills_list = list_skills()
except Exception:
skills_list = []
try:
from code.commands import list_commands
commands_list = list_commands()
except Exception:
commands_list = []
try:
from code.hooks import list_hooks
hooks_list = list_hooks()
except Exception:
hooks_list = []
try:
from code.agents import list_agents, get_active_agent
agents_list = list_agents()
active_agent = get_active_agent()
except Exception:
agents_list = []
active_agent = None
config = json.dumps({
"app_title": APP_TITLE,
"model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"],
"model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()},
"model_url": MODEL_URL,
"languages": LANGUAGE_OPTIONS,
"examples": [
{"label": label, "prompt": prompt, "language": lang, "framework": fw}
for label, prompt, lang, fw in EXAMPLE_PROMPTS
],
"default_model": "minicpm5-1b",
"skills": skills_list,
"commands": commands_list,
"hooks": hooks_list,
"agents": agents_list,
"active_agent": active_agent,
})
content = content.replace("__RUNTIME_CONFIG__", config)
return content
@app.get("/api/model-status")
async def model_status_endpoint():
"""Return the current model loading status."""
return get_model_status()
@app.get("/images/{filename}")
async def serve_image(filename: str):
"""Serve a generated plot image by filename."""
path = _served_files.get(f"img:{filename}")
if path and os.path.exists(path):
return FileResponse(path, media_type="image/png")
return HTMLResponse("Not found", status_code=404)
@app.get("/download/{filename}")
async def serve_download(filename: str):
"""Serve a project ZIP download by filename."""
path = _served_files.get(f"dl:{filename}")
if path and os.path.exists(path):
return FileResponse(path, filename=filename, media_type="application/octet-stream")
return HTMLResponse("Not found", status_code=404)
@app.get("/uploaded-images/{image_id}")
async def serve_uploaded_image(image_id: str):
"""Serve an uploaded image by its ID."""
path = _uploaded_images.get(image_id)
if path and os.path.exists(path):
return FileResponse(path, media_type="image/png")
return HTMLResponse("Not found", status_code=404)
# ─── Gradio API Endpoints ──────────────────────────────────────────────
@app.api(name="switch_model", concurrency_limit=1)
def handle_switch_model(model_key: str) -> str:
"""Switch to a different model."""
result = switch_model(model_key)
yield json.dumps(result)
@app.api(name="upload_image", concurrency_limit=4)
def handle_upload_image(image_data: str) -> str:
"""Upload a base64-encoded image for VLM inference.
Returns an image ID that can be referenced in chat.
"""
try:
if not image_data:
yield json.dumps({"success": False, "message": "No image data provided"})
return
# Handle data URI format: data:image/png;base64,...
if image_data.startswith("data:"):
# Extract the base64 part
parts = image_data.split(",", 1)
if len(parts) == 2:
image_data = parts[1]
# Decode base64
image_bytes = base64.b64decode(image_data)
# Save to temp file
img_dir = tempfile.mkdtemp(prefix="uploaded_img_")
image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}"
img_path = os.path.join(img_dir, f"{image_id}.png")
Path(img_path).write_bytes(image_bytes)
# Register for serving
_uploaded_images[image_id] = img_path
# Create a URL for the image that the VLM can access
image_url = f"/uploaded-images/{image_id}"
# Also save as a file:// URL for local VLM access
file_url = f"file://{img_path}"
yield json.dumps({
"success": True,
"image_id": image_id,
"image_url": image_url,
"file_url": file_url,
"message": "Image uploaded successfully",
})
except Exception as exc:
logger.exception("Image upload failed")
yield json.dumps({
"success": False,
"message": f"Upload failed: {str(exc)}",
})
@app.api(name="web_search", concurrency_limit=4)
def handle_web_search(query: str) -> str:
"""Search the web using Google scraping. No API key needed."""
query = (query or "").strip()
if not query:
yield json.dumps({"success": False, "results": [], "message": "Empty search query"})
return
try:
results = web_search_google(query, num_results=8)
formatted = format_search_results(results)
yield json.dumps({
"success": True,
"results": results,
"formatted": formatted,
"message": f"Found {len(results)} results",
})
except Exception as exc:
logger.exception("Web search failed")
yield json.dumps({
"success": False,
"results": [],
"message": f"Search failed: {str(exc)}",
})
@app.api(name="chat", concurrency_limit=2)
def handle_chat(
prompt: str,
target_language: str,
target_framework: str,
history_json: str,
exec_context_json: str,
search_enabled: str = "false",
image_url: str = "",
) -> str:
"""Stream chat responses with code execution. Yields JSON strings."""
history = json.loads(history_json) if history_json else []
execution_context = json.loads(exec_context_json) if exec_context_json else {}
prompt = (prompt or "").strip()
if not prompt:
yield json.dumps({
"type": "error",
"status_text": "Enter a prompt to get started.",
"status_state": "info",
"history": history,
"execution": execution_context,
})
return
# Check model status
model_status = get_model_status()
if model_status["status"] == "loading":
yield json.dumps({
"type": "error",
"status_text": model_status["message"],
"status_state": "working",
"history": history,
"execution": execution_context,
})
return
if model_status["status"] != "ready":
yield json.dumps({
"type": "error",
"status_text": model_status["message"],
"status_state": "error",
"history": history,
"execution": execution_context,
})
return
# Add user message and placeholder assistant message
history = list(history) + [
{"role": "user", "content": prompt},
{"role": "assistant", "content": ""},
]
yield json.dumps({
"type": "status",
"status_text": "Thinking...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
# Web search if enabled
search_context = ""
if search_enabled.lower() == "true":
yield json.dumps({
"type": "status",
"status_text": "Searching the web...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
search_results = web_search_google(prompt, num_results=6)
if search_results:
search_context = format_search_results(search_results)
yield json.dumps({
"type": "search_results",
"status_text": f"Found {len(search_results)} results, generating code...",
"status_state": "working",
"history": history,
"execution": execution_context,
"search_results": search_results,
})
# Build messages for model
model_history = list(history[:-1])
model_history[-1] = {
"role": "user",
"content": targeted_prompt(
prompt, target_language, target_framework, execution_context, search_context
),
}
messages = chat_history_to_messages(model_history)
# Determine image URL for VLM
vlm_image_url = image_url.strip() if image_url else None
final_response = ""
for partial in call_model(messages, image_url=vlm_image_url):
final_response = partial
# Strip thinking blocks so chat only shows clean output
clean_partial = strip_thinking_blocks(partial)
history[-1]["content"] = clean_partial
yield json.dumps({
"type": "streaming",
"status_text": "Generating...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
if not final_response:
history[-1]["content"] = "The model did not return a response."
yield json.dumps({
"type": "error",
"status_text": "No model response.",
"status_state": "error",
"history": history,
"execution": execution_context,
})
return
# Extract code from response (use cleaned version)
clean_response = strip_thinking_blocks(final_response)
code, fence_lang = extract_code(clean_response)
target = normalize_language(target_language, fence_lang)
# Also try multi-file extraction
multi_files = extract_multi_file(clean_response)
if not code and not multi_files:
yield json.dumps({
"type": "complete",
"status_text": "Answered without running code.",
"status_state": "info",
"history": history,
"execution": execution_context,
})
return
yield json.dumps({
"type": "status",
"status_text": "Running...",
"status_state": "working",
"history": history,
"execution": execution_context,
})
# Execute code
stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success"
is_gradio = False
gradio_url = None
if target == "python" and code:
if is_gradio_code(code) or target_framework == "Gradio":
is_gradio = True
gradio_result = run_gradio_app(code)
if gradio_result["success"]:
gradio_url = gradio_result["url"]
status_text = f"Gradio app running at {gradio_url}"
status_state = "success"
stderr = f"Gradio app launched successfully at {gradio_url}"
else:
status_text = "Gradio launch failed"
status_state = "error"
stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed"))
else:
result = run_python(code)
if result.timed_out:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = f"Timed out after {PY_TIMEOUT_S}s"
status_state = "error"
elif result.returncode:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = "Finished with errors"
status_state = "error"
else:
stdout, stderr, image_path = result.stdout, result.stderr, result.image_path
status_text = "Ran successfully"
status_state = "success"
# Register image for serving
image_url_out = None
if image_path:
filename = os.path.basename(image_path)
_served_files[f"img:{filename}"] = image_path
image_url_out = f"/images/{filename}"
# Register code for download
download_url = None
project_files = dict(multi_files) if multi_files else {}
# Rename main.py β†’ app.py for Python/Gradio projects (HF Spaces expects app.py)
if project_files and "main.py" in project_files and "app.py" not in project_files:
if target == "python" or is_gradio:
project_files["app.py"] = project_files.pop("main.py")
# If project_files is empty but we have single code, add it
if not project_files and code:
if target == "python":
fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py"
elif target in {"web", "html", "javascript"}:
fname = "index.html"
else:
fname = f"main.{fence_lang or 'txt'}"
project_files = {fname: code}
if project_files:
project_name = "generated-project"
zip_path = create_project_zip(project_files, project_name)
zip_filename = f"{project_name}.zip"
_served_files[f"dl:{zip_filename}"] = zip_path
download_url = f"/download/{zip_filename}"
elif code:
ext = "py" if target == "python" else "html"
dl_filename = f"generated.{ext}"
dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_")
dl_path = os.path.join(dl_dir, dl_filename)
Path(dl_path).write_text(code, encoding="utf-8")
_served_files[f"dl:{dl_filename}"] = dl_path
download_url = f"/download/{dl_filename}"
# Determine if this is web previewable
is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"}
web_code = code if is_web else None
execution_context = {
"code": code,
"target": target,
"fence_lang": fence_lang or target,
"stdout": stdout,
"stderr": stderr,
"image_url": image_url_out,
"image_path": image_path,
"status": status_text,
"language": fence_lang or target,
"suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console",
"download_url": download_url,
"project_files": project_files,
"is_web": is_web,
"web_code": web_code,
"is_gradio": is_gradio,
"gradio_url": gradio_url,
}
yield json.dumps({
"type": "complete",
"status_text": status_text,
"status_state": status_state,
"history": history,
"execution": execution_context,
})
@app.api(name="hf_auth", concurrency_limit=4)
def handle_hf_auth(
oauth_token: str = "",
) -> str:
"""Get HuggingFace OAuth profile and list of organizations.
If oauth_token is provided (from Gradio OAuth), uses it to fetch user info.
Otherwise, returns empty auth info.
"""
try:
import gradio as gr
from huggingface_hub import whoami
token = oauth_token.strip() if oauth_token else ""
if not token:
yield json.dumps({
"authenticated": False,
"username": "",
"name": "",
"picture": "",
"organizations": [],
"message": "Not signed in. Click Sign In to authenticate with HuggingFace.",
})
return
# Get user info using the OAuth token
user_info = whoami(token=token)
username = user_info.get("name", "")
fullname = user_info.get("fullname", username)
# Get avatar
avatar_url = ""
avatar_info = user_info.get("avatarUrl", "")
if avatar_info:
avatar_url = avatar_info
# Get organizations
orgs = []
for org in user_info.get("orgs", []):
orgs.append({
"name": org.get("name", ""),
"avatar": org.get("avatarUrl", ""),
})
# Also check orgRoles for role info
org_roles = user_info.get("orgRoles", [])
for role_info in org_roles:
org_name = role_info.get("org", "")
role = role_info.get("role", "member")
# Add role info to existing org if found
for org in orgs:
if org["name"] == org_name:
org["role"] = role
break
yield json.dumps({
"authenticated": True,
"username": username,
"name": fullname,
"picture": avatar_url,
"organizations": orgs,
"token": token,
"message": f"Signed in as {username}",
})
except Exception as exc:
logger.exception("HF auth check failed")
yield json.dumps({
"authenticated": False,
"username": "",
"name": "",
"picture": "",
"organizations": [],
"message": f"Auth check failed: {str(exc)}",
})
@app.api(name="push_hf", concurrency_limit=1)
def handle_push_hf(
exec_context_json: str,
repo_name: str,
hf_token: str,
space_sdk: str = "auto",
is_space: str = "true",
) -> str:
"""Push generated project to HuggingFace Hub."""
try:
execution_context = json.loads(exec_context_json) if exec_context_json else {}
project_files = dict(execution_context.get("project_files", {}) or {})
code = execution_context.get("code", "")
# If project_files is empty but we have code, build files from code
if not project_files and code:
lang = execution_context.get("language", "python")
is_gradio = execution_context.get("is_gradio", False)
# Map language to entry file β€” JS/TS single-files get wrapped for Docker
if lang in ("javascript", "js", "typescript", "ts"):
# For single-file JS/TS code that is HTML (vanilla), keep as index.html
if "<!doctype" in code.lower() or "<html" in code.lower():
filename = "index.html"
else:
filename = "index.js"
elif lang in ("html", "web"):
filename = "index.html"
else:
ext_map = {
"python": "app.py", "py": "app.py",
}
filename = ext_map.get(lang, "app.py")
project_files = {filename: code}
# Auto-detect SDK for Gradio apps
if is_gradio or is_gradio_code(code):
space_sdk = "gradio"
# If still no files, try extracting from the raw response
if not project_files and code:
project_files = extract_multi_file(code)
if not project_files:
yield json.dumps({
"success": False,
"message": "No code to push. Generate some code first.",
"url": "",
})
return
# "auto" SDK means let push_to_huggingface decide
if space_sdk == "auto":
space_sdk = "static" # push_to_huggingface will auto-detect from files
project_name = repo_name.split("/")[-1] if "/" in repo_name else repo_name
result = push_to_huggingface(
files=project_files,
project_name=project_name,
repo_name=repo_name,
hf_token=hf_token,
space_sdk=space_sdk,
is_space=is_space.lower() == "true",
)
yield json.dumps(result)
except Exception as exc:
logger.exception("Push to HuggingFace failed")
yield json.dumps({
"success": False,
"message": f"Push failed: {str(exc)}",
"url": "",
})
def get_app() -> Server:
"""Return the configured Gradio Server app instance."""
return app
# ─── Agent / Skills / Commands / Hooks / Workspace Endpoints ──────────
@app.api(name="agent_run", concurrency_limit=2)
def handle_agent_run(
prompt: str,
target_language: str = "",
target_framework: str = "",
history_json: str = "[]",
skills_json: str = "[]",
search_enabled: str = "false",
image_url: str = "",
agent_name: str = "",
) -> str:
"""Run the Claude Code-style agent loop with tools.
Yields JSON events: status, tool_call, tool_result, streaming, complete, error.
`agent_name` (optional) overrides the session-active agent for this run.
The `/agent use`, `/agent reset`, and `/agent delete` slash commands are
intercepted here and dispatched to the agents module before the model runs.
"""
from code.agent import run_agent
history = json.loads(history_json) if history_json else []
skills = json.loads(skills_json) if skills_json else []
prompt = (prompt or "").strip()
if not prompt:
yield json.dumps({
"type": "error",
"message": "Empty prompt",
})
return
# ── Intercept /agent use|reset|delete (session-state mutations) ────
# These need to happen server-side BEFORE the model runs so the very
# next prompt reflects the change.
stripped = prompt.lstrip()
if stripped.startswith("/agent ") or stripped == "/agent":
from code.agents import (
set_active_agent,
delete_agent as _delete_agent,
list_agents as _list_agents,
get_active_agent,
)
parts = stripped.split(None, 2) # ["/agent", <sub>, <rest>]
sub = parts[1] if len(parts) > 1 else ""
arg = parts[2].strip() if len(parts) > 2 else ""
if sub == "use" and arg:
result = set_active_agent(arg)
yield json.dumps({
"type": "complete",
"content": (
f"**Agent activated: `{result.get('active_agent')}`**\n\n"
+ (result.get("message", "") if not result.get("success") else "Subsequent prompts will use this agent's persona and tool whitelist.")
),
"agent": result.get("active_agent"),
"agent_op": "use",
})
return
if sub == "reset" or (sub == "" and arg == ""):
result = set_active_agent(None)
yield json.dumps({
"type": "complete",
"content": "**Active agent reset.** Subsequent prompts will use the default SoniCoder persona.",
"agent": None,
"agent_op": "reset",
})
return
if sub == "delete" and arg:
result = _delete_agent(arg)
if result.get("success"):
yield json.dumps({
"type": "complete",
"content": f"**Agent `{arg}` deleted.**",
"agent": None,
"agent_op": "delete",
})
else:
yield json.dumps({
"type": "error",
"message": result.get("error", f"Failed to delete agent '{arg}'"),
"agent_op": "delete",
})
return
if sub == "list":
agents_list = _list_agents()
if not agents_list:
content = "_No agents available._ Create one with `/agent create <description>`."
else:
lines = ["| Name | Description | Author | Tools |", "|------|-------------|--------|-------|"]
for a in agents_list:
tools = ", ".join(a.get("tools", [])) or "(all)"
active_marker = " **(active)**" if a.get("active") else ""
lines.append(f"| `{a['name']}`{active_marker} | {a.get('description', '')[:80]} | {a.get('author', '')} | {tools} |")
content = "\n".join(lines)
yield json.dumps({
"type": "complete",
"content": content,
"agents": agents_list,
"agent_op": "list",
})
return
# /agent create|show β†’ fall through to the model (handled by slash command expansion)
# Optional web search
search_context = ""
if search_enabled.lower() == "true":
try:
search_results = web_search_google(prompt, num_results=6)
if search_results:
search_context = format_search_results(search_results)
yield json.dumps({
"type": "search_results",
"results": search_results,
"status_text": f"Found {len(search_results)} results, running agent...",
})
except Exception as exc:
logger.warning("Web search failed: %s", exc)
try:
for event in run_agent(
user_input=prompt,
history=history,
target_language=target_language,
target_framework=target_framework,
skills=skills,
search_context=search_context,
image_url=image_url.strip() or None,
agent_name=agent_name.strip() or None,
):
yield json.dumps(event, default=str)
except Exception as exc:
logger.exception("Agent run failed")
yield json.dumps({
"type": "error",
"message": str(exc),
})
@app.api(name="list_skills", concurrency_limit=4)
def handle_list_skills() -> str:
"""List all available skills."""
from code.skills import list_skills
skills = list_skills()
yield json.dumps({"success": True, "skills": skills})
@app.api(name="list_commands", concurrency_limit=4)
def handle_list_commands() -> str:
"""List all available slash commands."""
from code.commands import list_commands
commands = list_commands()
yield json.dumps({"success": True, "commands": commands})
@app.api(name="list_hooks", concurrency_limit=4)
def handle_list_hooks() -> str:
"""List all configured hooks."""
from code.hooks import list_hooks
hooks = list_hooks()
yield json.dumps({"success": True, "hooks": hooks})
@app.api(name="workspace_tree", concurrency_limit=4)
def handle_workspace_tree() -> str:
"""Return the workspace file tree."""
from code.tools.fs import list_workspace_tree
result = list_workspace_tree()
yield json.dumps(result, default=str)
@app.api(name="workspace_read", concurrency_limit=4)
def handle_workspace_read(path: str, offset: int = 0, limit: int = 0) -> str:
"""Read a file from the workspace."""
from code.tools.fs import read_file
args = {"path": path}
if offset:
args["offset"] = offset
if limit:
args["limit"] = limit
result = read_file(**args)
yield json.dumps(result, default=str)
@app.api(name="workspace_write", concurrency_limit=1)
def handle_workspace_write(path: str, content: str) -> str:
"""Write a file to the workspace."""
from code.tools.fs import write_file
result = write_file(path=path, content=content)
yield json.dumps(result, default=str)
@app.api(name="workspace_bash", concurrency_limit=1)
def handle_workspace_bash(command: str, timeout: int = 30) -> str:
"""Run a bash command in the workspace."""
from code.tools.bash import run_bash
result = run_bash(command=command, timeout=timeout)
yield json.dumps(result, default=str)
@app.api(name="workspace_edit", concurrency_limit=1)
def handle_workspace_edit(
path: str,
old_str: str,
new_str: str,
replace_all: str = "false",
) -> str:
"""Edit a file in the workspace."""
from code.tools.fs import edit_file
result = edit_file(
path=path,
old_str=old_str,
new_str=new_str,
replace_all=replace_all.lower() == "true",
)
yield json.dumps(result, default=str)
@app.api(name="workspace_glob", concurrency_limit=4)
def handle_workspace_glob(pattern: str, path: str = ".") -> str:
"""Glob files in the workspace."""
from code.tools.fs import glob_paths
result = glob_paths(pattern=pattern, path=path)
yield json.dumps(result, default=str)
@app.api(name="workspace_grep", concurrency_limit=4)
def handle_workspace_grep(
pattern: str,
path: str = ".",
include: str = "",
ignore_case: str = "false",
) -> str:
"""Grep file contents in the workspace."""
from code.tools.fs import grep_search
result = grep_search(
pattern=pattern,
path=path,
include=include or None,
ignore_case=ignore_case.lower() == "true",
)
yield json.dumps(result, default=str)
@app.api(name="todo_read", concurrency_limit=4)
def handle_todo_read(session_id: str = "default") -> str:
"""Read the current todo list."""
from code.tools.todos import todo_read
result = todo_read(session_id=session_id)
yield json.dumps(result, default=str)
@app.api(name="todo_write", concurrency_limit=1)
def handle_todo_write(todos_json: str, session_id: str = "default") -> str:
"""Replace the todo list."""
from code.tools.todos import todo_write
todos = json.loads(todos_json) if todos_json else []
result = todo_write(todos=todos, session_id=session_id)
yield json.dumps(result, default=str)
@app.api(name="workspace_snapshot", concurrency_limit=2)
def handle_workspace_snapshot() -> str:
"""Return all workspace files for ZIP/deploy."""
from code.tools.fs import snapshot_workspace
files = snapshot_workspace()
yield json.dumps({"success": True, "files": files, "count": len(files)})
@app.api(name="workspace_reset", concurrency_limit=1)
def handle_workspace_reset() -> str:
"""Clear the workspace."""
from code.tools.fs import reset_workspace
result = reset_workspace()
yield json.dumps(result, default=str)
@app.api(name="create_hook", concurrency_limit=1)
def handle_create_hook(
name: str,
event: str,
pattern: str,
action: str = "warn",
message: str = "",
enabled: str = "true",
) -> str:
"""Create a new user hook."""
from code.hooks import create_hook
result = create_hook(
name=name,
event=event,
pattern=pattern,
action=action,
message=message,
enabled=enabled.lower() == "true",
)
yield json.dumps(result, default=str)
@app.api(name="delete_hook", concurrency_limit=1)
def handle_delete_hook(name: str) -> str:
"""Delete a user hook by name."""
from code.hooks import delete_hook
result = delete_hook(name)
yield json.dumps(result, default=str)
# ─── Custom Agent Endpoints ────────────────────────────────────────────
@app.api(name="list_agents", concurrency_limit=4)
def handle_list_agents() -> str:
"""List all available custom agents (builtins + user)."""
from code.agents import list_agents, get_active_agent
agents = list_agents()
active = get_active_agent()
yield json.dumps({
"success": True,
"agents": agents,
"active_agent": active,
}, default=str)
@app.api(name="get_agent", concurrency_limit=4)
def handle_get_agent(name: str) -> str:
"""Get the full definition of a single agent."""
from code.agents import get_agent
agent = get_agent(name)
if not agent:
yield json.dumps({"success": False, "error": f"Agent not found: {name}"})
return
# Strip non-serializable path
agent_serializable = {k: v for k, v in agent.items() if k != "path"}
yield json.dumps({"success": True, "agent": agent_serializable}, default=str)
@app.api(name="save_agent", concurrency_limit=1)
def handle_save_agent(
name: str,
description: str,
body: str,
tools: str = "",
skills: str = "",
temperature: str = "",
max_iterations: str = "",
tags: str = "",
author: str = "user",
) -> str:
"""Create or overwrite a custom agent definition (manual save, no AI).
`tools`, `skills`, `tags` are comma-separated strings. `temperature` and
`max_iterations` are strings that will be parsed if non-empty.
"""
from code.agents import save_agent, ALL_TOOLS
def _split(s: str) -> list[str]:
return [x.strip() for x in (s or "").split(",") if x.strip()]
tools_list = _split(tools) or list(ALL_TOOLS)
skills_list = _split(skills)
tags_list = _split(tags)
temp_val = None
if temperature.strip():
try:
temp_val = float(temperature)
except ValueError:
yield json.dumps({"success": False, "error": f"Invalid temperature: {temperature}"})
return
iter_val = None
if max_iterations.strip():
try:
iter_val = int(max_iterations)
except ValueError:
yield json.dumps({"success": False, "error": f"Invalid max_iterations: {max_iterations}"})
return
result = save_agent(
name=name,
description=description,
body=body,
tools=tools_list,
skills=skills_list,
temperature=temp_val,
max_iterations=iter_val,
tags=tags_list,
author=author,
)
yield json.dumps(result, default=str)
@app.api(name="delete_agent", concurrency_limit=1)
def handle_delete_agent(name: str) -> str:
"""Delete a user-defined agent by name."""
from code.agents import delete_agent
result = delete_agent(name)
yield json.dumps(result, default=str)
@app.api(name="set_active_agent", concurrency_limit=1)
def handle_set_active_agent(name: str = "") -> str:
"""Set the active agent for subsequent prompts. Empty string resets."""
from code.agents import set_active_agent, list_agents, get_active_agent
result = set_active_agent(name.strip() or None)
if not result.get("success"):
yield json.dumps(result, default=str)
return
# Return fresh list + active agent so frontend can re-render
yield json.dumps({
**result,
"agents": list_agents(),
"active_agent": get_active_agent(),
}, default=str)
# ─── GitHub Import Endpoint ────────────────────────────────────────────
@app.api(name="import_github", concurrency_limit=1)
def handle_import_github(
url: str,
branch: str = "",
subdir: str = "",
target_subdir: str = "",
depth: str = "1",
timeout: str = "120",
) -> str:
"""Clone a GitHub repo into the sandboxed workspace.
Parameters
----------
url : str
GitHub URL. Accepts:
- https://github.com/<owner>/<repo>[.git]
- https://github.com/<owner>/<repo>/tree/<branch>[/<subdir>]
- git@github.com:<owner>/<repo>.git
branch : str
Optional branch/tag override. If empty, uses URL's branch or the
repo's default branch.
subdir : str
Optional sub-directory inside the repo to import.
target_subdir : str
Where inside the workspace to place the import. Empty = root.
depth : str
Git clone depth (default "1" for shallow clone).
timeout : str
Git clone timeout in seconds (default "120").
Yields
------
JSON dict with keys: success, message, url, owner, repo, branch,
subdir, files_imported, dirs_skipped, workspace_path, tree_preview.
"""
from code.tools.github import import_github_repo
try:
depth_int = int(depth) if str(depth).strip() else 1
depth_int = max(1, min(50, depth_int))
except (ValueError, TypeError):
depth_int = 1
try:
timeout_int = int(timeout) if str(timeout).strip() else 120
timeout_int = max(10, min(600, timeout_int))
except (ValueError, TypeError):
timeout_int = 120
result = import_github_repo(
url=url,
branch=branch,
subdir=subdir,
target_subdir=target_subdir,
depth=depth_int,
timeout=timeout_int,
)
yield json.dumps(result, default=str)
@app.api(name="github_url_examples", concurrency_limit=4)
def handle_github_url_examples() -> str:
"""Return example GitHub URL formats accepted by import_github."""
from code.tools.github import list_github_url_examples
result = list_github_url_examples()
yield json.dumps(result, default=str)
@app.api(name="push_github", concurrency_limit=1)
def handle_push_github(
repo_name: str,
github_token: str,
username: str,
branch: str = "main",
commit_message: str = "",
timeout: str = "120",
) -> str:
"""Push the current workspace to a GitHub repo.
Requires only 3 user inputs (repo_name, github_token, username) plus
optional branch / commit_message / timeout. The workspace is snapshotted
(via `snapshot_workspace`), written into a fresh git repo in a temp dir,
committed, and pushed to `https://github.com/<username>/<repo_name>.git`
using HTTPS basic auth with the token.
The push uses `--force-with-lease` so it replaces the remote tip with the
SoniCoder workspace contents. If the remote doesn't exist yet (no refs
to lease against), it retries with a plain push.
Yields
------
JSON dict with keys: success, message, repo_full_name, branch,
commit_sha, commit_url, repo_url, files_pushed, error (on failure).
"""
from code.tools.github import push_to_github
try:
timeout_int = int(timeout) if str(timeout).strip() else 120
timeout_int = max(10, min(600, timeout_int))
except (ValueError, TypeError):
timeout_int = 120
result = push_to_github(
repo_name=repo_name,
github_token=github_token,
username=username,
branch=branch or "main",
commit_message=commit_message or "",
timeout=timeout_int,
)
yield json.dumps(result, default=str)