Spaces:
Paused
Paused
| # app_gradio.py | |
| import gradio as gr | |
| import re | |
| import uuid | |
| import time | |
| import contextlib | |
| from io import StringIO | |
| from threading import Thread | |
| from queue import Queue | |
| from pygmyclaw import PygmyClaw | |
| # -------------------------------------------------- | |
| # GLOBAL STATE | |
| # -------------------------------------------------- | |
| job_store = {} | |
| job_queue = Queue() | |
| agent = PygmyClaw() | |
| auto_refresh_enabled = True # toggle for auto-refresh | |
| # -------------------------------------------------- | |
| # HELPER FUNCTIONS | |
| # -------------------------------------------------- | |
| def clean_code(code): | |
| """Remove markdown fences and normalize code indentation.""" | |
| if not code: | |
| return "" | |
| code = re.sub(r"```python", "", code) | |
| code = re.sub(r"```", "", code) | |
| return code.strip() | |
| def extract_explanation(full_result): | |
| """Get explanation text from full AI response.""" | |
| text_no_code = re.sub(r"```.*?```", "", full_result, flags=re.S) | |
| return text_no_code.strip() | |
| def extract_code(full_result): | |
| """Extract Python code blocks from full AI response.""" | |
| code_blocks = re.findall(r"```(?:python)?\s*(.*?)```", full_result, re.S | re.I) | |
| code = "\n\n".join([c.strip() for c in code_blocks]) | |
| return code | |
| # -------------------------------------------------- | |
| # BACKGROUND WORKER | |
| # -------------------------------------------------- | |
| def worker(): | |
| """Process queued jobs with AI agent, extract code and explanation.""" | |
| while True: | |
| job_id = job_queue.get() | |
| job = job_store[job_id] | |
| job["status"] = "running" | |
| try: | |
| prompt = job["prompt"] | |
| tool = job["tool"] | |
| # System prompt | |
| system_prompt = f"Write Python code ONLY and explain all lines clearly in comments. Task: {prompt}" | |
| # Get fresh AI response | |
| full_result = agent.generate_with_ssd(system_prompt, timeout=120) | |
| # Extract code & explanation from response | |
| code = extract_code(full_result) | |
| explanation = extract_explanation(full_result) | |
| # Update job | |
| job["raw_result"] = full_result | |
| job["code"] = code | |
| job["response"] = explanation | |
| job["status"] = "completed" | |
| except Exception as e: | |
| job["status"] = "failed" | |
| job["response"] = str(e) | |
| finally: | |
| job_queue.task_done() | |
| # Start background worker | |
| Thread(target=worker, daemon=True).start() | |
| # -------------------------------------------------- | |
| # JOB OPERATIONS | |
| # -------------------------------------------------- | |
| def create_job(prompt, tool): | |
| job_id = str(uuid.uuid4())[:8] | |
| job_store[job_id] = { | |
| "prompt": prompt, | |
| "tool": tool, | |
| "status": "queued", | |
| "response": "", | |
| "code": "", | |
| "raw_result": "", | |
| "created": time.strftime("%H:%M:%S") | |
| } | |
| job_queue.put(job_id) | |
| return f"Job {job_id} queued", job_id | |
| def dashboard(): | |
| rows = [] | |
| for job_id, job in reversed(list(job_store.items())): | |
| rows.append([job_id, job["status"], job["created"], job["prompt"], job.get("code", "")]) | |
| return rows | |
| def load_job(job_id): | |
| job = job_store.get(job_id) | |
| if not job: | |
| return "", "", "", "" | |
| return ( | |
| job.get("prompt", ""), | |
| job.get("response", ""), | |
| job.get("code", ""), # editable code | |
| job.get("status", "") | |
| ) | |
| def run_code(code): | |
| try: | |
| code = clean_code(code) | |
| output_buffer = StringIO() | |
| with contextlib.redirect_stdout(output_buffer): | |
| exec(code, {}) | |
| result = output_buffer.getvalue() | |
| return result if result else "✅ Code executed successfully." | |
| except Exception as e: | |
| return str(e) | |
| def delete_job(job_id): | |
| if job_id in job_store: | |
| del job_store[job_id] | |
| return dashboard() | |
| def toggle_auto_refresh(): | |
| global auto_refresh_enabled | |
| auto_refresh_enabled = not auto_refresh_enabled | |
| return "✅ Auto-refresh ON" if auto_refresh_enabled else "❌ Auto-refresh OFF" | |
| def explain_code(code): | |
| """Send the current code to the AI to generate a fresh explanation.""" | |
| code = clean_code(code) | |
| if not code.strip(): | |
| return "⚠️ No code to explain." | |
| prompt = f"Explain the following Python code in simple terms:\n```python\n{code}\n```" | |
| try: | |
| explanation = agent.generate_with_ssd(prompt, timeout=120) | |
| # Remove code blocks | |
| explanation = re.sub(r"```.*?```", "", explanation, flags=re.S).strip() | |
| return explanation | |
| except Exception as e: | |
| return f"⚠️ Error generating explanation: {str(e)}" | |
| # -------------------------------------------------- | |
| # GRADIO UI | |
| # -------------------------------------------------- | |
| with gr.Blocks(title="PygmyClaw AI Dev Dashboard") as demo: | |
| gr.Markdown("# 🧠 PygmyClaw AI Dev Dashboard") | |
| # ---------------- Create Job ---------------- | |
| with gr.Row(): | |
| prompt_input = gr.Textbox(label="Prompt", lines=5, value="write a python function to add two numbers") | |
| tool_input = gr.Dropdown(["AI Agent"], value="AI Agent", label="Tool") | |
| create_btn = gr.Button("Create Job") | |
| create_status = gr.Markdown() | |
| job_id_input = gr.Textbox(label="Job ID") # auto-filled | |
| create_btn.click(create_job, inputs=[prompt_input, tool_input], outputs=[create_status, job_id_input]) | |
| # ---------------- Job Dashboard ---------------- | |
| gr.Markdown("## Job Dashboard") | |
| job_table = gr.Dataframe(headers=["Job ID", "Status", "Created", "Prompt", "Code"], interactive=False) | |
| refresh_btn = gr.Button("🔄 Refresh Dashboard") | |
| refresh_status = gr.Markdown() | |
| refresh_btn.click(dashboard, inputs=None, outputs=job_table) | |
| toggle_btn = gr.Button("⏯ Toggle Auto-Refresh") | |
| toggle_btn.click(toggle_auto_refresh, inputs=None, outputs=refresh_status) | |
| # Auto-refresh dashboard | |
| def auto_refresh_dashboard(): | |
| if auto_refresh_enabled: | |
| return dashboard() | |
| return gr.update() | |
| refresh_timer = gr.Timer(value=5) | |
| refresh_timer.tick(auto_refresh_dashboard, inputs=None, outputs=job_table) | |
| # ---------------- Selected Job ---------------- | |
| gr.Markdown("## Selected Job Details") | |
| load_btn = gr.Button("Load Job") | |
| prompt_box = gr.Textbox(label="Prompt", lines=4) | |
| response_box = gr.Markdown(label="AI Explanation") | |
| code_editor = gr.Code(label="Edit Code", language="python", interactive=True) | |
| status_box = gr.Textbox(label="Status") | |
| load_btn.click(load_job, inputs=job_id_input, outputs=[prompt_box, response_box, code_editor, status_box]) | |
| # Auto-refresh selected job | |
| def auto_refresh_job(job_id): | |
| if auto_refresh_enabled: | |
| return load_job(job_id) | |
| return gr.update(), gr.update(), gr.update(), gr.update() | |
| detail_timer = gr.Timer(value=40) | |
| detail_timer.tick(auto_refresh_job, inputs=[job_id_input], outputs=[prompt_box, response_box, code_editor, status_box]) | |
| # ---------------- Run / Delete / Explain ---------------- | |
| with gr.Row(): | |
| run_btn = gr.Button("▶ Run Code") | |
| delete_btn = gr.Button("Delete Job") | |
| explain_btn = gr.Button("💡 Explain Code") | |
| output_box = gr.Textbox(label="Execution Output", lines=6) | |
| explanation_box = gr.Markdown(label="Code Explanation") | |
| run_btn.click(run_code, inputs=code_editor, outputs=output_box) | |
| delete_btn.click(delete_job, inputs=job_id_input, outputs=job_table) | |
| explain_btn.click(explain_code, inputs=code_editor, outputs=explanation_box) | |
| # ---------------- Launch ---------------- | |
| demo.launch(server_name="0.0.0.0", server_port=7860,share=True) |