"""MiniCPM-planned agent orchestrator over the Space's own MCP tools. The Agent tab's engine: a small planner LLM (OpenBMB MiniCPM via a second llama-server, OpenAI-compatible) drives smolagents' ToolCallingAgent against the SAME tools this Space already exposes over MCP (extract_events / check_conflicts / make_ics) — consumed via the localhost MCP endpoint, so the agent demonstrably works through the public tool contract, not private imports. Everything stays local llama.cpp: no cloud AI APIs, every model under the 32B cap (gemma-cal E4B ~4B + MiniCPM 8B or 1B). Stub mode (USE_STUB_EXTRACTOR=1, used by the free preview and CI) — or any planner failure — falls back to ScriptedPlanner: the same tool sequence run deterministically, emitting identical step events, so the tab always works and tests never need a model. Steps are plain JSON-serialisable dicts: {"kind": "plan"|"tool_call"|"tool_result"|"final"|"error", ...} """ from __future__ import annotations import json import os from typing import Iterator, Optional from server import events as bus # Planner serving (second llama-server) — env-selected, OFF by default. # 8B default for planning quality; MiniCPM5-1B is the <=4B tiny variant. PLANNER_BASE_URL = os.environ.get("PLANNER_BASE_URL", "http://127.0.0.1:8081/v1") PLANNER_MODEL_ID = os.environ.get("PLANNER_MODEL_ID", "minicpm-planner") # Self MCP endpoint (localhost — no HF edge/auth between us and ourselves). MCP_SSE_URL = os.environ.get( "MCP_SSE_URL", f"http://127.0.0.1:{os.environ.get('PORT', '7860')}/gradio_api/mcp/sse" ) ORCH_TASK = """You are a scheduling agent for a busy parent. Read the thread below. Call exactly ONE tool — extract_events on the thread — then STOP. It returns the events (the fine-tuned calendar model does the real work), a reply draft, and any clarification. After that one call, return a short JSON summary: {{"events": }}. Do NOT call any other tool: conflict-checking and the .ics file are handled for you. {memory} Thread: {thread} """ def _planner_configured() -> bool: return bool(os.environ.get("PLANNER_HF_REPO") or os.environ.get("PLANNER_BASE_URL")) def _use_llm_planner() -> bool: return os.environ.get("USE_STUB_EXTRACTOR") != "1" and _planner_configured() def _short(obj, limit: int = 1200) -> str: try: s = obj if isinstance(obj, str) else json.dumps(obj, default=str) except Exception: # noqa: BLE001 s = str(obj) return s if len(s) <= limit else s[:limit] + " …" # --------------------------------------------------------------------------- # # ScriptedPlanner — deterministic fallback / stub-mode path # --------------------------------------------------------------------------- # def _scripted_steps(thread: str, ics_b64: Optional[str], memory_block: Optional[str], images: Optional[list[str]] = None) -> Iterator[dict]: from server import mcp_tools yield {"kind": "plan", "text": "Playbook: extract events from the thread" + (f" + {len(images)} screenshot(s)" if images else "") + (", check conflicts against the provided calendar" if ics_b64 else "") + ", then render an .ics."} yield {"kind": "tool_call", "tool": "extract_events", "args": {"thread": _short(thread, 300), **({"images": f"{len(images)} image(s)"} if images else {}), **({"memory": ""} if memory_block else {})}} plan = mcp_tools.extract_events(thread, images or None, memory_block) yield {"kind": "tool_result", "tool": "extract_events", "result": {"events": len(plan.get("events", [])), "reply_draft": _short(plan.get("reply_draft") or "", 200)}} conflicts: list = list(plan.get("conflicts") or []) if ics_b64 and plan.get("events"): yield {"kind": "tool_call", "tool": "check_conflicts", "args": {"events": f"{len(plan['events'])} event(s)", "ics_base64": ""}} conflicts = mcp_tools.check_conflicts(plan["events"], ics_b64) plan["conflicts"] = conflicts yield {"kind": "tool_result", "tool": "check_conflicts", "result": {"conflicts": len(conflicts)}} ics_out = None if plan.get("events"): yield {"kind": "tool_call", "tool": "make_ics", "args": {"events": f"{len(plan['events'])} event(s)"}} ics_out = mcp_tools.make_ics(plan["events"]) yield {"kind": "tool_result", "tool": "make_ics", "result": {"ics_bytes": len(ics_out or "")}} yield {"kind": "final", "plan": plan, "ics_base64": ics_out, "summary": {"events": len(plan.get("events", [])), "conflicts": len(conflicts)}} # --------------------------------------------------------------------------- # # smolagents path — MiniCPM planner over the self MCP endpoint # --------------------------------------------------------------------------- # def _smol_steps(thread: str, ics_b64: Optional[str], memory_block: Optional[str], max_steps: int, images: Optional[list[str]] = None) -> Iterator[dict]: # Lazy imports: smolagents is only needed on the real path, keeping CI and # the stub preview dependency-free. from smolagents import OpenAIServerModel, ToolCallingAgent # noqa: PLC0415 from smolagents.mcp_client import MCPClient # noqa: PLC0415 model = OpenAIServerModel( model_id=PLANNER_MODEL_ID, api_base=PLANNER_BASE_URL, api_key=os.environ.get("PLANNER_API_KEY", "local"), temperature=0.0, ) task = ORCH_TASK.format( memory=(f"What you know about this user:\n{memory_block}" if memory_block else ""), thread=thread, ) yield {"kind": "plan", "text": f"MiniCPM planner ({PLANNER_MODEL_ID}) engaged — " f"tools via MCP at {MCP_SSE_URL}"} with MCPClient({"url": MCP_SSE_URL, "transport": "sse"}) as tools: # Minimal-footprint planner: expose ONLY extract_events and cap the loop # at a couple of steps. The fine-tuned E4B (inside extract_events) does # the real work; conflict-checking and the .ics are finalized # deterministically by _scripted_steps below. This keeps the planner to a # single tool call so it stays fast and never accumulates enough context # to overflow (multi-step runs hit ~207s and 'request exceeds context'). # Restricting tools also avoids the File-input callbacks whose schemas # $ref #/$defs/FileData (which the planner's jinja rendering can't resolve). _WANTED = {"extract_events"} tools = [t for t in tools if getattr(t, "name", "") in _WANTED] agent = ToolCallingAgent(tools=tools, model=model, max_steps=min(max_steps, 3)) result = None for step in agent.run(task, stream=True): kind = type(step).__name__ if kind == "ActionStep": for call in (getattr(step, "tool_calls", None) or []): yield {"kind": "tool_call", "tool": getattr(call, "name", "?"), "args": _short(getattr(call, "arguments", ""))} obs = getattr(step, "observations", None) if obs: yield {"kind": "tool_result", "tool": "(observation)", "result": _short(obs)} elif kind == "FinalAnswerStep": result = getattr(step, "final_answer", None) or getattr(step, "output", None) yield {"kind": "plan", "text": f"Planner finished: {_short(result, 300)}"} # The planner's free-text answer isn't the product — re-derive the # structured plan through the deterministic path so the UI always gets a # valid ActionPlan + ics, with the planner trace above as the evidence. yield from _scripted_steps(thread, ics_b64, memory_block, images) # --------------------------------------------------------------------------- # # Entry point # --------------------------------------------------------------------------- # def run_orchestrator(thread: str, ics_b64: Optional[str] = None, memory_block: Optional[str] = None, max_steps: int = 6, images: Optional[list[str]] = None) -> Iterator[dict]: """Yield orchestration steps for a thread (+ optional screenshot data URIs); always ends with a 'final' step (or an 'error' followed by the scripted fallback's steps).""" with bus.run_scope("agent"): bus.emit("decision", "agent orchestrator run started") if _use_llm_planner(): try: yield from _smol_steps(thread, ics_b64, memory_block, max_steps, images) bus.emit("decision", "agent orchestrator run finished (MiniCPM planner)") return except Exception as e: # noqa: BLE001 planner down -> scripted fallback # Surface the actual message (e.g. which module is missing), not # just the type — a bare "ModuleNotFoundError" hides the cause. detail = f"{type(e).__name__}: {e}".strip().rstrip(":") yield {"kind": "error", "text": f"Planner unavailable ({_short(detail, 160)}) — " "falling back to the scripted playbook."} yield from _scripted_steps(thread, ics_b64, memory_block, images) bus.emit("decision", "agent orchestrator run finished (scripted)")