File size: 7,049 Bytes
7f9dfed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from __future__ import annotations

import json
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any

from core.file_exports import copy_text_file_or_empty
from mcp_tools.tools import safe_calculator_tool, tool_registry

AGENT_SYSTEM_PROMPT = (
    "You are a local workbench agent. Research the request, draft a small plan, "
    "name tools you would use, and require verification before marking work done."
)


@dataclass(frozen=True)
class AgentStep:
    """One deterministic agent trace step."""

    phase: str
    content: str


@dataclass(frozen=True)
class AgentSession:
    """Agent session trace."""

    task: str
    steps: list[AgentStep]
    tools: list[str]
    limitations: list[str]
    safety_gates: list[str]

    def as_dict(self) -> dict[str, Any]:
        return {
            "task": self.task,
            "steps": [asdict(step) for step in self.steps],
            "tools": self.tools,
            "limitations": self.limitations,
            "safety_gates": self.safety_gates,
            "system_prompt": AGENT_SYSTEM_PROMPT,
        }

    def as_markdown(self) -> str:
        lines = [f"Task: {self.task or '(none)'}", ""]
        for step in self.steps:
            lines.append(f"{step.phase}: {step.content}")
        lines.append("")
        lines.append(f"Tools: {', '.join(self.tools)}")
        lines.append(f"Limitations: {'; '.join(self.limitations)}")
        lines.append(f"Safety gates: {'; '.join(self.safety_gates)}")
        return "\n".join(lines)


def run_agent_loop(task: str) -> AgentSession:
    tools = sorted(tool_registry())
    steps = [
        AgentStep("research", _research_summary(task)),
        AgentStep("plan", _plan_summary(task)),
        AgentStep("implement", _implementation_summary(task)),
        AgentStep("verify", "Run unit tests, smoke checks, quality gates, and update docs/tasks."),
    ]

    calculator_result = _maybe_calculate(task)
    if calculator_result is not None:
        steps.insert(
            1,
            AgentStep(
                "tool:safe_calculator",
                json.dumps(calculator_result.payload, ensure_ascii=False),
            ),
        )

    return AgentSession(
        task=task,
        steps=steps,
        tools=tools,
        limitations=[
            "Does not execute shell commands.",
            "Does not commit, push, deploy, download models, or call external services.",
            "Requires Codex or a human to apply and verify implementation changes.",
        ],
        safety_gates=default_safety_gates(),
    )


def run_paper_to_code_loop(
    paper_title: str,
    paper_notes: str,
    implementation_goal: str,
) -> AgentSession:
    task = f"Paper-to-code: {paper_title.strip() or 'untitled paper'}"
    steps = [
        AgentStep("research", _paper_research_summary(paper_title, paper_notes)),
        AgentStep("plan", _paper_plan_summary(implementation_goal)),
        AgentStep("implement", _paper_implementation_trace(implementation_goal)),
        AgentStep("verify", "Map claims to tests, run quality gates, and document gaps."),
    ]
    return AgentSession(
        task=task,
        steps=steps,
        tools=sorted(tool_registry()),
        limitations=[
            "Does not read remote papers automatically.",
            "Does not execute code changes autonomously.",
            "Requires human/Codex review before implementation claims are marked done.",
        ],
        safety_gates=default_safety_gates(),
    )


def default_safety_gates() -> list[str]:
    return [
        "No shell commands are executed by the agent trace.",
        "No model weights, datasets, or papers are downloaded automatically.",
        "Every implementation claim needs a matching test or documented blocker.",
        "External services require explicit user credentials and approval.",
    ]


def save_agent_trace(
    session: AgentSession,
    path: str | Path = "data/agent_traces.jsonl",
) -> Path:
    output = Path(path)
    output.parent.mkdir(parents=True, exist_ok=True)
    with output.open("a", encoding="utf-8") as f:
        f.write(json.dumps(session.as_dict(), ensure_ascii=False) + "\n")
    return output


def export_agent_traces(
    source_path: str | Path = "data/agent_traces.jsonl",
    output_path: str | Path = "exports/agent_traces.jsonl",
) -> Path:
    return copy_text_file_or_empty(source_path, output_path)


def export_agent_traces_hf_dataset(
    source_path: str | Path = "data/agent_traces.jsonl",
    output_dir: str | Path = "exports/agent_traces_dataset",
) -> Path:
    target = Path(output_dir)
    target.mkdir(parents=True, exist_ok=True)
    data_file = target / "data.jsonl"
    if Path(source_path).exists():
        data_file.write_text(Path(source_path).read_text(encoding="utf-8"), encoding="utf-8")
    else:
        data_file.write_text("", encoding="utf-8")
    (target / "README.md").write_text(
        "# Agent Traces Dataset\n\n"
        "Local Hugging Face Dataset-style export for OpenBMB Local AI Workbench traces.\n",
        encoding="utf-8",
    )
    return target


def _research_summary(task: str) -> str:
    if not task.strip():
        return "No task provided. Ask for a concrete task before implementation."
    return "Inspect PRD/tasks/docs, identify affected modules, and check existing tests."


def _plan_summary(task: str) -> str:
    if any(word in task.casefold() for word in ["deploy", "push", "github", "huggingface"]):
        return "Prepare repo/deploy steps, verify auth/remotes, then push only after tests pass."
    return "Make a focused implementation slice, add or update tests, then update docs."


def _implementation_summary(task: str) -> str:
    if "model" in task.casefold():
        return "Use configured backend services and avoid startup downloads."
    return "Apply changes in the smallest relevant modules and keep unrelated files untouched."


def _paper_research_summary(paper_title: str, paper_notes: str) -> str:
    title = paper_title.strip() or "untitled paper"
    notes = paper_notes.strip()
    if not notes:
        return f"Summarize the claims, assumptions, and reproducibility risks for {title}."
    return f"Extract implementation claims from {title}: {notes[:240]}"


def _paper_plan_summary(implementation_goal: str) -> str:
    goal = implementation_goal.strip() or "create a minimal local reproduction plan"
    return f"Break the goal into local modules, tests, data assumptions, and blockers: {goal}."


def _paper_implementation_trace(implementation_goal: str) -> str:
    goal = implementation_goal.strip() or "minimal reproducible scaffold"
    return (
        "Draft a non-executing implementation trace for "
        f"{goal}; keep dependencies explicit and update docs before claiming completion."
    )


def _maybe_calculate(task: str):
    prefix = "calculate:"
    if task.casefold().strip().startswith(prefix):
        expression = task.split(":", 1)[1].strip()
        return safe_calculator_tool(expression)
    return None