Spaces:
Sleeping
Sleeping
| """ | |
| Prebuilt Code Agent β writes, reviews, and optionally executes code. | |
| Graph: START β write_code β review_code β [fix | execute] β END | |
| """ | |
| from __future__ import annotations | |
| import os, sys, io, json, textwrap, traceback | |
| from typing import TypedDict | |
| # ββ State βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CodeState(TypedDict): | |
| task: str | |
| language: str | |
| code: str | |
| review: str | |
| execution_out: str | |
| needs_fix: bool | |
| execute: bool | |
| attempts: int | |
| # ββ Nodes βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _write_node(state: CodeState, llm): | |
| from langchain_core.messages import HumanMessage | |
| lang = state["language"] | |
| prompt = ( | |
| f"You are an expert {lang} programmer. Write clean, well-commented, production-quality code " | |
| f"that solves the following task.\n\n" | |
| f"Task: {state['task']}\n\n" | |
| f"Respond with ONLY the code block, no explanations outside the code. " | |
| f"Use comments inside the code to explain logic." | |
| ) | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| code = response.content.strip() | |
| # strip markdown fences if present | |
| if code.startswith("```"): | |
| lines = code.splitlines() | |
| code = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) | |
| return {"code": code, "attempts": state.get("attempts", 0) + 1} | |
| def _review_node(state: CodeState, llm): | |
| from langchain_core.messages import HumanMessage | |
| prompt = ( | |
| f"Review this {state['language']} code for correctness, bugs, and best practices.\n\n" | |
| f"```{state['language'].lower()}\n{state['code']}\n```\n\n" | |
| f"Respond with JSON only:\n" | |
| f'{{ "verdict": "pass" or "fix", "issues": ["list of issues"], "suggestion": "brief fix advice" }}' | |
| ) | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| try: | |
| raw = response.content.strip().lstrip("```json").rstrip("```").strip() | |
| review = json.loads(raw) | |
| except Exception: | |
| review = {"verdict": "pass", "issues": [], "suggestion": ""} | |
| needs_fix = review.get("verdict") == "fix" and state.get("attempts", 0) < 2 | |
| return { | |
| "review": json.dumps(review, indent=2), | |
| "needs_fix": needs_fix, | |
| } | |
| def _fix_node(state: CodeState, llm): | |
| from langchain_core.messages import HumanMessage | |
| review = json.loads(state["review"]) | |
| prompt = ( | |
| f"Fix the following {state['language']} code based on the review.\n\n" | |
| f"Original code:\n```\n{state['code']}\n```\n\n" | |
| f"Issues found: {review.get('issues', [])}\n" | |
| f"Suggestion: {review.get('suggestion', '')}\n\n" | |
| f"Respond with ONLY the corrected code." | |
| ) | |
| response = llm.invoke([HumanMessage(content=prompt)]) | |
| code = response.content.strip() | |
| if code.startswith("```"): | |
| lines = code.splitlines() | |
| code = "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) | |
| return {"code": code, "attempts": state["attempts"] + 1} | |
| def _execute_node(state: CodeState): | |
| """Safely execute Python code in a restricted environment.""" | |
| if not state.get("execute") or state["language"] != "Python": | |
| return {"execution_out": "[Execution skipped β only Python execution is supported]"} | |
| code = state["code"] | |
| stdout = io.StringIO() | |
| stderr = io.StringIO() | |
| # Restrict builtins to safe subset | |
| safe_builtins = { | |
| "print": print, "len": len, "range": range, "enumerate": enumerate, | |
| "zip": zip, "map": map, "filter": filter, "sorted": sorted, | |
| "reversed": reversed, "list": list, "dict": dict, "set": set, | |
| "tuple": tuple, "str": str, "int": int, "float": float, "bool": bool, | |
| "abs": abs, "max": max, "min": min, "sum": sum, "round": round, | |
| "isinstance": isinstance, "type": type, "repr": repr, | |
| "__import__": __import__, # allow safe imports like math, json | |
| } | |
| try: | |
| import contextlib | |
| with contextlib.redirect_stdout(stdout), contextlib.redirect_stderr(stderr): | |
| exec(textwrap.dedent(code), {"__builtins__": safe_builtins}) # noqa: S102 | |
| out = stdout.getvalue() | |
| err = stderr.getvalue() | |
| result = out or "(No output)" | |
| if err: | |
| result += f"\n[stderr]: {err}" | |
| except Exception as exc: | |
| result = f"Execution error: {exc}\n{traceback.format_exc()}" | |
| return {"execution_out": result} | |
| def _route_after_review(state: CodeState): | |
| if state["needs_fix"]: | |
| return "fix" | |
| return "execute" | |
| # ββ Main runner βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_code_agent(api_key: str, task: str, language: str, execute: bool): | |
| if not api_key.strip(): | |
| return "# β οΈ Please enter your OpenAI API key.", "Missing API key." | |
| if not task.strip(): | |
| return "# β οΈ Please enter a coding task.", "Missing task." | |
| try: | |
| from langchain_openai import ChatOpenAI | |
| from langgraph.graph import StateGraph, END | |
| os.environ["OPENAI_API_KEY"] = api_key.strip() | |
| llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1) | |
| graph = StateGraph(CodeState) | |
| graph.add_node("write", lambda s: _write_node(s, llm)) | |
| graph.add_node("review", lambda s: _review_node(s, llm)) | |
| graph.add_node("fix", lambda s: _fix_node(s, llm)) | |
| graph.add_node("execute", _execute_node) | |
| graph.set_entry_point("write") | |
| graph.add_edge("write", "review") | |
| graph.add_conditional_edges( | |
| "review", | |
| _route_after_review, | |
| {"fix": "fix", "execute": "execute"} | |
| ) | |
| graph.add_edge("fix", "review") | |
| graph.add_edge("execute", END) | |
| app = graph.compile() | |
| initial: CodeState = { | |
| "task": task, | |
| "language": language, | |
| "code": "", | |
| "review": "", | |
| "execution_out": "", | |
| "needs_fix": False, | |
| "execute": execute, | |
| "attempts": 0, | |
| } | |
| final = app.invoke(initial) | |
| code = final.get("code", "# No code generated") | |
| exec_r = final.get("execution_out", "") | |
| review = final.get("review", "") | |
| result_text = "" | |
| if exec_r: | |
| result_text += f"=== Execution Output ===\n{exec_r}\n\n" | |
| if review: | |
| result_text += f"=== Code Review ===\n{review}" | |
| return code, result_text or "No execution output." | |
| except Exception as exc: | |
| return f"# β Error\n# {exc}", traceback.format_exc() | |