Spaces:
Running on Zero
Running on Zero
| # tools.py | |
| # PBH Applied Systems — Deterministic tool functions for the ReAct agent loop. | |
| # All functions are pure Python with no secondary LLM calls. | |
| # Each returns a plain string — the agent's OBSERVATION after an ACTION. | |
| from eval_data import MODELS, DIMENSION_DESCRIPTIONS, FAMILY_DESCRIPTIONS, pair_is_feasible | |
| def get_model_scores(model_key: str) -> str: | |
| """Return quant_eval v7.21 dimension scores for a model.""" | |
| if model_key not in MODELS: | |
| return f"Unknown model key '{model_key}'. Available: {', '.join(MODELS.keys())}" | |
| m = MODELS[model_key] | |
| lines = [ | |
| f"=== quant_eval v7.21 scores: {m['display_name']} ===", | |
| f"Run ID: {m['run_id']}", | |
| "", | |
| ] | |
| scores = m["scores"] | |
| if all(v is None for v in scores.values()): | |
| lines.append( | |
| "Aggregate dimension scores not available for this model.\n" | |
| "This model was evaluated with a single runner (Q4_K_M only) because\n" | |
| "the F16 GGUF exceeds RTX 4090 VRAM. Per-family pass rates are\n" | |
| "published on the model card." | |
| ) | |
| else: | |
| dim_labels = { | |
| "task_completion": "Task Completion", | |
| "reasoning": "Reasoning", | |
| "coherence": "Coherence", | |
| "instruction_following": "Instruction Following", | |
| } | |
| for dim, label in dim_labels.items(): | |
| val = scores[dim] | |
| lines.append(f" {label}: {val:.4f}") | |
| lines.append(f"\n Avg Inference: {m['avg_inference_sec']} sec/case") | |
| lines.append(f" Context Window: {m['context_window']:,} tokens") | |
| lines.append(f" VRAM: ~{m['vram_gb']} GB") | |
| if m.get("thinking_mode"): | |
| lines.append( | |
| "\n ⚠️ Thinking Mode: This model uses hybrid adaptive thinking. " | |
| "Strip <think> blocks before structured output extraction, " | |
| "or use /no_think in user message." | |
| ) | |
| if m["known_issues"]: | |
| lines.append("\nKnown Issues:") | |
| for issue in m["known_issues"]: | |
| lines.append(f" ⚠️ {issue}") | |
| if m["series_notes"]: | |
| lines.append(f"\nSeries Notes: {m['series_notes']}") | |
| return "\n".join(lines) | |
| def compare_models(model_key_a: str, model_key_b: str) -> str: | |
| """Compare two models across all quant_eval dimensions with delta analysis.""" | |
| for key in (model_key_a, model_key_b): | |
| if key not in MODELS: | |
| return f"Unknown model key '{key}'. Available: {', '.join(MODELS.keys())}" | |
| a, b = MODELS[model_key_a], MODELS[model_key_b] | |
| lines = [ | |
| f"=== Model Comparison: {a['short_name']} vs {b['short_name']} ===", | |
| "", | |
| ] | |
| dims = ["task_completion", "reasoning", "coherence", "instruction_following"] | |
| dim_labels = { | |
| "task_completion": "Task Completion", | |
| "reasoning": "Reasoning", | |
| "coherence": "Coherence", | |
| "instruction_following": "Instr. Following", | |
| } | |
| sa, sb = a["scores"], b["scores"] | |
| has_scores_a = any(v is not None for v in sa.values()) | |
| has_scores_b = any(v is not None for v in sb.values()) | |
| if has_scores_a and has_scores_b: | |
| lines.append( | |
| f" {'Dimension':<22} {'Left':>8} {'Right':>8} {'Delta':>8} {'Winner':>16}" | |
| ) | |
| lines.append(" " + "-" * 64) | |
| for dim in dims: | |
| va, vb = sa[dim], sb[dim] | |
| if va is None or vb is None: | |
| continue | |
| delta = vb - va | |
| if abs(delta) < 0.005: | |
| winner = "Tie" | |
| elif delta > 0: | |
| winner = b["short_name"] | |
| else: | |
| winner = a["short_name"] | |
| lines.append( | |
| f" {dim_labels[dim]:<22} {va:>8.4f} {vb:>8.4f} {delta:>+8.4f} {winner:>16}" | |
| ) | |
| else: | |
| if not has_scores_a: | |
| lines.append( | |
| f" {a['short_name']}: aggregate scores not available " | |
| f"(single-runner evaluation — see model card for per-family pass rates)." | |
| ) | |
| if not has_scores_b: | |
| lines.append( | |
| f" {b['short_name']}: aggregate scores not available " | |
| f"(single-runner evaluation — see model card for per-family pass rates)." | |
| ) | |
| lines.append("") | |
| ta, tb = a["avg_inference_sec"], b["avg_inference_sec"] | |
| if ta and tb: | |
| faster = a["short_name"] if ta < tb else b["short_name"] | |
| lines.append(f" Inference: {ta:.3f}s vs {tb:.3f}s — {faster} is faster") | |
| lines.append(f" Context: {a['context_window']:,} vs {b['context_window']:,} tokens") | |
| feasible, reason = pair_is_feasible(model_key_a, model_key_b) | |
| lines.append(f"\n Side-by-side pairing: {'✅ Feasible' if feasible else '❌ Not feasible'}") | |
| lines.append(f" {reason}") | |
| for key, m in ((model_key_a, a), (model_key_b, b)): | |
| if m["known_issues"]: | |
| lines.append(f"\n {m['short_name']} known issues:") | |
| for issue in m["known_issues"][:2]: | |
| lines.append(f" ⚠️ {issue[:100]}{'...' if len(issue) > 100 else ''}") | |
| return "\n".join(lines) | |
| def get_fixture_example(family: str) -> str: | |
| """Return what a quant_eval fixture family tests and what pass/fail looks like.""" | |
| if family not in FAMILY_DESCRIPTIONS: | |
| return ( | |
| f"Unknown family '{family}'. " | |
| f"Available: {', '.join(FAMILY_DESCRIPTIONS.keys())}" | |
| ) | |
| lines = [ | |
| f"=== quant_eval Fixture Family: {family} ===", | |
| "", | |
| FAMILY_DESCRIPTIONS[family], | |
| "", | |
| ] | |
| per_family_series_data = { | |
| "json_multistep": ( | |
| "Series pass rates (Q4_K_M):\n" | |
| " Qwen2.5-3B: 0.200 — checks_consistent_ok fails except ms_easy_01\n" | |
| " Qwen2.5-7B: 0.800 — ms_easy_02 fails only\n" | |
| " Qwen2.5-14B-1M:0.800 — ms_easy_02 fails only\n" | |
| " Qwen2.5-32B: 0.600 — ms_easy_02 + ms_hard_01 fail\n" | |
| " Qwen3.6-27B: 0.400 — easy cases pass; medium/hard fail due to think-block\n" | |
| " Ministral-14B: see model card\n" | |
| " Mistral-Nemo: ms_hard_01 fails all four signals\n\n" | |
| "This is the hardest fixture family. All four signals must pass simultaneously:\n" | |
| "schema_ok, checks_consistent_ok, stop_semantics_ok, oracle_equiv_ok." | |
| ), | |
| "toolcall_only": ( | |
| "The strictest format test in the series. Model must emit bare JSON only.\n" | |
| "No prose, no wrapper text, no explanation.\n\n" | |
| "Schema progression across the Qwen family (Q4_K_M):\n" | |
| " Qwen2.5-3B: {\"tool\": \"add\", \"operands\": [5, 10]} ❌\n" | |
| " Qwen2.5-7B: {\"tool\": \"add\", \"numbers\": [5, 10]} ❌\n" | |
| " Qwen2.5-14B-1M:{\"tool\": \"add\", \"input\": {\"x\": 5, \"y\": 10}} ❌\n" | |
| " Qwen2.5-32B: {\"tool\": \"add\", \"params\": {\"a\": 5, \"b\": 10}} ❌ (closest)\n" | |
| " Qwen3.6-27B: {\"tool_name\": \"add\", \"arguments\": {\"a\":5,\"b\":10}} ❌ (nearest)\n" | |
| " Ministral-14B-Instruct: F16=1.000 → Q4_K_M=0.000 (complete degradation)" | |
| ), | |
| "stateful_followup": ( | |
| "Two-turn state tracking. Turn 2 only evaluated given correct Turn 1.\n\n" | |
| "Every model in the evaluated series passes at 1.000 on this family.\n" | |
| "This is the most consistent family across the entire series." | |
| ), | |
| } | |
| if family in per_family_series_data: | |
| lines.append(per_family_series_data[family]) | |
| return "\n".join(lines) | |
| def recommend_model(use_case: str) -> str: | |
| """Rules-based model recommendation using confirmed quant_eval scores.""" | |
| use_case_lower = use_case.lower() | |
| # Long-context: direct recommendation, no scoring needed | |
| if any(kw in use_case_lower for kw in | |
| ["document", "long", "1m", "million", "large context", "extract", "summarize"]): | |
| return ( | |
| "Use case requires long-context handling.\n\n" | |
| "Recommendation: Qwen2.5-14B-1M Q4_K_M\n" | |
| " 1,000,000-token context window — 30x larger than any other model in series.\n" | |
| " #1 reasoning (0.9907) and #1 instruction-following (0.9902) in the series.\n" | |
| " Zero quantization degradation — F16 and Q4_K_M produce identical pass rates.\n" | |
| " 8.99 GB, ~12 GB VRAM.\n\n" | |
| "For deployment: set n_ctx to your actual document token count.\n" | |
| "Full 1M context requires ~80 GB VRAM — pair with n_ctx=32768 for most use cases." | |
| ) | |
| # Speed: rank by confirmed inference time | |
| if any(kw in use_case_lower for kw in | |
| ["fast", "speed", "latency", "real-time", "quick", "low latency"]): | |
| speed_ranked = sorted( | |
| [(k, m) for k, m in MODELS.items() if m["avg_inference_sec"] is not None], | |
| key=lambda x: x[1]["avg_inference_sec"] | |
| ) | |
| lines = ["Speed-ranked models (confirmed avg inference time, Q4_K_M):\n"] | |
| for key, m in speed_ranked: | |
| solo = " [solo only]" if m["solo_only"] else "" | |
| lines.append(f" {m['short_name']:<22} {m['avg_inference_sec']:.3f} sec/case{solo}") | |
| lines.append( | |
| f"\nFastest: {speed_ranked[0][1]['short_name']} " | |
| f"at {speed_ranked[0][1]['avg_inference_sec']:.3f} sec/case" | |
| ) | |
| return "\n".join(lines) | |
| # Reasoning: rank by reasoning score (scored models only) | |
| if any(kw in use_case_lower for kw in | |
| ["reason", "plan", "analyz", "think", "logic", "chain", "multi-step"]): | |
| scored = [ | |
| (k, m) for k, m in MODELS.items() | |
| if m["scores"]["reasoning"] is not None and not m["solo_only"] | |
| ] | |
| scored.sort(key=lambda x: x[1]["scores"]["reasoning"], reverse=True) | |
| lines = ["Ranked by Reasoning score (models with aggregate scores):\n"] | |
| for key, m in scored: | |
| lines.append( | |
| f" {m['short_name']:<22} {m['scores']['reasoning']:.4f}" | |
| ) | |
| top = scored[0] | |
| lines.append(f"\nTop recommendation: {top[1]['display_name']}") | |
| if top[1]["known_issues"]: | |
| lines.append(f"Note: {top[1]['known_issues'][0][:120]}") | |
| return "\n".join(lines) | |
| # Tool calling / structured output | |
| if any(kw in use_case_lower for kw in | |
| ["tool", "api", "json", "schema", "struct", "dispatch", "function"]): | |
| scored = [ | |
| (k, m) for k, m in MODELS.items() | |
| if m["scores"]["instruction_following"] is not None and not m["solo_only"] | |
| ] | |
| scored.sort(key=lambda x: x[1]["scores"]["instruction_following"], reverse=True) | |
| lines = ["Ranked by Instruction Following score for tool/structured output use cases:\n"] | |
| for key, m in scored: | |
| lines.append( | |
| f" {m['short_name']:<22} {m['scores']['instruction_following']:.4f}" | |
| ) | |
| lines.append( | |
| "\nNote on toolcall_only: every model in the series fails args_ok without " | |
| "explicit key-name enforcement in the system prompt. Qwen3.6-27B produces " | |
| "the correct 'tool_name' outer key without enforcement — only 'args' vs " | |
| "'arguments' remains. Always enforce exact key names in production." | |
| ) | |
| return "\n".join(lines) | |
| # General: weighted average of all four dimensions | |
| weights = { | |
| "task_completion": 1.0, | |
| "reasoning": 1.0, | |
| "coherence": 1.0, | |
| "instruction_following": 1.0, | |
| } | |
| scored = [] | |
| for key, m in MODELS.items(): | |
| if any(v is None for v in m["scores"].values()): | |
| continue | |
| if m["solo_only"]: | |
| continue | |
| ws = sum(m["scores"][d] * weights[d] for d in weights) | |
| scored.append((key, m, ws)) | |
| scored.sort(key=lambda x: x[2], reverse=True) | |
| lines = [ | |
| f"Recommendation for: '{use_case}'\n", | |
| f" {'Model':<22} {'TC':>7} {'Reason':>7} {'Coh':>7} {'IF':>7}", | |
| " " + "-" * 56, | |
| ] | |
| for key, m, ws in scored[:5]: | |
| s = m["scores"] | |
| lines.append( | |
| f" {m['short_name']:<22} " | |
| f"{s['task_completion']:>7.4f} {s['reasoning']:>7.4f} " | |
| f"{s['coherence']:>7.4f} {s['instruction_following']:>7.4f}" | |
| ) | |
| if scored: | |
| top = scored[0] | |
| lines.append(f"\nTop recommendation: {top[1]['display_name']}") | |
| # Always note the two unscored models | |
| lines.append( | |
| "\nNote: Qwen2.5-32B and Qwen3.6-27B have per-family pass rates only " | |
| "(single-runner evaluations). See model cards for full data." | |
| ) | |
| return "\n".join(lines) | |
| # --------------------------------------------------------------------------- | |
| # Tool registry | |
| # --------------------------------------------------------------------------- | |
| TOOL_REGISTRY = { | |
| "get_model_scores": { | |
| "fn": get_model_scores, | |
| "description": "Get quant_eval v7.21 scores for a specific model.", | |
| "args": "model_key — one of: " + ", ".join(MODELS.keys()), | |
| "example": "get_model_scores(qwen2.5-7b)", | |
| }, | |
| "compare_models": { | |
| "fn": compare_models, | |
| "description": "Compare two models across all quant_eval dimensions.", | |
| "args": "model_key_a, model_key_b", | |
| "example": "compare_models(qwen2.5-7b, qwen2.5-14b-1m)", | |
| }, | |
| "get_fixture_example": { | |
| "fn": get_fixture_example, | |
| "description": "Get a description of what a quant_eval fixture family tests.", | |
| "args": "family — one of: " + ", ".join(FAMILY_DESCRIPTIONS.keys()), | |
| "example": "get_fixture_example(toolcall_only)", | |
| }, | |
| "recommend_model": { | |
| "fn": recommend_model, | |
| "description": "Get a data-driven model recommendation for a use case.", | |
| "args": "use_case (str) — describe your intended deployment scenario", | |
| "example": "recommend_model(multi-step reasoning pipeline for structured data extraction)", | |
| }, | |
| } | |
| def dispatch_tool(tool_name: str, args_str: str) -> str: | |
| if tool_name not in TOOL_REGISTRY: | |
| available = ", ".join(TOOL_REGISTRY.keys()) | |
| return ( | |
| f"Tool '{tool_name}' does not exist. Available tools: {available}. " | |
| f"You must use one of these tools, or if the query is outside your domain " | |
| f"output: FINAL ANSWER: This agent is specialized for quant_eval model evaluation. " | |
| f"For general coding assistance visit pbhappliedsystems.com/assistant.html" | |
| ) | |
| fn = TOOL_REGISTRY[tool_name]["fn"] | |
| raw_args = [a.strip() for a in args_str.split(",") if a.strip()] | |
| try: | |
| if tool_name == "get_model_scores": | |
| return fn(raw_args[0]) if raw_args else "Error: model_key required." | |
| elif tool_name == "compare_models": | |
| if len(raw_args) < 2: | |
| return "Error: compare_models requires two model keys." | |
| return fn(raw_args[0], raw_args[1]) | |
| elif tool_name == "get_fixture_example": | |
| return fn(raw_args[0]) if raw_args else "Error: family required." | |
| elif tool_name == "recommend_model": | |
| use_case = ", ".join(raw_args) if raw_args else args_str | |
| return fn(use_case) | |
| else: | |
| return f"Dispatch not implemented for '{tool_name}'." | |
| except Exception as e: | |
| return f"Tool error: {type(e).__name__}: {e}" | |
| def build_tool_prompt_section() -> str: | |
| lines = ["Available tools (call exactly one per ACTION step):"] | |
| for name, meta in TOOL_REGISTRY.items(): | |
| lines.append(f"\n Tool: {name}") | |
| lines.append(f" Description: {meta['description']}") | |
| lines.append(f" Args: {meta['args']}") | |
| lines.append(f" Example: {meta['example']}") | |
| return "\n".join(lines) | |