Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Gradio Space that showcases execution-grounded code generation.""" | |
| from __future__ import annotations | |
| import html | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import textwrap | |
| from collections.abc import Iterator | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from gradio import Server | |
| from fastapi.responses import HTMLResponse, FileResponse | |
| from cohere import ClientV2 | |
| from cohere.core.api_error import ApiError | |
| APP_TITLE = "North Mini Code 1.0 Demo" | |
| CLIENT_NAME = "hf-space-north-mini-code" | |
| MODEL_ID = "north-mini-code-1-0" | |
| MODEL_URL = "https://huggingface.co/CohereLabs/North-Mini-Code-1.0" | |
| OPENCODE_URL = "https://opencode.ai" | |
| DEFAULT_TEMPERATURE = 0.2 | |
| PY_TIMEOUT_S = 12 | |
| PY_MEM_LIMIT_MB = 1024 | |
| MAX_STDIO_CHARS = 16_000 | |
| OUTPUT_PNG = "output.png" | |
| THINKING_BLOCK_RE = re.compile(r"<\s*think\s*>.*?<\s*/\s*think\s*>", re.IGNORECASE | re.DOTALL) | |
| CODE_BLOCK_RE = re.compile(r"```([a-zA-Z0-9_+.#-]*)\s*\n(.*?)```", re.DOTALL) | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| SYSTEM_PROMPT = """You are a coding model in a demo where generated code can be run. | |
| Only respond to coding-related requests: code generation, debugging, code review, | |
| software design, developer tooling, programming concepts, or reasoning about code. | |
| If the user asks for something unrelated to coding, briefly say you can only help | |
| with coding-related requests. | |
| If the user asks a coding question that does not require runnable code, answer it | |
| directly and do not force a code block. | |
| If the user asks you to generate, modify, or fix runnable code, return exactly one | |
| fenced code block and no extra prose. Use a correct language tag: ```python for | |
| Python, or ```html for Web. | |
| For Python, prefer standard library or common packages such as matplotlib. | |
| For Python, do not use network calls, subprocesses, shell commands, or long-running loops. | |
| For Web, return a single self-contained HTML document with any CSS and JavaScript inline. | |
| For Web, make the page fully responsive so it fills the area it is given: set html and body | |
| to margin:0 and 100% width/height, prefer relative sizes (100%, 100vw/100vh, flexbox) over | |
| fixed pixel dimensions, and size any <canvas> to its container and re-fit it on window resize | |
| so nothing is clipped or scrolled. | |
| """ | |
| # Curated starter prompts. Each entry is (chip label, prompt, target language). | |
| EXAMPLE_PROMPTS: list[tuple[str, str, str]] = [ | |
| ( | |
| "🌀 Spiral plot", | |
| "Create a Python script that plots a colorful spiral with matplotlib, prints a short " | |
| "description of what it drew, and does not require any external files.", | |
| "Python", | |
| ), | |
| ( | |
| "📊 Sine waves", | |
| "Plot three sine waves with different frequencies and amplitudes on one matplotlib " | |
| "figure with a legend and grid, and print the equation of each wave.", | |
| "Python", | |
| ), | |
| ( | |
| "✨ Particles", | |
| "Create a self-contained HTML/CSS/JavaScript demo with an animated particle field " | |
| "that reacts to the mouse and includes a small title.", | |
| "Web", | |
| ), | |
| ( | |
| "🖌️ Blackboard", | |
| "Create a self-contained HTML/CSS/JavaScript blackboard drawing app: a dark chalkboard " | |
| "canvas you can draw on with the mouse (and touch), a small palette of chalk colors, an " | |
| "adjustable brush size, and a button to clear the board.", | |
| "Web", | |
| ), | |
| ( | |
| "🎲 Monte Carlo π", | |
| "Estimate π with a Monte Carlo simulation: sample random points in a unit square, count " | |
| "how many fall inside the quarter circle, and print the estimate, the true value, and the " | |
| "error. Plot the sampled points with matplotlib, colored by whether they land inside the " | |
| "circle.", | |
| "Python", | |
| ), | |
| ( | |
| "✅ Todo app", | |
| "Create a self-contained HTML/CSS/JavaScript todo app: add tasks, mark them complete, " | |
| "delete them, filter by all/active/completed, and show a live count of remaining tasks, " | |
| "with a clean, modern, responsive UI.", | |
| "Web", | |
| ), | |
| ] | |
| class PythonExecutionResult: | |
| stdout: str | |
| stderr: str | |
| image_path: str | None | |
| returncode: int | None | |
| timed_out: bool = False | |
| def _disable_parent_proc_inspection() -> None: | |
| """Best-effort hardening against same-UID reads of the parent process env. | |
| The subprocess receives a scrubbed env, but Linux /proc can sometimes expose a | |
| same-user process's environment. Marking the Gradio process non-dumpable helps | |
| prevent generated code from reading `/proc/<parent>/environ`. | |
| """ | |
| if sys.platform != "linux": | |
| return | |
| try: | |
| import ctypes | |
| pr_set_dumpable = 4 | |
| libc = ctypes.CDLL(None) | |
| libc.prctl(pr_set_dumpable, 0, 0, 0, 0) | |
| except Exception: | |
| logger.warning("Could not disable parent /proc inspection", exc_info=True) | |
| def _build_client(api_key: str) -> ClientV2 | None: | |
| if api_key: | |
| return ClientV2(api_key=api_key, client_name=CLIENT_NAME) | |
| else: | |
| logger.warning("COHERE_API_KEY is not set; inference is disabled until configured.") | |
| return None | |
| _raw_api_key = os.getenv("COHERE_API_KEY", "").strip() | |
| API_KEY_CONFIGURED = bool(_raw_api_key) | |
| CLIENT = _build_client(_raw_api_key) | |
| # Do not keep the secret in the parent environment longer than needed. The Cohere | |
| # client has already been constructed, and subprocesses are passed explicit envs. | |
| if _raw_api_key: | |
| os.environ.pop("COHERE_API_KEY", None) | |
| _disable_parent_proc_inspection() | |
| _raw_api_key = "" | |
| def _extract_content_parts(content: object) -> tuple[str, str]: | |
| """Extract visible text and reasoning text from Cohere content shapes.""" | |
| if content is None: | |
| return "", "" | |
| if isinstance(content, str): | |
| return content, "" | |
| if isinstance(content, list): | |
| parts = [_extract_content_parts(block) for block in content] | |
| return "".join(text for text, _ in parts), "".join(thinking for _, thinking in parts) | |
| if isinstance(content, dict): | |
| text = str(content.get("text") or "") | |
| thinking = str(content.get("thinking") or "") | |
| if not text and not thinking and "content" in content: | |
| return _extract_content_parts(content.get("content")) | |
| return text, thinking | |
| text = getattr(content, "text", None) | |
| thinking = getattr(content, "thinking", None) | |
| return (str(text) if text is not None else ""), (str(thinking) if thinking is not None else "") | |
| def _strip_thinking_blocks(text: str) -> str: | |
| return THINKING_BLOCK_RE.sub("", text).strip() | |
| def _format_response(output: str, thinking: str) -> str: | |
| thinking = thinking.strip() | |
| if not thinking: | |
| return output | |
| if not output: | |
| return f"<think>{thinking}</think>" | |
| return f"<think>{thinking}</think>\n\n{output}" | |
| def _no_output_note(finish_reason: str) -> str: | |
| if finish_reason == "MAX_TOKENS": | |
| return "_The model hit its output-token cap before producing visible code._" | |
| if finish_reason == "ERROR": | |
| return "_The model returned an error before producing code. Please try again._" | |
| return f"_The model finished without visible output (finish_reason={finish_reason})._" | |
| def _format_api_error(exc: ApiError) -> str: | |
| body = exc.body | |
| if isinstance(body, dict): | |
| message = body.get("message") or body.get("error") or "" | |
| body_text = str(message) if message else str(body) | |
| else: | |
| body_text = str(body or "").strip() | |
| if exc.status_code == 404: | |
| return f"Model `{MODEL_ID}` was not found on the configured Cohere endpoint." | |
| if exc.status_code in (401, 403): | |
| return "The `COHERE_API_KEY` secret was rejected. Check the Space secret." | |
| if exc.status_code == 429: | |
| return "The Cohere API rate limit was reached. Please wait and try again." | |
| return body_text[:240] or f"HTTP {exc.status_code}" | |
| def call_model(messages: list[dict[str, Any]]) -> Iterator[str]: | |
| """Stream cumulative model text. | |
| All Cohere-specific details are intentionally isolated here: model name, | |
| client method, streaming event shape, and reasoning handling. | |
| """ | |
| if CLIENT is None: | |
| if not API_KEY_CONFIGURED: | |
| yield "This Space needs a `COHERE_API_KEY` secret before it can call Cohere." | |
| else: | |
| yield "Cohere client is not configured." | |
| return | |
| output = "" | |
| thinking_output = "" | |
| finish_reason: str | None = None | |
| event_counts: dict[str, int] = {} | |
| try: | |
| stream = CLIENT.chat_stream( | |
| model=MODEL_ID, | |
| messages=messages, | |
| temperature=DEFAULT_TEMPERATURE, | |
| thinking={"type": "enabled"}, | |
| ) | |
| for event in stream: | |
| event_type = getattr(event, "type", None) or "unknown" | |
| event_counts[event_type] = event_counts.get(event_type, 0) + 1 | |
| delta = getattr(event, "delta", None) | |
| if event_type in ("content-delta", "content-start"): | |
| msg = getattr(delta, "message", None) if delta is not None else None | |
| if msg is None: | |
| continue | |
| text, thinking = _extract_content_parts(getattr(msg, "content", None)) | |
| if thinking: | |
| thinking_output += thinking | |
| yield _format_response(output, thinking_output) | |
| if text: | |
| output += text | |
| yield _format_response(output, thinking_output) | |
| elif event_type == "message-end": | |
| finish_reason = getattr(delta, "finish_reason", None) | |
| if finish_reason is None and isinstance(delta, dict): | |
| finish_reason = delta.get("finish_reason") | |
| logger.info( | |
| "Cohere stream ended: finish_reason=%s, output_len=%d, thinking_len=%d, events=%s", | |
| finish_reason, | |
| len(output), | |
| len(thinking_output), | |
| event_counts, | |
| ) | |
| if not output: | |
| yield _format_response(_no_output_note((finish_reason or "unknown").upper()), thinking_output) | |
| except ApiError as exc: | |
| logger.exception("Cohere API error (status=%s)", exc.status_code) | |
| yield _format_response(f"_Cohere API error_: {_format_api_error(exc)}", thinking_output) | |
| except Exception as exc: | |
| logger.exception("Unexpected error calling Cohere API") | |
| yield _format_response(f"_Unexpected error calling Cohere_: {exc}", thinking_output) | |
| def _chat_history_to_messages(history: list[dict[str, str]]) -> list[dict[str, Any]]: | |
| messages: list[dict[str, Any]] = [{"role": "system", "content": SYSTEM_PROMPT}] | |
| for item in history: | |
| role = item.get("role") | |
| content = str(item.get("content") or "").strip() | |
| if role not in {"user", "assistant"} or not content: | |
| continue | |
| if role == "assistant": | |
| content = _strip_thinking_blocks(content) | |
| messages.append({"role": role, "content": content}) | |
| return messages | |
| def _clip_context(text: str, limit: int = 6_000) -> str: | |
| if len(text) <= limit: | |
| return text | |
| return text[:limit] + f"\n... truncated {len(text) - limit} characters ..." | |
| def _iteration_context(execution_context: dict[str, Any] | None) -> str: | |
| if not execution_context or not execution_context.get("code"): | |
| return "" | |
| code = _clip_context(str(execution_context.get("code") or ""), 8_000) | |
| target = str(execution_context.get("target") or "code") | |
| fence_lang = str(execution_context.get("fence_lang") or target) | |
| status = str(execution_context.get("status") or "") | |
| stdout = _clip_context(str(execution_context.get("stdout") or ""), 2_000) | |
| stderr = _clip_context(str(execution_context.get("stderr") or ""), 2_000) | |
| parts = [ | |
| "Previous generated code and run result are available for iteration.", | |
| f"Previous target: {target}", | |
| f"Previous status: {status}", | |
| f"Previous code:\n```{fence_lang}\n{code}\n```", | |
| ] | |
| if stdout: | |
| parts.append(f"Previous stdout:\n{stdout}") | |
| if stderr: | |
| parts.append(f"Previous stderr / traceback:\n{stderr}") | |
| parts.append("If the user asks to revise, debug, extend, or explain the prior code, use this context.") | |
| return "\n\n".join(parts) | |
| def _targeted_prompt( | |
| prompt: str, | |
| target_language: str, | |
| execution_context: dict[str, Any] | None = None, | |
| ) -> str: | |
| target = "Python" if target_language == "Python" else "Web" | |
| iteration_context = _iteration_context(execution_context) | |
| context_block = f"\n\n{iteration_context}" if iteration_context else "" | |
| if target == "Python": | |
| return ( | |
| "Target: Python. Stay within coding-related requests only. " | |
| "If the user asks a coding question or wants reasoning that does not require running code, " | |
| "answer directly without a fenced block. If they ask to generate, revise, or fix runnable " | |
| "code, return one ```python fenced block only. The code will be executed in a short-lived " | |
| "subprocess." | |
| f"{context_block}\n\n" | |
| f"User request:\n{prompt}" | |
| ) | |
| return ( | |
| "Target: Web. Stay within coding-related requests only. " | |
| "If the user asks a coding question or wants reasoning that does not require running code, " | |
| "answer directly without a fenced block. If they ask to generate, revise, or fix runnable " | |
| "web code, return one ```html fenced block only. The code is rendered inside a sandboxed " | |
| "iframe that fills the preview panel, so design the page to fill that iframe responsively: " | |
| "html/body at margin:0 and 100% width/height, " | |
| "avoid fixed widths larger than the iframe, and resize any <canvas> to its container " | |
| "(including on window resize) so the whole app is visible without horizontal scrolling." | |
| f"{context_block}\n\n" | |
| f"User request:\n{prompt}" | |
| ) | |
| def extract_code(response: str) -> tuple[str, str | None]: | |
| """Return the first fenced code block and its language tag.""" | |
| visible_response = _strip_thinking_blocks(response) | |
| match = CODE_BLOCK_RE.search(visible_response) | |
| if not match: | |
| return "", None | |
| return match.group(2).strip(), (match.group(1).strip().lower() or None) | |
| def _normalize_language(target_language: str | None, fence_lang: str | None) -> str: | |
| if fence_lang in {"python", "py"}: | |
| return "python" | |
| if fence_lang in {"html", "web", "javascript", "js", "css"}: | |
| return "web" | |
| if target_language in {"Python", "Web"}: | |
| return target_language.lower() | |
| return "python" | |
| def _truncate_output(text: str) -> str: | |
| if len(text) <= MAX_STDIO_CHARS: | |
| return text | |
| remaining = len(text) - MAX_STDIO_CHARS | |
| return text[:MAX_STDIO_CHARS] + f"\n\n... truncated {remaining} characters ..." | |
| def _decode_timeout_output(value: str | bytes | None) -> str: | |
| if value is None: | |
| return "" | |
| if isinstance(value, bytes): | |
| return value.decode("utf-8", errors="replace") | |
| return value | |
| def _apply_subprocess_limits() -> None: | |
| """Apply child-only CPU and memory caps before Python user code starts.""" | |
| import resource | |
| mem_bytes = PY_MEM_LIMIT_MB * 1024 * 1024 | |
| resource.setrlimit(resource.RLIMIT_AS, (mem_bytes, mem_bytes)) | |
| resource.setrlimit(resource.RLIMIT_CPU, (PY_TIMEOUT_S, PY_TIMEOUT_S)) | |
| def _python_runner_source() -> str: | |
| return textwrap.dedent( | |
| f""" | |
| import os | |
| import runpy | |
| import sys | |
| import traceback | |
| os.environ.setdefault("MPLBACKEND", "Agg") | |
| exit_code = 0 | |
| try: | |
| runpy.run_path(os.path.join(os.getcwd(), "user_code.py"), run_name="__main__") | |
| except SystemExit as exc: | |
| code = exc.code | |
| exit_code = code if isinstance(code, int) else 1 | |
| except Exception: | |
| traceback.print_exc() | |
| exit_code = 1 | |
| finally: | |
| try: | |
| import matplotlib | |
| matplotlib.use("Agg", force=True) | |
| import matplotlib.pyplot as plt | |
| if plt.get_fignums(): | |
| plt.savefig(os.environ["OUTPUT_PNG"], bbox_inches="tight") | |
| except ModuleNotFoundError as exc: | |
| if exc.name != "matplotlib": | |
| traceback.print_exc() | |
| except Exception: | |
| traceback.print_exc() | |
| raise SystemExit(exit_code) | |
| """ | |
| ).strip() | |
| def run_python(code: str) -> PythonExecutionResult: | |
| """Execute generated Python in a subprocess with baseline containment. | |
| Security boundary for v1: | |
| - Scrubbed env: never pass os.environ, so COHERE_API_KEY is absent. | |
| - Hard timeout: kill code that hangs the Space. | |
| - Memory/CPU caps: reduce the blast radius of runaway code. | |
| Accepted limitation: a standard non-privileged Gradio Space cannot reliably | |
| block network egress or filesystem reads from this subprocess. Full isolation | |
| would require a privileged Docker Space with nsjail/gVisor, or an external | |
| executor such as E2B/Modal. Do not refactor these comments away; they define | |
| the risk boundary of this demo. | |
| """ | |
| with tempfile.TemporaryDirectory(prefix="coding_model_run_") as tmp: | |
| workdir = Path(tmp) | |
| runner_path = workdir / "runner.py" | |
| user_path = workdir / "user_code.py" | |
| image_path = workdir / OUTPUT_PNG | |
| runner_path.write_text(_python_runner_source(), encoding="utf-8") | |
| user_path.write_text(code, encoding="utf-8") | |
| env = { | |
| # SECURITY: explicit scrubbed env. Never pass parent os.environ, which | |
| # may contain COHERE_API_KEY or other HF Space secrets. | |
| "PATH": "/usr/bin:/bin", | |
| "HOME": str(workdir), | |
| "TMPDIR": str(workdir), | |
| "MPLBACKEND": "Agg", | |
| "MPLCONFIGDIR": str(workdir / ".matplotlib"), | |
| "OUTPUT_PNG": str(image_path), | |
| "PYTHONIOENCODING": "utf-8", | |
| "PYTHONNOUSERSITE": "1", | |
| "PYTHONUNBUFFERED": "1", | |
| "LANG": "C.UTF-8", | |
| # Keep numeric libraries from spawning many workers inside the capped process. | |
| "OPENBLAS_NUM_THREADS": "1", | |
| "OMP_NUM_THREADS": "1", | |
| "MKL_NUM_THREADS": "1", | |
| "NUMEXPR_NUM_THREADS": "1", | |
| } | |
| try: | |
| completed = subprocess.run( | |
| [sys.executable, "-I", str(runner_path)], | |
| cwd=workdir, | |
| env=env, | |
| capture_output=True, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| timeout=PY_TIMEOUT_S, | |
| preexec_fn=_apply_subprocess_limits if sys.platform == "linux" else None, | |
| check=False, | |
| ) | |
| stdout = _truncate_output(completed.stdout) | |
| stderr = _truncate_output(completed.stderr) | |
| if completed.returncode and not stderr: | |
| stderr = f"Process exited with status {completed.returncode}." | |
| saved_image: str | None = None | |
| if image_path.exists() and image_path.stat().st_size > 0: | |
| saved = tempfile.NamedTemporaryFile( | |
| prefix="coding_model_plot_", suffix=".png", delete=False | |
| ) | |
| saved.close() | |
| Path(saved.name).write_bytes(image_path.read_bytes()) | |
| saved_image = saved.name | |
| return PythonExecutionResult( | |
| stdout=stdout, | |
| stderr=stderr, | |
| image_path=saved_image, | |
| returncode=completed.returncode, | |
| ) | |
| except subprocess.TimeoutExpired as exc: | |
| stdout = _truncate_output(_decode_timeout_output(exc.stdout)) | |
| stderr = _truncate_output(_decode_timeout_output(exc.stderr)) | |
| timeout_note = f"Timed out after {PY_TIMEOUT_S} seconds; the process was killed." | |
| stderr = f"{stderr}\n{timeout_note}".strip() | |
| return PythonExecutionResult( | |
| stdout=stdout, | |
| stderr=stderr, | |
| image_path=None, | |
| returncode=None, | |
| timed_out=True, | |
| ) | |
| def _web_document(code: str, fence_lang: str | None) -> str: | |
| lang = (fence_lang or "").lower() | |
| if lang in {"javascript", "js"}: | |
| return f"<!doctype html><html><body><script>\n{code}\n</script></body></html>" | |
| if lang == "css": | |
| return f"<!doctype html><html><head><style>\n{code}\n</style></head><body></body></html>" | |
| if re.search(r"<!doctype|<html[\s>]", code, flags=re.IGNORECASE): | |
| return code | |
| return f"<!doctype html><html><head><meta charset='utf-8'></head><body>\n{code}\n</body></html>" | |
| def build_iframe(code: str, fence_lang: str | None = None) -> str: | |
| """Render web code in a sandboxed iframe. | |
| SECURITY: sandbox allows scripts so demos can run, but deliberately omits | |
| allow-same-origin. Without same-origin, generated code cannot share the | |
| parent origin, cookies, or storage. | |
| """ | |
| document = _web_document(code, fence_lang) | |
| srcdoc = html.escape(document, quote=True) | |
| return ( | |
| '<iframe class="web-frame" ' | |
| 'sandbox="allow-scripts" ' | |
| 'allow="fullscreen" ' | |
| "allowfullscreen " | |
| f'srcdoc="{srcdoc}" ' | |
| 'style="width:100%; min-height:680px; border:0; border-radius:14px; ' | |
| 'background:white;"></iframe>' | |
| ) | |
| # ---------- gradio.Server application ---------- | |
| def _run_extracted_code( | |
| code: str, | |
| target: str, | |
| ) -> tuple[str, str, str | None, str, str]: | |
| """Execute code and return (stdout, stderr, image_path, status_text, status_state).""" | |
| if target == "python": | |
| result = run_python(code) | |
| if result.timed_out: | |
| return result.stdout, result.stderr, result.image_path, f"Timed out after {PY_TIMEOUT_S}s", "error" | |
| if result.returncode: | |
| return result.stdout, result.stderr, result.image_path, "Finished with errors", "error" | |
| return result.stdout, result.stderr, result.image_path, "Ran successfully", "success" | |
| return "", "", None, "Preview ready", "success" | |
| def _updated_execution_context( | |
| *, | |
| code: str, | |
| target: str, | |
| fence_lang: str | None, | |
| stdout: str, | |
| stderr: str, | |
| image_path: str | None, | |
| status: str, | |
| download_path: str | None, | |
| ) -> dict[str, Any]: | |
| return { | |
| "code": code, | |
| "target": target, | |
| "fence_lang": fence_lang or target, | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "image_path": image_path, | |
| "status": status, | |
| "download_path": download_path, | |
| } | |
| # In-memory registry of temp files to serve (images, downloads) | |
| _served_files: dict[str, str] = {} | |
| app = Server() | |
| async def homepage(): | |
| html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "index.html") | |
| with open(html_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| # Inject runtime config | |
| config = json.dumps({ | |
| "api_key_configured": API_KEY_CONFIGURED, | |
| "app_title": APP_TITLE, | |
| "model_id": MODEL_ID, | |
| "model_url": MODEL_URL, | |
| "opencode_url": OPENCODE_URL, | |
| "examples": [{"label": label, "prompt": prompt, "target": target} for label, prompt, target in EXAMPLE_PROMPTS], | |
| }) | |
| content = content.replace("__RUNTIME_CONFIG__", config) | |
| return content | |
| async def serve_image(filename: str): | |
| path = _served_files.get(f"img:{filename}") | |
| if path and os.path.exists(path): | |
| return FileResponse(path, media_type="image/png") | |
| return HTMLResponse("Not found", status_code=404) | |
| async def serve_download(filename: str): | |
| path = _served_files.get(f"dl:{filename}") | |
| if path and os.path.exists(path): | |
| return FileResponse(path, filename=filename, media_type="application/octet-stream") | |
| return HTMLResponse("Not found", status_code=404) | |
| def handle_chat(prompt: str, target_language: str, history_json: str, exec_context_json: str) -> str: | |
| """Stream chat responses with code execution. Yields JSON strings.""" | |
| history = json.loads(history_json) if history_json else [] | |
| execution_context = json.loads(exec_context_json) if exec_context_json else {} | |
| prompt = (prompt or "").strip() | |
| if not prompt: | |
| yield json.dumps({"type": "error", "status_text": "Enter a prompt to get started.", "status_state": "info", "history": history, "execution": execution_context}) | |
| return | |
| # Add user message and placeholder assistant message | |
| history = list(history) + [ | |
| {"role": "user", "content": prompt}, | |
| {"role": "assistant", "content": ""}, | |
| ] | |
| yield json.dumps({"type": "status", "status_text": "Thinking…", "status_state": "working", "history": history, "execution": execution_context}) | |
| # Build messages for Cohere — use targeted prompt | |
| cohere_history = list(history[:-1]) # everything except empty assistant | |
| # Replace the last user message with the targeted version | |
| cohere_history[-1] = {"role": "user", "content": _targeted_prompt(prompt, target_language, execution_context)} | |
| messages = _chat_history_to_messages(cohere_history) | |
| final_response = "" | |
| for partial in call_model(messages): | |
| final_response = partial | |
| history[-1]["content"] = partial | |
| yield json.dumps({"type": "streaming", "status_text": "Generating…", "status_state": "working", "history": history, "execution": execution_context}) | |
| if not final_response: | |
| history[-1]["content"] = "The model did not return a response." | |
| yield json.dumps({"type": "error", "status_text": "No model response.", "status_state": "error", "history": history, "execution": execution_context}) | |
| return | |
| code, fence_lang = extract_code(final_response) | |
| target = _normalize_language(target_language, fence_lang) | |
| if not code: | |
| yield json.dumps({"type": "complete", "status_text": "Answered without running code.", "status_state": "info", "history": history, "execution": execution_context}) | |
| return | |
| yield json.dumps({"type": "status", "status_text": "Running…", "status_state": "working", "history": history, "execution": execution_context}) | |
| stdout, stderr, image_path, status_text, status_state = _run_extracted_code(code, target) | |
| # Register image for serving | |
| image_url = None | |
| if image_path: | |
| filename = os.path.basename(image_path) | |
| _served_files[f"img:{filename}"] = image_path | |
| image_url = f"/images/{filename}" | |
| # Register code for download | |
| download_url = None | |
| if code: | |
| ext = "py" if target == "python" else "html" | |
| dl_filename = f"generated.{ext}" | |
| dl_dir = tempfile.mkdtemp(prefix="coding_model_dl_") | |
| dl_path = os.path.join(dl_dir, dl_filename) | |
| Path(dl_path).write_text(code, encoding="utf-8") | |
| _served_files[f"dl:{dl_filename}"] = dl_path | |
| download_url = f"/download/{dl_filename}" | |
| execution_context = { | |
| "code": code, | |
| "target": target, | |
| "fence_lang": fence_lang or target, | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "image_url": image_url, | |
| "image_path": image_path, | |
| "status": status_text, | |
| "language": "python" if target == "python" else "html", | |
| "suggested_tab": "preview" if (image_path or target == "web") else "console", | |
| "download_url": download_url, | |
| } | |
| yield json.dumps({"type": "complete", "status_text": status_text, "status_state": status_state, "history": history, "execution": execution_context}) | |
| app.launch(show_error=True) | |