import sys import subprocess import os import re import json import uuid import time import contextlib import textwrap import redis import gradio as gr from io import StringIO from threading import Thread from pathlib import Path # 🔥 import your real agent from pygmyclaw import PygmyClaw # ---------------- INIT ---------------- agent = PygmyClaw() REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379)) r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) QUEUE = "pg:queue" JOB_LIST = "pg:jobs" JOB_KEY = "pg:job:" # ---------------- PROMPT ---------------- SYSTEM_PROMPT = """ You are a Python coding agent. Rules: - Always return COMPLETE runnable Python code. - NEVER use placeholders like YourModel, your_data, TODO, pass-only examples. - ALWAYS include every import required by the code. - If using ML/PyTorch, define model, dummy data, loss, optimizer, forward pass, backward pass, and update step. - Code must run as-is. - Use print() to show useful output. - Do not use unavailable files, internet URLs, APIs, secrets, or local paths. - Prefer small CPU-friendly examples. Return: 1. Short explanation 2. One Python code block using ```python """ # ---------------- HELPERS ---------------- def clean_code(code): if not code: return "" code = re.sub(r"```python", "", code, flags=re.I) code = re.sub(r"```", "", code) code = textwrap.dedent(code).strip() return code def extract_code(text): blocks = re.findall(r"```(?:python)?\s*(.*?)```", text, re.S | re.I) if blocks: return "\n\n".join(clean_code(b) for b in blocks) return "" def extract_explanation(text): return re.sub(r"```.*?```", "", text, flags=re.S).strip() def extract_imports(code): packages = set() import_lines = re.findall( r"^\s*(?:import\s+([a-zA-Z_][\w]*)|from\s+([a-zA-Z_][\w]*)\s+import)", code, flags=re.M, ) for imp1, imp2 in import_lines: module = imp1 or imp2 if module: packages.add(module) return packages def package_name(module): mapping = { "PIL": "pillow", "cv2": "opencv-python", "sklearn": "scikit-learn", "yaml": "pyyaml", "bs4": "beautifulsoup4", "np": "numpy", "pd": "pandas", } return mapping.get(module, module) def install_packages_for_code(code): stdlib = { "os", "sys", "re", "json", "math", "time", "datetime", "random", "pathlib", "collections", "itertools", "functools", "statistics", "typing", "subprocess", "contextlib", "io", "uuid", "textwrap", "queue", "threading", "asyncio" } installed = [] failed = [] for module in sorted(extract_imports(code)): root = module.split(".")[0] if root in stdlib: continue try: __import__(root) continue except ImportError: pass pkg = package_name(root) try: subprocess.check_call( [sys.executable, "-m", "pip", "install", pkg], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) installed.append(pkg) except Exception: failed.append(pkg) return installed, failed def auto_fix_code(code): fixes = [] if "datetime.now()" in code and "from datetime import datetime" not in code and "import datetime" not in code: code = "from datetime import datetime\n" + code fixes.append("datetime") if "np." in code and "import numpy as np" not in code: code = "import numpy as np\n" + code fixes.append("numpy") if "pd." in code and "import pandas as pd" not in code: code = "import pandas as pd\n" + code fixes.append("pandas") if "torch." in code and "import torch" not in code: code = "import torch\n" + code fixes.append("torch") return code, fixes def run_code(code): code = clean_code(code) code, fixes = auto_fix_code(code) if not code: return "⚠️ No code" installed, failed = install_packages_for_code(code) if failed: return ( "❌ Could not install required packages:\n" + "\n".join(f"- {p}" for p in failed) + "\n\n--- Code ---\n" + code ) buffer = StringIO() try: with contextlib.redirect_stdout(buffer): exec(code, {"__builtins__": __builtins__}) out = buffer.getvalue().strip() prefix = "" if fixes: prefix += "🔧 Auto-fixed imports: " + ", ".join(fixes) + "\n" if installed: prefix += "📦 Installed packages: " + ", ".join(installed) + "\n" return prefix + (out if out else "✅ Code executed") except Exception as e: return f"❌ Error:\n{e}\n\n--- Code ---\n{code}" # ---------------- REDIS OPS ---------------- def save_job(job_id, job): r.set(JOB_KEY + job_id, json.dumps(job)) def get_job(job_id): raw = r.get(JOB_KEY + job_id) return json.loads(raw) if raw else None def update_job(job_id, **kwargs): job = get_job(job_id) if not job: return job.update(kwargs) save_job(job_id, job) # ---------------- JOB CREATE ---------------- def create_job(prompt): job_id = str(uuid.uuid4())[:8] job = { "id": job_id, "prompt": prompt, "status": "queued", "response": "", "code": "", "output": "", "raw": "", "time": time.strftime("%H:%M:%S"), "created_ts": time.time() } save_job(job_id, job) r.lpush(JOB_LIST, job_id) # history r.rpush(QUEUE, job_id) # queue return f"✅ Job {job_id} queued", job_id, dashboard() # ---------------- WORKER ---------------- def worker(): while True: item = r.blpop(QUEUE, timeout=5) if not item: continue _, job_id = item job = get_job(job_id) if not job: continue update_job(job_id, status="running") try: prompt = job["prompt"] full = agent.generate_with_ssd( f"""{SYSTEM_PROMPT} Task: {prompt} Generate full runnable script. """, max_tokens=300 ) code = extract_code(full) explanation = extract_explanation(full) if not code: code = "print('No code generated')" output = run_code(code) update_job( job_id, status="completed", response=explanation, code=code, output=output, raw=full ) except Exception as e: update_job( job_id, status="failed", response=str(e), output=str(e) ) Thread(target=worker, daemon=True).start() # ---------------- DASHBOARD ---------------- def dashboard(): rows = [] job_ids = r.lrange(JOB_LIST, 0, 50) for jid in job_ids: job = get_job(jid) if not job: continue rows.append([ jid, job.get("status", ""), job.get("time", ""), job.get("prompt", "")[:80], job.get("output", "")[:80] ]) return rows # ---------------- LOAD ---------------- def load_job(job_id): job = get_job(job_id.strip()) if not job: return "", "", "", "", "", "" return ( job.get("status", ""), job.get("prompt", ""), job.get("response", ""), job.get("code", ""), job.get("output", ""), job.get("raw", "") ) # ---------------- ACTIONS ---------------- def rerun(code): return run_code(code) def delete_job(job_id): job_id = job_id.strip() r.delete(JOB_KEY + job_id) r.lrem(JOB_LIST, 0, job_id) return dashboard(), "", "", "", "", "", "" def explain_code(code): code = clean_code(code) if not code: return "No code" res = agent.generate_with_ssd( f"Explain this:\n```python\n{code}\n```", max_tokens=200 ) return re.sub(r"```.*?```", "", res, flags=re.S).strip() # ---------------- UI ---------------- with gr.Blocks() as demo: gr.Markdown("# 🐍 PygmyClaw (Redis Queue + Agent)") prompt = gr.Textbox(label="Prompt", lines=4) create = gr.Button("Create Job") status = gr.Markdown() job_id_box = gr.Textbox(label="Job ID") table = gr.Dataframe( headers=["ID", "Status", "Time", "Prompt", "Output"], interactive=False ) create.click(create_job, inputs=[prompt], outputs=[status, job_id_box, table]) refresh = gr.Button("Refresh") refresh.click(dashboard, outputs=table) load_id = gr.Textbox(label="Job ID") load = gr.Button("Load Job") status_box = gr.Textbox(label="Status") prompt_box = gr.Textbox(label="Prompt") response_box = gr.Markdown(label="Explanation") code_box = gr.Code(language="python", interactive=True) output_box = gr.Textbox(label="Output") raw_box = gr.Textbox(label="Raw") load.click(load_job, inputs=[load_id], outputs=[status_box, prompt_box, response_box, code_box, output_box, raw_box]) run_btn = gr.Button("Run Code") explain_btn = gr.Button("Explain") delete_btn = gr.Button("Delete") explain_out = gr.Markdown() run_btn.click(rerun, inputs=[code_box], outputs=output_box) explain_btn.click(explain_code, inputs=[code_box], outputs=explain_out) delete_btn.click(delete_job, inputs=[load_id], outputs=[table, status_box, prompt_box, response_box, code_box, output_box, raw_box]) demo.queue().launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)) )