Spaces:
Paused
Paused
| """Tiered model router — the "forge-router" pattern. | |
| The point of smolcode: don't burn a 32B model on a one-line helper, and don't | |
| fail a hard task on a 3B. The router picks a *starting* tier from a cheap | |
| complexity heuristic, runs the agent, then **escalates on failure**: if the | |
| produced code doesn't actually pass when re-run, it retries the whole task on the | |
| next-bigger model. The tier that ultimately solved it is surfaced for the UI badge. | |
| Each tier is an independent SmallCodeAgent (its own model + fresh workspace), so | |
| every model in the ladder uses LiteForge's native tool-calling loop — no parsing | |
| hacks. All tiers are <=32B to stay hackathon-eligible. | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import re | |
| from collections.abc import AsyncIterator | |
| from dataclasses import dataclass, field | |
| from . import browsercheck | |
| from .agent import SmallCodeAgent, Step | |
| from .config import Preset, SpecialistLadder, SpecialistPreset, Tier, load_preset | |
| from .judge import judge_correct, judge_enabled | |
| from .live_run import LiveFrame | |
| from .preview import find_entry, inline_app | |
| from .trace_collector import TraceEvent | |
| from .ui_trace import merge_step_metadata | |
| # Signals that a task is non-trivial and worth starting higher up the ladder. | |
| # Leading \b + trailing \w* so stems match their word family | |
| # (recursi -> recursive, optimi -> optimize, concurren -> concurrency). | |
| _HARD_HINTS = re.compile( | |
| r"\b(class|async|thread|concurren|regex|pars|algorithm|optimi|recursi|" | |
| r"benchmark|refactor|multiple files|api|server|database|sql|decorator|" | |
| r"generator|data ?structure|graph|tree|dynamic programming)\w*", | |
| re.I, | |
| ) | |
| def _route_classifier(): | |
| """The learned routing classifier singleton, or None if unavailable. | |
| Importing route_clf pulls in pydantic (and lazily onnxruntime); any failure | |
| here just means we route with the regex baseline below. | |
| """ | |
| try: | |
| from .route_clf import get_classifier | |
| return get_classifier() | |
| except Exception: | |
| return None | |
| def classify_tier(task: str, n_tiers: int) -> int: | |
| """Pick a starting tier index (0 = smallest). Cheap, transparent heuristic.""" | |
| if n_tiers <= 1: | |
| return 0 | |
| score = 0 | |
| if len(task) > 280: | |
| score += 1 | |
| if len(_HARD_HINTS.findall(task)) >= 1: | |
| score += 1 | |
| if len(_HARD_HINTS.findall(task)) >= 3: | |
| score += 1 | |
| return min(score, n_tiers - 1) | |
| # --- specialty (language/function) classifier -------------------------------- | |
| # Picks the specialist *family* for a task; classify_tier then picks the size | |
| # within it. Same cheap, transparent, ordered-regex style as classify_tier. | |
| # Priority on ties (earlier wins); 'py' is last because it's the safe default. | |
| # `orchestrate` is first: explicit fan-out language is a strong, specific signal | |
| # that should win over an incidental language mention. | |
| _SPECIALTY_ORDER = ("orchestrate", "git", "terraform", "docker", "sql", "powershell", | |
| "bsd", "rust", "go", "cpp", "java", "dotnet", "csharp", "bash", | |
| "js", "py") | |
| _FENCE_LANG = re.compile(r"```([a-z0-9+#.]+)", re.I) | |
| _FENCE_TO_SPECIALTY = { | |
| "python": "py", "py": "py", "pytest": "py", | |
| "bash": "bash", "sh": "bash", "shell": "bash", "zsh": "bash", "console": "bash", | |
| "powershell": "powershell", "ps1": "powershell", "pwsh": "powershell", | |
| "sql": "sql", "psql": "sql", "sqlite": "sql", | |
| "javascript": "js", "js": "js", "ts": "js", "typescript": "js", | |
| "jsx": "js", "tsx": "js", "node": "js", | |
| "go": "go", "golang": "go", | |
| "rust": "rust", "rs": "rust", | |
| "cpp": "cpp", "c++": "cpp", "cc": "cpp", "c": "cpp", | |
| "java": "java", | |
| "csharp": "csharp", "cs": "csharp", | |
| "dockerfile": "docker", "docker": "docker", | |
| "hcl": "terraform", "terraform": "terraform", "tf": "terraform", | |
| } | |
| _EXT_RE = re.compile(r"\.(py|sh|bash|ps1|sql|js|mjs|cjs|ts|tsx|jsx|go|rs|cpp|cc|cxx|" | |
| r"hpp|java|cs|csproj|tf|dockerfile)\b", re.I) | |
| _EXT_TO_SPECIALTY = { | |
| "py": "py", "sh": "bash", "bash": "bash", "ps1": "powershell", "sql": "sql", | |
| "js": "js", "mjs": "js", "cjs": "js", "ts": "js", "tsx": "js", "jsx": "js", | |
| "go": "go", "rs": "rust", "cpp": "cpp", "cc": "cpp", "cxx": "cpp", "hpp": "cpp", | |
| "java": "java", "cs": "csharp", "csproj": "dotnet", "tf": "terraform", | |
| "dockerfile": "docker", | |
| } | |
| _SPECIALTY_HINTS = { | |
| # Fan-out / parallel delegation work -> the task_batch specialist. | |
| "orchestrate": re.compile(r"\b(in parallel|fan ?out|concurrently|task_batch|" | |
| r"orchestrat|several independent|multiple independent|" | |
| r"simultaneously|batch of (tasks|jobs))\w*", re.I), | |
| # NOTE: `staged` requires the trailing 'd' so it does NOT match "stage" inside | |
| # "multi-stage" (a docker term) — that false-positive misrouted Docker tasks. | |
| "git": re.compile(r"\b(git|commit|rebase|cherry-?pick|merge conflict|stash|" | |
| r"\bbranch\b|pull request|\bPR\b|revert|bisect|staged)\w*", re.I), | |
| "terraform": re.compile(r"\b(terraform|\bhcl\b|\.tf\b|provider|resource block|" | |
| r"infrastructure as code|\biac\b|tfstate)\w*", re.I), | |
| "docker": re.compile(r"\b(docker|dockerfile|docker-?compose|container image|" | |
| r"\bimage\b|\bbuild -t\b|entrypoint)\w*", re.I), | |
| "sql": re.compile(r"\b(sql|select |insert |update |delete |join|schema|" | |
| r"\btable\b|\bindex\b|migration|postgres|sqlite|mysql|query)\w*", re.I), | |
| "powershell": re.compile(r"\b(powershell|pwsh|\.ps1|cmdlet|get-|set-|write-output)\w*", re.I), | |
| "bsd": re.compile(r"\b(freebsd|openbsd|netbsd|\bbsd\b|pf\.conf|rc\.d|pkg_add)\w*", re.I), | |
| "rust": re.compile(r"\b(rust|cargo|crate|rustc|\.rs\b|borrow checker|tokio)\w*", re.I), | |
| "go": re.compile(r"\b(golang|\bgo\b|goroutine|go mod|go test|\.go\b)\w*", re.I), | |
| "cpp": re.compile(r"\b(c\+\+|cpp|g\+\+|clang|std::|cmake|\.cpp\b|template)\w*", re.I), | |
| "java": re.compile(r"\b(java|maven|gradle|\bjvm\b|junit|\.java\b)\w*", re.I), | |
| "dotnet": re.compile(r"\b(\.net|dotnet|nuget|asp\.net|\.csproj|msbuild)\w*", re.I), | |
| "csharp": re.compile(r"\b(c#|csharp|\blinq\b|\.cs\b|\bxunit\b)\w*", re.I), | |
| "bash": re.compile(r"\b(shell script|\bbash\b|\bzsh\b|chmod|grep|sed|awk|" | |
| r"\bpipe\b|cron|stdout|stderr|\$PATH)\w*", re.I), | |
| "js": re.compile(r"\b(javascript|typescript|node|npm|react|vue|jsx|tsx|" | |
| r"webpack|vite|eslint|package\.json)\w*", re.I), | |
| "py": re.compile(r"\b(python|pytest|pandas|numpy|django|flask|pip|venv|" | |
| r"def |async def|decorator)\w*", re.I), | |
| } | |
| def classify_specialty(task: str, *, default: str = "py") -> str: | |
| """Pick the specialist family key for a task. Cheap, transparent, deterministic. | |
| Precedence (most explicit signal first): SMALLCODE_SPECIALTY env override -> | |
| code-fence language tag -> file extensions mentioned -> keyword-cue scoring -> | |
| default. Mirrors classify_tier's style; pairs with it for 2D routing. | |
| """ | |
| forced = os.environ.get("SMALLCODE_SPECIALTY") | |
| if forced: | |
| return forced.strip().lower() | |
| # A fenced code block (```lang) is the single most explicit signal -> hard win. | |
| for lang in _FENCE_LANG.findall(task): | |
| s = _FENCE_TO_SPECIALTY.get(lang.lower()) | |
| if s: | |
| return s | |
| # Otherwise SCORE keyword cues AND file-extension mentions together, so a strong | |
| # action signal (e.g. "rebase ... merge conflict") beats an incidental ".py" | |
| # filename. Ties broken by _SPECIALTY_ORDER (earlier = higher priority). | |
| scores = {s: len(rx.findall(task)) for s, rx in _SPECIALTY_HINTS.items()} | |
| for e in _EXT_RE.findall(task): | |
| s = _EXT_TO_SPECIALTY.get(e.lower()) | |
| if s: | |
| scores[s] = scores.get(s, 0) + 1 | |
| best = max(scores, key=lambda s: (scores[s], -_SPECIALTY_ORDER.index(s))) | |
| if scores[best] > 0: | |
| return best | |
| return default | |
| class RouteResult: | |
| final: str | |
| steps: list[Step] | |
| tier_name: str | |
| tier_model: str | |
| start_tier: str | |
| escalations: int | |
| verified: bool | |
| specialty: str = "general" | |
| files: dict[str, str] = field(default_factory=dict) | |
| trace_events: list[TraceEvent] = field(default_factory=list) | |
| agent: SmallCodeAgent | None = None | |
| def _smoke_command(files: list[str]) -> str | None: | |
| """A best-effort 'does it build/run (and pass any tests)?' shell command for a | |
| NON-Python solution, or None if the language isn't recognized. Mirrors the | |
| per-specialty run commands (finetune/specialties.py) so the router can escalate | |
| on go/rust/js/sql/… exactly like it does on Python via run_python.""" | |
| def ext(e: str) -> list[str]: | |
| return [f for f in files if f.endswith(e)] | |
| if ext(".go"): | |
| if any(f.endswith("_test.go") for f in files): | |
| return "go test ./... 2>&1" | |
| return "go run . 2>&1 || go run *.go 2>&1" | |
| if "Cargo.toml" in files: | |
| return "cargo test -q 2>&1 || cargo build -q 2>&1" | |
| if ext(".rs"): | |
| return f"rustc {ext('.rs')[0]} -o /tmp/_smv 2>&1 && /tmp/_smv" | |
| js = ext(".js") + ext(".mjs") + ext(".cjs") + ext(".ts") | |
| if "package.json" in files: | |
| return "npm test --silent 2>&1 || node --test 2>&1" | |
| if js: | |
| if any(".test." in f or ".spec." in f for f in js): | |
| return "node --test 2>&1" | |
| entry = next((f for f in js if f in ("index.js", "main.js")), js[0]) | |
| return f"node {entry} 2>&1" | |
| if ext(".sql"): | |
| return f"sqlite3 :memory: < {ext('.sql')[0]} 2>&1" | |
| if ext(".cpp") or ext(".cc"): | |
| srcs = " ".join(ext(".cpp") + ext(".cc")) | |
| return f"g++ -std=c++17 {srcs} -o /tmp/_smv 2>&1 && /tmp/_smv" | |
| if ext(".java"): | |
| main = "Main" if "Main.java" in files else ext(".java")[0][:-5] | |
| return f"javac *.java 2>&1 && java {main} 2>&1" | |
| if ext(".sh"): | |
| return f"bash {ext('.sh')[0]} 2>&1" | |
| if ext(".tf"): | |
| return "terraform init -backend=false 2>&1 && terraform validate 2>&1" | |
| if "Program.cs" in files or ext(".cs"): | |
| return "dotnet run 2>&1" | |
| return None | |
| def _verify(agent: SmallCodeAgent) -> bool | None: | |
| """Independently check the agent's output actually works. | |
| Returns True/False if there's something runnable to check, else None | |
| (unverifiable — don't escalate purely on a missing signal). Python uses the | |
| pytest/run_python fast paths; other languages smoke-run via run_shell so the | |
| specialist router escalates on a broken go/rust/sql/… solution instead of | |
| silently accepting the smallest tier. | |
| """ | |
| ws = agent.workspace | |
| files = ws.list_files() | |
| pys = [f for f in files if f.endswith(".py")] | |
| if pys: | |
| if any("test" in f.lower() for f in pys): | |
| return ws.run_tests().ok | |
| entry = next((f for f in pys if f in ("main.py", "solution.py")), None) or pys[0] | |
| return ws.run_python(path=entry).ok | |
| # Web app (index.html + browser JS): render it in a real browser — must come | |
| # BEFORE the shell smoke-run so we don't `node` browser-side JS. Same signal | |
| # smolbuilder's WebBuilder uses (engine/builder._evaluate). | |
| web_files = agent.files() | |
| if find_entry(web_files) is not None: | |
| ok, _errors = browsercheck.check_html(inline_app(web_files)) | |
| return ok | |
| cmd = _smoke_command(files) | |
| if cmd is not None: | |
| return ws.run_shell(cmd, timeout=90).ok | |
| return None | |
| def _build_result(agent: SmallCodeAgent, final: str, steps: list[Step], tier: Tier, | |
| start_name: str, escalations: int, verified: bool, | |
| specialty: str = "general") -> RouteResult: | |
| events = merge_step_metadata(agent.trace_collector.snapshot(), agent.raw_history()) | |
| return RouteResult( | |
| final=final, steps=steps, tier_name=tier.name, tier_model=tier.model, | |
| start_tier=start_name, escalations=escalations, verified=verified, | |
| specialty=specialty, files=agent.files(), trace_events=events, agent=agent, | |
| ) | |
| # Difficulty buckets the tier head predicts (matches route_clf.TIER_BUCKETS). Kept as | |
| # a local constant so router.py imports even when route_clf's deps (pydantic) are | |
| # absent. The bucket drives BOTH the thinking level and the start-tier clamp, so it's | |
| # decoupled from the ladder length — think stays meaningful even for a pinned 1-tier | |
| # preset. | |
| _THINK_BUCKETS = 3 | |
| class Router: | |
| def __init__( | |
| self, | |
| preset: Preset | None = None, | |
| max_steps: int = 12, | |
| approval_handler=None, | |
| workspace_dir: str | None = None, | |
| think: str = "off", | |
| yolo: bool = False, | |
| agent: str = "build", | |
| size_floor: str | None = None, | |
| ) -> None: | |
| self.preset = preset or load_preset() | |
| self.tiers: list[Tier] = self.preset.tiers | |
| self.max_steps = max_steps | |
| self.approval_handler = approval_handler | |
| self.workspace_dir = workspace_dir | |
| self.think = think | |
| self.yolo = yolo | |
| self.agent_name = agent | |
| # "Auto · <size>" pins the START rung to this specialist size (e.g. "3b") while | |
| # the router still picks the specialty and escalation still climbs the ladder. | |
| self.size_floor = size_floor | |
| async def run(self, task: str) -> RouteResult: | |
| result: RouteResult | None = None | |
| async for frame in self.run_live(task): | |
| if frame.done and isinstance(frame.result, RouteResult): | |
| result = frame.result | |
| assert result is not None | |
| return result | |
| def _ladder_for(self, task: str, specialty: str | None = None) -> SpecialistLadder: | |
| """The size ladder for this task's specialty (generic if not a matrix preset). | |
| `specialty` may be supplied by the learned classifier; falls back to the | |
| regex classify_specialty when not given. | |
| """ | |
| if isinstance(self.preset, SpecialistPreset): | |
| if specialty is None: | |
| specialty = classify_specialty(task) | |
| return self.preset.ladder_for(specialty) | |
| return SpecialistLadder(specialty="general", tiers=self.preset.tiers) | |
| def _size_floor_index(self, tiers: list[Tier], size_floor: str) -> int: | |
| """Start-rung index for an 'Auto · <size>' pin: the first ladder tier whose | |
| size is >= the floor (closest available, then escalates). Falls back to 0.""" | |
| from .config import parse_size_b | |
| target = parse_size_b(size_floor if str(size_floor).lower().endswith("b") | |
| else f"{size_floor}b") | |
| if target <= 0: | |
| return 0 | |
| for i, t in enumerate(tiers): | |
| if parse_size_b(t.model) >= target: | |
| return i | |
| return max(len(tiers) - 1, 0) | |
| def _route(self, task: str) -> tuple[SpecialistLadder, int, str]: | |
| """Pick (ladder, start-tier index, thinking level) for a task. | |
| Uses the learned RouteClassifier when it's confident; otherwise the regex | |
| baseline. A difficulty bucket (decoupled from ladder length) drives both the | |
| start rung and the thinking level. `size_floor` (Auto · <size>) overrides the | |
| start rung; an explicit user `/think` (anything but the default "off") wins. | |
| """ | |
| clf = _route_classifier() | |
| has_clf = clf is not None and clf.available | |
| # 1. specialty -> size ladder | |
| if has_clf and isinstance(self.preset, SpecialistPreset): | |
| specialty = clf.pick_specialty(task, list(self.preset.ladders))[0] | |
| ladder = self._ladder_for(task, specialty=specialty) | |
| else: | |
| ladder = self._ladder_for(task) | |
| tiers = ladder.tiers | |
| # 2. difficulty bucket (0..TIER_BUCKETS-1) + escalation hint | |
| if has_clf: | |
| bucket = clf.pick_tier(task, _THINK_BUCKETS)[0] | |
| esc = clf.pick_escalate(task)[0] | |
| else: | |
| bucket = classify_tier(task, _THINK_BUCKETS) | |
| esc = False | |
| # 3. start rung: an explicit size floor wins; else the difficulty bucket | |
| if self.size_floor: | |
| start = self._size_floor_index(tiers, self.size_floor) | |
| else: | |
| start = min(bucket, max(len(tiers) - 1, 0)) | |
| # 4. thinking level: explicit /think wins; else router-derived (clf only) | |
| if self.think != "off": | |
| think = self.think | |
| elif has_clf: | |
| think = clf.think_for(bucket, _THINK_BUCKETS, esc) | |
| else: | |
| think = "off" | |
| return ladder, start, think | |
| async def run_live( | |
| self, | |
| task: str, | |
| *, | |
| rust_session=None, | |
| ) -> AsyncIterator[LiveFrame]: | |
| """Yield live frames while routing; final frame carries RouteResult.""" | |
| ladder, start, think = self._route(task) | |
| specialty = ladder.specialty | |
| tiers = ladder.tiers | |
| escalations = 0 | |
| last: RouteResult | None = None | |
| prev_tier_name: str | None = None | |
| for idx in range(start, len(tiers)): | |
| tier = tiers[idx] | |
| if prev_tier_name is not None: | |
| yield LiveFrame(events=[ | |
| TraceEvent(kind="tier_escalation", name=tier.name, | |
| detail=f"escalated from {prev_tier_name}"), | |
| ]) | |
| # The start tier reuses the caller's session; make it run the ROUTED model | |
| # (not whatever the UI last pinned), so "Auto" honors the router's pick and | |
| # a concrete pin (single-tier ladder) runs exactly that model. | |
| if idx == start and rust_session is not None: | |
| try: | |
| rust_session.set_model(tier.model) | |
| except Exception: | |
| pass | |
| agent = SmallCodeAgent( | |
| preset=self.preset, | |
| model=tier.model, | |
| max_steps=self.max_steps, | |
| approval_handler=self.approval_handler, | |
| workspace_dir=self.workspace_dir, | |
| agent=self.agent_name, | |
| yolo=self.yolo, | |
| rust_session=rust_session if idx == start else None, | |
| ) | |
| async for frame in agent.run_live_turn( | |
| task, think=think, yolo=self.yolo, | |
| ): | |
| if not frame.done: | |
| yield frame | |
| continue | |
| final, steps = frame.result | |
| ok = False if (agent.hit_max_steps or agent.errored) else _verify(agent) | |
| # _verify only proves the code RAN, not that it's correct. If it ran | |
| # clean (ok is True) but a bigger tier exists, ask a judge whether the | |
| # solution actually satisfies the task; a concrete "no" -> escalate. | |
| if ok is True and idx < len(tiers) - 1 and judge_enabled(): | |
| correct = await judge_correct( | |
| self.preset, tiers[idx + 1].model, task, agent.files(), final, | |
| ) | |
| if not correct: | |
| ok = False | |
| last = _build_result( | |
| agent, final, steps, tier, tiers[start].name, | |
| escalations, bool(ok), specialty=specialty, | |
| ) | |
| if ok is not False: | |
| yield LiveFrame( | |
| steps=steps, | |
| events=last.trace_events, | |
| files=last.files, | |
| done=True, | |
| result=last, | |
| ) | |
| return | |
| if idx < len(tiers) - 1: | |
| agent.trace_collector.record_escalation(tier.name, tiers[idx + 1].name) | |
| agent.cleanup() | |
| escalations += 1 | |
| prev_tier_name = tier.name | |
| break | |
| if last is not None: | |
| yield LiveFrame( | |
| steps=last.steps, | |
| events=last.trace_events, | |
| files=last.files, | |
| done=True, | |
| result=last, | |
| ) | |