| """MiniCPM-planned agent orchestrator over the Space's own MCP tools. |
| |
| The Agent tab's engine: a small planner LLM (OpenBMB MiniCPM via a second |
| llama-server, OpenAI-compatible) drives smolagents' ToolCallingAgent against |
| the SAME tools this Space already exposes over MCP (extract_events / |
| check_conflicts / make_ics) — consumed via the localhost MCP endpoint, so the |
| agent demonstrably works through the public tool contract, not private |
| imports. Everything stays local llama.cpp: no cloud AI APIs, every model |
| under the 32B cap (gemma-cal E4B ~4B + MiniCPM 8B or 1B). |
| |
| Stub mode (USE_STUB_EXTRACTOR=1, used by the free preview and CI) — or any |
| planner failure — falls back to ScriptedPlanner: the same tool sequence run |
| deterministically, emitting identical step events, so the tab always works |
| and tests never need a model. |
| |
| Steps are plain JSON-serialisable dicts: |
| {"kind": "plan"|"tool_call"|"tool_result"|"final"|"error", ...} |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| from typing import Iterator, Optional |
|
|
| from server import events as bus |
|
|
| |
| |
| PLANNER_BASE_URL = os.environ.get("PLANNER_BASE_URL", "http://127.0.0.1:8081/v1") |
| PLANNER_MODEL_ID = os.environ.get("PLANNER_MODEL_ID", "minicpm-planner") |
| |
| MCP_SSE_URL = os.environ.get( |
| "MCP_SSE_URL", f"http://127.0.0.1:{os.environ.get('PORT', '7860')}/gradio_api/mcp/sse" |
| ) |
|
|
| ORCH_TASK = """You are a scheduling agent for a busy parent. Read the thread below. |
| |
| Call exactly ONE tool — extract_events on the thread — then STOP. It returns the |
| events (the fine-tuned calendar model does the real work), a reply draft, and any |
| clarification. After that one call, return a short JSON summary: {{"events": <int>}}. |
| Do NOT call any other tool: conflict-checking and the .ics file are handled for you. |
| |
| {memory} |
| |
| Thread: |
| {thread} |
| """ |
|
|
|
|
| def _planner_configured() -> bool: |
| return bool(os.environ.get("PLANNER_HF_REPO") or os.environ.get("PLANNER_BASE_URL")) |
|
|
|
|
| def _use_llm_planner() -> bool: |
| return os.environ.get("USE_STUB_EXTRACTOR") != "1" and _planner_configured() |
|
|
|
|
| def _short(obj, limit: int = 1200) -> str: |
| try: |
| s = obj if isinstance(obj, str) else json.dumps(obj, default=str) |
| except Exception: |
| s = str(obj) |
| return s if len(s) <= limit else s[:limit] + " …" |
|
|
|
|
| |
| |
| |
| def _scripted_steps(thread: str, ics_b64: Optional[str], |
| memory_block: Optional[str], |
| images: Optional[list[str]] = None) -> Iterator[dict]: |
| from server import mcp_tools |
|
|
| yield {"kind": "plan", |
| "text": "Playbook: extract events from the thread" |
| + (f" + {len(images)} screenshot(s)" if images else "") |
| + (", check conflicts against the provided calendar" if ics_b64 else "") |
| + ", then render an .ics."} |
|
|
| yield {"kind": "tool_call", "tool": "extract_events", |
| "args": {"thread": _short(thread, 300), |
| **({"images": f"{len(images)} image(s)"} if images else {}), |
| **({"memory": "<user recall block>"} if memory_block else {})}} |
| plan = mcp_tools.extract_events(thread, images or None, memory_block) |
| yield {"kind": "tool_result", "tool": "extract_events", |
| "result": {"events": len(plan.get("events", [])), |
| "reply_draft": _short(plan.get("reply_draft") or "", 200)}} |
|
|
| conflicts: list = list(plan.get("conflicts") or []) |
| if ics_b64 and plan.get("events"): |
| yield {"kind": "tool_call", "tool": "check_conflicts", |
| "args": {"events": f"{len(plan['events'])} event(s)", "ics_base64": "<calendar>"}} |
| conflicts = mcp_tools.check_conflicts(plan["events"], ics_b64) |
| plan["conflicts"] = conflicts |
| yield {"kind": "tool_result", "tool": "check_conflicts", |
| "result": {"conflicts": len(conflicts)}} |
|
|
| ics_out = None |
| if plan.get("events"): |
| yield {"kind": "tool_call", "tool": "make_ics", |
| "args": {"events": f"{len(plan['events'])} event(s)"}} |
| ics_out = mcp_tools.make_ics(plan["events"]) |
| yield {"kind": "tool_result", "tool": "make_ics", |
| "result": {"ics_bytes": len(ics_out or "")}} |
|
|
| yield {"kind": "final", "plan": plan, "ics_base64": ics_out, |
| "summary": {"events": len(plan.get("events", [])), "conflicts": len(conflicts)}} |
|
|
|
|
| |
| |
| |
| def _smol_steps(thread: str, ics_b64: Optional[str], |
| memory_block: Optional[str], max_steps: int, |
| images: Optional[list[str]] = None) -> Iterator[dict]: |
| |
| |
| from smolagents import OpenAIServerModel, ToolCallingAgent |
| from smolagents.mcp_client import MCPClient |
|
|
| model = OpenAIServerModel( |
| model_id=PLANNER_MODEL_ID, api_base=PLANNER_BASE_URL, |
| api_key=os.environ.get("PLANNER_API_KEY", "local"), temperature=0.0, |
| ) |
| task = ORCH_TASK.format( |
| memory=(f"What you know about this user:\n{memory_block}" if memory_block else ""), |
| thread=thread, |
| ) |
|
|
| yield {"kind": "plan", "text": f"MiniCPM planner ({PLANNER_MODEL_ID}) engaged — " |
| f"tools via MCP at {MCP_SSE_URL}"} |
| with MCPClient({"url": MCP_SSE_URL, "transport": "sse"}) as tools: |
| |
| |
| |
| |
| |
| |
| |
| |
| _WANTED = {"extract_events"} |
| tools = [t for t in tools if getattr(t, "name", "") in _WANTED] |
| agent = ToolCallingAgent(tools=tools, model=model, max_steps=min(max_steps, 3)) |
| result = None |
| for step in agent.run(task, stream=True): |
| kind = type(step).__name__ |
| if kind == "ActionStep": |
| for call in (getattr(step, "tool_calls", None) or []): |
| yield {"kind": "tool_call", |
| "tool": getattr(call, "name", "?"), |
| "args": _short(getattr(call, "arguments", ""))} |
| obs = getattr(step, "observations", None) |
| if obs: |
| yield {"kind": "tool_result", "tool": "(observation)", |
| "result": _short(obs)} |
| elif kind == "FinalAnswerStep": |
| result = getattr(step, "final_answer", None) or getattr(step, "output", None) |
| yield {"kind": "plan", "text": f"Planner finished: {_short(result, 300)}"} |
|
|
| |
| |
| |
| yield from _scripted_steps(thread, ics_b64, memory_block, images) |
|
|
|
|
| |
| |
| |
| def run_orchestrator(thread: str, ics_b64: Optional[str] = None, |
| memory_block: Optional[str] = None, |
| max_steps: int = 6, |
| images: Optional[list[str]] = None) -> Iterator[dict]: |
| """Yield orchestration steps for a thread (+ optional screenshot data URIs); |
| always ends with a 'final' step (or an 'error' followed by the scripted |
| fallback's steps).""" |
| with bus.run_scope("agent"): |
| bus.emit("decision", "agent orchestrator run started") |
| if _use_llm_planner(): |
| try: |
| yield from _smol_steps(thread, ics_b64, memory_block, max_steps, images) |
| bus.emit("decision", "agent orchestrator run finished (MiniCPM planner)") |
| return |
| except Exception as e: |
| |
| |
| detail = f"{type(e).__name__}: {e}".strip().rstrip(":") |
| yield {"kind": "error", |
| "text": f"Planner unavailable ({_short(detail, 160)}) — " |
| "falling back to the scripted playbook."} |
| yield from _scripted_steps(thread, ics_b64, memory_block, images) |
| bus.emit("decision", "agent orchestrator run finished (scripted)") |
|
|