workbench / HF_PRD_ext.md
GitHub Actions
Initial ZeroGPU deployment with spaces shim
7f9dfed
|
Raw
History Blame Contribute Delete
55.7 kB

A newer version of the Gradio SDK is available: 6.19.0

Upgrade

OpenBMB Workbench β€” Planned Extensions: Detailed Spec

2026-06-05 | Companion to PRD v2.0


Table of Contents

  1. vLLM Serving Tab
  2. Ollama Quick-Start
  3. Reward Model Eval
  4. Synthetic Data Gen
  5. Paper-to-Code Agent
  6. HF Spaces Deploy
  7. VINDEX Integration
  8. OCR Pipeline Hook
  9. MiniCPM Desk-Pet
  10. MiniCPM-o Audio Tab
  11. Cross-Extension Wiring

1. vLLM Serving Tab

What it is

vLLM is a production-grade inference engine built around PagedAttention β€” a KV-cache management algorithm that treats GPU memory like virtual memory pages. The result is dramatically higher throughput when multiple requests run concurrently, compared to naive Transformers inference.

In the workbench context, vLLM adds a fourth inference mode alongside llama.cpp, SGLang, and Ollama. You use it when you want OpenAI-compatible HTTP endpoints, continuous batching, or when benchmarking production serving latency.

Why it matters

Scenario Benefit
Benchmarking fine-tuned LoRA Compare throughput before/after fine-tune
Multi-user demo Queue and batch concurrent requests
Production deployment OpenAI-compatible API, drop-in for existing tooling
MiniCPM4.1-8B long context PagedAttention shines on 128K context β€” avoids OOM

Architecture

models/vllm_runner.py
  VLLMRunner
    .start(model_id, cfg)      β†’ subprocess: vllm serve ...
    .stop()                    β†’ terminate subprocess
    .chat(messages) β†’ str      β†’ POST /v1/chat/completions
    .batch(prompts) β†’ list[str]β†’ concurrent POST via asyncio
    .stats() β†’ dict            β†’ GET /metrics (Prometheus)
# models/vllm_runner.py
import subprocess, asyncio, requests
from openai import AsyncOpenAI

class VLLMRunner:
    def __init__(self, cfg: dict):
        self.model_id   = cfg["hf_id"]
        self.port       = cfg.get("port", 8000)
        self.gpu_memory = cfg.get("gpu_memory_utilization", 0.85)
        self.trust_rc   = cfg.get("trust_remote_code", False)
        self._proc      = None
        self._client    = AsyncOpenAI(
            base_url=f"http://localhost:{self.port}/v1",
            api_key="vllm-local"
        )

    def start(self):
        cmd = [
            "vllm", "serve", self.model_id,
            "--port",                    str(self.port),
            "--gpu-memory-utilization",  str(self.gpu_memory),
        ]
        if self.trust_rc:
            cmd.append("--trust-remote-code")
        self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        self._wait_ready()

    def _wait_ready(self, timeout: int = 60):
        import time
        for _ in range(timeout):
            try:
                r = requests.get(f"http://localhost:{self.port}/health")
                if r.status_code == 200:
                    return
            except Exception:
                pass
            time.sleep(1)
        raise RuntimeError("vLLM server did not become healthy")

    async def chat(self, messages: list[dict], **kwargs) -> str:
        resp = await self._client.chat.completions.create(
            model=self.model_id,
            messages=messages,
            max_tokens=kwargs.get("max_tokens", 512),
        )
        return resp.choices[0].message.content

    async def batch(self, prompts: list[str], system: str = "") -> list[str]:
        tasks = [
            self.chat([
                {"role": "system",  "content": system},
                {"role": "user",    "content": p},
            ])
            for p in prompts
        ]
        return await asyncio.gather(*tasks)

    def stats(self) -> dict:
        """Prometheus metrics parsed to dict. Returns throughput + latency."""
        r = requests.get(f"http://localhost:{self.port}/metrics")
        lines = r.text.splitlines()
        metrics = {}
        for line in lines:
            if line.startswith("vllm:") and not line.startswith("#"):
                key, val = line.rsplit(" ", 1)
                metrics[key] = float(val)
        return metrics

    def stop(self):
        if self._proc:
            self._proc.terminate()
            self._proc = None

UI tab (models_tab.py β†’ extend)

Add a "vLLM" section to the models tab:

with gr.Tab("⚑ vLLM"):
    model_dd      = gr.Dropdown(label="Model", choices=list_vllm_compatible())
    gpu_mem_sl    = gr.Slider(0.5, 0.95, value=0.85, label="GPU memory utilization")
    start_btn     = gr.Button("Start server")
    stop_btn      = gr.Button("Stop server")
    status_lbl    = gr.Label(label="Status")
    metrics_json  = gr.JSON(label="Live metrics")

    def start_vllm(model_id, gpu_mem):
        cfg = {**model_registry.get(model_id).cfg,
               "gpu_memory_utilization": gpu_mem}
        runner = VLLMRunner(cfg)
        runner.start()
        model_registry.register(f"{model_id}_vllm", runner)
        return "Running", runner.stats()

    start_btn.click(start_vllm, [model_dd, gpu_mem_sl], [status_lbl, metrics_json])

Trackio events fired

trackio.init(project="workbench", run_name="vllm_benchmark")
trackio.log({"throughput_tok_per_s": ..., "p50_latency_ms": ..., "gpu_mem_used": ...})
trackio.finish()

2. Ollama Quick-Start

What it is

Ollama is zero-configuration local model serving. One ollama pull downloads a quantized model and one ollama serve runs it. No CUDA setup, no Python environment issues. The REST API is OpenAI-compatible on port 11434.

MiniCPM-V-4.6 and MiniCPM5-1B are both in the Ollama registry:

ollama pull openbmb/minicpm-v4.6
ollama pull openbmb/minicpm5-1b

Why it matters

Ollama is the fastest path from "nothing" to "running model" β€” ideal for demos, non-GPU machines (Apple Silicon is well-optimized), and users who shouldn't need to understand quantization.

Architecture

models/ollama_runner.py
  OllamaRunner
    .pull(model_id)            β†’ subprocess: ollama pull ...
    .chat(messages) β†’ str      β†’ POST http://localhost:11434/api/chat
    .generate(prompt) β†’ str    β†’ POST http://localhost:11434/api/generate (streaming)
    .list() β†’ list[str]        β†’ GET /api/tags
# models/ollama_runner.py
import requests, subprocess, json
from typing import Generator

