smolcode / engine /router.py
seanpoyner's picture
Upload folder using huggingface_hub
daea45b verified
Raw
History Blame Contribute Delete
20.1 kB
"""Tiered model router — the "forge-router" pattern.
The point of smolcode: don't burn a 32B model on a one-line helper, and don't
fail a hard task on a 3B. The router picks a *starting* tier from a cheap
complexity heuristic, runs the agent, then **escalates on failure**: if the
produced code doesn't actually pass when re-run, it retries the whole task on the
next-bigger model. The tier that ultimately solved it is surfaced for the UI badge.
Each tier is an independent SmallCodeAgent (its own model + fresh workspace), so
every model in the ladder uses LiteForge's native tool-calling loop — no parsing
hacks. All tiers are <=32B to stay hackathon-eligible.
"""
from __future__ import annotations
import os
import re
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from . import browsercheck
from .agent import SmallCodeAgent, Step
from .config import Preset, SpecialistLadder, SpecialistPreset, Tier, load_preset
from .judge import judge_correct, judge_enabled
from .live_run import LiveFrame
from .preview import find_entry, inline_app
from .trace_collector import TraceEvent
from .ui_trace import merge_step_metadata
# Signals that a task is non-trivial and worth starting higher up the ladder.
# Leading \b + trailing \w* so stems match their word family
# (recursi -> recursive, optimi -> optimize, concurren -> concurrency).
_HARD_HINTS = re.compile(
r"\b(class|async|thread|concurren|regex|pars|algorithm|optimi|recursi|"
r"benchmark|refactor|multiple files|api|server|database|sql|decorator|"
r"generator|data ?structure|graph|tree|dynamic programming)\w*",
re.I,
)
def _route_classifier():
"""The learned routing classifier singleton, or None if unavailable.
Importing route_clf pulls in pydantic (and lazily onnxruntime); any failure
here just means we route with the regex baseline below.
"""
try:
from .route_clf import get_classifier
return get_classifier()
except Exception:
return None
def classify_tier(task: str, n_tiers: int) -> int:
"""Pick a starting tier index (0 = smallest). Cheap, transparent heuristic."""
if n_tiers <= 1:
return 0
score = 0
if len(task) > 280:
score += 1
if len(_HARD_HINTS.findall(task)) >= 1:
score += 1
if len(_HARD_HINTS.findall(task)) >= 3:
score += 1
return min(score, n_tiers - 1)
# --- specialty (language/function) classifier --------------------------------
# Picks the specialist *family* for a task; classify_tier then picks the size
# within it. Same cheap, transparent, ordered-regex style as classify_tier.
# Priority on ties (earlier wins); 'py' is last because it's the safe default.
# `orchestrate` is first: explicit fan-out language is a strong, specific signal
# that should win over an incidental language mention.
_SPECIALTY_ORDER = ("orchestrate", "git", "terraform", "docker", "sql", "powershell",
"bsd", "rust", "go", "cpp", "java", "dotnet", "csharp", "bash",
"js", "py")
_FENCE_LANG = re.compile(r"```([a-z0-9+#.]+)", re.I)
_FENCE_TO_SPECIALTY = {
"python": "py", "py": "py", "pytest": "py",
"bash": "bash", "sh": "bash", "shell": "bash", "zsh": "bash", "console": "bash",
"powershell": "powershell", "ps1": "powershell", "pwsh": "powershell",
"sql": "sql", "psql": "sql", "sqlite": "sql",
"javascript": "js", "js": "js", "ts": "js", "typescript": "js",
"jsx": "js", "tsx": "js", "node": "js",
"go": "go", "golang": "go",
"rust": "rust", "rs": "rust",
"cpp": "cpp", "c++": "cpp", "cc": "cpp", "c": "cpp",
"java": "java",
"csharp": "csharp", "cs": "csharp",
"dockerfile": "docker", "docker": "docker",
"hcl": "terraform", "terraform": "terraform", "tf": "terraform",
}
_EXT_RE = re.compile(r"\.(py|sh|bash|ps1|sql|js|mjs|cjs|ts|tsx|jsx|go|rs|cpp|cc|cxx|"
r"hpp|java|cs|csproj|tf|dockerfile)\b", re.I)
_EXT_TO_SPECIALTY = {
"py": "py", "sh": "bash", "bash": "bash", "ps1": "powershell", "sql": "sql",
"js": "js", "mjs": "js", "cjs": "js", "ts": "js", "tsx": "js", "jsx": "js",
"go": "go", "rs": "rust", "cpp": "cpp", "cc": "cpp", "cxx": "cpp", "hpp": "cpp",
"java": "java", "cs": "csharp", "csproj": "dotnet", "tf": "terraform",
"dockerfile": "docker",
}
_SPECIALTY_HINTS = {
# Fan-out / parallel delegation work -> the task_batch specialist.
"orchestrate": re.compile(r"\b(in parallel|fan ?out|concurrently|task_batch|"
r"orchestrat|several independent|multiple independent|"
r"simultaneously|batch of (tasks|jobs))\w*", re.I),
# NOTE: `staged` requires the trailing 'd' so it does NOT match "stage" inside
# "multi-stage" (a docker term) — that false-positive misrouted Docker tasks.
"git": re.compile(r"\b(git|commit|rebase|cherry-?pick|merge conflict|stash|"
r"\bbranch\b|pull request|\bPR\b|revert|bisect|staged)\w*", re.I),
"terraform": re.compile(r"\b(terraform|\bhcl\b|\.tf\b|provider|resource block|"
r"infrastructure as code|\biac\b|tfstate)\w*", re.I),
"docker": re.compile(r"\b(docker|dockerfile|docker-?compose|container image|"
r"\bimage\b|\bbuild -t\b|entrypoint)\w*", re.I),
"sql": re.compile(r"\b(sql|select |insert |update |delete |join|schema|"
r"\btable\b|\bindex\b|migration|postgres|sqlite|mysql|query)\w*", re.I),
"powershell": re.compile(r"\b(powershell|pwsh|\.ps1|cmdlet|get-|set-|write-output)\w*", re.I),
"bsd": re.compile(r"\b(freebsd|openbsd|netbsd|\bbsd\b|pf\.conf|rc\.d|pkg_add)\w*", re.I),
"rust": re.compile(r"\b(rust|cargo|crate|rustc|\.rs\b|borrow checker|tokio)\w*", re.I),
"go": re.compile(r"\b(golang|\bgo\b|goroutine|go mod|go test|\.go\b)\w*", re.I),
"cpp": re.compile(r"\b(c\+\+|cpp|g\+\+|clang|std::|cmake|\.cpp\b|template)\w*", re.I),
"java": re.compile(r"\b(java|maven|gradle|\bjvm\b|junit|\.java\b)\w*", re.I),
"dotnet": re.compile(r"\b(\.net|dotnet|nuget|asp\.net|\.csproj|msbuild)\w*", re.I),
"csharp": re.compile(r"\b(c#|csharp|\blinq\b|\.cs\b|\bxunit\b)\w*", re.I),
"bash": re.compile(r"\b(shell script|\bbash\b|\bzsh\b|chmod|grep|sed|awk|"
r"\bpipe\b|cron|stdout|stderr|\$PATH)\w*", re.I),
"js": re.compile(r"\b(javascript|typescript|node|npm|react|vue|jsx|tsx|"
r"webpack|vite|eslint|package\.json)\w*", re.I),
"py": re.compile(r"\b(python|pytest|pandas|numpy|django|flask|pip|venv|"
r"def |async def|decorator)\w*", re.I),
}
def classify_specialty(task: str, *, default: str = "py") -> str:
"""Pick the specialist family key for a task. Cheap, transparent, deterministic.
Precedence (most explicit signal first): SMALLCODE_SPECIALTY env override ->
code-fence language tag -> file extensions mentioned -> keyword-cue scoring ->
default. Mirrors classify_tier's style; pairs with it for 2D routing.
"""
forced = os.environ.get("SMALLCODE_SPECIALTY")
if forced:
return forced.strip().lower()
# A fenced code block (```lang) is the single most explicit signal -> hard win.
for lang in _FENCE_LANG.findall(task):
s = _FENCE_TO_SPECIALTY.get(lang.lower())
if s:
return s
# Otherwise SCORE keyword cues AND file-extension mentions together, so a strong
# action signal (e.g. "rebase ... merge conflict") beats an incidental ".py"
# filename. Ties broken by _SPECIALTY_ORDER (earlier = higher priority).
scores = {s: len(rx.findall(task)) for s, rx in _SPECIALTY_HINTS.items()}
for e in _EXT_RE.findall(task):
s = _EXT_TO_SPECIALTY.get(e.lower())
if s:
scores[s] = scores.get(s, 0) + 1
best = max(scores, key=lambda s: (scores[s], -_SPECIALTY_ORDER.index(s)))
if scores[best] > 0:
return best
return default
@dataclass
class RouteResult:
final: str
steps: list[Step]
tier_name: str
tier_model: str
start_tier: str
escalations: int
verified: bool
specialty: str = "general"
files: dict[str, str] = field(default_factory=dict)
trace_events: list[TraceEvent] = field(default_factory=list)
agent: SmallCodeAgent | None = None
def _smoke_command(files: list[str]) -> str | None:
"""A best-effort 'does it build/run (and pass any tests)?' shell command for a
NON-Python solution, or None if the language isn't recognized. Mirrors the
per-specialty run commands (finetune/specialties.py) so the router can escalate
on go/rust/js/sql/… exactly like it does on Python via run_python."""
def ext(e: str) -> list[str]:
return [f for f in files if f.endswith(e)]
if ext(".go"):
if any(f.endswith("_test.go") for f in files):
return "go test ./... 2>&1"
return "go run . 2>&1 || go run *.go 2>&1"
if "Cargo.toml" in files:
return "cargo test -q 2>&1 || cargo build -q 2>&1"
if ext(".rs"):
return f"rustc {ext('.rs')[0]} -o /tmp/_smv 2>&1 && /tmp/_smv"
js = ext(".js") + ext(".mjs") + ext(".cjs") + ext(".ts")
if "package.json" in files:
return "npm test --silent 2>&1 || node --test 2>&1"
if js:
if any(".test." in f or ".spec." in f for f in js):
return "node --test 2>&1"
entry = next((f for f in js if f in ("index.js", "main.js")), js[0])
return f"node {entry} 2>&1"
if ext(".sql"):
return f"sqlite3 :memory: < {ext('.sql')[0]} 2>&1"
if ext(".cpp") or ext(".cc"):
srcs = " ".join(ext(".cpp") + ext(".cc"))
return f"g++ -std=c++17 {srcs} -o /tmp/_smv 2>&1 && /tmp/_smv"
if ext(".java"):
main = "Main" if "Main.java" in files else ext(".java")[0][:-5]
return f"javac *.java 2>&1 && java {main} 2>&1"
if ext(".sh"):
return f"bash {ext('.sh')[0]} 2>&1"
if ext(".tf"):
return "terraform init -backend=false 2>&1 && terraform validate 2>&1"
if "Program.cs" in files or ext(".cs"):
return "dotnet run 2>&1"
return None
def _verify(agent: SmallCodeAgent) -> bool | None:
"""Independently check the agent's output actually works.
Returns True/False if there's something runnable to check, else None
(unverifiable — don't escalate purely on a missing signal). Python uses the
pytest/run_python fast paths; other languages smoke-run via run_shell so the
specialist router escalates on a broken go/rust/sql/… solution instead of
silently accepting the smallest tier.
"""
ws = agent.workspace
files = ws.list_files()
pys = [f for f in files if f.endswith(".py")]
if pys:
if any("test" in f.lower() for f in pys):
return ws.run_tests().ok
entry = next((f for f in pys if f in ("main.py", "solution.py")), None) or pys[0]
return ws.run_python(path=entry).ok
# Web app (index.html + browser JS): render it in a real browser — must come
# BEFORE the shell smoke-run so we don't `node` browser-side JS. Same signal
# smolbuilder's WebBuilder uses (engine/builder._evaluate).
web_files = agent.files()
if find_entry(web_files) is not None:
ok, _errors = browsercheck.check_html(inline_app(web_files))
return ok
cmd = _smoke_command(files)
if cmd is not None:
return ws.run_shell(cmd, timeout=90).ok
return None
def _build_result(agent: SmallCodeAgent, final: str, steps: list[Step], tier: Tier,
start_name: str, escalations: int, verified: bool,
specialty: str = "general") -> RouteResult:
events = merge_step_metadata(agent.trace_collector.snapshot(), agent.raw_history())
return RouteResult(
final=final, steps=steps, tier_name=tier.name, tier_model=tier.model,
start_tier=start_name, escalations=escalations, verified=verified,
specialty=specialty, files=agent.files(), trace_events=events, agent=agent,
)
# Difficulty buckets the tier head predicts (matches route_clf.TIER_BUCKETS). Kept as
# a local constant so router.py imports even when route_clf's deps (pydantic) are
# absent. The bucket drives BOTH the thinking level and the start-tier clamp, so it's
# decoupled from the ladder length — think stays meaningful even for a pinned 1-tier
# preset.
_THINK_BUCKETS = 3
class Router:
def __init__(
self,
preset: Preset | None = None,
max_steps: int = 12,
approval_handler=None,
workspace_dir: str | None = None,
think: str = "off",
yolo: bool = False,
agent: str = "build",
size_floor: str | None = None,
) -> None:
self.preset = preset or load_preset()
self.tiers: list[Tier] = self.preset.tiers
self.max_steps = max_steps
self.approval_handler = approval_handler
self.workspace_dir = workspace_dir
self.think = think
self.yolo = yolo
self.agent_name = agent
# "Auto · <size>" pins the START rung to this specialist size (e.g. "3b") while
# the router still picks the specialty and escalation still climbs the ladder.
self.size_floor = size_floor
async def run(self, task: str) -> RouteResult:
result: RouteResult | None = None
async for frame in self.run_live(task):
if frame.done and isinstance(frame.result, RouteResult):
result = frame.result
assert result is not None
return result
def _ladder_for(self, task: str, specialty: str | None = None) -> SpecialistLadder:
"""The size ladder for this task's specialty (generic if not a matrix preset).
`specialty` may be supplied by the learned classifier; falls back to the
regex classify_specialty when not given.
"""
if isinstance(self.preset, SpecialistPreset):
if specialty is None:
specialty = classify_specialty(task)
return self.preset.ladder_for(specialty)
return SpecialistLadder(specialty="general", tiers=self.preset.tiers)
def _size_floor_index(self, tiers: list[Tier], size_floor: str) -> int:
"""Start-rung index for an 'Auto · <size>' pin: the first ladder tier whose
size is >= the floor (closest available, then escalates). Falls back to 0."""
from .config import parse_size_b
target = parse_size_b(size_floor if str(size_floor).lower().endswith("b")
else f"{size_floor}b")
if target <= 0:
return 0
for i, t in enumerate(tiers):
if parse_size_b(t.model) >= target:
return i
return max(len(tiers) - 1, 0)
def _route(self, task: str) -> tuple[SpecialistLadder, int, str]:
"""Pick (ladder, start-tier index, thinking level) for a task.
Uses the learned RouteClassifier when it's confident; otherwise the regex
baseline. A difficulty bucket (decoupled from ladder length) drives both the
start rung and the thinking level. `size_floor` (Auto · <size>) overrides the
start rung; an explicit user `/think` (anything but the default "off") wins.
"""
clf = _route_classifier()
has_clf = clf is not None and clf.available
# 1. specialty -> size ladder
if has_clf and isinstance(self.preset, SpecialistPreset):
specialty = clf.pick_specialty(task, list(self.preset.ladders))[0]
ladder = self._ladder_for(task, specialty=specialty)
else:
ladder = self._ladder_for(task)
tiers = ladder.tiers
# 2. difficulty bucket (0..TIER_BUCKETS-1) + escalation hint
if has_clf:
bucket = clf.pick_tier(task, _THINK_BUCKETS)[0]
esc = clf.pick_escalate(task)[0]
else:
bucket = classify_tier(task, _THINK_BUCKETS)
esc = False
# 3. start rung: an explicit size floor wins; else the difficulty bucket
if self.size_floor:
start = self._size_floor_index(tiers, self.size_floor)
else:
start = min(bucket, max(len(tiers) - 1, 0))
# 4. thinking level: explicit /think wins; else router-derived (clf only)
if self.think != "off":
think = self.think
elif has_clf:
think = clf.think_for(bucket, _THINK_BUCKETS, esc)
else:
think = "off"
return ladder, start, think
async def run_live(
self,
task: str,
*,
rust_session=None,
) -> AsyncIterator[LiveFrame]:
"""Yield live frames while routing; final frame carries RouteResult."""
ladder, start, think = self._route(task)
specialty = ladder.specialty
tiers = ladder.tiers
escalations = 0
last: RouteResult | None = None
prev_tier_name: str | None = None
for idx in range(start, len(tiers)):
tier = tiers[idx]
if prev_tier_name is not None:
yield LiveFrame(events=[
TraceEvent(kind="tier_escalation", name=tier.name,
detail=f"escalated from {prev_tier_name}"),
])
# The start tier reuses the caller's session; make it run the ROUTED model
# (not whatever the UI last pinned), so "Auto" honors the router's pick and
# a concrete pin (single-tier ladder) runs exactly that model.
if idx == start and rust_session is not None:
try:
rust_session.set_model(tier.model)
except Exception:
pass
agent = SmallCodeAgent(
preset=self.preset,
model=tier.model,
max_steps=self.max_steps,
approval_handler=self.approval_handler,
workspace_dir=self.workspace_dir,
agent=self.agent_name,
yolo=self.yolo,
rust_session=rust_session if idx == start else None,
)
async for frame in agent.run_live_turn(
task, think=think, yolo=self.yolo,
):
if not frame.done:
yield frame
continue
final, steps = frame.result
ok = False if (agent.hit_max_steps or agent.errored) else _verify(agent)
# _verify only proves the code RAN, not that it's correct. If it ran
# clean (ok is True) but a bigger tier exists, ask a judge whether the
# solution actually satisfies the task; a concrete "no" -> escalate.
if ok is True and idx < len(tiers) - 1 and judge_enabled():
correct = await judge_correct(
self.preset, tiers[idx + 1].model, task, agent.files(), final,
)
if not correct:
ok = False
last = _build_result(
agent, final, steps, tier, tiers[start].name,
escalations, bool(ok), specialty=specialty,
)
if ok is not False:
yield LiveFrame(
steps=steps,
events=last.trace_events,
files=last.files,
done=True,
result=last,
)
return
if idx < len(tiers) - 1:
agent.trace_collector.record_escalation(tier.name, tiers[idx + 1].name)
agent.cleanup()
escalations += 1
prev_tier_name = tier.name
break
if last is not None:
yield LiveFrame(
steps=last.steps,
events=last.trace_events,
files=last.files,
done=True,
result=last,
)