"""Tiered model router — the "forge-router" pattern. The point of smolcode: don't burn a 32B model on a one-line helper, and don't fail a hard task on a 3B. The router picks a *starting* tier from a cheap complexity heuristic, runs the agent, then **escalates on failure**: if the produced code doesn't actually pass when re-run, it retries the whole task on the next-bigger model. The tier that ultimately solved it is surfaced for the UI badge. Each tier is an independent SmallCodeAgent (its own model + fresh workspace), so every model in the ladder uses LiteForge's native tool-calling loop — no parsing hacks. All tiers are <=32B to stay hackathon-eligible. """ from __future__ import annotations import os import re from collections.abc import AsyncIterator from dataclasses import dataclass, field from . import browsercheck from .agent import SmallCodeAgent, Step from .config import Preset, SpecialistLadder, SpecialistPreset, Tier, load_preset from .judge import judge_correct, judge_enabled from .live_run import LiveFrame from .preview import find_entry, inline_app from .trace_collector import TraceEvent from .ui_trace import merge_step_metadata # Signals that a task is non-trivial and worth starting higher up the ladder. # Leading \b + trailing \w* so stems match their word family # (recursi -> recursive, optimi -> optimize, concurren -> concurrency). _HARD_HINTS = re.compile( r"\b(class|async|thread|concurren|regex|pars|algorithm|optimi|recursi|" r"benchmark|refactor|multiple files|api|server|database|sql|decorator|" r"generator|data ?structure|graph|tree|dynamic programming)\w*", re.I, ) def _route_classifier(): """The learned routing classifier singleton, or None if unavailable. Importing route_clf pulls in pydantic (and lazily onnxruntime); any failure here just means we route with the regex baseline below. """ try: from .route_clf import get_classifier return get_classifier() except Exception: return None def classify_tier(task: str, n_tiers: int) -> int: """Pick a starting tier index (0 = smallest). Cheap, transparent heuristic.""" if n_tiers <= 1: return 0 score = 0 if len(task) > 280: score += 1 if len(_HARD_HINTS.findall(task)) >= 1: score += 1 if len(_HARD_HINTS.findall(task)) >= 3: score += 1 return min(score, n_tiers - 1) # --- specialty (language/function) classifier -------------------------------- # Picks the specialist *family* for a task; classify_tier then picks the size # within it. Same cheap, transparent, ordered-regex style as classify_tier. # Priority on ties (earlier wins); 'py' is last because it's the safe default. # `orchestrate` is first: explicit fan-out language is a strong, specific signal # that should win over an incidental language mention. _SPECIALTY_ORDER = ("orchestrate", "git", "terraform", "docker", "sql", "powershell", "bsd", "rust", "go", "cpp", "java", "dotnet", "csharp", "bash", "js", "py") _FENCE_LANG = re.compile(r"```([a-z0-9+#.]+)", re.I) _FENCE_TO_SPECIALTY = { "python": "py", "py": "py", "pytest": "py", "bash": "bash", "sh": "bash", "shell": "bash", "zsh": "bash", "console": "bash", "powershell": "powershell", "ps1": "powershell", "pwsh": "powershell", "sql": "sql", "psql": "sql", "sqlite": "sql", "javascript": "js", "js": "js", "ts": "js", "typescript": "js", "jsx": "js", "tsx": "js", "node": "js", "go": "go", "golang": "go", "rust": "rust", "rs": "rust", "cpp": "cpp", "c++": "cpp", "cc": "cpp", "c": "cpp", "java": "java", "csharp": "csharp", "cs": "csharp", "dockerfile": "docker", "docker": "docker", "hcl": "terraform", "terraform": "terraform", "tf": "terraform", } _EXT_RE = re.compile(r"\.(py|sh|bash|ps1|sql|js|mjs|cjs|ts|tsx|jsx|go|rs|cpp|cc|cxx|" r"hpp|java|cs|csproj|tf|dockerfile)\b", re.I) _EXT_TO_SPECIALTY = { "py": "py", "sh": "bash", "bash": "bash", "ps1": "powershell", "sql": "sql", "js": "js", "mjs": "js", "cjs": "js", "ts": "js", "tsx": "js", "jsx": "js", "go": "go", "rs": "rust", "cpp": "cpp", "cc": "cpp", "cxx": "cpp", "hpp": "cpp", "java": "java", "cs": "csharp", "csproj": "dotnet", "tf": "terraform", "dockerfile": "docker", } _SPECIALTY_HINTS = { # Fan-out / parallel delegation work -> the task_batch specialist. "orchestrate": re.compile(r"\b(in parallel|fan ?out|concurrently|task_batch|" r"orchestrat|several independent|multiple independent|" r"simultaneously|batch of (tasks|jobs))\w*", re.I), # NOTE: `staged` requires the trailing 'd' so it does NOT match "stage" inside # "multi-stage" (a docker term) — that false-positive misrouted Docker tasks. "git": re.compile(r"\b(git|commit|rebase|cherry-?pick|merge conflict|stash|" r"\bbranch\b|pull request|\bPR\b|revert|bisect|staged)\w*", re.I), "terraform": re.compile(r"\b(terraform|\bhcl\b|\.tf\b|provider|resource block|" r"infrastructure as code|\biac\b|tfstate)\w*", re.I), "docker": re.compile(r"\b(docker|dockerfile|docker-?compose|container image|" r"\bimage\b|\bbuild -t\b|entrypoint)\w*", re.I), "sql": re.compile(r"\b(sql|select |insert |update |delete |join|schema|" r"\btable\b|\bindex\b|migration|postgres|sqlite|mysql|query)\w*", re.I), "powershell": re.compile(r"\b(powershell|pwsh|\.ps1|cmdlet|get-|set-|write-output)\w*", re.I), "bsd": re.compile(r"\b(freebsd|openbsd|netbsd|\bbsd\b|pf\.conf|rc\.d|pkg_add)\w*", re.I), "rust": re.compile(r"\b(rust|cargo|crate|rustc|\.rs\b|borrow checker|tokio)\w*", re.I), "go": re.compile(r"\b(golang|\bgo\b|goroutine|go mod|go test|\.go\b)\w*", re.I), "cpp": re.compile(r"\b(c\+\+|cpp|g\+\+|clang|std::|cmake|\.cpp\b|template)\w*", re.I), "java": re.compile(r"\b(java|maven|gradle|\bjvm\b|junit|\.java\b)\w*", re.I), "dotnet": re.compile(r"\b(\.net|dotnet|nuget|asp\.net|\.csproj|msbuild)\w*", re.I), "csharp": re.compile(r"\b(c#|csharp|\blinq\b|\.cs\b|\bxunit\b)\w*", re.I), "bash": re.compile(r"\b(shell script|\bbash\b|\bzsh\b|chmod|grep|sed|awk|" r"\bpipe\b|cron|stdout|stderr|\$PATH)\w*", re.I), "js": re.compile(r"\b(javascript|typescript|node|npm|react|vue|jsx|tsx|" r"webpack|vite|eslint|package\.json)\w*", re.I), "py": re.compile(r"\b(python|pytest|pandas|numpy|django|flask|pip|venv|" r"def |async def|decorator)\w*", re.I), } def classify_specialty(task: str, *, default: str = "py") -> str: """Pick the specialist family key for a task. Cheap, transparent, deterministic. Precedence (most explicit signal first): SMALLCODE_SPECIALTY env override -> code-fence language tag -> file extensions mentioned -> keyword-cue scoring -> default. Mirrors classify_tier's style; pairs with it for 2D routing. """ forced = os.environ.get("SMALLCODE_SPECIALTY") if forced: return forced.strip().lower() # A fenced code block (```lang) is the single most explicit signal -> hard win. for lang in _FENCE_LANG.findall(task): s = _FENCE_TO_SPECIALTY.get(lang.lower()) if s: return s # Otherwise SCORE keyword cues AND file-extension mentions together, so a strong # action signal (e.g. "rebase ... merge conflict") beats an incidental ".py" # filename. Ties broken by _SPECIALTY_ORDER (earlier = higher priority). scores = {s: len(rx.findall(task)) for s, rx in _SPECIALTY_HINTS.items()} for e in _EXT_RE.findall(task): s = _EXT_TO_SPECIALTY.get(e.lower()) if s: scores[s] = scores.get(s, 0) + 1 best = max(scores, key=lambda s: (scores[s], -_SPECIALTY_ORDER.index(s))) if scores[best] > 0: return best return default @dataclass class RouteResult: final: str steps: list[Step] tier_name: str tier_model: str start_tier: str escalations: int verified: bool specialty: str = "general" files: dict[str, str] = field(default_factory=dict) trace_events: list[TraceEvent] = field(default_factory=list) agent: SmallCodeAgent | None = None def _smoke_command(files: list[str]) -> str | None: """A best-effort 'does it build/run (and pass any tests)?' shell command for a NON-Python solution, or None if the language isn't recognized. Mirrors the per-specialty run commands (finetune/specialties.py) so the router can escalate on go/rust/js/sql/… exactly like it does on Python via run_python.""" def ext(e: str) -> list[str]: return [f for f in files if f.endswith(e)] if ext(".go"): if any(f.endswith("_test.go") for f in files): return "go test ./... 2>&1" return "go run . 2>&1 || go run *.go 2>&1" if "Cargo.toml" in files: return "cargo test -q 2>&1 || cargo build -q 2>&1" if ext(".rs"): return f"rustc {ext('.rs')[0]} -o /tmp/_smv 2>&1 && /tmp/_smv" js = ext(".js") + ext(".mjs") + ext(".cjs") + ext(".ts") if "package.json" in files: return "npm test --silent 2>&1 || node --test 2>&1" if js: if any(".test." in f or ".spec." in f for f in js): return "node --test 2>&1" entry = next((f for f in js if f in ("index.js", "main.js")), js[0]) return f"node {entry} 2>&1" if ext(".sql"): return f"sqlite3 :memory: < {ext('.sql')[0]} 2>&1" if ext(".cpp") or ext(".cc"): srcs = " ".join(ext(".cpp") + ext(".cc")) return f"g++ -std=c++17 {srcs} -o /tmp/_smv 2>&1 && /tmp/_smv" if ext(".java"): main = "Main" if "Main.java" in files else ext(".java")[0][:-5] return f"javac *.java 2>&1 && java {main} 2>&1" if ext(".sh"): return f"bash {ext('.sh')[0]} 2>&1" if ext(".tf"): return "terraform init -backend=false 2>&1 && terraform validate 2>&1" if "Program.cs" in files or ext(".cs"): return "dotnet run 2>&1" return None def _verify(agent: SmallCodeAgent) -> bool | None: """Independently check the agent's output actually works. Returns True/False if there's something runnable to check, else None (unverifiable — don't escalate purely on a missing signal). Python uses the pytest/run_python fast paths; other languages smoke-run via run_shell so the specialist router escalates on a broken go/rust/sql/… solution instead of silently accepting the smallest tier. """ ws = agent.workspace files = ws.list_files() pys = [f for f in files if f.endswith(".py")] if pys: if any("test" in f.lower() for f in pys): return ws.run_tests().ok entry = next((f for f in pys if f in ("main.py", "solution.py")), None) or pys[0] return ws.run_python(path=entry).ok # Web app (index.html + browser JS): render it in a real browser — must come # BEFORE the shell smoke-run so we don't `node` browser-side JS. Same signal # smolbuilder's WebBuilder uses (engine/builder._evaluate). web_files = agent.files() if find_entry(web_files) is not None: ok, _errors = browsercheck.check_html(inline_app(web_files)) return ok cmd = _smoke_command(files) if cmd is not None: return ws.run_shell(cmd, timeout=90).ok return None def _build_result(agent: SmallCodeAgent, final: str, steps: list[Step], tier: Tier, start_name: str, escalations: int, verified: bool, specialty: str = "general") -> RouteResult: events = merge_step_metadata(agent.trace_collector.snapshot(), agent.raw_history()) return RouteResult( final=final, steps=steps, tier_name=tier.name, tier_model=tier.model, start_tier=start_name, escalations=escalations, verified=verified, specialty=specialty, files=agent.files(), trace_events=events, agent=agent, ) # Difficulty buckets the tier head predicts (matches route_clf.TIER_BUCKETS). Kept as # a local constant so router.py imports even when route_clf's deps (pydantic) are # absent. The bucket drives BOTH the thinking level and the start-tier clamp, so it's # decoupled from the ladder length — think stays meaningful even for a pinned 1-tier # preset. _THINK_BUCKETS = 3 class Router: def __init__( self, preset: Preset | None = None, max_steps: int = 12, approval_handler=None, workspace_dir: str | None = None, think: str = "off", yolo: bool = False, agent: str = "build", size_floor: str | None = None, ) -> None: self.preset = preset or load_preset() self.tiers: list[Tier] = self.preset.tiers self.max_steps = max_steps self.approval_handler = approval_handler self.workspace_dir = workspace_dir self.think = think self.yolo = yolo self.agent_name = agent # "Auto · " pins the START rung to this specialist size (e.g. "3b") while # the router still picks the specialty and escalation still climbs the ladder. self.size_floor = size_floor async def run(self, task: str) -> RouteResult: result: RouteResult | None = None async for frame in self.run_live(task): if frame.done and isinstance(frame.result, RouteResult): result = frame.result assert result is not None return result def _ladder_for(self, task: str, specialty: str | None = None) -> SpecialistLadder: """The size ladder for this task's specialty (generic if not a matrix preset). `specialty` may be supplied by the learned classifier; falls back to the regex classify_specialty when not given. """ if isinstance(self.preset, SpecialistPreset): if specialty is None: specialty = classify_specialty(task) return self.preset.ladder_for(specialty) return SpecialistLadder(specialty="general", tiers=self.preset.tiers) def _size_floor_index(self, tiers: list[Tier], size_floor: str) -> int: """Start-rung index for an 'Auto · ' pin: the first ladder tier whose size is >= the floor (closest available, then escalates). Falls back to 0.""" from .config import parse_size_b target = parse_size_b(size_floor if str(size_floor).lower().endswith("b") else f"{size_floor}b") if target <= 0: return 0 for i, t in enumerate(tiers): if parse_size_b(t.model) >= target: return i return max(len(tiers) - 1, 0) def _route(self, task: str) -> tuple[SpecialistLadder, int, str]: """Pick (ladder, start-tier index, thinking level) for a task. Uses the learned RouteClassifier when it's confident; otherwise the regex baseline. A difficulty bucket (decoupled from ladder length) drives both the start rung and the thinking level. `size_floor` (Auto · ) overrides the start rung; an explicit user `/think` (anything but the default "off") wins. """ clf = _route_classifier() has_clf = clf is not None and clf.available # 1. specialty -> size ladder if has_clf and isinstance(self.preset, SpecialistPreset): specialty = clf.pick_specialty(task, list(self.preset.ladders))[0] ladder = self._ladder_for(task, specialty=specialty) else: ladder = self._ladder_for(task) tiers = ladder.tiers # 2. difficulty bucket (0..TIER_BUCKETS-1) + escalation hint if has_clf: bucket = clf.pick_tier(task, _THINK_BUCKETS)[0] esc = clf.pick_escalate(task)[0] else: bucket = classify_tier(task, _THINK_BUCKETS) esc = False # 3. start rung: an explicit size floor wins; else the difficulty bucket if self.size_floor: start = self._size_floor_index(tiers, self.size_floor) else: start = min(bucket, max(len(tiers) - 1, 0)) # 4. thinking level: explicit /think wins; else router-derived (clf only) if self.think != "off": think = self.think elif has_clf: think = clf.think_for(bucket, _THINK_BUCKETS, esc) else: think = "off" return ladder, start, think async def run_live( self, task: str, *, rust_session=None, ) -> AsyncIterator[LiveFrame]: """Yield live frames while routing; final frame carries RouteResult.""" ladder, start, think = self._route(task) specialty = ladder.specialty tiers = ladder.tiers escalations = 0 last: RouteResult | None = None prev_tier_name: str | None = None for idx in range(start, len(tiers)): tier = tiers[idx] if prev_tier_name is not None: yield LiveFrame(events=[ TraceEvent(kind="tier_escalation", name=tier.name, detail=f"escalated from {prev_tier_name}"), ]) # The start tier reuses the caller's session; make it run the ROUTED model # (not whatever the UI last pinned), so "Auto" honors the router's pick and # a concrete pin (single-tier ladder) runs exactly that model. if idx == start and rust_session is not None: try: rust_session.set_model(tier.model) except Exception: pass agent = SmallCodeAgent( preset=self.preset, model=tier.model, max_steps=self.max_steps, approval_handler=self.approval_handler, workspace_dir=self.workspace_dir, agent=self.agent_name, yolo=self.yolo, rust_session=rust_session if idx == start else None, ) async for frame in agent.run_live_turn( task, think=think, yolo=self.yolo, ): if not frame.done: yield frame continue final, steps = frame.result ok = False if (agent.hit_max_steps or agent.errored) else _verify(agent) # _verify only proves the code RAN, not that it's correct. If it ran # clean (ok is True) but a bigger tier exists, ask a judge whether the # solution actually satisfies the task; a concrete "no" -> escalate. if ok is True and idx < len(tiers) - 1 and judge_enabled(): correct = await judge_correct( self.preset, tiers[idx + 1].model, task, agent.files(), final, ) if not correct: ok = False last = _build_result( agent, final, steps, tier, tiers[start].name, escalations, bool(ok), specialty=specialty, ) if ok is not False: yield LiveFrame( steps=steps, events=last.trace_events, files=last.files, done=True, result=last, ) return if idx < len(tiers) - 1: agent.trace_collector.record_escalation(tier.name, tiers[idx + 1].name) agent.cleanup() escalations += 1 prev_tier_name = tier.name break if last is not None: yield LiveFrame( steps=last.steps, events=last.trace_events, files=last.files, done=True, result=last, )