| import os |
| import re |
| import sys |
| import json |
| import time |
| import subprocess |
| import urllib.request |
| import urllib.error |
| import tempfile |
| import sqlite3 |
|
|
| class OllamaClient: |
| """Helper to query local Ollama instance.""" |
| def __init__(self, host="127.0.0.1", port=11434, model="qwen2.5-coder:3b"): |
| self.url = f"http://{host}:{port}/api/generate" |
| self.model = model |
|
|
| def check_connection(self): |
| try: |
| req = urllib.request.Request(self.url.replace("/api/generate", "/api/tags")) |
| with urllib.request.urlopen(req, timeout=3) as response: |
| data = json.loads(response.read().decode('utf-8')) |
| models = [m["name"] for m in data.get("models", [])] |
| if self.model in models or f"{self.model}:latest" in models or any(self.model in m for m in models): |
| return True, f"Connected to Ollama. Model '{self.model}' is available." |
| return False, f"Ollama is running, but model '{self.model}' was not found. Available models: {', '.join(models)}" |
| except Exception as e: |
| return False, f"Could not connect to Ollama server at {self.url}: {e}" |
|
|
| def generate(self, prompt, system_prompt=None, temperature=0.2): |
| payload = { |
| "model": self.model, |
| "prompt": prompt, |
| "stream": False, |
| "options": { |
| "temperature": temperature, |
| "top_p": 0.9, |
| } |
| } |
| if system_prompt: |
| payload["system"] = system_prompt |
|
|
| try: |
| data = json.dumps(payload).encode('utf-8') |
| req = urllib.request.Request( |
| self.url, |
| data=data, |
| headers={"Content-Type": "application/json"} |
| ) |
| with urllib.request.urlopen(req, timeout=45) as response: |
| res_data = json.loads(response.read().decode('utf-8')) |
| return res_data.get("response", "") |
| except Exception as e: |
| raise RuntimeError(f"Ollama generation failed: {e}") |
|
|
| class SandboxExecutor: |
| """Compiles and executes code in multiple languages in a safe sub-process sandbox.""" |
| |
| @staticmethod |
| def check_compiler(cmd): |
| try: |
| |
| subprocess.run([cmd, "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, timeout=2) |
| return True |
| except Exception: |
| try: |
| subprocess.run([cmd, "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, timeout=2) |
| return True |
| except Exception: |
| return False |
|
|
| @classmethod |
| def get_supported_runtimes(cls): |
| return { |
| "python": True, |
| "javascript": cls.check_compiler("node"), |
| "cpp": cls.check_compiler("g++") or cls.check_compiler("clang++"), |
| "go": cls.check_compiler("go"), |
| "java": cls.check_compiler("javac"), |
| "rust": cls.check_compiler("rustc"), |
| "sql": True, |
| "html": True |
| } |
|
|
| def execute(self, language, code): |
| """Runs the code and returns (exit_code, stdout, stderr, is_simulated).""" |
| language = language.lower().strip() |
| runtimes = self.get_supported_runtimes() |
|
|
| |
| if language == "sql": |
| return self._run_sql(code) |
|
|
| |
| if language in ("html", "html/css", "css"): |
| return self._run_html(code) |
|
|
| |
| compiler_needed = { |
| "javascript": "node", |
| "cpp": "g++", |
| "go": "go run", |
| "java": "javac", |
| "rust": "rustc" |
| } |
| |
| target_compiler = compiler_needed.get(language) |
| if target_compiler and not runtimes.get(language): |
| return self._run_simulated(language, code, f"Compiler/Runtime '{target_compiler}' is not installed or not in PATH.") |
|
|
| |
| with tempfile.TemporaryDirectory(dir="scratch") as tmpdir: |
| if language == "python": |
| filepath = os.path.join(tmpdir, "solution.py") |
| with open(filepath, "w", encoding="utf-8") as f: |
| f.write(code) |
| return self._run_process(["python", filepath]) |
|
|
| elif language == "javascript": |
| filepath = os.path.join(tmpdir, "solution.js") |
| with open(filepath, "w", encoding="utf-8") as f: |
| f.write(code) |
| return self._run_process(["node", filepath]) |
|
|
| elif language == "cpp": |
| source_path = os.path.join(tmpdir, "solution.cpp") |
| exec_path = os.path.join(tmpdir, "solution.exe") |
| with open(source_path, "w", encoding="utf-8") as f: |
| f.write(code) |
| |
| |
| comp_cmd = ["g++", "-std=c++17", source_path, "-o", exec_path] |
| c_res = subprocess.run(comp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, timeout=8) |
| if c_res.returncode != 0: |
| return c_res.returncode, "", c_res.stderr.decode("utf-8", errors="ignore"), False |
| |
| |
| return self._run_process([exec_path]) |
|
|
| elif language == "go": |
| filepath = os.path.join(tmpdir, "solution.go") |
| with open(filepath, "w", encoding="utf-8") as f: |
| f.write(code) |
| return self._run_process(["go", "run", filepath]) |
|
|
| elif language == "java": |
| |
| class_match = re.search(r"public\s+class\s+(\w+)", code) |
| class_name = class_match.group(1) if class_match else "Main" |
| |
| source_path = os.path.join(tmpdir, f"{class_name}.java") |
| with open(source_path, "w", encoding="utf-8") as f: |
| f.write(code) |
| |
| |
| c_res = subprocess.run(["javac", source_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, timeout=8) |
| if c_res.returncode != 0: |
| return c_res.returncode, "", c_res.stderr.decode("utf-8", errors="ignore"), False |
| |
| |
| |
| return self._run_process(["java", "-cp", tmpdir, class_name]) |
|
|
| elif language == "rust": |
| source_path = os.path.join(tmpdir, "solution.rs") |
| exec_path = os.path.join(tmpdir, "solution.exe") |
| with open(source_path, "w", encoding="utf-8") as f: |
| f.write(code) |
| |
| |
| c_res = subprocess.run(["rustc", source_path, "-o", exec_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, timeout=10) |
| if c_res.returncode != 0: |
| return c_res.returncode, "", c_res.stderr.decode("utf-8", errors="ignore"), False |
| |
| |
| return self._run_process([exec_path]) |
|
|
| return -1, "", f"Unsupported language: {language}", False |
|
|
| def _run_process(self, cmd_args): |
| try: |
| res = subprocess.run( |
| cmd_args, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| timeout=5, |
| shell=True |
| ) |
| return ( |
| res.returncode, |
| res.stdout.decode("utf-8", errors="ignore"), |
| res.stderr.decode("utf-8", errors="ignore"), |
| False |
| ) |
| except subprocess.TimeoutExpired: |
| return -1, "", "Execution Timed Out (exceeded 5 seconds limit).", False |
| except Exception as e: |
| return -1, "", f"Failed to execute process: {e}", False |
|
|
| def _run_sql(self, code): |
| """Runs SQL statements on an in-memory SQLite database.""" |
| conn = sqlite3.connect(":memory:") |
| cursor = conn.cursor() |
| stdout = [] |
| stderr = [] |
| exit_code = 0 |
| |
| |
| statements = [s.strip() for s in code.split(";") if s.strip()] |
| for stmt in statements: |
| try: |
| cursor.execute(stmt) |
| rows = cursor.fetchall() |
| if rows: |
| cols = [d[0] for d in cursor.description] |
| stdout.append(f"Query: {stmt}\nResult:\n" + "\t".join(cols) + "\n" + "\n".join(["\t".join(map(str, r)) for r in rows]) + "\n") |
| else: |
| stdout.append(f"Statement executed successfully: {stmt}\nRows affected: {cursor.rowcount}\n") |
| except Exception as e: |
| stderr.append(f"SQL Error on statement '{stmt}': {e}") |
| exit_code = 1 |
| break |
| |
| conn.close() |
| return exit_code, "\n".join(stdout), "\n".join(stderr), False |
|
|
| def _run_html(self, code): |
| """Performs static structure validation on HTML/CSS contents.""" |
| errors = [] |
| |
| |
| required = ["<!DOCTYPE html>", "<html", "</html>", "<body", "</body>"] |
| for tag in required: |
| if tag.startswith("<!") or tag.endswith(">"): |
| if tag.lower() not in code.lower(): |
| errors.append(f"Missing tag: {tag}") |
| else: |
| if tag.lower() not in code.lower(): |
| errors.append(f"Missing open/close tag structure for: {tag}") |
| |
| |
| tags_to_check = ["div", "span", "p", "h1", "h2", "section", "header", "footer"] |
| for t in tags_to_check: |
| opens = len(re.findall(rf"<{t}\b", code, re.IGNORECASE)) |
| closes = len(re.findall(rf"</{t}>", code, re.IGNORECASE)) |
| if opens != closes: |
| errors.append(f"Mismatched tag count for <{t}>: found {opens} open, {closes} close.") |
|
|
| if errors: |
| return 1, "", "HTML/CSS Validation failed:\n" + "\n".join(errors), False |
| return 0, "HTML/CSS document structure validated successfully! No mismatched basic tags found.", "", False |
|
|
| def _run_simulated(self, language, code, reason): |
| """Simulates compilation and validates syntax of target code when compiler is missing.""" |
| |
| errors = [] |
| |
| if language == "javascript": |
| |
| opens = code.count("{") |
| closes = code.count("}") |
| if opens != closes: |
| errors.append(f"Mismatched curly braces: {opens} open vs {closes} closed.") |
| |
| elif language == "go": |
| if "package main" not in code: |
| errors.append("Go script missing 'package main' declaration.") |
| if "func main(" not in code: |
| errors.append("Go script missing 'func main()' entrypoint.") |
| |
| elif language == "cpp": |
| if "#include" not in code: |
| errors.append("C++ file missing preprocessor includes.") |
| if "int main(" not in code: |
| errors.append("C++ file missing 'int main()' function.") |
| |
| for i, line in enumerate(code.split("\n")): |
| line = line.strip() |
| if line and not line.endswith(";") and not line.endswith("{") and not line.endswith("}") and not line.startswith("#") and not line.startswith("//"): |
| if "using namespace" in line or "return " in line or "cout " in line or "cin " in line: |
| errors.append(f"Line {i+1}: Missing semicolon at end of statement: '{line}'") |
|
|
| if errors: |
| return 1, "", f"Simulated Check Fail ({reason}):\n" + "\n".join(errors), True |
| return 0, f"Simulated check passed ({reason}). Syntax looks structurally valid.", "", True |
|
|
| class AutonomousCodingAgent: |
| """Manages the autonomous coding loop connecting CAT V3 concept plan to Ollama self-correction.""" |
| def __init__(self, ollama_client: OllamaClient, sandbox: SandboxExecutor): |
| self.client = ollama_client |
| self.sandbox = sandbox |
|
|
| def extract_code(self, response_text): |
| """Extracts code block between ```language and ```.""" |
| match = re.search(r"```(?:\w+)?\n(.*?)\n```", response_text, re.DOTALL) |
| if match: |
| return match.group(1).strip() |
| |
| match2 = re.search(r"```\n(.*?)\n```", response_text, re.DOTALL) |
| if match2: |
| return match2.group(1).strip() |
| |
| return response_text.strip() |
|
|
| def run_agent_loop(self, task, language, concept_path, max_iterations=4, callback=None): |
| """Runs the self-correcting agent loop. |
| |
| Args: |
| task: User's coding prompt. |
| language: Target programming language. |
| concept_path: List of concepts from CAT V3. |
| max_iterations: Maximum debug retries. |
| callback: Function to stream iterations back to GUI. |
| """ |
| path_str = " -> ".join(concept_path) |
| system_prompt = ( |
| "You are a world-class autonomous software engineer. Your task is to write highly optimized, " |
| "clean, fully functional, and self-contained executable code that solves the user's problem. " |
| "Use ONLY the specified programming language. Follow clean coding standards: use descriptive " |
| "variable names, write modular functions, include comments explaining complexities (Big-O), " |
| "and use idiomatic structures (e.g., list comprehensions or collections.Counter in Python). " |
| "CRITICAL: Never write interactive input queries (e.g., input() in Python, prompt() in JS, cin in C++) " |
| "as the code runs inside a non-interactive subprocess sandbox. Always include a main/test block with " |
| "representative mock input data and print the execution output. " |
| "Always wrap your code in a single markdown block (e.g. ```python\n# code here\n```)." |
| ) |
|
|
| history = [] |
| code = "" |
| success = False |
| iteration = 0 |
|
|
| |
| os.makedirs("scratch", exist_ok=True) |
|
|
| while iteration < max_iterations and not success: |
| iteration += 1 |
| if callback: |
| callback({"type": "status", "iteration": iteration, "message": f"Starting iteration {iteration}..."}) |
|
|
| if iteration == 1: |
| prompt = ( |
| f"Write a complete, optimized program in {language.capitalize()} for the following task:\n" |
| f"Task: {task}\n\n" |
| f"To design your solution, you must strictly follow this logical reasoning path:\n" |
| f"Concept Plan: {path_str}\n\n" |
| f"Requirements:\n" |
| f"- Write clean, modular, and self-contained code.\n" |
| f"- Handle potential errors/exceptions gracefully.\n" |
| f"- Do NOT use any blocking interactive input functions (no input(), prompt(), cin, etc.).\n" |
| f"- Hardcode a main or test block with sample input data to run and display the results.\n" |
| f"Provide only the markdown code block. Make it compile and run-ready." |
| ) |
| else: |
| prompt = ( |
| f"The code you generated in the previous iteration failed to run or compile successfully.\n" |
| f"Task: {task}\n" |
| f"Language: {language.capitalize()}\n" |
| f"Concept Plan: {path_str}\n\n" |
| f"Here was the code you wrote:\n" |
| f"```\n{code}\n```\n\n" |
| f"It resulted in the following execution error/failure:\n" |
| f"```\n{err_msg}\n```\n\n" |
| f"Identify the bug, explain your correction plan, and output the fully corrected, compile-ready code in a new code block." |
| ) |
|
|
| if callback: |
| callback({"type": "thought", "iteration": iteration, "message": "Invoking Ollama model to generate code..."}) |
|
|
| try: |
| response = self.client.generate(prompt, system_prompt=system_prompt) |
| except Exception as e: |
| err_str = f"Ollama query failed: {e}" |
| if callback: |
| callback({"type": "error", "iteration": iteration, "message": err_str}) |
| return {"success": False, "iterations": iteration, "error": err_str} |
|
|
| code = self.extract_code(response) |
| |
| if callback: |
| callback({"type": "code", "iteration": iteration, "code": code, "explanation": response.replace(code, "").strip()}) |
|
|
| if callback: |
| callback({"type": "thought", "iteration": iteration, "message": "Executing code in sandbox..."}) |
|
|
| |
| exit_code, stdout, stderr, is_simulated = self.sandbox.execute(language, code) |
|
|
| if callback: |
| callback({ |
| "type": "execution", |
| "iteration": iteration, |
| "exit_code": exit_code, |
| "stdout": stdout, |
| "stderr": stderr, |
| "is_simulated": is_simulated |
| }) |
|
|
| if exit_code == 0: |
| success = True |
| if callback: |
| callback({"type": "status", "iteration": iteration, "message": "Code executed successfully! Loop terminated."}) |
| else: |
| err_msg = stderr if stderr.strip() else f"Process exited with non-zero code: {exit_code}\nOutput: {stdout}" |
| if callback: |
| callback({"type": "status", "iteration": iteration, "message": "Execution failed. Bug detected, retrying..."}) |
|
|
| return { |
| "success": success, |
| "iterations": iteration, |
| "code": code, |
| "exit_code": exit_code if 'exit_code' in locals() else -1, |
| "stdout": stdout if 'stdout' in locals() else "", |
| "stderr": stderr if 'stderr' in locals() else "No execution run.", |
| "is_simulated": is_simulated if 'is_simulated' in locals() else False |
| } |
|
|