OffGridSchedula / server /orchestrator.py
ParetoOptimal's picture
Initial Commit
0366d65
Raw
History Blame Contribute Delete
9.61 kB
"""MiniCPM-planned agent orchestrator over the Space's own MCP tools.
The Agent tab's engine: a small planner LLM (OpenBMB MiniCPM via a second
llama-server, OpenAI-compatible) drives smolagents' ToolCallingAgent against
the SAME tools this Space already exposes over MCP (extract_events /
check_conflicts / make_ics) — consumed via the localhost MCP endpoint, so the
agent demonstrably works through the public tool contract, not private
imports. Everything stays local llama.cpp: no cloud AI APIs, every model
under the 32B cap (gemma-cal E4B ~4B + MiniCPM 8B or 1B).
Stub mode (USE_STUB_EXTRACTOR=1, used by the free preview and CI) — or any
planner failure — falls back to ScriptedPlanner: the same tool sequence run
deterministically, emitting identical step events, so the tab always works
and tests never need a model.
Steps are plain JSON-serialisable dicts:
{"kind": "plan"|"tool_call"|"tool_result"|"final"|"error", ...}
"""
from __future__ import annotations
import json
import os
from typing import Iterator, Optional
from server import events as bus
# Planner serving (second llama-server) — env-selected, OFF by default.
# 8B default for planning quality; MiniCPM5-1B is the <=4B tiny variant.
PLANNER_BASE_URL = os.environ.get("PLANNER_BASE_URL", "http://127.0.0.1:8081/v1")
PLANNER_MODEL_ID = os.environ.get("PLANNER_MODEL_ID", "minicpm-planner")
# Self MCP endpoint (localhost — no HF edge/auth between us and ourselves).
MCP_SSE_URL = os.environ.get(
"MCP_SSE_URL", f"http://127.0.0.1:{os.environ.get('PORT', '7860')}/gradio_api/mcp/sse"
)
ORCH_TASK = """You are a scheduling agent for a busy parent. Read the thread below.
Call exactly ONE tool — extract_events on the thread — then STOP. It returns the
events (the fine-tuned calendar model does the real work), a reply draft, and any
clarification. After that one call, return a short JSON summary: {{"events": <int>}}.
Do NOT call any other tool: conflict-checking and the .ics file are handled for you.
{memory}
Thread:
{thread}
"""
def _planner_configured() -> bool:
return bool(os.environ.get("PLANNER_HF_REPO") or os.environ.get("PLANNER_BASE_URL"))
def _use_llm_planner() -> bool:
return os.environ.get("USE_STUB_EXTRACTOR") != "1" and _planner_configured()
def _short(obj, limit: int = 1200) -> str:
try:
s = obj if isinstance(obj, str) else json.dumps(obj, default=str)
except Exception: # noqa: BLE001
s = str(obj)
return s if len(s) <= limit else s[:limit] + " …"
# --------------------------------------------------------------------------- #
# ScriptedPlanner — deterministic fallback / stub-mode path
# --------------------------------------------------------------------------- #
def _scripted_steps(thread: str, ics_b64: Optional[str],
memory_block: Optional[str],
images: Optional[list[str]] = None) -> Iterator[dict]:
from server import mcp_tools
yield {"kind": "plan",
"text": "Playbook: extract events from the thread"
+ (f" + {len(images)} screenshot(s)" if images else "")
+ (", check conflicts against the provided calendar" if ics_b64 else "")
+ ", then render an .ics."}
yield {"kind": "tool_call", "tool": "extract_events",
"args": {"thread": _short(thread, 300),
**({"images": f"{len(images)} image(s)"} if images else {}),
**({"memory": "<user recall block>"} if memory_block else {})}}
plan = mcp_tools.extract_events(thread, images or None, memory_block)
yield {"kind": "tool_result", "tool": "extract_events",
"result": {"events": len(plan.get("events", [])),
"reply_draft": _short(plan.get("reply_draft") or "", 200)}}
conflicts: list = list(plan.get("conflicts") or [])
if ics_b64 and plan.get("events"):
yield {"kind": "tool_call", "tool": "check_conflicts",
"args": {"events": f"{len(plan['events'])} event(s)", "ics_base64": "<calendar>"}}
conflicts = mcp_tools.check_conflicts(plan["events"], ics_b64)
plan["conflicts"] = conflicts
yield {"kind": "tool_result", "tool": "check_conflicts",
"result": {"conflicts": len(conflicts)}}
ics_out = None
if plan.get("events"):
yield {"kind": "tool_call", "tool": "make_ics",
"args": {"events": f"{len(plan['events'])} event(s)"}}
ics_out = mcp_tools.make_ics(plan["events"])
yield {"kind": "tool_result", "tool": "make_ics",
"result": {"ics_bytes": len(ics_out or "")}}
yield {"kind": "final", "plan": plan, "ics_base64": ics_out,
"summary": {"events": len(plan.get("events", [])), "conflicts": len(conflicts)}}
# --------------------------------------------------------------------------- #
# smolagents path — MiniCPM planner over the self MCP endpoint
# --------------------------------------------------------------------------- #
def _smol_steps(thread: str, ics_b64: Optional[str],
memory_block: Optional[str], max_steps: int,
images: Optional[list[str]] = None) -> Iterator[dict]:
# Lazy imports: smolagents is only needed on the real path, keeping CI and
# the stub preview dependency-free.
from smolagents import OpenAIServerModel, ToolCallingAgent # noqa: PLC0415
from smolagents.mcp_client import MCPClient # noqa: PLC0415
model = OpenAIServerModel(
model_id=PLANNER_MODEL_ID, api_base=PLANNER_BASE_URL,
api_key=os.environ.get("PLANNER_API_KEY", "local"), temperature=0.0,
)
task = ORCH_TASK.format(
memory=(f"What you know about this user:\n{memory_block}" if memory_block else ""),
thread=thread,
)
yield {"kind": "plan", "text": f"MiniCPM planner ({PLANNER_MODEL_ID}) engaged — "
f"tools via MCP at {MCP_SSE_URL}"}
with MCPClient({"url": MCP_SSE_URL, "transport": "sse"}) as tools:
# Minimal-footprint planner: expose ONLY extract_events and cap the loop
# at a couple of steps. The fine-tuned E4B (inside extract_events) does
# the real work; conflict-checking and the .ics are finalized
# deterministically by _scripted_steps below. This keeps the planner to a
# single tool call so it stays fast and never accumulates enough context
# to overflow (multi-step runs hit ~207s and 'request exceeds context').
# Restricting tools also avoids the File-input callbacks whose schemas
# $ref #/$defs/FileData (which the planner's jinja rendering can't resolve).
_WANTED = {"extract_events"}
tools = [t for t in tools if getattr(t, "name", "") in _WANTED]
agent = ToolCallingAgent(tools=tools, model=model, max_steps=min(max_steps, 3))
result = None
for step in agent.run(task, stream=True):
kind = type(step).__name__
if kind == "ActionStep":
for call in (getattr(step, "tool_calls", None) or []):
yield {"kind": "tool_call",
"tool": getattr(call, "name", "?"),
"args": _short(getattr(call, "arguments", ""))}
obs = getattr(step, "observations", None)
if obs:
yield {"kind": "tool_result", "tool": "(observation)",
"result": _short(obs)}
elif kind == "FinalAnswerStep":
result = getattr(step, "final_answer", None) or getattr(step, "output", None)
yield {"kind": "plan", "text": f"Planner finished: {_short(result, 300)}"}
# The planner's free-text answer isn't the product — re-derive the
# structured plan through the deterministic path so the UI always gets a
# valid ActionPlan + ics, with the planner trace above as the evidence.
yield from _scripted_steps(thread, ics_b64, memory_block, images)
# --------------------------------------------------------------------------- #
# Entry point
# --------------------------------------------------------------------------- #
def run_orchestrator(thread: str, ics_b64: Optional[str] = None,
memory_block: Optional[str] = None,
max_steps: int = 6,
images: Optional[list[str]] = None) -> Iterator[dict]:
"""Yield orchestration steps for a thread (+ optional screenshot data URIs);
always ends with a 'final' step (or an 'error' followed by the scripted
fallback's steps)."""
with bus.run_scope("agent"):
bus.emit("decision", "agent orchestrator run started")
if _use_llm_planner():
try:
yield from _smol_steps(thread, ics_b64, memory_block, max_steps, images)
bus.emit("decision", "agent orchestrator run finished (MiniCPM planner)")
return
except Exception as e: # noqa: BLE001 planner down -> scripted fallback
# Surface the actual message (e.g. which module is missing), not
# just the type — a bare "ModuleNotFoundError" hides the cause.
detail = f"{type(e).__name__}: {e}".strip().rstrip(":")
yield {"kind": "error",
"text": f"Planner unavailable ({_short(detail, 160)}) — "
"falling back to the scripted playbook."}
yield from _scripted_steps(thread, ics_b64, memory_block, images)
bus.emit("decision", "agent orchestrator run finished (scripted)")