class OllamaRunner:
    BASE = "http://localhost:11434"

    def __init__(self, model_id: str):
        # Ollama uses "openbmb/minicpm-v4.6" style IDs directly
        self.model_id = model_id

    @staticmethod
    def pull(model_id: str):
        subprocess.run(["ollama", "pull", model_id], check=True)

    @staticmethod
    def list_local() -> list[str]:
        r = requests.get(f"{OllamaRunner.BASE}/api/tags")
        return [m["name"] for m in r.json().get("models", [])]

    def chat(self, messages: list[dict], stream: bool = False) -> str | Generator:
        payload = {"model": self.model_id, "messages": messages, "stream": stream}
        r = requests.post(f"{self.BASE}/api/chat", json=payload, stream=stream)
        if not stream:
            return r.json()["message"]["content"]
        # Generator for Gradio streaming
        def _stream():
            for line in r.iter_lines():
                if line:
                    chunk = json.loads(line)
                    yield chunk["message"]["content"]
                    if chunk.get("done"):
                        break
        return _stream()

    def vision_chat(self, image_b64: str, prompt: str) -> str:
        """Multimodal chat β€” Ollama passes images as base64 in the message."""
        messages = [{
            "role":    "user",
            "content": prompt,
            "images":  [image_b64],
        }]
        return self.chat(messages)

UI integration

The models tab gets an "Ollama" subtab with a model browser, pull button, and instant chat that requires zero setup β€” the simplest entry point to the whole workbench.

with gr.Tab("πŸ¦™ Ollama"):
    available = gr.Dropdown(label="Pull model",
        choices=["openbmb/minicpm-v4.6", "openbmb/minicpm5-1b"],
        allow_custom_value=True)
    pull_btn   = gr.Button("Pull")
    local_list = gr.JSON(label="Locally available")
    pull_status= gr.Textbox(label="Status")

    pull_btn.click(
        lambda m: (OllamaRunner.pull(m), OllamaRunner.list_local()),
        [available],
        [pull_status, local_list]
    )

3. Reward Model Eval

What it is

A reward model is a model trained to score (prompt, response) pairs β€” answering "how good is this output?" It's the missing piece between fine-tuning and verified alignment improvement. Without it you can train a LoRA and only know quantitatively that loss went down, not whether outputs actually got better by human-relevant criteria.

Why it matters

  • Validates that LoRA fine-tuning improved quality (not just minimized loss)
  • Enables best-of-N sampling: generate N responses, keep highest-scored
  • Enables DPO data creation: generate response pairs, reward model labels preferences
  • Closes the RLHF loop within the workbench itself

Reward model options

Model Size Focus
OpenAssistant/reward-model-deberta-v3-large-v2 450M General helpfulness
Salesforce/SFR-Reward-FsfairX-LLaMA3-RM-v0.1 8B Instruction following
MiniCPM5-1B itself (self-eval) 1B Domain-specific, via prompt

For the workbench, using MiniCPM5-1B as a judge (LLM-as-judge pattern) is the lowest-friction option since the model is already loaded.

Architecture

training/reward_eval.py
  RewardEvaluator
    .score(prompt, response) β†’ float
    .best_of_n(prompt, n, generator) β†’ str
    .create_dpo_pairs(dataset, generator, n=4) β†’ Dataset
    .eval_lora_vs_base(base_svc, lora_svc, eval_ds) β†’ dict
# training/reward_eval.py
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
import torch

class RewardEvaluator:
    def __init__(self, reward_model_id: str = "OpenAssistant/reward-model-deberta-v3-large-v2"):
        self.pipe = pipeline(
            "text-classification",
            model=reward_model_id,
            device=0 if torch.cuda.is_available() else -1,
        )

    def score(self, prompt: str, response: str) -> float:
        """Returns a scalar reward score (higher = better)."""
        text = f"Human: {prompt}\n\nAssistant: {response}"
        result = self.pipe(text, truncation=True, max_length=512)
        return result[0]["score"]

    def best_of_n(
        self,
        prompt: str,
        n: int,
        generator_fn,    # callable: prompt -> str
    ) -> tuple[str, float]:
        """Generate N responses, return the one with highest reward."""
        candidates = [(generator_fn(prompt), ) for _ in range(n)]
        scored = [(r[0], self.score(prompt, r[0])) for r in candidates]
        return max(scored, key=lambda x: x[1])

    def create_dpo_pairs(
        self,
        prompts: list[str],
        generator_fn,
        n: int = 4,
    ):
        """
        For each prompt: generate n responses, rank by reward,
        return (prompt, chosen, rejected) triplets for DPO training.
        """
        from datasets import Dataset
        rows = []
        for prompt in prompts:
            responses = [(generator_fn(prompt), ) for _ in range(n)]
            scored = sorted(
                [(r[0], self.score(prompt, r[0])) for r in responses],
                key=lambda x: x[1], reverse=True
            )
            if len(scored) >= 2:
                rows.append({
                    "prompt":   prompt,
                    "chosen":   scored[0][0],
                    "rejected": scored[-1][0],
                    "reward_gap": scored[0][1] - scored[-1][1],
                })
        return Dataset.from_list(rows)

    def eval_lora_vs_base(
        self,
        base_svc,
        lora_svc,
        eval_prompts: list[str],
    ) -> dict:
        """
        Compare base vs LoRA checkpoint by average reward score.
        Returns win rate and per-prompt scores.
        """
        base_scores = [self.score(p, base_svc.generate(p)) for p in eval_prompts]
        lora_scores = [self.score(p, lora_svc.generate(p)) for p in eval_prompts]
        wins = sum(l > b for l, b in zip(lora_scores, base_scores))
        return {
            "base_mean":   sum(base_scores) / len(base_scores),
            "lora_mean":   sum(lora_scores) / len(lora_scores),
            "lora_win_rate": wins / len(eval_prompts),
            "per_prompt":  list(zip(eval_prompts, base_scores, lora_scores)),
        }

Trackio logging

results = evaluator.eval_lora_vs_base(base_svc, lora_svc, eval_prompts)
trackio.init(project="workbench", run_name="reward_eval")
trackio.log({
    "base_reward_mean":     results["base_mean"],
    "lora_reward_mean":     results["lora_mean"],
    "lora_win_rate":        results["lora_win_rate"],
})
trackio.finish()

4. Synthetic Data Gen

What it is

The ml-intern finding: when real data is insufficient, have an LLM generate training data. This module does exactly that β€” it uses a capable model (MiniCPM4.1-8B or a cloud model via HF Router) to generate diverse, high-quality (prompt, response) pairs on demand.

Why it matters

Real-world fine-tuning is often blocked not by compute but by data. You have 50 good examples but need 5000. Synthetic gen + quality filtering bridges that gap, especially for specialized domains (plant species, historical OCR corrections, industrial inspection defect labels).

