deepagent-bot
ci: sync dashboard from ae0de3f
b07e602
Raw
History Blame Contribute Delete
15.8 kB
"""High-level orchestration: fix-issue, evolve-code, iterate-pr, review-pr.
All entrypoints converge here so that the CLI, the GitHub Actions glue script and
the webhook server share the same code path.
"""
from __future__ import annotations
import json
import subprocess
import textwrap
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from rich.console import Console
from .agent import build_agent
from .config import get_settings
from .github_client import GitHubOps, IssueRef, normalize_repo_full_name
console = Console()
@dataclass
class RunResult:
ok: bool
summary: str
pr_url: Optional[str] = None
diff: Optional[str] = None
def _prompt_decisions(interrupt, input_fn=input) -> list[dict]:
"""Turn a HumanInTheLoop interrupt into a list of resume decisions.
The interrupt value looks like::
{"action_requests": [{"name", "args", "description"}, ...],
"review_configs": [{"action_name", "allowed_decisions"}, ...]}
We ask once per pending action and default to *approve* on empty input.
Returns one decision dict per action (order matters for the resume).
"""
value = interrupt[0].value if isinstance(interrupt, (list, tuple)) else getattr(interrupt, "value", interrupt)
actions = value.get("action_requests", []) if isinstance(value, dict) else []
if not actions:
return [{"type": "approve"}]
decisions: list[dict] = []
for a in actions:
name, args = a.get("name"), a.get("args")
try:
ans = input_fn(f"⚠️ Approve tool `{name}` args={args}? [Y/n] ").strip().lower()
except EOFError: # non-interactive stdin → approve
ans = ""
decisions.append({"type": "reject"} if ans in ("n", "no", "reject") else {"type": "approve"})
return decisions
def _stream(agent, initial_input: dict, max_turns: int, interactive: bool = False) -> str:
"""Stream the agent, log structured events, observe sub-agent calls.
Also publishes log lines to Redis pub/sub when running inside a job
(so SSE subscribers see live progress).
When `interactive` is True, destructive tool calls pause for console
approval (the agent is built with `interrupt_on` + a checkpointer) and we
resume the graph with the operator's decisions.
"""
import uuid
from .observability.metrics import SUBAGENT_CALLS
from .observability.logging_setup import get_logger
from .observability.tracing import span
logger = get_logger("agent.stream")
final_text = ""
turns = 0
seen_subagents: set[str] = set()
# If we're inside a job, attach a stream sink so SSE clients see live output.
try:
import structlog
ctx = structlog.contextvars.get_contextvars()
except Exception:
ctx = {}
job_id = ctx.get("job_id") if isinstance(ctx, dict) else None
job_queue = None
if job_id:
try:
from .queue import JobQueue
job_queue = JobQueue()
except Exception:
job_queue = None
config: dict = {"recursion_limit": max_turns * 4}
if interactive:
# A checkpointer needs a thread id to persist/resume the paused graph.
config["configurable"] = {"thread_id": str(uuid.uuid4())}
stream_input: object = initial_input
with span("agent.stream", max_turns=max_turns):
# Outer loop runs once normally; with HITL it re-enters after each
# resume. Bounded by max_turns to avoid an unbounded approval loop.
while True:
pending_interrupt = None
for chunk in agent.stream(stream_input, config=config):
if "__interrupt__" in chunk:
pending_interrupt = chunk["__interrupt__"]
continue
turns += 1
for node, payload in chunk.items():
if node not in seen_subagents and node not in {"agent", "tools", "__start__", "__end__"}:
SUBAGENT_CALLS.labels(node).inc()
seen_subagents.add(node)
if isinstance(payload, dict) and "messages" in payload:
msg = payload["messages"][-1]
role = getattr(msg, "type", "?")
content = getattr(msg, "content", "") or ""
line = f"[{node}] {role}: {str(content)[:400]}"
console.print(f"[dim]{node}[/] [{role}] {str(content)[:400]}")
logger.info("agent_step", agent_node=node, role=role,
content_preview=str(content)[:200])
if job_queue and job_id:
try: job_queue.append_log(job_id, line)
except Exception: pass
if role == "ai" and content:
final_text = content
if turns >= max_turns:
console.print("[yellow]Hit max_turns, stopping.[/]")
logger.warning("max turns hit", turns=turns)
return final_text
if interactive and pending_interrupt is not None:
from langgraph.types import Command
decisions = _prompt_decisions(pending_interrupt)
stream_input = Command(resume={"decisions": decisions})
continue
break
return final_text
def _extract_pr_url(text: str) -> Optional[str]:
import re
m = re.search(r"https://github\.com/[^/]+/[^/]+/pull/\d+", text or "")
return m.group(0) if m else None
def _dry_run_result(repo_path: Path, text: str) -> RunResult:
diff = subprocess.run(
["git", "diff", "HEAD"], cwd=repo_path, capture_output=True, text=True
).stdout
return RunResult(ok=True, summary=text, diff=diff)
# ---------- public entrypoints ----------
def fix_issue(
issue_url: str,
dry_run: bool = False,
backend: Optional[str] = None,
interactive: Optional[bool] = None,
) -> RunResult:
"""Resolve a GitHub issue and (unless dry_run) open a PR."""
settings = get_settings()
settings.assert_ready()
interactive = settings.interactive if interactive is None else interactive
ref = IssueRef.from_url(issue_url)
gh = GitHubOps()
ctx = gh.fetch_issue_context(ref)
console.rule(f"[bold green]Issue #{ref.number}{ctx['title']}")
repo_path = gh.clone(ref.full_name, settings.workdir)
console.print(f"[cyan]Cloned[/] {ref.full_name}{repo_path}")
agent, handle = build_agent(
repo_path=repo_path, repo_full_name=ref.full_name, issue_ref=ref,
backend_kind=backend, interactive=interactive,
)
try:
prompt = textwrap.dedent(f"""\
Resolve GitHub issue #{ref.number} in repo {ref.full_name}.
Title: {ctx['title']}
Body:
{ctx['body']}
Comments (chronological):
{json.dumps(ctx['comments'], indent=2, default=str)}
Working directory contains a fresh clone of the repo. Investigate, plan, patch,
test, then {"DO NOT open a PR — print the diff and stop." if dry_run else f"call finalize_patch with branch name `deepagent/issue-{ref.number}`."}
""")
text = _stream(agent, {"messages": [{"role": "user", "content": prompt}]},
settings.max_turns, interactive=interactive)
if dry_run:
return _dry_run_result(repo_path, text)
return RunResult(ok=bool(_extract_pr_url(text)), summary=text, pr_url=_extract_pr_url(text))
finally:
handle.cleanup()
def evolve_code(
repo_full_name: str,
instruction: str,
dry_run: bool = False,
backend: Optional[str] = None,
interactive: Optional[bool] = None,
) -> RunResult:
"""Apply a free-form evolution request to the repo."""
repo_full_name = normalize_repo_full_name(repo_full_name)
settings = get_settings()
settings.assert_ready()
interactive = settings.interactive if interactive is None else interactive
gh = GitHubOps()
console.rule(f"[bold green]Evolve {repo_full_name}")
repo_path = gh.clone(repo_full_name, settings.workdir)
console.print(f"[cyan]Cloned[/] {repo_full_name}{repo_path}")
agent, handle = build_agent(
repo_path=repo_path, repo_full_name=repo_full_name, backend_kind=backend,
interactive=interactive,
)
slug = f"evolve-{int(time.time())}"
try:
prompt = textwrap.dedent(f"""\
Evolution request for repo {repo_full_name}:
\"\"\"
{instruction}
\"\"\"
Investigate the codebase, build a minimal plan, implement the change, run the
tests, and {"print the diff (no PR)." if dry_run else f"call finalize_patch with branch name `deepagent/{slug}`."}
""")
text = _stream(agent, {"messages": [{"role": "user", "content": prompt}]},
settings.max_turns, interactive=interactive)
if dry_run:
return _dry_run_result(repo_path, text)
return RunResult(ok=bool(_extract_pr_url(text)), summary=text, pr_url=_extract_pr_url(text))
finally:
handle.cleanup()
def iterate_pr(
repo_full_name: str,
pr_number: int,
instruction: str,
dry_run: bool = False,
backend: Optional[str] = None,
interactive: Optional[bool] = None,
) -> RunResult:
"""Push additional commits to the branch backing an existing PR.
Triggered by `/deepagent <instruction>` on a PR comment, or via CLI.
"""
repo_full_name = normalize_repo_full_name(repo_full_name)
settings = get_settings()
settings.assert_ready()
interactive = settings.interactive if interactive is None else interactive
gh = GitHubOps()
repo = gh.get_repo(repo_full_name)
pr = repo.get_pull(pr_number)
head_branch = pr.head.ref
base_branch = pr.base.ref
console.rule(f"[bold green]Iterate on PR #{pr_number} — branch {head_branch}")
repo_path = gh.clone(repo_full_name, settings.workdir)
# Check out the PR branch so the agent sees the latest PR state.
subprocess.run(
["git", "fetch", "origin", head_branch], cwd=repo_path, check=True
)
subprocess.run(["git", "checkout", head_branch], cwd=repo_path, check=True)
agent, handle = build_agent(
repo_path=repo_path,
repo_full_name=repo_full_name,
backend_kind=backend,
base_branch=base_branch,
existing_branch=head_branch,
interactive=interactive,
)
try:
# Give the agent the existing PR context so it understands what to amend.
pr_diff = subprocess.run(
["git", "diff", f"origin/{base_branch}...HEAD"],
cwd=repo_path, capture_output=True, text=True,
).stdout[:15000]
prompt = textwrap.dedent(f"""\
You are amending an EXISTING pull request.
Repo: {repo_full_name} PR: #{pr_number} Branch: {head_branch} Base: {base_branch}
PR title: {pr.title}
PR body (truncated):
{(pr.body or '')[:2000]}
Current PR diff vs base:
```diff
{pr_diff}
```
Reviewer instruction:
\"\"\"
{instruction}
\"\"\"
Make the requested changes ON TOP of the current branch. Run the tests.
When done, {"print the diff and stop." if dry_run else "call finalize_patch (branch_name argument will be ignored — the existing branch is used)."}
""")
text = _stream(agent, {"messages": [{"role": "user", "content": prompt}]},
settings.max_turns, interactive=interactive)
if dry_run:
return _dry_run_result(repo_path, text)
# Even if the LLM didn't echo a URL, return the existing PR URL.
return RunResult(ok=True, summary=text, pr_url=pr.html_url)
finally:
handle.cleanup()
def review_pr(repo_full_name: str, pr_number: int, backend: Optional[str] = None) -> RunResult:
"""Post an automated code-review comment on a PR.
Tries to render a structured ReviewReport from the reviewer sub-agent's
response_format output; falls back to the agent's raw final message if the
schema parse fails (older deepagents / unstructured run).
"""
import urllib.request
repo_full_name = normalize_repo_full_name(repo_full_name)
settings = get_settings()
settings.assert_ready()
gh = GitHubOps()
repo = gh.get_repo(repo_full_name)
pr = repo.get_pull(pr_number)
diff_text = urllib.request.urlopen(pr.diff_url, timeout=30).read().decode("utf-8", errors="replace")
repo_path = gh.clone(repo_full_name, settings.workdir)
agent, handle = build_agent(
repo_path=repo_path, repo_full_name=repo_full_name, backend_kind=backend
)
try:
prompt = (
f"Review the following diff for PR #{pr_number} of {repo_full_name}.\n\n"
f"Delegate to the `reviewer` sub-agent. Return its structured "
f"ReviewReport (or the most precise markdown you can if structured "
f"output isn't available).\n\n"
f"```diff\n{diff_text[:30000]}\n```"
)
result = _stream_with_structured(
agent,
{"messages": [{"role": "user", "content": prompt}]},
settings.max_turns,
)
body = _render_review_body(result)
pr.create_issue_comment(body)
return RunResult(ok=True, summary=body, pr_url=pr.html_url)
finally:
handle.cleanup()
def _stream_with_structured(agent, initial_input: dict, max_turns: int):
"""Like _stream() but also captures `structured_response` from the final state."""
from .observability.logging_setup import get_logger
logger = get_logger("agent.stream")
final_text = ""
structured = None
turns = 0
for chunk in agent.stream(initial_input, config={"recursion_limit": max_turns * 4}):
turns += 1
for node, payload in chunk.items():
if isinstance(payload, dict):
if "structured_response" in payload and payload["structured_response"] is not None:
structured = payload["structured_response"]
if "messages" in payload:
msg = payload["messages"][-1]
content = getattr(msg, "content", "") or ""
if getattr(msg, "type", "?") == "ai" and content:
final_text = content
if turns >= max_turns:
logger.warning("max turns hit", turns=turns)
break
return {"text": final_text, "structured": structured}
def _render_review_body(result: dict) -> str:
"""Choose the prettiest rendering we can for the GitHub comment."""
structured = result.get("structured")
if structured is not None:
try:
from .review_schema import ReviewReport, render_report_markdown
# `structured` might already be a ReviewReport (Pydantic), a dict,
# or another model instance — normalise.
if isinstance(structured, ReviewReport):
return render_report_markdown(structured)
if hasattr(structured, "model_dump"):
return render_report_markdown(ReviewReport.model_validate(structured.model_dump()))
if isinstance(structured, dict):
return render_report_markdown(ReviewReport.model_validate(structured))
except Exception:
pass # fall through to raw text
text = result.get("text") or "(no review produced)"
return f"### 🤖 gh-deepagent review\n\n{text}"