Spaces:
Sleeping
Sleeping
| import sys | |
| import subprocess | |
| import os | |
| import re | |
| import json | |
| import uuid | |
| import time | |
| import contextlib | |
| import textwrap | |
| import redis | |
| import gradio as gr | |
| from io import StringIO | |
| from threading import Thread | |
| from pathlib import Path | |
| # 🔥 import your real agent | |
| from pygmyclaw import PygmyClaw | |
| # ---------------- INIT ---------------- | |
| agent = PygmyClaw() | |
| REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") | |
| REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379)) | |
| r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True) | |
| QUEUE = "pg:queue" | |
| JOB_LIST = "pg:jobs" | |
| JOB_KEY = "pg:job:" | |
| # ---------------- PROMPT ---------------- | |
| SYSTEM_PROMPT = """ | |
| You are a Python coding agent. | |
| Rules: | |
| - Always return COMPLETE runnable Python code. | |
| - NEVER use placeholders like YourModel, your_data, TODO, pass-only examples. | |
| - ALWAYS include every import required by the code. | |
| - If using ML/PyTorch, define model, dummy data, loss, optimizer, forward pass, backward pass, and update step. | |
| - Code must run as-is. | |
| - Use print() to show useful output. | |
| - Do not use unavailable files, internet URLs, APIs, secrets, or local paths. | |
| - Prefer small CPU-friendly examples. | |
| Return: | |
| 1. Short explanation | |
| 2. One Python code block using ```python | |
| """ | |
| # ---------------- HELPERS ---------------- | |
| def clean_code(code): | |
| if not code: | |
| return "" | |
| code = re.sub(r"```python", "", code, flags=re.I) | |
| code = re.sub(r"```", "", code) | |
| code = textwrap.dedent(code).strip() | |
| return code | |
| def extract_code(text): | |
| blocks = re.findall(r"```(?:python)?\s*(.*?)```", text, re.S | re.I) | |
| if blocks: | |
| return "\n\n".join(clean_code(b) for b in blocks) | |
| return "" | |
| def extract_explanation(text): | |
| return re.sub(r"```.*?```", "", text, flags=re.S).strip() | |
| def extract_imports(code): | |
| packages = set() | |
| import_lines = re.findall( | |
| r"^\s*(?:import\s+([a-zA-Z_][\w]*)|from\s+([a-zA-Z_][\w]*)\s+import)", | |
| code, | |
| flags=re.M, | |
| ) | |
| for imp1, imp2 in import_lines: | |
| module = imp1 or imp2 | |
| if module: | |
| packages.add(module) | |
| return packages | |
| def package_name(module): | |
| mapping = { | |
| "PIL": "pillow", | |
| "cv2": "opencv-python", | |
| "sklearn": "scikit-learn", | |
| "yaml": "pyyaml", | |
| "bs4": "beautifulsoup4", | |
| "np": "numpy", | |
| "pd": "pandas", | |
| } | |
| return mapping.get(module, module) | |
| def install_packages_for_code(code): | |
| stdlib = { | |
| "os", "sys", "re", "json", "math", "time", "datetime", "random", | |
| "pathlib", "collections", "itertools", "functools", "statistics", | |
| "typing", "subprocess", "contextlib", "io", "uuid", "textwrap", | |
| "queue", "threading", "asyncio" | |
| } | |
| installed = [] | |
| failed = [] | |
| for module in sorted(extract_imports(code)): | |
| root = module.split(".")[0] | |
| if root in stdlib: | |
| continue | |
| try: | |
| __import__(root) | |
| continue | |
| except ImportError: | |
| pass | |
| pkg = package_name(root) | |
| try: | |
| subprocess.check_call( | |
| [sys.executable, "-m", "pip", "install", pkg], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| installed.append(pkg) | |
| except Exception: | |
| failed.append(pkg) | |
| return installed, failed | |
| def auto_fix_code(code): | |
| fixes = [] | |
| if "datetime.now()" in code and "from datetime import datetime" not in code and "import datetime" not in code: | |
| code = "from datetime import datetime\n" + code | |
| fixes.append("datetime") | |
| if "np." in code and "import numpy as np" not in code: | |
| code = "import numpy as np\n" + code | |
| fixes.append("numpy") | |
| if "pd." in code and "import pandas as pd" not in code: | |
| code = "import pandas as pd\n" + code | |
| fixes.append("pandas") | |
| if "torch." in code and "import torch" not in code: | |
| code = "import torch\n" + code | |
| fixes.append("torch") | |
| return code, fixes | |
| def run_code(code): | |
| code = clean_code(code) | |
| code, fixes = auto_fix_code(code) | |
| if not code: | |
| return "⚠️ No code" | |
| installed, failed = install_packages_for_code(code) | |
| if failed: | |
| return ( | |
| "❌ Could not install required packages:\n" | |
| + "\n".join(f"- {p}" for p in failed) | |
| + "\n\n--- Code ---\n" | |
| + code | |
| ) | |
| buffer = StringIO() | |
| try: | |
| with contextlib.redirect_stdout(buffer): | |
| exec(code, {"__builtins__": __builtins__}) | |
| out = buffer.getvalue().strip() | |
| prefix = "" | |
| if fixes: | |
| prefix += "🔧 Auto-fixed imports: " + ", ".join(fixes) + "\n" | |
| if installed: | |
| prefix += "📦 Installed packages: " + ", ".join(installed) + "\n" | |
| return prefix + (out if out else "✅ Code executed") | |
| except Exception as e: | |
| return f"❌ Error:\n{e}\n\n--- Code ---\n{code}" | |
| # ---------------- REDIS OPS ---------------- | |
| def save_job(job_id, job): | |
| r.set(JOB_KEY + job_id, json.dumps(job)) | |
| def get_job(job_id): | |
| raw = r.get(JOB_KEY + job_id) | |
| return json.loads(raw) if raw else None | |
| def update_job(job_id, **kwargs): | |
| job = get_job(job_id) | |
| if not job: | |
| return | |
| job.update(kwargs) | |
| save_job(job_id, job) | |
| # ---------------- JOB CREATE ---------------- | |
| def create_job(prompt): | |
| job_id = str(uuid.uuid4())[:8] | |
| job = { | |
| "id": job_id, | |
| "prompt": prompt, | |
| "status": "queued", | |
| "response": "", | |
| "code": "", | |
| "output": "", | |
| "raw": "", | |
| "time": time.strftime("%H:%M:%S"), | |
| "created_ts": time.time() | |
| } | |
| save_job(job_id, job) | |
| r.lpush(JOB_LIST, job_id) # history | |
| r.rpush(QUEUE, job_id) # queue | |
| return f"✅ Job {job_id} queued", job_id, dashboard() | |
| # ---------------- WORKER ---------------- | |
| def worker(): | |
| while True: | |
| item = r.blpop(QUEUE, timeout=5) | |
| if not item: | |
| continue | |
| _, job_id = item | |
| job = get_job(job_id) | |
| if not job: | |
| continue | |
| update_job(job_id, status="running") | |
| try: | |
| prompt = job["prompt"] | |
| full = agent.generate_with_ssd( | |
| f"""{SYSTEM_PROMPT} | |
| Task: | |
| {prompt} | |
| Generate full runnable script. | |
| """, | |
| max_tokens=300 | |
| ) | |
| code = extract_code(full) | |
| explanation = extract_explanation(full) | |
| if not code: | |
| code = "print('No code generated')" | |
| output = run_code(code) | |
| update_job( | |
| job_id, | |
| status="completed", | |
| response=explanation, | |
| code=code, | |
| output=output, | |
| raw=full | |
| ) | |
| except Exception as e: | |
| update_job( | |
| job_id, | |
| status="failed", | |
| response=str(e), | |
| output=str(e) | |
| ) | |
| Thread(target=worker, daemon=True).start() | |
| # ---------------- DASHBOARD ---------------- | |
| def dashboard(): | |
| rows = [] | |
| job_ids = r.lrange(JOB_LIST, 0, 50) | |
| for jid in job_ids: | |
| job = get_job(jid) | |
| if not job: | |
| continue | |
| rows.append([ | |
| jid, | |
| job.get("status", ""), | |
| job.get("time", ""), | |
| job.get("prompt", "")[:80], | |
| job.get("output", "")[:80] | |
| ]) | |
| return rows | |
| # ---------------- LOAD ---------------- | |
| def load_job(job_id): | |
| job = get_job(job_id.strip()) | |
| if not job: | |
| return "", "", "", "", "", "" | |
| return ( | |
| job.get("status", ""), | |
| job.get("prompt", ""), | |
| job.get("response", ""), | |
| job.get("code", ""), | |
| job.get("output", ""), | |
| job.get("raw", "") | |
| ) | |
| # ---------------- ACTIONS ---------------- | |
| def rerun(code): | |
| return run_code(code) | |
| def delete_job(job_id): | |
| job_id = job_id.strip() | |
| r.delete(JOB_KEY + job_id) | |
| r.lrem(JOB_LIST, 0, job_id) | |
| return dashboard(), "", "", "", "", "", "" | |
| def explain_code(code): | |
| code = clean_code(code) | |
| if not code: | |
| return "No code" | |
| res = agent.generate_with_ssd( | |
| f"Explain this:\n```python\n{code}\n```", | |
| max_tokens=200 | |
| ) | |
| return re.sub(r"```.*?```", "", res, flags=re.S).strip() | |
| # ---------------- UI ---------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🐍 PygmyClaw (Redis Queue + Agent)") | |
| prompt = gr.Textbox(label="Prompt", lines=4) | |
| create = gr.Button("Create Job") | |
| status = gr.Markdown() | |
| job_id_box = gr.Textbox(label="Job ID") | |
| table = gr.Dataframe( | |
| headers=["ID", "Status", "Time", "Prompt", "Output"], | |
| interactive=False | |
| ) | |
| create.click(create_job, inputs=[prompt], outputs=[status, job_id_box, table]) | |
| refresh = gr.Button("Refresh") | |
| refresh.click(dashboard, outputs=table) | |
| load_id = gr.Textbox(label="Job ID") | |
| load = gr.Button("Load Job") | |
| status_box = gr.Textbox(label="Status") | |
| prompt_box = gr.Textbox(label="Prompt") | |
| response_box = gr.Markdown(label="Explanation") | |
| code_box = gr.Code(language="python", interactive=True) | |
| output_box = gr.Textbox(label="Output") | |
| raw_box = gr.Textbox(label="Raw") | |
| load.click(load_job, inputs=[load_id], | |
| outputs=[status_box, prompt_box, response_box, code_box, output_box, raw_box]) | |
| run_btn = gr.Button("Run Code") | |
| explain_btn = gr.Button("Explain") | |
| delete_btn = gr.Button("Delete") | |
| explain_out = gr.Markdown() | |
| run_btn.click(rerun, inputs=[code_box], outputs=output_box) | |
| explain_btn.click(explain_code, inputs=[code_box], outputs=explain_out) | |
| delete_btn.click(delete_job, inputs=[load_id], | |
| outputs=[table, status_box, prompt_box, response_box, code_box, output_box, raw_box]) | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)) | |
| ) |