Spaces:
Running
Running
| # nodes.py β All 13 nodes for Autonomous Python Coding Agent | |
| import os | |
| import ast | |
| import subprocess | |
| import re | |
| import hashlib | |
| import importlib.util | |
| from langchain_groq import ChatGroq | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| import chromadb | |
| from state import State | |
| # ββ LLM ββββββββββββββββββββββββββββββββββ | |
| llm = ChatGroq(model="llama-3.1-8b-instant", temperature=0) | |
| # ββ CHROMADB βββββββββββββββββββββββββββββ | |
| chroma_client = chromadb.Client() | |
| memory_collection = chroma_client.get_or_create_collection("bug_fixes") | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 1 β PLANNER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def planner(state: State): | |
| print("\nπ Planner thinking...") | |
| response = llm.invoke([ | |
| SystemMessage(content="You are a coding planner. Break tasks into clear steps."), | |
| HumanMessage(content=f""" | |
| Break this coding task into clear steps: | |
| Task: {state['task']} | |
| Reply with: | |
| 1. What the function should do | |
| 2. Input and output format | |
| 3. Edge cases to handle | |
| 4. Test cases to verify | |
| """) | |
| ]) | |
| print("Plan ready") | |
| return {"plan": response.content} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 2 β CODER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def coder(state: State): | |
| print("\nπ» Coder writing code...") | |
| past_fixes = "" | |
| if state["error"]: | |
| try: | |
| results = memory_collection.query(query_texts=[state["error"]], n_results=2) | |
| if results["documents"][0]: | |
| past_fixes = "\n".join(results["documents"][0]) | |
| print("π§ Found past fixes in memory!") | |
| except Exception: | |
| pass | |
| response = llm.invoke([ | |
| SystemMessage(content="""You are an expert Python developer. | |
| Write clean working Python code WITH type hints on every function. | |
| Return ONLY the code β no explanation, no markdown, no backticks."""), | |
| HumanMessage(content=f""" | |
| Task: {state['task']} | |
| Plan to follow: | |
| {state['plan']} | |
| Previous error (fix this): | |
| {state['error'] if state['error'] else 'No errors yet β write fresh code'} | |
| Reflection notes: | |
| {state.get('reflection_notes', '') or 'None'} | |
| Past fixes from memory: | |
| {past_fixes if past_fixes else 'No past fixes available'} | |
| Rules: | |
| - Type hints on ALL functions | |
| - Docstring on every function | |
| - Keep it simple and readable | |
| - MUST include demo calls inside: if __name__ == '__main__': that print results | |
| Write complete working Python code only: | |
| """) | |
| ]) | |
| code = response.content | |
| code = re.sub(r"```python", "", code) | |
| code = re.sub(r"```", "", code) | |
| code = code.strip() | |
| print(f"Code written ({len(code.splitlines())} lines)") | |
| return {"code": code, "error": "", "fixed_code": "", "reflection_notes": ""} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 3 β AST VALIDATOR | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| import ast | |
| import importlib.util | |
| from state import State | |
| def ast_validator(state: State): | |
| code = state["code"] | |
| try: | |
| tree = ast.parse(code) | |
| except SyntaxError as e: | |
| return { | |
| "ast_valid": False, | |
| "error": f"SyntaxError: {e}", | |
| "feedback": f"Fix syntax error: {e}" | |
| } | |
| hallucinated_imports = [] | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| base = alias.name.split(".")[0] | |
| if importlib.util.find_spec(base) is None: | |
| hallucinated_imports.append(base) | |
| elif isinstance(node, ast.ImportFrom): | |
| if node.module: | |
| base = node.module.split(".")[0] | |
| if importlib.util.find_spec(base) is None: | |
| hallucinated_imports.append(base) | |
| missing_hints = [ | |
| fn.name | |
| for fn in ast.walk(tree) | |
| if isinstance(fn, ast.FunctionDef) | |
| and fn.returns is None | |
| ] | |
| feedback = [] | |
| if hallucinated_imports: | |
| feedback.append( | |
| f"Unknown imports detected: {list(set(hallucinated_imports))}" | |
| ) | |
| if missing_hints: | |
| feedback.append( | |
| f"Missing return type hints: {missing_hints}" | |
| ) | |
| # FAIL validation if any issue exists | |
| if feedback: | |
| return { | |
| "ast_valid": False, | |
| "error": "\n".join(feedback), | |
| "feedback": "\n".join(feedback) | |
| } | |
| return { | |
| "ast_valid": True, | |
| "error": "", | |
| "feedback": "" | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 4 β TEST GENERATOR | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def test_generator(state: State): | |
| print("\n𧬠Test Generator creating tests...") | |
| code = state["fixed_code"] if state["fixed_code"] else state["code"] | |
| response = llm.invoke([ | |
| SystemMessage(content="""You are a Python testing expert. | |
| Return ONLY runnable Python test code β no markdown, no backticks. | |
| DO NOT use 'unittest', 'pytest', or 'sys'."""), | |
| HumanMessage(content=f""" | |
| Generate test cases for this code: | |
| TASK: {state['task']} | |
| CODE: | |
| {code} | |
| Rules: | |
| - Copy ALL function definitions inline. | |
| - Use ONLY simple 'assert' statements for validation. | |
| - Do NOT use 'unittest' or 'sys'. | |
| - If a test fails, let the script raise an AssertionError. | |
| - Print "All tests passed!" at the end if successful. | |
| - Wrap all test calls in a 'try...except' block to print the error before exiting. | |
| Return ONLY runnable Python code: | |
| """) | |
| ]) | |
| tests = response.content | |
| # ... (keep existing regex cleaning) | |
| tests = re.sub(r"```python", "", tests) | |
| tests = re.sub(r"```", "", tests) | |
| tests = tests.strip() | |
| print(f"Generated {tests.count('def test_')} test functions") | |
| return {"generated_tests": tests} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 5 β TESTER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def tester(state: State): | |
| print("\nπ§ͺ Tester running code...") | |
| code = state["fixed_code"] if state["fixed_code"] else state["code"] | |
| try: | |
| result = subprocess.run( | |
| ["python", "-c", code], | |
| capture_output=True, text=True, timeout=10 | |
| ) | |
| if result.returncode == 0: | |
| if not result.stdout.strip(): | |
| print("β No output produced") | |
| return { | |
| "test_result": "", | |
| "error": "Code ran but produced no output. Add print statements in if __name__ == '__main__'.", | |
| "passed": False | |
| } | |
| print("β Code passed!") | |
| test_output = "" | |
| if state.get("generated_tests"): | |
| try: | |
| test_run = subprocess.run( | |
| ["python", "-c", state["generated_tests"]], | |
| capture_output=True, text=True, timeout=15 | |
| ) | |
| if test_run.returncode == 0: | |
| test_output = "β All generated tests passed\n" + test_run.stdout | |
| else: | |
| test_output = f"β οΈ Some tests failed:\n{test_run.stderr[:200]}" | |
| except Exception as e: | |
| test_output = f"Test run error: {e}" | |
| return { | |
| "test_result": result.stdout + "\n" + test_output, | |
| "error": "", | |
| "passed": True, | |
| "fixed_code": "" | |
| } | |
| else: | |
| print(f"β Failed: {result.stderr[:80]}") | |
| return {"test_result": "", "error": result.stderr, "passed": False} | |
| except subprocess.TimeoutExpired: | |
| return {"test_result": "", "error": "Timed out after 10 seconds", "passed": False} | |
| except Exception as e: | |
| return {"test_result": "", "error": str(e), "passed": False} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 6 β HYPOTHESIS TESTER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def hypothesis_tester(state: State): | |
| print("\nπ² Hypothesis property-based testing...") | |
| code = state["fixed_code"] if state["fixed_code"] else state["code"] | |
| hypothesis_result = "Skipped" | |
| try: | |
| response = llm.invoke([ | |
| SystemMessage(content="""You are a Hypothesis testing expert. | |
| Return ONLY runnable Python code β no markdown, no backticks."""), | |
| HumanMessage(content=f""" | |
| Write Hypothesis property tests for this code: | |
| TASK: {state['task']} | |
| CODE: | |
| {code} | |
| Rules: | |
| - Copy function definitions inline | |
| - Use: from hypothesis import given, settings, strategies as st | |
| - DO NOT use unittest or sys anywhere | |
| - Call test functions directly at the bottom | |
| - Keep to 2 simple property tests only | |
| Return ONLY complete runnable Python code: | |
| """) | |
| ]) | |
| hyp_code = response.content | |
| hyp_code = re.sub(r"```python", "", hyp_code) | |
| hyp_code = re.sub(r"```", "", hyp_code) | |
| hyp_code = hyp_code.strip() | |
| result = subprocess.run( | |
| ["python", "-c", hyp_code], | |
| capture_output=True, text=True, timeout=30 | |
| ) | |
| if result.returncode == 0: | |
| print("β Hypothesis passed!") | |
| hypothesis_result = "β Property-based tests passed with random inputs" | |
| else: | |
| err = result.stderr[:200] | |
| print(f"β οΈ Hypothesis edge case: {err[:80]}") | |
| hypothesis_result = f"β οΈ Edge case found: {err}" | |
| except subprocess.TimeoutExpired: | |
| hypothesis_result = "β οΈ Timed out β possible infinite loop on edge input" | |
| except Exception as e: | |
| hypothesis_result = f"β οΈ Error: {str(e)[:100]}" | |
| return {"hypothesis_result": hypothesis_result} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 7 β PERFORMANCE BENCHMARKER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def performance_benchmarker(state: State): | |
| print("\nβ‘ Benchmarking performance...") | |
| code = state["fixed_code"] if state["fixed_code"] else state["code"] | |
| clean_code = code.replace("'", "") | |
| benchmark_code = ( | |
| code + "\n\n" | |
| "import timeit as _t, ast as _a\n" | |
| "_tree = _a.parse('''" + clean_code + "''')\n" | |
| "_fns = [n.name for n in _a.walk(_tree) " | |
| "if isinstance(n, _a.FunctionDef) and not n.name.startswith('_')]\n" | |
| "if _fns:\n" | |
| " _f = _fns[0]\n" | |
| " _ran = False\n" | |
| " for _call in [_f+'(100)', _f+'(\"hello\")', _f+'([1,2,3,4,5])', _f+'(\"racecar\")', _f+'(10)']:\n" | |
| " try:\n" | |
| " _ms = _t.timeit(_call, globals=globals(), number=1000)*1000\n" | |
| " print('BENCHMARK:'+str(round(_ms,2))+'ms')\n" | |
| " _ran = True\n" | |
| " break\n" | |
| " except: continue\n" | |
| " if not _ran: print('BENCHMARK:skipped')\n" | |
| "else: print('BENCHMARK:skipped')\n" | |
| ) | |
| try: | |
| result = subprocess.run( | |
| ["python", "-c", benchmark_code], | |
| capture_output=True, text=True, timeout=20 | |
| ) | |
| output = result.stdout + result.stderr | |
| match = re.search(r"BENCHMARK:([\d.]+)ms", output) | |
| if match: | |
| ms = float(match.group(1)) | |
| print(f"β‘ {ms:.2f}ms per 1000 runs") | |
| if ms > 5000: | |
| return { | |
| "benchmark_ms": ms, | |
| "error": f"Too slow: {ms:.0f}ms. Optimize algorithm.", | |
| "passed": False | |
| } | |
| return {"benchmark_ms": ms} | |
| return {"benchmark_ms": 0.0} | |
| except Exception as e: | |
| print(f"β οΈ Benchmark error: {e}") | |
| return {"benchmark_ms": 0.0} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 8 β DEBUGGER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def debugger(state: State): | |
| print(f"\nπ§ Debugger fixing (attempt {state['retries']+1})...") | |
| response = llm.invoke([ | |
| SystemMessage(content="""You are a Python debugger. | |
| Fix the exact error. Return ONLY fixed code β no markdown, no backticks."""), | |
| HumanMessage(content=f""" | |
| CODE: | |
| {state['code']} | |
| ERROR: | |
| {state['error']} | |
| Return complete fixed Python code only: | |
| """) | |
| ]) | |
| fixed = response.content | |
| fixed = re.sub(r"```python", "", fixed) | |
| fixed = re.sub(r"```", "", fixed) | |
| fixed = fixed.strip() | |
| try: | |
| stable_id = hashlib.md5(state["error"].encode()).hexdigest()[:8] | |
| memory_collection.add( | |
| documents=[f"BUG: {state['error']}\nFIX: {fixed}"], | |
| ids=[f"fix_{state['retries']}_{stable_id}"] | |
| ) | |
| print("π§ Stored in memory!") | |
| except Exception: | |
| pass | |
| return {"fixed_code": fixed, "retries": state["retries"] + 1} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 9 β SECURITY AUDITOR | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def security_auditor(state: State): | |
| print("\nπ Security check...") | |
| code = state["final_code"] if state["final_code"] else state["code"] | |
| dangerous = [ | |
| ("eval(", "Code execution via eval"), | |
| ("exec(", "Code execution via exec"), | |
| ("os.system(", "Shell injection risk"), | |
| ("__import__(", "Dynamic import risk"), | |
| ("pickle.loads(","Deserialization attack"), | |
| ("password =", "Hardcoded credential"), | |
| ("api_key =", "Hardcoded API key"), | |
| ] | |
| found = [reason for pattern, reason in dangerous if pattern.lower() in code.lower()] | |
| if found: | |
| print(f"β Security issues: {found}") | |
| return { | |
| "is_secure": False, | |
| "error": f"Security issues: {found}", | |
| "security_retries": state["security_retries"] + 1 | |
| } | |
| print("β Security passed!") | |
| return {"is_secure": True} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 10 β COMPLEXITY JUDGE | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def complexity_judge(state: State): | |
| print("\nπ Complexity check...") | |
| code = state["final_code"] if state["final_code"] else state["code"] | |
| lines = code.split("\n") | |
| issues = [] | |
| if len(lines) > 60: | |
| issues.append(f"Too long: {len(lines)} lines") | |
| max_indent = max( | |
| (len(l) - len(l.lstrip()) for l in lines if l.strip()), default=0 | |
| ) | |
| if max_indent > 16: | |
| issues.append("Too deeply nested") | |
| try: | |
| response = llm.invoke([ | |
| HumanMessage(f"Rate complexity 1-10:\n{code}\nReply ONLY a number 1-10.") | |
| ]) | |
| score = int(re.search(r'\d+', response.content.strip()).group()) | |
| except Exception: | |
| score = 5 | |
| print(f"Complexity: {score}/10") | |
| if score > 7 or issues: | |
| print(f"β Too complex: {issues}") | |
| return { | |
| "is_simple": False, | |
| "error": f"Too complex (score {score}/10). Simplify.", | |
| "complexity_retries": state["complexity_retries"] + 1 | |
| } | |
| print("β Complexity passed!") | |
| return {"is_simple": True} | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 11 β SELF REFLECTION | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def self_reflection(state: State): | |
| print("\nπͺ Self Reflection...") | |
| code = state["final_code"] if state["final_code"] else state["code"] | |
| response = llm.invoke([ | |
| SystemMessage(content="""You are a senior Python engineer. | |
| Reply in EXACTLY this format: | |
| CONFIDENCE: <1-10> | |
| APPROVED: <YES or NO> | |
| ISSUES: <list or NONE> | |
| NOTES: <one sentence>"""), | |
| HumanMessage(content=f"Review this code:\nTASK: {state['task']}\nCODE:\n{code}") | |
| ]) | |
| reflection = response.content.strip() | |
| lines_map = {} | |
| for line in reflection.splitlines(): | |
| if ":" in line: | |
| key, _, val = line.partition(":") | |
| lines_map[key.strip().upper()] = val.strip() | |
| try: | |
| confidence = int(re.search(r'\d+', lines_map.get("CONFIDENCE", "7")).group()) | |
| except Exception: | |
| confidence = 7 | |
| try: | |
| approved = "YES" in lines_map.get("APPROVED", "YES").upper() | |
| except Exception: | |
| approved = True | |
| issues_text = lines_map.get("ISSUES", "NONE") | |
| notes = lines_map.get("NOTES", "Looks good") | |
| has_issues = issues_text.upper() not in ("NONE", "") and bool(issues_text.strip()) | |
| if not approved or (has_issues and confidence < 7): | |
| print(f"β Reflection: confidence {confidence}/10") | |
| return { | |
| "reflection_ok": False, | |
| "reflection_notes": f"Issues: {issues_text}. {notes}", | |
| "confidence_score": confidence, | |
| "error": f"Reflection failed ({confidence}/10): {issues_text}" | |
| } | |
| print(f"β Reflection approved ({confidence}/10)") | |
| return { | |
| "reflection_ok": True, | |
| "reflection_notes": notes, | |
| "confidence_score": confidence | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 12 β REVIEWER | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def reviewer(state: State): | |
| print("\n⨠Reviewer polishing + explaining...") | |
| code = state["fixed_code"] if state["fixed_code"] else state["code"] | |
| response = llm.invoke([ | |
| SystemMessage(content="""You are a senior Python developer and teacher. | |
| Do TWO things and return in EXACTLY this format: | |
| FINAL_CODE: | |
| <complete polished code with docstrings and type hints> | |
| EXPLANATION: | |
| <simple explanation covering: what it does, how it works, time complexity, example usage> | |
| """), | |
| HumanMessage(content=f"Polish this code and explain it:\n{code}") | |
| ]) | |
| content = response.content | |
| final_code = "" | |
| explanation= "" | |
| if "FINAL_CODE:" in content and "EXPLANATION:" in content: | |
| parts = content.split("EXPLANATION:") | |
| code_part = parts[0].replace("FINAL_CODE:", "").strip() | |
| code_part = re.sub(r"```python", "", code_part) | |
| code_part = re.sub(r"```", "", code_part) | |
| final_code = code_part.strip() | |
| explanation = parts[1].strip() | |
| else: | |
| final_code = code | |
| explanation = content.strip() | |
| if not explanation: | |
| explanation = "Code completed successfully. See final code above." | |
| return { | |
| "final_code": final_code, | |
| "explanation": explanation, | |
| "review": "Polished and explained" | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| # NODE 13 β EXPLAINER (passthrough) | |
| # βββββββββββββββββββββββββββββββββββββββββ | |
| def explainer(state: State): | |
| explanation = state.get("explanation") | |
| if not explanation: | |
| return {"explanation": "Code completed successfully. See final code above."} | |
| # LangGraph requires a state update. | |
| # Re-writing the existing explanation satisfies this rule. | |
| return {"explanation": explanation} | |