Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| from core.file_exports import copy_text_file_or_empty | |
| from mcp_tools.tools import safe_calculator_tool, tool_registry | |
| AGENT_SYSTEM_PROMPT = ( | |
| "You are a local workbench agent. Research the request, draft a small plan, " | |
| "name tools you would use, and require verification before marking work done." | |
| ) | |
| class AgentStep: | |
| """One deterministic agent trace step.""" | |
| phase: str | |
| content: str | |
| class AgentSession: | |
| """Agent session trace.""" | |
| task: str | |
| steps: list[AgentStep] | |
| tools: list[str] | |
| limitations: list[str] | |
| safety_gates: list[str] | |
| def as_dict(self) -> dict[str, Any]: | |
| return { | |
| "task": self.task, | |
| "steps": [asdict(step) for step in self.steps], | |
| "tools": self.tools, | |
| "limitations": self.limitations, | |
| "safety_gates": self.safety_gates, | |
| "system_prompt": AGENT_SYSTEM_PROMPT, | |
| } | |
| def as_markdown(self) -> str: | |
| lines = [f"Task: {self.task or '(none)'}", ""] | |
| for step in self.steps: | |
| lines.append(f"{step.phase}: {step.content}") | |
| lines.append("") | |
| lines.append(f"Tools: {', '.join(self.tools)}") | |
| lines.append(f"Limitations: {'; '.join(self.limitations)}") | |
| lines.append(f"Safety gates: {'; '.join(self.safety_gates)}") | |
| return "\n".join(lines) | |
| def run_agent_loop(task: str) -> AgentSession: | |
| tools = sorted(tool_registry()) | |
| steps = [ | |
| AgentStep("research", _research_summary(task)), | |
| AgentStep("plan", _plan_summary(task)), | |
| AgentStep("implement", _implementation_summary(task)), | |
| AgentStep("verify", "Run unit tests, smoke checks, quality gates, and update docs/tasks."), | |
| ] | |
| calculator_result = _maybe_calculate(task) | |
| if calculator_result is not None: | |
| steps.insert( | |
| 1, | |
| AgentStep( | |
| "tool:safe_calculator", | |
| json.dumps(calculator_result.payload, ensure_ascii=False), | |
| ), | |
| ) | |
| return AgentSession( | |
| task=task, | |
| steps=steps, | |
| tools=tools, | |
| limitations=[ | |
| "Does not execute shell commands.", | |
| "Does not commit, push, deploy, download models, or call external services.", | |
| "Requires Codex or a human to apply and verify implementation changes.", | |
| ], | |
| safety_gates=default_safety_gates(), | |
| ) | |
| def run_paper_to_code_loop( | |
| paper_title: str, | |
| paper_notes: str, | |
| implementation_goal: str, | |
| ) -> AgentSession: | |
| task = f"Paper-to-code: {paper_title.strip() or 'untitled paper'}" | |
| steps = [ | |
| AgentStep("research", _paper_research_summary(paper_title, paper_notes)), | |
| AgentStep("plan", _paper_plan_summary(implementation_goal)), | |
| AgentStep("implement", _paper_implementation_trace(implementation_goal)), | |
| AgentStep("verify", "Map claims to tests, run quality gates, and document gaps."), | |
| ] | |
| return AgentSession( | |
| task=task, | |
| steps=steps, | |
| tools=sorted(tool_registry()), | |
| limitations=[ | |
| "Does not read remote papers automatically.", | |
| "Does not execute code changes autonomously.", | |
| "Requires human/Codex review before implementation claims are marked done.", | |
| ], | |
| safety_gates=default_safety_gates(), | |
| ) | |
| def default_safety_gates() -> list[str]: | |
| return [ | |
| "No shell commands are executed by the agent trace.", | |
| "No model weights, datasets, or papers are downloaded automatically.", | |
| "Every implementation claim needs a matching test or documented blocker.", | |
| "External services require explicit user credentials and approval.", | |
| ] | |
| def save_agent_trace( | |
| session: AgentSession, | |
| path: str | Path = "data/agent_traces.jsonl", | |
| ) -> Path: | |
| output = Path(path) | |
| output.parent.mkdir(parents=True, exist_ok=True) | |
| with output.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(session.as_dict(), ensure_ascii=False) + "\n") | |
| return output | |
| def export_agent_traces( | |
| source_path: str | Path = "data/agent_traces.jsonl", | |
| output_path: str | Path = "exports/agent_traces.jsonl", | |
| ) -> Path: | |
| return copy_text_file_or_empty(source_path, output_path) | |
| def export_agent_traces_hf_dataset( | |
| source_path: str | Path = "data/agent_traces.jsonl", | |
| output_dir: str | Path = "exports/agent_traces_dataset", | |
| ) -> Path: | |
| target = Path(output_dir) | |
| target.mkdir(parents=True, exist_ok=True) | |
| data_file = target / "data.jsonl" | |
| if Path(source_path).exists(): | |
| data_file.write_text(Path(source_path).read_text(encoding="utf-8"), encoding="utf-8") | |
| else: | |
| data_file.write_text("", encoding="utf-8") | |
| (target / "README.md").write_text( | |
| "# Agent Traces Dataset\n\n" | |
| "Local Hugging Face Dataset-style export for OpenBMB Local AI Workbench traces.\n", | |
| encoding="utf-8", | |
| ) | |
| return target | |
| def _research_summary(task: str) -> str: | |
| if not task.strip(): | |
| return "No task provided. Ask for a concrete task before implementation." | |
| return "Inspect PRD/tasks/docs, identify affected modules, and check existing tests." | |
| def _plan_summary(task: str) -> str: | |
| if any(word in task.casefold() for word in ["deploy", "push", "github", "huggingface"]): | |
| return "Prepare repo/deploy steps, verify auth/remotes, then push only after tests pass." | |
| return "Make a focused implementation slice, add or update tests, then update docs." | |
| def _implementation_summary(task: str) -> str: | |
| if "model" in task.casefold(): | |
| return "Use configured backend services and avoid startup downloads." | |
| return "Apply changes in the smallest relevant modules and keep unrelated files untouched." | |
| def _paper_research_summary(paper_title: str, paper_notes: str) -> str: | |
| title = paper_title.strip() or "untitled paper" | |
| notes = paper_notes.strip() | |
| if not notes: | |
| return f"Summarize the claims, assumptions, and reproducibility risks for {title}." | |
| return f"Extract implementation claims from {title}: {notes[:240]}" | |
| def _paper_plan_summary(implementation_goal: str) -> str: | |
| goal = implementation_goal.strip() or "create a minimal local reproduction plan" | |
| return f"Break the goal into local modules, tests, data assumptions, and blockers: {goal}." | |
| def _paper_implementation_trace(implementation_goal: str) -> str: | |
| goal = implementation_goal.strip() or "minimal reproducible scaffold" | |
| return ( | |
| "Draft a non-executing implementation trace for " | |
| f"{goal}; keep dependencies explicit and update docs before claiming completion." | |
| ) | |
| def _maybe_calculate(task: str): | |
| prefix = "calculate:" | |
| if task.casefold().strip().startswith(prefix): | |
| expression = task.split(":", 1)[1].strip() | |
| return safe_calculator_tool(expression) | |
| return None | |