Architecture

datasets/synthetic.py
  SyntheticGenerator
    .generate(topic, n, schema) β†’ Dataset
    .augment(existing_ds, n) β†’ Dataset
    .filter_quality(ds, min_score) β†’ Dataset
    .generate_dpo_pairs(topic, n) β†’ Dataset
# datasets/synthetic.py
import json
from datasets import Dataset

GENERATION_PROMPT = """You are a training data generator. Generate {n} diverse, high-quality 
training examples for the topic: {topic}.

Output ONLY a valid JSON array. Each item must have these fields: {schema}
No explanation, no markdown, no preamble. Raw JSON array only."""

class SyntheticGenerator:
    def __init__(self, generator_svc):
        """generator_svc: any loaded ModelService with a .generate(prompt) method."""
        self.gen = generator_svc

    def generate(
        self,
        topic: str,
        n: int = 100,
        schema: dict | None = None,
    ) -> Dataset:
        """
        Generate n training examples on a topic.
        schema: dict of field_name β†’ description, e.g.
          {"instruction": "task to perform", "response": "ideal answer"}
        """
        schema = schema or {"instruction": "user task", "response": "ideal answer"}
        schema_str = ", ".join(f'"{k}": "{v}"' for k, v in schema.items())

        # Generate in batches of 20 to stay within context
        rows = []
        for batch_start in range(0, n, 20):
            batch_n = min(20, n - batch_start)
            prompt = GENERATION_PROMPT.format(
                n=batch_n, topic=topic, schema="{" + schema_str + "}"
            )
            raw = self.gen.generate(prompt)
            try:
                # Strip any accidental markdown fences
                clean = raw.strip().lstrip("```json").lstrip("```").rstrip("```")
                batch = json.loads(clean)
                rows.extend(batch)
            except json.JSONDecodeError:
                # Skip malformed batches; log the failure
                continue

        return Dataset.from_list(rows)

    def augment(self, existing_ds: Dataset, n: int) -> Dataset:
        """
        Use existing examples as few-shot demonstrations to generate n more.
        Samples up to 5 examples from existing_ds as context.
        """
        import random
        samples = existing_ds.shuffle().select(range(min(5, len(existing_ds))))
        examples_str = json.dumps(samples.to_list(), indent=2)

        prompt = f"""Here are {len(samples)} example training items:
{examples_str}

Generate {n} MORE diverse examples in the exact same JSON format.
Output only the JSON array, no explanation."""

        raw = self.gen.generate(prompt)
        try:
            clean = raw.strip().lstrip("```json").rstrip("```")
            new_rows = json.loads(clean)
        except json.JSONDecodeError:
            return existing_ds
        return Dataset.from_list(existing_ds.to_list() + new_rows)

    def filter_quality(
        self,
        ds: Dataset,
        reward_evaluator=None,
        min_score: float = 0.6,
    ) -> Dataset:
        """
        Filter with reward model if available, else heuristic filters.
        Heuristics: min length, no repeated n-grams, valid JSON fields.
        """
        if reward_evaluator:
            def _score(row):
                return reward_evaluator.score(
                    row.get("instruction", ""),
                    row.get("response", "")
                ) >= min_score
            return ds.filter(_score)
        else:
            # Basic heuristics
            def _heuristic(row):
                resp = row.get("response", "")
                return (
                    len(resp) >= 20 and       # not too short
                    len(resp) <= 4096 and     # not too long
                    resp.count(resp[:20]) < 3  # not repetitive
                )
            return ds.filter(_heuristic)

    def generate_for_domain(
        self,
        domain: str,
        output_path: str,
        n: int = 500,
    ):
        """
        Convenience method: generate, augment, filter, save to disk.
        Use for plant ID: domain="Plant species identification from photo descriptions"
        """
        ds = self.generate(topic=domain, n=n // 2)
        ds = self.augment(ds, n=n // 2)
        ds = self.filter_quality(ds)
        ds.save_to_disk(output_path)
        return ds

Domain-specific example: plant ID

gen = SyntheticGenerator(model_registry.get("minicpm41_8b"))
plant_ds = gen.generate_for_domain(
    domain="Identifying plant species from visual descriptions. "
           "Include common name, latin name, family, key visual features, and care tips.",
    output_path="data/synthetic_plants",
    n=2000,
)
# β†’ 2000 synthetic (description β†’ species JSON) training pairs

5. Paper-to-Code Agent

What it is

An autonomous agent that takes an arXiv paper URL or title, reads the methodology section, and implements the described technique within the workbench codebase. Directly inspired by the ml-intern architecture (Research β†’ Plan β†’ Implement β†’ Trace).

Why it matters

The gap between reading a paper and running an experiment is usually days of engineering. This agent compresses that to minutes for techniques that fit the workbench's model family. Practical use cases: implement a new PEFT variant, add a new evaluation metric, adapt a new data augmentation from a recent VLM paper.

Architecture

agent/paper_agent.py
  PaperAgent
    .run(paper_ref) β†’ AgentResult
      β†’ Phase 1: Research (fetch + parse paper)
      β†’ Phase 2: Plan (identify workbench integration points)
      β†’ Phase 3: Implement (generate + write code)
      β†’ Phase 4: Test (run + log to Trackio)
      β†’ Phase 5: Trace (upload session to HF Dataset)
# agent/paper_agent.py
import re
from dataclasses import dataclass, field
from huggingface_hub import HfApi
from smolagents import CodeAgent, HfApiModel
import trackio

@dataclass
class AgentResult:
    paper_title: str = ""
    summary:     str = ""
    files_modified: list[str] = field(default_factory=list)
    test_results:   dict      = field(default_factory=dict)
    trace_url:      str       = ""

class PaperAgent:
    SYSTEM_PROMPT = """You are an ML engineer working inside the OpenBMB Workbench codebase.
Given a research paper, your job is to:
1. Understand the core algorithm or technique.
2. Identify which module in the workbench it extends (training/, models/, datasets/, tools/).
3. Implement it as a new class or function, following the existing patterns.
4. Write a simple test that runs within the workbench and logs results to Trackio.

The workbench uses: transformers, peft, trl, trackio, mcp, gradio.
All new code must: fire events via the EventBus, log to Trackio, register in the Registry."""

    def __init__(self, orchestrator_model: str = "openbmb/MiniCPM4.1-8B"):
        self.model   = HfApiModel(orchestrator_model)
        self.api     = HfApi()
        self._log    = []

    def run(self, paper_ref: str) -> AgentResult:
        """
        paper_ref: arXiv URL like "https://arxiv.org/abs/2106.09685"
                   or paper title like "LoRA: Low-Rank Adaptation of Large Language Models"
        """
        result = AgentResult()

        # Phase 1: Research
        paper_text = self._fetch_paper(paper_ref)
        result.paper_title = self._extract_title(paper_text)

        # Phase 2: Plan
        plan = self._plan(paper_text)

        # Phase 3: Implement
        code_files = self._implement(plan, paper_text)
        result.files_modified = list(code_files.keys())
        for path, code in code_files.items():
            self._write_file(path, code)

        # Phase 4: Test
        trackio.init(project="workbench", run_name=f"paper_agent_{result.paper_title[:30]}")
        test_result = self._test(code_files)
        result.test_results = test_result
        trackio.log({"test_passed": test_result.get("passed", False), **test_result})
        trackio.finish()

        # Phase 5: Trace (ml-intern pattern)
        result.trace_url = self._upload_trace(result)

        return result

    def _fetch_paper(self, paper_ref: str) -> str:
        """Fetch paper text via HF Papers API or arXiv."""
        import requests
        if "arxiv.org" in paper_ref:
            arxiv_id = paper_ref.split("/abs/")[-1]
            r = requests.get(f"https://export.arxiv.org/abs/{arxiv_id}")
            return r.text
        # Fall back to HF Papers search
        from huggingface_hub import list_papers
        results = list(list_papers(query=paper_ref, limit=1))
        return str(results[0]) if results else ""

    def _plan(self, paper_text: str) -> str:
        """Ask the LLM to analyze the paper and produce an integration plan."""
        agent = CodeAgent(tools=[], model=self.model, max_steps=5)
        return agent.run(
            f"Read this paper excerpt and produce a 5-step integration plan "
            f"for the OpenBMB Workbench:\n\n{paper_text[:8000]}"
        )

    def _implement(self, plan: str, paper_text: str) -> dict[str, str]:
        """Generate code files from the plan."""
        agent = CodeAgent(tools=[], model=self.model, max_steps=15)
        code = agent.run(
            f"Implementation plan:\n{plan}\n\n"
            f"Paper details:\n{paper_text[:4000]}\n\n"
            f"Generate the Python file(s). Return a JSON dict: "
            f"{{\"path/to/file.py\": \"file_content\", ...}}"
        )
        import json
        try:
            return json.loads(code)
        except Exception:
            return {}

    def _write_file(self, path: str, content: str):
        import os
        os.makedirs(os.path.dirname(path), exist_ok=True)
        with open(path, "w") as f:
            f.write(content)

    def _test(self, code_files: dict) -> dict:
        """Run a quick import + instantiation test on generated files."""
        results = {}
        for path in code_files:
            try:
                module_name = path.replace("/", ".").replace(".py", "")
                import importlib.util, sys
                spec = importlib.util.spec_from_file_location(module_name, path)
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                results[path] = "imported_ok"
            except Exception as e:
                results[path] = f"error: {e}"
        results["passed"] = all("ok" in v for v in results.values())
        return results

    def _upload_trace(self, result: AgentResult) -> str:
        """Upload session trace to private HF Dataset (ml-intern pattern)."""
        import json, datetime
        session = {
            "timestamp":      datetime.datetime.utcnow().isoformat(),
            "paper":          result.paper_title,
            "files_modified": result.files_modified,
            "test_results":   result.test_results,
        }
        user = self.api.whoami()["name"]
        dataset_id = f"{user}/workbench-paper-sessions"
        # Upload as JSONL
        # ... (HF Dataset API)
        return f"https://huggingface.co/datasets/{dataset_id}"

6. HF Spaces Deploy

What it is

One-click packaging and deployment of the current workbench state as a Hugging Face Space. The Space runs the same Gradio app on HF infrastructure, shareable via URL, with ZeroGPU support for serverless GPU access.

Why it matters

Sharing a workbench state with a collaborator currently requires: git push, environment setup, model download, config sync. With one-click deploy: copy URL β†’ colleague sees the live app. For hackathons this is especially valuable β€” deploy a domain-specific variant as a demo Space in under 2 minutes.

Architecture

deploy/spaces.py
  SpacesDeployer
    .prepare_repo() β†’ creates/updates HF Space repo
    .upload_code()  β†’ pushes app code (not model weights)
    .set_hardware(tier) β†’ sets GPU tier in README
    .configure_secrets(env_vars) β†’ sets HF Space secrets
    .deploy() β†’ trigger Space rebuild
    .get_url() β†’ returns live Space URL
# deploy/spaces.py
import os, shutil, tempfile
from pathlib import Path
from huggingface_hub import HfApi, SpaceHardware

class SpacesDeployer:
    HARDWARE_MAP = {
        "cpu":    SpaceHardware.CPU_BASIC,
        "t4":     SpaceHardware.T4_SMALL,
        "t4_lg":  SpaceHardware.T4_MEDIUM,
        "a10":    SpaceHardware.A10G_SMALL,
        "a100":   SpaceHardware.A100_LARGE,
        "zero":   SpaceHardware.CPU_BASIC,   # ZeroGPU: uses CPU_BASIC + @spaces.GPU
    }
    EXCLUDE = {".git", "__pycache__", "exports", "data", "checkpoints",
               "*.gguf", "*.bin", "*.safetensors", ".env"}

    def __init__(self, space_id: str, hardware: str = "zero"):
        self.api      = HfApi()
        self.space_id = space_id   # "username/my-workbench"
        self.hardware = hardware

    def deploy(
        self,
        src_dir: str = ".",
        env_vars: dict | None = None,
    ) -> str:
        """Full deploy pipeline. Returns live Space URL."""
        self._create_or_update_repo()
        self._upload_code(src_dir)
        self._configure_secrets(env_vars or {})
        self._patch_app_for_zerogpu()
        return f"https://huggingface.co/spaces/{self.space_id}"

    def _create_or_update_repo(self):
        try:
            self.api.create_repo(
                repo_id=self.space_id,
                repo_type="space",
                space_sdk="gradio",
                private=False,
                exist_ok=True,
            )
        except Exception as e:
            print(f"Repo create/update: {e}")

    def _upload_code(self, src_dir: str):
        with tempfile.TemporaryDirectory() as tmp:
            src  = Path(src_dir)
            dest = Path(tmp)
            # Copy only non-excluded files
            for item in src.rglob("*"):
                if any(item.match(pat) for pat in self.EXCLUDE):
                    continue
                rel = item.relative_to(src)
                target = dest / rel
                target.parent.mkdir(parents=True, exist_ok=True)
                if item.is_file():
                    shutil.copy2(item, target)
            self.api.upload_folder(
                folder_path=str(dest),
                repo_id=self.space_id,
                repo_type="space",
                commit_message="workbench deploy",
            )

    def _configure_secrets(self, env_vars: dict):
        for key, val in env_vars.items():
            self.api.add_space_secret(
                repo_id=self.space_id,
                key=key,
                value=val,
            )
        # Always set hardware
        self.api.request_space_hardware(
            repo_id=self.space_id,
            hardware=self.HARDWARE_MAP[self.hardware],
        )

    def _patch_app_for_zerogpu(self):
        """
        If hardware=zero, wrap inference functions with @spaces.GPU decorator.
        Uploads a patched app.py.
        """
        if self.hardware != "zero":
            return
        # Read existing app.py from Space, inject @spaces.GPU, re-upload
        app_content = self.api.hf_hub_download(
            repo_id=self.space_id, filename="app.py", repo_type="space"
        )
        with open(app_content) as f:
            code = f.read()
        # Simple injection: add import and decorator before inference functions
        patched = "import spaces\n" + code.replace(
            "def run_inference(",
            "@spaces.GPU\ndef run_inference("
        )
        self.api.upload_file(
            path_or_fileobj=patched.encode(),
            path_in_repo="app.py",
            repo_id=self.space_id,
            repo_type="space",
        )

UI: deploy button in any tab header

with gr.Row():
    space_id_box = gr.Textbox(placeholder="username/my-workbench", label="Space ID")
    hardware_dd  = gr.Dropdown(["cpu","t4","a10","zero"], value="zero", label="Hardware")
    deploy_btn   = gr.Button("πŸš€ Deploy to HF Spaces")
    deploy_url   = gr.Textbox(label="Live URL", interactive=False)

    def do_deploy(space_id, hw):
        d = SpacesDeployer(space_id, hw)
        url = d.deploy(env_vars={"HF_TOKEN": os.environ.get("HF_TOKEN","")})
        return url

    deploy_btn.click(do_deploy, [space_id_box, hardware_dd], deploy_url)

7. VINDEX Integration

What it is

VINDEX is your own knowledge-editing engine (ki-fusion-labs.de). It exposes eight methods for mechanistic interpretability and targeted weight editing on transformer models. Integrating it into the workbench closes the loop between training a LoRA and verifying (or surgically fixing) what the model actually knows.

VINDEX endpoints (from your PRD v1):

  • logit_lens β€” per-layer prediction visualization
  • slot_neighbors β€” embedding space neighbors
  • layer_contribution β€” per-layer influence on final prediction
  • transition_spectrum β€” attention head transition matrix
  • calibrated_edit β€” targeted fact edit in weight space
  • derive_scale β€” scaling factor derivation for safe edits
  • star_spread β€” spread edit across semantically related slots
  • protect_relations β€” guard against side effects

Why it matters for the workbench

After LoRA fine-tuning a model on plant data:

  1. logit_lens β€” does the model correctly predict "Rosa" at the right layer for a rose image?
  2. calibrated_edit β€” if it consistently misidentifies Acer as Quercus, edit that slot
  3. star_spread β€” propagate the Acer correction to closely related maple species
  4. protect_relations β€” verify the edit didn't break "plant β†’ living thing β†’ organism"

This is not possible with LoRA alone β€” LoRA changes weight statistics globally. VINDEX does surgical point edits, making it a complement to LoRA, not a replacement.

Architecture

tools/vindex_tool.py
  VINDEXClient
    .logit_lens(model, tokenizer, text) β†’ dict[layer, prediction]
    .calibrated_edit(model, subject, relation, old_obj, new_obj) β†’ model
    .star_spread(model, anchor_subject, n=5) β†’ list[affected_slots]
    .protect_relations(model, protected_triplets) β†’ model
    .layer_contribution(model, text) β†’ dict[layer, score]
    .slot_neighbors(model, token_id, n=10) β†’ list[str]
# tools/vindex_tool.py
"""
VINDEX integration.
Assumes VINDEX FastAPI server is running locally on port 8765,
OR VINDEX modules are importable from your local install.
"""
import requests
from mcp.server.fastmcp import FastMCP

VINDEX_BASE = "http://localhost:8765"   # your local VINDEX FastAPI

mcp = FastMCP("VINDEXTools")

@mcp.tool()
async def logit_lens(
    model_id:  str,
    text:      str,
    layer_range: tuple[int, int] = (0, -1),
) -> dict:
    """
    Run logit lens on a loaded model for the given text.
    Returns per-layer top-5 token predictions and probabilities.
    Useful for finding the 'phase layer' where the model commits to an answer.
    """
    r = requests.post(f"{VINDEX_BASE}/logit_lens", json={
        "model_id":    model_id,
        "text":        text,
        "layer_range": list(layer_range),
    })
    return r.json()

@mcp.tool()
async def calibrated_edit(
    model_id:  str,
    subject:   str,
    relation:  str,
    old_obj:   str,
    new_obj:   str,
    causal_window: int = 3,   # Β±3 layers around logit lens phase layer
) -> dict:
    """
    Perform a targeted knowledge edit: change the model's belief about
    (subject, relation) from old_obj to new_obj.
    causal_window: restrict causal search to Β±N layers around phase layer.
    Returns edit_success, layers_modified, side_effect_score.
    """
    r = requests.post(f"{VINDEX_BASE}/calibrated_edit", json={
        "model_id":      model_id,
        "subject":       subject,
        "relation":      relation,
        "old_obj":       old_obj,
        "new_obj":       new_obj,
        "causal_window": causal_window,
    })
    return r.json()

@mcp.tool()
async def star_spread(
    model_id:       str,
    anchor_subject: str,
    n_neighbors:    int = 5,
) -> dict:
    """
    Find semantically related slots and spread a recent edit across them.
    Example: after editing "Acer palmatum β†’ maple", also update
    "Acer japonicum", "Acer shirasawanum" etc.
    Returns list of affected subjects and their edit scores.
    """
    r = requests.post(f"{VINDEX_BASE}/star_spread", json={
        "model_id":       model_id,
        "anchor_subject": anchor_subject,
        "n_neighbors":    n_neighbors,
    })
    return r.json()

@mcp.tool()
async def protect_relations(
    model_id:          str,
    protected_triplets: list[dict],   # [{"s": ..., "r": ..., "o": ...}]
) -> dict:
    """
    After a knowledge edit, verify that listed subject-relation-object triplets
    remain intact. Returns a pass/fail table and a side_effect_score.
    """
    r = requests.post(f"{VINDEX_BASE}/protect_relations", json={
        "model_id":          model_id,
        "protected_triplets": protected_triplets,
    })
    return r.json()

@mcp.tool()
async def layer_contribution(
    model_id: str,
    text:     str,
) -> dict:
    """
    Per-layer contribution score to the final prediction.
    Use to find which layers drive the target behavior before editing.
    """
    r = requests.post(f"{VINDEX_BASE}/layer_contribution", json={
        "model_id": model_id,
        "text":     text,
    })
    return r.json()

Known bugs to fix before integration (from VINDEX PRD v1)

  1. GPU memory leak β€” after repeated edits, VRAM grows unbounded. Fix: explicitly call torch.cuda.empty_cache() after each calibrated_edit call and detach gradient graphs.

  2. Dead-code blocks β€” several helper functions in the weight-surgery path are unreachable after a recent refactor. Before integrating: grep -n "def " vindex/core.py | xargs and verify each function has at least one call site.

  3. Weight imbalance β€” star_spread can over-edit related slots if n_neighbors > 5. Hard-cap at 5 in the MCP tool until the scaling formula is validated.

  4. Forward optimization β€” restrict causal search to Β±3 layers around the logit lens phase layer (already implemented as causal_window param above).

UI tab: "🧠 Knowledge Editor"

Inputs:
  Model selector (loaded models)
  Subject text (e.g. "Acer palmatum")
  Relation (e.g. "is a type of")
  Old object (e.g. "oak")
  New object (e.g. "maple")
  [Run Logit Lens] button β†’ displays per-layer heatmap via gr.Plot
  [Apply Edit] button β†’ runs calibrated_edit
  [Spread] button β†’ runs star_spread
  [Verify] button β†’ runs protect_relations on a default triplet set
  
Outputs:
  Per-layer prediction table
  Edit success / layers modified
  Side effect score (0 = safe, 1 = dangerous)

8. OCR Pipeline Hook

What it is

Your self-improving multilingual OCR pipeline (Latin, Arabic, Cyrillic) already exists and produces output files: image + predicted_text + confidence scores. This extension hooks those outputs directly into the workbench Field Notes system, creating a tight correction loop:

OCR pipeline outputs (uncertain predictions)
        ↓
Auto-created Field Notes (image + OCR text + empty correction field)
        ↓
Human reviews in UI β†’ fills in correction
        ↓
Accepted corrections auto-tagged "use_for_training=True"
        ↓
LoRA training run on correction pairs
        ↓
Better OCR model β†’ fewer uncertain predictions

This is the active learning loop your OCR pipeline was designed for but didn't yet have a clean UI for corrections and retraining.

Architecture

datasets/ocr_loader.py
  OCRPipelineLoader
    .watch(output_dir, threshold) β†’ poll for new low-confidence outputs
    .ingest(output_dir) β†’ batch import all outputs
    .to_field_notes(threshold) β†’ FieldNote[] (uncertain ones only)
    .to_training_dataset() β†’ Dataset (corrected ones only)
# datasets/ocr_loader.py
import json, os
from pathlib import Path
from datasets.field_notes import FieldNote, FieldNoteStore
from core.events import bus, EventType, Event

class OCRPipelineLoader:
    """
    Watches a directory written by the OCR pipeline.
    Expected format per document:
      <doc_id>.json β†’ {"image_path": ..., "predicted_text": ...,
                        "confidence": float, "script": "latin"|"arabic"|"cyrillic"}
    """
    def __init__(
        self,
        output_dir: str,
        store: FieldNoteStore,
        confidence_threshold: float = 0.85,
    ):
        self.output_dir  = Path(output_dir)
        self.store       = store
        self.threshold   = confidence_threshold

    def ingest(self, limit: int | None = None) -> int:
        """
        Read all pipeline outputs. Create Field Notes for uncertain predictions
        (confidence < threshold). Skip already-ingested docs.
        Returns number of new Field Notes created.
        """
        count = 0
        json_files = sorted(self.output_dir.glob("*.json"))
        if limit:
            json_files = json_files[:limit]

        for jf in json_files:
            try:
                data = json.loads(jf.read_text())
            except json.JSONDecodeError:
                continue

            # Skip high-confidence outputs
            if data.get("confidence", 1.0) >= self.threshold:
                continue

            note = FieldNote(
                id=f"ocr_{jf.stem}",
                model_id="ocr_pipeline",
                modality="image",
                image_path=data["image_path"],
                prompt=(
                    f"Transcribe this {data.get('script','latin')} text accurately. "
                    f"OCR predicted: '{data['predicted_text']}'"
                ),
                response=data["predicted_text"],
                correction="",      # human fills this in
                tags=[
                    f"script:{data.get('script','unknown')}",
                    f"conf:{data.get('confidence',0.0):.2f}",
                    "source:ocr_pipeline",
                ],
            )
            self.store.save(note)
            count += 1

        # Fire event
        import asyncio
        asyncio.run(bus.emit(Event(
            type=EventType.DATASET_LOADED,
            payload={"source": "ocr_pipeline", "new_notes": count}
        )))
        return count

    def watch(self, poll_interval: int = 30):
        """
        Background thread: poll output_dir every N seconds, ingest new files.
        Use in production when OCR pipeline runs continuously.
        """
        import threading, time
        seen = set()
        def _poll():
            while True:
                for jf in self.output_dir.glob("*.json"):
                    if jf.stem not in seen:
                        seen.add(jf.stem)
                        self.ingest.__wrapped__([jf])  # single-file ingest
                time.sleep(poll_interval)
        t = threading.Thread(target=_poll, daemon=True)
        t.start()

    def to_training_dataset(self, script_filter: str | None = None):
        """
        Export corrected field notes as a training dataset.
        schema: {"image_path": ..., "instruction": ..., "response": ...}
        Ready to pass to LoRATextTrainer or a vision LoRA config.
        """
        from datasets import Dataset
        query = "SELECT data FROM notes WHERE json_extract(data,'$.correction') != ''"
        if script_filter:
            query += f" AND json_extract(data,'$.tags') LIKE '%script:{script_filter}%'"

        rows = [
            json.loads(r[0])
            for r in self.store.conn.execute(query)
        ]
        training_rows = [
            {
                "image_path":  r["image_path"],
                "instruction": r["prompt"],
                "response":    r["correction"],  # human-corrected
                "script":      next(
                    (t.split(":")[1] for t in r["tags"] if t.startswith("script:")),
                    "unknown"
                ),
            }
            for r in rows
        ]
        return Dataset.from_list(training_rows)

UI: OCR correction view (Field Notes tab, new subtab)

[OCR Pipeline Output dir: ____]  [Confidence threshold: 0.85]  [Ingest]
  
Table of uncertain predictions:
  | Image | OCR text          | Confidence | Your correction | Save |
  | [img] | "Rechung 18. Ap"  | 0.73       | [____________]  | [βœ“]  |
  | [img] | "Beschlußprotoko" | 0.69       | [____________]  | [βœ“]  |

[Export corrections as training dataset]  [Start LoRA retrain]

Connection to the active learning loop

Your OCR pipeline already has:

  • abstention logic (the "council abstains" on uncertain predictions)
  • acceptance-gated fine-tuning
  • RAG-based post-correction

The workbench hook provides the missing UI layer: human-in-the-loop corrections that feed the acceptance gate. The FieldNoteStore.to_hf_dataset() output plugs directly into the pipeline's acceptance-gated fine-tuning step.


9. MiniCPM Desk-Pet

What it is

OpenBMB ships MiniCPM-Desk-Pet, a desktop companion app powered by MiniCPM5-1B, alongside the model release (2026-05-19). Key features:

  • Runs locally on Apple Silicon, NVIDIA GPU, or CPU
  • LoRA persona switching β€” different personalities loaded as adapters
  • Integrates with coding agents (Cursor, Claude Code, Codex)
  • Tiny footprint (~2GB VRAM with Q4_K_M)

The workbench extension lets you train LoRA personas directly and export them to the Desk-Pet format.

What "persona" means here

A LoRA persona is a small adapter (rank 8–16) trained on ~100–500 conversation examples in a specific voice or style. Examples:

  • "Botanist assistant" β€” answers in scientific plant terminology
  • "Friendly field guide" β€” casual, encouraging tone for beginners
  • "Historical document expert" β€” formal, precise, citation-aware (connects to OCR pipeline)

Training data is small enough that synthetic gen (Extension 4) can produce it in minutes.

Architecture

agent/desk_pet.py
  DeskPetExporter
    .train_persona(name, style_desc, n_examples) β†’ LoRA checkpoint
    .export_to_deskpet(checkpoint_path) β†’ deskpet_compatible.gguf
    .list_personas() β†’ [PersonaMeta]
    .load_persona(name) β†’ activates adapter in current session
# agent/desk_pet.py
from dataclasses import dataclass
from pathlib import Path
import json, shutil

@dataclass
class PersonaMeta:
    name:        str
    description: str
    checkpoint:  str
    gguf_path:   str | None = None
    n_examples:  int = 0

class DeskPetExporter:
    PERSONA_DIR = Path("data/personas")

    def __init__(self, base_model_id: str = "openbmb/MiniCPM5-1B"):
        self.base_model_id = base_model_id
        self.PERSONA_DIR.mkdir(parents=True, exist_ok=True)

    def train_persona(
        self,
        name:        str,
        style_desc:  str,
        n_examples:  int = 200,
        lora_rank:   int = 8,      # small rank: personas need only ~50-100 examples
    ) -> str:
        """
        1. Use SyntheticGenerator to create conversation examples in the persona style.
        2. Fine-tune MiniCPM5-1B LoRA.
        3. Save checkpoint.
        Returns checkpoint path.
        """
        from datasets.synthetic import SyntheticGenerator
        from training.lora import LoRATextTrainer
        from models.minicpm_text import MiniCPMTextService
        import torch

        # Generate persona training data
        from transformers import AutoModelForCausalLM, AutoTokenizer
        base_model = AutoModelForCausalLM.from_pretrained(
            self.base_model_id, torch_dtype=torch.bfloat16, device_map="auto"
        )
        base_tok = AutoTokenizer.from_pretrained(self.base_model_id)

        gen_svc = MiniCPMTextService.__new__(MiniCPMTextService)
        gen_svc.model, gen_svc.tokenizer = base_model, base_tok

        synth = SyntheticGenerator(gen_svc)
        ds = synth.generate(
            topic=f"Conversation examples in the style of: {style_desc}. "
                  f"Each example: a user message and a response in that persona's voice.",
            n=n_examples,
            schema={"instruction": "user message", "response": "persona reply"},
        )

        # LoRA fine-tune
        trainer = LoRATextTrainer(cfg={
            "lora_rank":      lora_rank,
            "lora_alpha":     lora_rank * 2,
            "epochs":         2,
            "batch_size":     8,
            "grad_accum":     2,
        })
        output_dir = str(self.PERSONA_DIR / name / "checkpoint")
        trainer.train(base_model, base_tok, ds, run_name=f"persona_{name}")

        # Save metadata
        meta = PersonaMeta(
            name=name, description=style_desc,
            checkpoint=output_dir, n_examples=n_examples
        )
        (self.PERSONA_DIR / name / "meta.json").write_text(
            json.dumps(meta.__dict__, indent=2)
        )
        return output_dir

    def export_to_deskpet(self, persona_name: str) -> str:
        """
        Merge LoRA into base weights, then export as GGUF for Desk-Pet.
        Returns path to merged GGUF.
        """
        from peft import PeftModel
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import torch
        from training.export import GGUFExporter

        meta_path = self.PERSONA_DIR / persona_name / "meta.json"
        meta      = PersonaMeta(**json.loads(meta_path.read_text()))

        # Merge LoRA into base
        base  = AutoModelForCausalLM.from_pretrained(
            self.base_model_id, torch_dtype=torch.bfloat16
        )
        peft_model = PeftModel.from_pretrained(base, meta.checkpoint)
        merged = peft_model.merge_and_unload()

        merged_path = str(self.PERSONA_DIR / persona_name / "merged")
        merged.save_pretrained(merged_path)
        AutoTokenizer.from_pretrained(self.base_model_id).save_pretrained(merged_path)

        # Export GGUF
        exporter  = GGUFExporter()
        gguf_path = exporter.export(
            model_path=merged_path,
            output_dir=str(self.PERSONA_DIR / persona_name / "gguf"),
            quant="Q4_K_M",
            model_type="text",
        )[1]   # [0] = F16, [1] = quantized

        # Update metadata
        meta.gguf_path = gguf_path
        meta_path.write_text(json.dumps(meta.__dict__, indent=2))
        return gguf_path

Usage flow

1. UI: "New persona" β†’ enter name + style description
2. Synthetic gen: 200 examples of that voice β†’ fine-tune LoRA (rank 8, ~10 min on RTX)
3. Export β†’ Q4_K_M GGUF
4. Copy to Desk-Pet personas/ dir
5. Desk-Pet: switch persona β†’ instant personality change

10. MiniCPM-o Audio Tab

What it is

MiniCPM-o-4.5 (released 2026-05-17) is a true omnimodal model β€” it sees, listens, and speaks simultaneously in real-time. It supports proactive interactions (proactive reminding) and real-time conversation with both visual and audio input.

This extension adds a new Gradio tab with a microphone + camera (or image) interface, streaming audio output, and real-time MiniCPM-o inference.

Architecture

ui/audio_tab.py
  OmnimodalTab
    .build() β†’ gr.Column with audio+image inputs and streaming output
    
models/minicpm_omni.py
  MiniCPMOmniService
    .stream_chat(audio_bytes, image=None, text=None) β†’ Generator[str]
    .speak(text) β†’ bytes  (TTS for audio output)
# models/minicpm_omni.py
import torch
import numpy as np
from transformers import AutoProcessor, AutoModel

class MiniCPMOmniService:
    """
    MiniCPM-o-4.5: omnimodal service.
    Handles text + image + audio simultaneously.
    """
    MODEL_ID = "openbmb/MiniCPM-o-4.5"

    def __init__(self, cfg: dict):
        self.processor = AutoProcessor.from_pretrained(
            self.MODEL_ID, trust_remote_code=True
        )
        self.model = AutoModel.from_pretrained(
            self.MODEL_ID,
            trust_remote_code=True,
            torch_dtype=torch.bfloat16,
            device_map="auto",
        )
        self.thinking = cfg.get("thinking_mode", False)

    def chat(
        self,
        text:        str | None = None,
        image=None,             # PIL Image
        audio:       np.ndarray | None = None,
        sample_rate: int = 16000,
    ) -> str:
        """
        Full omnimodal chat: pass any combination of text, image, audio.
        MiniCPM-o-4.5 handles them natively.
        """
        content = []
        if image is not None:
            content.append({"type": "image", "image": image})
        if audio is not None:
            content.append({
                "type":        "audio",
                "audio":       audio,
                "sample_rate": sample_rate,
            })
        if text:
            content.append({"type": "text", "text": text})

        messages = [{"role": "user", "content": content}]
        inputs   = self.processor.apply_chat_template(
            messages,
            add_generation_prompt=True,
            tokenize=True,
            return_tensors="pt",
            return_dict=True,
            chat_template_kwargs={"enable_thinking": self.thinking},
        ).to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(**inputs, max_new_tokens=512)
        return self.processor.decode(
            outputs[0][inputs["input_ids"].shape[1]:],
            skip_special_tokens=True
        )
# ui/audio_tab.py
import gradio as gr
import numpy as np

def build_audio_tab(model_registry):
    with gr.Column():
        gr.Markdown("### πŸŽ™οΈ Omnimodal β€” MiniCPM-o-4.5")
        gr.Markdown("Speak, show an image, or type β€” all at once.")

        with gr.Row():
            audio_in  = gr.Audio(
                sources=["microphone"],
                type="numpy",
                label="Microphone input",
                streaming=True,
            )
            image_in  = gr.Image(
                type="pil",
                label="Optional image",
            )

        text_in   = gr.Textbox(label="Optional text", placeholder="Or type here...")
        submit    = gr.Button("Send")
        output    = gr.Textbox(label="Response", lines=8)
        audio_out = gr.Audio(label="Spoken response (TTS)", autoplay=True)

        def respond(audio_data, image, text):
            svc = model_registry.get("minicpm_o45")
            audio_arr = None
            if audio_data is not None:
                sr, arr = audio_data
                audio_arr = arr.astype(np.float32) / 32768.0
            response = svc.chat(text=text, image=image, audio=audio_arr)
            # Optional: TTS for spoken output
            # spoken = tts(response)
            return response, None   # None = no audio out yet

        submit.click(respond, [audio_in, image_in, text_in], [output, audio_out])

Real-time streaming version (advanced)

For true real-time conversation (proactive reminding, interrupt detection):

# streaming audio inference using gradio's streaming audio + SSE
demo = gr.Interface(
    fn=stream_omni_response,
    inputs=[
        gr.Audio(streaming=True, sources=["microphone"]),
        gr.Image(type="pil"),
    ],
    outputs=gr.Textbox(),
    live=True,
)

TTS for audio output

MiniCPM-o-4.5 has its own audio generation capability β€” check the model card for the generate_audio method. Fallback: use kokoro-82M (Apache 2.0, 82M, runs locally) for TTS.

# Kokoro TTS fallback (lightweight, local)
from kokoro import KPipeline
tts_pipe = KPipeline(lang_code="en-us")
audio, sr = tts_pipe(response_text, voice="af_heart")

11. Cross-Extension Wiring

Most extensions are independent, but several combinations unlock powerful compound workflows:

OCR β†’ VINDEX

Low-confidence OCR outputs β†’ Field Notes β†’ human corrections β†’ LoRA retrain. But additionally: if the OCR model consistently misreads a specific character class, use VINDEX's logit_lens to identify which layer is responsible, then calibrated_edit to target that slot directly β€” a faster fix than a full retraining cycle.

Synthetic Gen β†’ Reward Model β†’ DPO

SyntheticGenerator.generate(topic, n=1000)
    β†’ RewardEvaluator.create_dpo_pairs(prompts, generator, n=4)
    β†’ DPO training via TRL DPOTrainer
    β†’ Trackio logs win rate
    β†’ VINDEX verify alignment not broken

Paper Agent β†’ Desk-Pet Persona

PaperAgent reads: "Persona-based dialogue systems for domain experts"
    β†’ Implements: persona training data format
    β†’ DeskPetExporter.train_persona("expert_botanist", "...")
    β†’ Export GGUF β†’ load in Desk-Pet

HF Spaces + vLLM + Trackio

SpacesDeployer.deploy(hardware="a10")   # production GPU
    β†’ app.py runs VLLMRunner on A10G
    β†’ All requests logged via Trackio with space_id=deployed_space
    β†’ Dashboard visible to collaborators at trackio Space URL

Full active-learning loop (all extensions combined)

MiniCPM-o Audio Tab: user speaks + shows image of plant
        ↓
OmnimodalService: identify species (low confidence)
        ↓
OCRPipelineLoader: auto-create Field Note (uncertain prediction)
        ↓
Human: corrects species name in UI
        ↓
SyntheticGenerator: augment with 50 similar examples
        ↓
RewardEvaluator: filter synthetic examples
        ↓
LoRATextTrainer + TRL + Trackio: fine-tune
        ↓
VINDEX: verify the target species slot was corrected
        ↓
DeskPetExporter: export updated persona
        ↓
SpacesDeployer: push updated app to HF Spaces

Extensions spec v1.0 β€” Christof Kaller / ki-fusion-labs.de β€” 2026-06-05