workbench / HF_PRD_ext.md
GitHub Actions
Initial ZeroGPU deployment with spaces shim
7f9dfed
|
Raw
History Blame Contribute Delete
55.7 kB
# OpenBMB Workbench β€” Planned Extensions: Detailed Spec
> 2026-06-05 | Companion to PRD v2.0
---
## Table of Contents
1. [vLLM Serving Tab](#1-vllm-serving-tab)
2. [Ollama Quick-Start](#2-ollama-quick-start)
3. [Reward Model Eval](#3-reward-model-eval)
4. [Synthetic Data Gen](#4-synthetic-data-gen)
5. [Paper-to-Code Agent](#5-paper-to-code-agent)
6. [HF Spaces Deploy](#6-hf-spaces-deploy)
7. [VINDEX Integration](#7-vindex-integration)
8. [OCR Pipeline Hook](#8-ocr-pipeline-hook)
9. [MiniCPM Desk-Pet](#9-minicpm-desk-pet)
10. [MiniCPM-o Audio Tab](#10-minicpm-o-audio-tab)
11. [Cross-Extension Wiring](#11-cross-extension-wiring)
---
## 1. vLLM Serving Tab
### What it is
vLLM is a production-grade inference engine built around *PagedAttention* β€” a KV-cache management
algorithm that treats GPU memory like virtual memory pages. The result is dramatically higher
throughput when multiple requests run concurrently, compared to naive Transformers inference.
In the workbench context, vLLM adds a fourth inference mode alongside llama.cpp, SGLang, and
Ollama. You use it when you want OpenAI-compatible HTTP endpoints, continuous batching, or when
benchmarking production serving latency.
### Why it matters
| Scenario | Benefit |
|----------|---------|
| Benchmarking fine-tuned LoRA | Compare throughput before/after fine-tune |
| Multi-user demo | Queue and batch concurrent requests |
| Production deployment | OpenAI-compatible API, drop-in for existing tooling |
| MiniCPM4.1-8B long context | PagedAttention shines on 128K context β€” avoids OOM |
### Architecture
```
models/vllm_runner.py
VLLMRunner
.start(model_id, cfg) β†’ subprocess: vllm serve ...
.stop() β†’ terminate subprocess
.chat(messages) β†’ str β†’ POST /v1/chat/completions
.batch(prompts) β†’ list[str]β†’ concurrent POST via asyncio
.stats() β†’ dict β†’ GET /metrics (Prometheus)
```
```python
# models/vllm_runner.py
import subprocess, asyncio, requests
from openai import AsyncOpenAI
class VLLMRunner:
def __init__(self, cfg: dict):
self.model_id = cfg["hf_id"]
self.port = cfg.get("port", 8000)
self.gpu_memory = cfg.get("gpu_memory_utilization", 0.85)
self.trust_rc = cfg.get("trust_remote_code", False)
self._proc = None
self._client = AsyncOpenAI(
base_url=f"http://localhost:{self.port}/v1",
api_key="vllm-local"
)
def start(self):
cmd = [
"vllm", "serve", self.model_id,
"--port", str(self.port),
"--gpu-memory-utilization", str(self.gpu_memory),
]
if self.trust_rc:
cmd.append("--trust-remote-code")
self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
self._wait_ready()
def _wait_ready(self, timeout: int = 60):
import time
for _ in range(timeout):
try:
r = requests.get(f"http://localhost:{self.port}/health")
if r.status_code == 200:
return
except Exception:
pass
time.sleep(1)
raise RuntimeError("vLLM server did not become healthy")
async def chat(self, messages: list[dict], **kwargs) -> str:
resp = await self._client.chat.completions.create(
model=self.model_id,
messages=messages,
max_tokens=kwargs.get("max_tokens", 512),
)
return resp.choices[0].message.content
async def batch(self, prompts: list[str], system: str = "") -> list[str]:
tasks = [
self.chat([
{"role": "system", "content": system},
{"role": "user", "content": p},
])
for p in prompts
]
return await asyncio.gather(*tasks)
def stats(self) -> dict:
"""Prometheus metrics parsed to dict. Returns throughput + latency."""
r = requests.get(f"http://localhost:{self.port}/metrics")
lines = r.text.splitlines()
metrics = {}
for line in lines:
if line.startswith("vllm:") and not line.startswith("#"):
key, val = line.rsplit(" ", 1)
metrics[key] = float(val)
return metrics
def stop(self):
if self._proc:
self._proc.terminate()
self._proc = None
```
### UI tab (models_tab.py β†’ extend)
Add a "vLLM" section to the models tab:
```python
with gr.Tab("⚑ vLLM"):
model_dd = gr.Dropdown(label="Model", choices=list_vllm_compatible())
gpu_mem_sl = gr.Slider(0.5, 0.95, value=0.85, label="GPU memory utilization")
start_btn = gr.Button("Start server")
stop_btn = gr.Button("Stop server")
status_lbl = gr.Label(label="Status")
metrics_json = gr.JSON(label="Live metrics")
def start_vllm(model_id, gpu_mem):
cfg = {**model_registry.get(model_id).cfg,
"gpu_memory_utilization": gpu_mem}
runner = VLLMRunner(cfg)
runner.start()
model_registry.register(f"{model_id}_vllm", runner)
return "Running", runner.stats()
start_btn.click(start_vllm, [model_dd, gpu_mem_sl], [status_lbl, metrics_json])
```
### Trackio events fired
```python
trackio.init(project="workbench", run_name="vllm_benchmark")
trackio.log({"throughput_tok_per_s": ..., "p50_latency_ms": ..., "gpu_mem_used": ...})
trackio.finish()
```
---
## 2. Ollama Quick-Start
### What it is
Ollama is zero-configuration local model serving. One `ollama pull` downloads a quantized model
and one `ollama serve` runs it. No CUDA setup, no Python environment issues. The REST API is
OpenAI-compatible on port 11434.
MiniCPM-V-4.6 and MiniCPM5-1B are both in the Ollama registry:
```bash
ollama pull openbmb/minicpm-v4.6
ollama pull openbmb/minicpm5-1b
```
### Why it matters
Ollama is the fastest path from "nothing" to "running model" β€” ideal for demos, non-GPU machines
(Apple Silicon is well-optimized), and users who shouldn't need to understand quantization.
### Architecture
```
models/ollama_runner.py
OllamaRunner
.pull(model_id) β†’ subprocess: ollama pull ...
.chat(messages) β†’ str β†’ POST http://localhost:11434/api/chat
.generate(prompt) β†’ str β†’ POST http://localhost:11434/api/generate (streaming)
.list() β†’ list[str] β†’ GET /api/tags
```
```python
# models/ollama_runner.py
import requests, subprocess, json
from typing import Generator
class OllamaRunner:
BASE = "http://localhost:11434"
def __init__(self, model_id: str):
# Ollama uses "openbmb/minicpm-v4.6" style IDs directly
self.model_id = model_id
@staticmethod
def pull(model_id: str):
subprocess.run(["ollama", "pull", model_id], check=True)
@staticmethod
def list_local() -> list[str]:
r = requests.get(f"{OllamaRunner.BASE}/api/tags")
return [m["name"] for m in r.json().get("models", [])]
def chat(self, messages: list[dict], stream: bool = False) -> str | Generator:
payload = {"model": self.model_id, "messages": messages, "stream": stream}
r = requests.post(f"{self.BASE}/api/chat", json=payload, stream=stream)
if not stream:
return r.json()["message"]["content"]
# Generator for Gradio streaming
def _stream():
for line in r.iter_lines():
if line:
chunk = json.loads(line)
yield chunk["message"]["content"]
if chunk.get("done"):
break
return _stream()
def vision_chat(self, image_b64: str, prompt: str) -> str:
"""Multimodal chat β€” Ollama passes images as base64 in the message."""
messages = [{
"role": "user",
"content": prompt,
"images": [image_b64],
}]
return self.chat(messages)
```
### UI integration
The models tab gets an "Ollama" subtab with a model browser, pull button, and instant chat
that requires zero setup β€” the simplest entry point to the whole workbench.
```python
with gr.Tab("πŸ¦™ Ollama"):
available = gr.Dropdown(label="Pull model",
choices=["openbmb/minicpm-v4.6", "openbmb/minicpm5-1b"],
allow_custom_value=True)
pull_btn = gr.Button("Pull")
local_list = gr.JSON(label="Locally available")
pull_status= gr.Textbox(label="Status")
pull_btn.click(
lambda m: (OllamaRunner.pull(m), OllamaRunner.list_local()),
[available],
[pull_status, local_list]
)
```
---
## 3. Reward Model Eval
### What it is
A reward model is a model trained to score (prompt, response) pairs β€” answering "how good is this
output?" It's the missing piece between fine-tuning and verified alignment improvement. Without it
you can train a LoRA and only know quantitatively that loss went down, not whether outputs actually
got better by human-relevant criteria.
### Why it matters
- Validates that LoRA fine-tuning improved quality (not just minimized loss)
- Enables best-of-N sampling: generate N responses, keep highest-scored
- Enables DPO data creation: generate response pairs, reward model labels preferences
- Closes the RLHF loop within the workbench itself
### Reward model options
| Model | Size | Focus |
|-------|------|-------|
| `OpenAssistant/reward-model-deberta-v3-large-v2` | 450M | General helpfulness |
| `Salesforce/SFR-Reward-FsfairX-LLaMA3-RM-v0.1` | 8B | Instruction following |
| MiniCPM5-1B itself (self-eval) | 1B | Domain-specific, via prompt |
For the workbench, using MiniCPM5-1B as a judge (LLM-as-judge pattern) is the lowest-friction
option since the model is already loaded.
### Architecture
```
training/reward_eval.py
RewardEvaluator
.score(prompt, response) β†’ float
.best_of_n(prompt, n, generator) β†’ str
.create_dpo_pairs(dataset, generator, n=4) β†’ Dataset
.eval_lora_vs_base(base_svc, lora_svc, eval_ds) β†’ dict
```
```python
# training/reward_eval.py
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer
import torch
class RewardEvaluator:
def __init__(self, reward_model_id: str = "OpenAssistant/reward-model-deberta-v3-large-v2"):
self.pipe = pipeline(
"text-classification",
model=reward_model_id,
device=0 if torch.cuda.is_available() else -1,
)
def score(self, prompt: str, response: str) -> float:
"""Returns a scalar reward score (higher = better)."""
text = f"Human: {prompt}\n\nAssistant: {response}"
result = self.pipe(text, truncation=True, max_length=512)
return result[0]["score"]
def best_of_n(
self,
prompt: str,
n: int,
generator_fn, # callable: prompt -> str
) -> tuple[str, float]:
"""Generate N responses, return the one with highest reward."""
candidates = [(generator_fn(prompt), ) for _ in range(n)]
scored = [(r[0], self.score(prompt, r[0])) for r in candidates]
return max(scored, key=lambda x: x[1])
def create_dpo_pairs(
self,
prompts: list[str],
generator_fn,
n: int = 4,
):
"""
For each prompt: generate n responses, rank by reward,
return (prompt, chosen, rejected) triplets for DPO training.
"""
from datasets import Dataset
rows = []
for prompt in prompts:
responses = [(generator_fn(prompt), ) for _ in range(n)]
scored = sorted(
[(r[0], self.score(prompt, r[0])) for r in responses],
key=lambda x: x[1], reverse=True
)
if len(scored) >= 2:
rows.append({
"prompt": prompt,
"chosen": scored[0][0],
"rejected": scored[-1][0],
"reward_gap": scored[0][1] - scored[-1][1],
})
return Dataset.from_list(rows)
def eval_lora_vs_base(
self,
base_svc,
lora_svc,
eval_prompts: list[str],
) -> dict:
"""
Compare base vs LoRA checkpoint by average reward score.
Returns win rate and per-prompt scores.
"""
base_scores = [self.score(p, base_svc.generate(p)) for p in eval_prompts]
lora_scores = [self.score(p, lora_svc.generate(p)) for p in eval_prompts]
wins = sum(l > b for l, b in zip(lora_scores, base_scores))
return {
"base_mean": sum(base_scores) / len(base_scores),
"lora_mean": sum(lora_scores) / len(lora_scores),
"lora_win_rate": wins / len(eval_prompts),
"per_prompt": list(zip(eval_prompts, base_scores, lora_scores)),
}
```
### Trackio logging
```python
results = evaluator.eval_lora_vs_base(base_svc, lora_svc, eval_prompts)
trackio.init(project="workbench", run_name="reward_eval")
trackio.log({
"base_reward_mean": results["base_mean"],
"lora_reward_mean": results["lora_mean"],
"lora_win_rate": results["lora_win_rate"],
})
trackio.finish()
```
---
## 4. Synthetic Data Gen
### What it is
The ml-intern finding: *when real data is insufficient, have an LLM generate training data*.
This module does exactly that β€” it uses a capable model (MiniCPM4.1-8B or a cloud model via
HF Router) to generate diverse, high-quality (prompt, response) pairs on demand.
### Why it matters
Real-world fine-tuning is often blocked not by compute but by data. You have 50 good examples
but need 5000. Synthetic gen + quality filtering bridges that gap, especially for specialized
domains (plant species, historical OCR corrections, industrial inspection defect labels).
### Architecture
```
datasets/synthetic.py
SyntheticGenerator
.generate(topic, n, schema) β†’ Dataset
.augment(existing_ds, n) β†’ Dataset
.filter_quality(ds, min_score) β†’ Dataset
.generate_dpo_pairs(topic, n) β†’ Dataset
```
```python
# datasets/synthetic.py
import json
from datasets import Dataset
GENERATION_PROMPT = """You are a training data generator. Generate {n} diverse, high-quality
training examples for the topic: {topic}.
Output ONLY a valid JSON array. Each item must have these fields: {schema}
No explanation, no markdown, no preamble. Raw JSON array only."""
class SyntheticGenerator:
def __init__(self, generator_svc):
"""generator_svc: any loaded ModelService with a .generate(prompt) method."""
self.gen = generator_svc
def generate(
self,
topic: str,
n: int = 100,
schema: dict | None = None,
) -> Dataset:
"""
Generate n training examples on a topic.
schema: dict of field_name β†’ description, e.g.
{"instruction": "task to perform", "response": "ideal answer"}
"""
schema = schema or {"instruction": "user task", "response": "ideal answer"}
schema_str = ", ".join(f'"{k}": "{v}"' for k, v in schema.items())
# Generate in batches of 20 to stay within context
rows = []
for batch_start in range(0, n, 20):
batch_n = min(20, n - batch_start)
prompt = GENERATION_PROMPT.format(
n=batch_n, topic=topic, schema="{" + schema_str + "}"
)
raw = self.gen.generate(prompt)
try:
# Strip any accidental markdown fences
clean = raw.strip().lstrip("```json").lstrip("```").rstrip("```")
batch = json.loads(clean)
rows.extend(batch)
except json.JSONDecodeError:
# Skip malformed batches; log the failure
continue
return Dataset.from_list(rows)
def augment(self, existing_ds: Dataset, n: int) -> Dataset:
"""
Use existing examples as few-shot demonstrations to generate n more.
Samples up to 5 examples from existing_ds as context.
"""
import random
samples = existing_ds.shuffle().select(range(min(5, len(existing_ds))))
examples_str = json.dumps(samples.to_list(), indent=2)
prompt = f"""Here are {len(samples)} example training items:
{examples_str}
Generate {n} MORE diverse examples in the exact same JSON format.
Output only the JSON array, no explanation."""
raw = self.gen.generate(prompt)
try:
clean = raw.strip().lstrip("```json").rstrip("```")
new_rows = json.loads(clean)
except json.JSONDecodeError:
return existing_ds
return Dataset.from_list(existing_ds.to_list() + new_rows)
def filter_quality(
self,
ds: Dataset,
reward_evaluator=None,
min_score: float = 0.6,
) -> Dataset:
"""
Filter with reward model if available, else heuristic filters.
Heuristics: min length, no repeated n-grams, valid JSON fields.
"""
if reward_evaluator:
def _score(row):
return reward_evaluator.score(
row.get("instruction", ""),
row.get("response", "")
) >= min_score
return ds.filter(_score)
else:
# Basic heuristics
def _heuristic(row):
resp = row.get("response", "")
return (
len(resp) >= 20 and # not too short
len(resp) <= 4096 and # not too long
resp.count(resp[:20]) < 3 # not repetitive
)
return ds.filter(_heuristic)
def generate_for_domain(
self,
domain: str,
output_path: str,
n: int = 500,
):
"""
Convenience method: generate, augment, filter, save to disk.
Use for plant ID: domain="Plant species identification from photo descriptions"
"""
ds = self.generate(topic=domain, n=n // 2)
ds = self.augment(ds, n=n // 2)
ds = self.filter_quality(ds)
ds.save_to_disk(output_path)
return ds
```
### Domain-specific example: plant ID
```python
gen = SyntheticGenerator(model_registry.get("minicpm41_8b"))
plant_ds = gen.generate_for_domain(
domain="Identifying plant species from visual descriptions. "
"Include common name, latin name, family, key visual features, and care tips.",
output_path="data/synthetic_plants",
n=2000,
)
# β†’ 2000 synthetic (description β†’ species JSON) training pairs
```
---
## 5. Paper-to-Code Agent
### What it is
An autonomous agent that takes an arXiv paper URL or title, reads the methodology section,
and implements the described technique within the workbench codebase. Directly inspired by
the ml-intern architecture (Research β†’ Plan β†’ Implement β†’ Trace).
### Why it matters
The gap between reading a paper and running an experiment is usually days of engineering. This
agent compresses that to minutes for techniques that fit the workbench's model family.
Practical use cases: implement a new PEFT variant, add a new evaluation metric, adapt a new
data augmentation from a recent VLM paper.
### Architecture
```
agent/paper_agent.py
PaperAgent
.run(paper_ref) β†’ AgentResult
β†’ Phase 1: Research (fetch + parse paper)
β†’ Phase 2: Plan (identify workbench integration points)
β†’ Phase 3: Implement (generate + write code)
β†’ Phase 4: Test (run + log to Trackio)
β†’ Phase 5: Trace (upload session to HF Dataset)
```
```python
# agent/paper_agent.py
import re
from dataclasses import dataclass, field
from huggingface_hub import HfApi
from smolagents import CodeAgent, HfApiModel
import trackio
@dataclass
class AgentResult:
paper_title: str = ""
summary: str = ""
files_modified: list[str] = field(default_factory=list)
test_results: dict = field(default_factory=dict)
trace_url: str = ""
class PaperAgent:
SYSTEM_PROMPT = """You are an ML engineer working inside the OpenBMB Workbench codebase.
Given a research paper, your job is to:
1. Understand the core algorithm or technique.
2. Identify which module in the workbench it extends (training/, models/, datasets/, tools/).
3. Implement it as a new class or function, following the existing patterns.
4. Write a simple test that runs within the workbench and logs results to Trackio.
The workbench uses: transformers, peft, trl, trackio, mcp, gradio.
All new code must: fire events via the EventBus, log to Trackio, register in the Registry."""
def __init__(self, orchestrator_model: str = "openbmb/MiniCPM4.1-8B"):
self.model = HfApiModel(orchestrator_model)
self.api = HfApi()
self._log = []
def run(self, paper_ref: str) -> AgentResult:
"""
paper_ref: arXiv URL like "https://arxiv.org/abs/2106.09685"
or paper title like "LoRA: Low-Rank Adaptation of Large Language Models"
"""
result = AgentResult()
# Phase 1: Research
paper_text = self._fetch_paper(paper_ref)
result.paper_title = self._extract_title(paper_text)
# Phase 2: Plan
plan = self._plan(paper_text)
# Phase 3: Implement
code_files = self._implement(plan, paper_text)
result.files_modified = list(code_files.keys())
for path, code in code_files.items():
self._write_file(path, code)
# Phase 4: Test
trackio.init(project="workbench", run_name=f"paper_agent_{result.paper_title[:30]}")
test_result = self._test(code_files)
result.test_results = test_result
trackio.log({"test_passed": test_result.get("passed", False), **test_result})
trackio.finish()
# Phase 5: Trace (ml-intern pattern)
result.trace_url = self._upload_trace(result)
return result
def _fetch_paper(self, paper_ref: str) -> str:
"""Fetch paper text via HF Papers API or arXiv."""
import requests
if "arxiv.org" in paper_ref:
arxiv_id = paper_ref.split("/abs/")[-1]
r = requests.get(f"https://export.arxiv.org/abs/{arxiv_id}")
return r.text
# Fall back to HF Papers search
from huggingface_hub import list_papers
results = list(list_papers(query=paper_ref, limit=1))
return str(results[0]) if results else ""
def _plan(self, paper_text: str) -> str:
"""Ask the LLM to analyze the paper and produce an integration plan."""
agent = CodeAgent(tools=[], model=self.model, max_steps=5)
return agent.run(
f"Read this paper excerpt and produce a 5-step integration plan "
f"for the OpenBMB Workbench:\n\n{paper_text[:8000]}"
)
def _implement(self, plan: str, paper_text: str) -> dict[str, str]:
"""Generate code files from the plan."""
agent = CodeAgent(tools=[], model=self.model, max_steps=15)
code = agent.run(
f"Implementation plan:\n{plan}\n\n"
f"Paper details:\n{paper_text[:4000]}\n\n"
f"Generate the Python file(s). Return a JSON dict: "
f"{{\"path/to/file.py\": \"file_content\", ...}}"
)
import json
try:
return json.loads(code)
except Exception:
return {}
def _write_file(self, path: str, content: str):
import os
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "w") as f:
f.write(content)
def _test(self, code_files: dict) -> dict:
"""Run a quick import + instantiation test on generated files."""
results = {}
for path in code_files:
try:
module_name = path.replace("/", ".").replace(".py", "")
import importlib.util, sys
spec = importlib.util.spec_from_file_location(module_name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
results[path] = "imported_ok"
except Exception as e:
results[path] = f"error: {e}"
results["passed"] = all("ok" in v for v in results.values())
return results
def _upload_trace(self, result: AgentResult) -> str:
"""Upload session trace to private HF Dataset (ml-intern pattern)."""
import json, datetime
session = {
"timestamp": datetime.datetime.utcnow().isoformat(),
"paper": result.paper_title,
"files_modified": result.files_modified,
"test_results": result.test_results,
}
user = self.api.whoami()["name"]
dataset_id = f"{user}/workbench-paper-sessions"
# Upload as JSONL
# ... (HF Dataset API)
return f"https://huggingface.co/datasets/{dataset_id}"
```
---
## 6. HF Spaces Deploy
### What it is
One-click packaging and deployment of the current workbench state as a Hugging Face Space.
The Space runs the same Gradio app on HF infrastructure, shareable via URL, with ZeroGPU
support for serverless GPU access.
### Why it matters
Sharing a workbench state with a collaborator currently requires: git push, environment setup,
model download, config sync. With one-click deploy: copy URL β†’ colleague sees the live app.
For hackathons this is especially valuable β€” deploy a domain-specific variant as a demo Space
in under 2 minutes.
### Architecture
```
deploy/spaces.py
SpacesDeployer
.prepare_repo() β†’ creates/updates HF Space repo
.upload_code() β†’ pushes app code (not model weights)
.set_hardware(tier) β†’ sets GPU tier in README
.configure_secrets(env_vars) β†’ sets HF Space secrets
.deploy() β†’ trigger Space rebuild
.get_url() β†’ returns live Space URL
```
```python
# deploy/spaces.py
import os, shutil, tempfile
from pathlib import Path
from huggingface_hub import HfApi, SpaceHardware
class SpacesDeployer:
HARDWARE_MAP = {
"cpu": SpaceHardware.CPU_BASIC,
"t4": SpaceHardware.T4_SMALL,
"t4_lg": SpaceHardware.T4_MEDIUM,
"a10": SpaceHardware.A10G_SMALL,
"a100": SpaceHardware.A100_LARGE,
"zero": SpaceHardware.CPU_BASIC, # ZeroGPU: uses CPU_BASIC + @spaces.GPU
}
EXCLUDE = {".git", "__pycache__", "exports", "data", "checkpoints",
"*.gguf", "*.bin", "*.safetensors", ".env"}
def __init__(self, space_id: str, hardware: str = "zero"):
self.api = HfApi()
self.space_id = space_id # "username/my-workbench"
self.hardware = hardware
def deploy(
self,
src_dir: str = ".",
env_vars: dict | None = None,
) -> str:
"""Full deploy pipeline. Returns live Space URL."""
self._create_or_update_repo()
self._upload_code(src_dir)
self._configure_secrets(env_vars or {})
self._patch_app_for_zerogpu()
return f"https://huggingface.co/spaces/{self.space_id}"
def _create_or_update_repo(self):
try:
self.api.create_repo(
repo_id=self.space_id,
repo_type="space",
space_sdk="gradio",
private=False,
exist_ok=True,
)
except Exception as e:
print(f"Repo create/update: {e}")
def _upload_code(self, src_dir: str):
with tempfile.TemporaryDirectory() as tmp:
src = Path(src_dir)
dest = Path(tmp)
# Copy only non-excluded files
for item in src.rglob("*"):
if any(item.match(pat) for pat in self.EXCLUDE):
continue
rel = item.relative_to(src)
target = dest / rel
target.parent.mkdir(parents=True, exist_ok=True)
if item.is_file():
shutil.copy2(item, target)
self.api.upload_folder(
folder_path=str(dest),
repo_id=self.space_id,
repo_type="space",
commit_message="workbench deploy",
)
def _configure_secrets(self, env_vars: dict):
for key, val in env_vars.items():
self.api.add_space_secret(
repo_id=self.space_id,
key=key,
value=val,
)
# Always set hardware
self.api.request_space_hardware(
repo_id=self.space_id,
hardware=self.HARDWARE_MAP[self.hardware],
)
def _patch_app_for_zerogpu(self):
"""
If hardware=zero, wrap inference functions with @spaces.GPU decorator.
Uploads a patched app.py.
"""
if self.hardware != "zero":
return
# Read existing app.py from Space, inject @spaces.GPU, re-upload
app_content = self.api.hf_hub_download(
repo_id=self.space_id, filename="app.py", repo_type="space"
)
with open(app_content) as f:
code = f.read()
# Simple injection: add import and decorator before inference functions
patched = "import spaces\n" + code.replace(
"def run_inference(",
"@spaces.GPU\ndef run_inference("
)
self.api.upload_file(
path_or_fileobj=patched.encode(),
path_in_repo="app.py",
repo_id=self.space_id,
repo_type="space",
)
```
### UI: deploy button in any tab header
```python
with gr.Row():
space_id_box = gr.Textbox(placeholder="username/my-workbench", label="Space ID")
hardware_dd = gr.Dropdown(["cpu","t4","a10","zero"], value="zero", label="Hardware")
deploy_btn = gr.Button("πŸš€ Deploy to HF Spaces")
deploy_url = gr.Textbox(label="Live URL", interactive=False)
def do_deploy(space_id, hw):
d = SpacesDeployer(space_id, hw)
url = d.deploy(env_vars={"HF_TOKEN": os.environ.get("HF_TOKEN","")})
return url
deploy_btn.click(do_deploy, [space_id_box, hardware_dd], deploy_url)
```
---
## 7. VINDEX Integration
### What it is
VINDEX is your own knowledge-editing engine (ki-fusion-labs.de). It exposes eight methods for
mechanistic interpretability and targeted weight editing on transformer models. Integrating it
into the workbench closes the loop between training a LoRA and verifying (or surgically fixing)
what the model actually knows.
VINDEX endpoints (from your PRD v1):
- `logit_lens` β€” per-layer prediction visualization
- `slot_neighbors` β€” embedding space neighbors
- `layer_contribution` β€” per-layer influence on final prediction
- `transition_spectrum` β€” attention head transition matrix
- `calibrated_edit` β€” targeted fact edit in weight space
- `derive_scale` β€” scaling factor derivation for safe edits
- `star_spread` β€” spread edit across semantically related slots
- `protect_relations` β€” guard against side effects
### Why it matters for the workbench
After LoRA fine-tuning a model on plant data:
1. `logit_lens` β€” does the model correctly predict "Rosa" at the right layer for a rose image?
2. `calibrated_edit` β€” if it consistently misidentifies Acer as Quercus, edit that slot
3. `star_spread` β€” propagate the Acer correction to closely related maple species
4. `protect_relations` β€” verify the edit didn't break "plant β†’ living thing β†’ organism"
This is not possible with LoRA alone β€” LoRA changes weight statistics globally. VINDEX does
surgical point edits, making it a *complement* to LoRA, not a replacement.
### Architecture
```
tools/vindex_tool.py
VINDEXClient
.logit_lens(model, tokenizer, text) β†’ dict[layer, prediction]
.calibrated_edit(model, subject, relation, old_obj, new_obj) β†’ model
.star_spread(model, anchor_subject, n=5) β†’ list[affected_slots]
.protect_relations(model, protected_triplets) β†’ model
.layer_contribution(model, text) β†’ dict[layer, score]
.slot_neighbors(model, token_id, n=10) β†’ list[str]
```
```python
# tools/vindex_tool.py
"""
VINDEX integration.
Assumes VINDEX FastAPI server is running locally on port 8765,
OR VINDEX modules are importable from your local install.
"""
import requests
from mcp.server.fastmcp import FastMCP
VINDEX_BASE = "http://localhost:8765" # your local VINDEX FastAPI
mcp = FastMCP("VINDEXTools")
@mcp.tool()
async def logit_lens(
model_id: str,
text: str,
layer_range: tuple[int, int] = (0, -1),
) -> dict:
"""
Run logit lens on a loaded model for the given text.
Returns per-layer top-5 token predictions and probabilities.
Useful for finding the 'phase layer' where the model commits to an answer.
"""
r = requests.post(f"{VINDEX_BASE}/logit_lens", json={
"model_id": model_id,
"text": text,
"layer_range": list(layer_range),
})
return r.json()
@mcp.tool()
async def calibrated_edit(
model_id: str,
subject: str,
relation: str,
old_obj: str,
new_obj: str,
causal_window: int = 3, # Β±3 layers around logit lens phase layer
) -> dict:
"""
Perform a targeted knowledge edit: change the model's belief about
(subject, relation) from old_obj to new_obj.
causal_window: restrict causal search to Β±N layers around phase layer.
Returns edit_success, layers_modified, side_effect_score.
"""
r = requests.post(f"{VINDEX_BASE}/calibrated_edit", json={
"model_id": model_id,
"subject": subject,
"relation": relation,
"old_obj": old_obj,
"new_obj": new_obj,
"causal_window": causal_window,
})
return r.json()
@mcp.tool()
async def star_spread(
model_id: str,
anchor_subject: str,
n_neighbors: int = 5,
) -> dict:
"""
Find semantically related slots and spread a recent edit across them.
Example: after editing "Acer palmatum β†’ maple", also update
"Acer japonicum", "Acer shirasawanum" etc.
Returns list of affected subjects and their edit scores.
"""
r = requests.post(f"{VINDEX_BASE}/star_spread", json={
"model_id": model_id,
"anchor_subject": anchor_subject,
"n_neighbors": n_neighbors,
})
return r.json()
@mcp.tool()
async def protect_relations(
model_id: str,
protected_triplets: list[dict], # [{"s": ..., "r": ..., "o": ...}]
) -> dict:
"""
After a knowledge edit, verify that listed subject-relation-object triplets
remain intact. Returns a pass/fail table and a side_effect_score.
"""
r = requests.post(f"{VINDEX_BASE}/protect_relations", json={
"model_id": model_id,
"protected_triplets": protected_triplets,
})
return r.json()
@mcp.tool()
async def layer_contribution(
model_id: str,
text: str,
) -> dict:
"""
Per-layer contribution score to the final prediction.
Use to find which layers drive the target behavior before editing.
"""
r = requests.post(f"{VINDEX_BASE}/layer_contribution", json={
"model_id": model_id,
"text": text,
})
return r.json()
```
### Known bugs to fix before integration (from VINDEX PRD v1)
1. **GPU memory leak** β€” after repeated edits, VRAM grows unbounded. Fix: explicitly call
`torch.cuda.empty_cache()` after each `calibrated_edit` call and detach gradient graphs.
2. **Dead-code blocks** β€” several helper functions in the weight-surgery path are unreachable
after a recent refactor. Before integrating: `grep -n "def " vindex/core.py | xargs` and
verify each function has at least one call site.
3. **Weight imbalance** β€” `star_spread` can over-edit related slots if `n_neighbors > 5`.
Hard-cap at 5 in the MCP tool until the scaling formula is validated.
4. **Forward optimization** β€” restrict causal search to Β±3 layers around the logit lens
phase layer (already implemented as `causal_window` param above).
### UI tab: "🧠 Knowledge Editor"
```
Inputs:
Model selector (loaded models)
Subject text (e.g. "Acer palmatum")
Relation (e.g. "is a type of")
Old object (e.g. "oak")
New object (e.g. "maple")
[Run Logit Lens] button β†’ displays per-layer heatmap via gr.Plot
[Apply Edit] button β†’ runs calibrated_edit
[Spread] button β†’ runs star_spread
[Verify] button β†’ runs protect_relations on a default triplet set
Outputs:
Per-layer prediction table
Edit success / layers modified
Side effect score (0 = safe, 1 = dangerous)
```
---
## 8. OCR Pipeline Hook
### What it is
Your self-improving multilingual OCR pipeline (Latin, Arabic, Cyrillic) already exists and
produces output files: image + predicted_text + confidence scores. This extension hooks those
outputs directly into the workbench Field Notes system, creating a tight correction loop:
```
OCR pipeline outputs (uncertain predictions)
↓
Auto-created Field Notes (image + OCR text + empty correction field)
↓
Human reviews in UI β†’ fills in correction
↓
Accepted corrections auto-tagged "use_for_training=True"
↓
LoRA training run on correction pairs
↓
Better OCR model β†’ fewer uncertain predictions
```
This is the active learning loop your OCR pipeline was designed for but didn't yet have
a clean UI for corrections and retraining.
### Architecture
```
datasets/ocr_loader.py
OCRPipelineLoader
.watch(output_dir, threshold) β†’ poll for new low-confidence outputs
.ingest(output_dir) β†’ batch import all outputs
.to_field_notes(threshold) β†’ FieldNote[] (uncertain ones only)
.to_training_dataset() β†’ Dataset (corrected ones only)
```
```python
# datasets/ocr_loader.py
import json, os
from pathlib import Path
from datasets.field_notes import FieldNote, FieldNoteStore
from core.events import bus, EventType, Event
class OCRPipelineLoader:
"""
Watches a directory written by the OCR pipeline.
Expected format per document:
<doc_id>.json β†’ {"image_path": ..., "predicted_text": ...,
"confidence": float, "script": "latin"|"arabic"|"cyrillic"}
"""
def __init__(
self,
output_dir: str,
store: FieldNoteStore,
confidence_threshold: float = 0.85,
):
self.output_dir = Path(output_dir)
self.store = store
self.threshold = confidence_threshold
def ingest(self, limit: int | None = None) -> int:
"""
Read all pipeline outputs. Create Field Notes for uncertain predictions
(confidence < threshold). Skip already-ingested docs.
Returns number of new Field Notes created.
"""
count = 0
json_files = sorted(self.output_dir.glob("*.json"))
if limit:
json_files = json_files[:limit]
for jf in json_files:
try:
data = json.loads(jf.read_text())
except json.JSONDecodeError:
continue
# Skip high-confidence outputs
if data.get("confidence", 1.0) >= self.threshold:
continue
note = FieldNote(
id=f"ocr_{jf.stem}",
model_id="ocr_pipeline",
modality="image",
image_path=data["image_path"],
prompt=(
f"Transcribe this {data.get('script','latin')} text accurately. "
f"OCR predicted: '{data['predicted_text']}'"
),
response=data["predicted_text"],
correction="", # human fills this in
tags=[
f"script:{data.get('script','unknown')}",
f"conf:{data.get('confidence',0.0):.2f}",
"source:ocr_pipeline",
],
)
self.store.save(note)
count += 1
# Fire event
import asyncio
asyncio.run(bus.emit(Event(
type=EventType.DATASET_LOADED,
payload={"source": "ocr_pipeline", "new_notes": count}
)))
return count
def watch(self, poll_interval: int = 30):
"""
Background thread: poll output_dir every N seconds, ingest new files.
Use in production when OCR pipeline runs continuously.
"""
import threading, time
seen = set()
def _poll():
while True:
for jf in self.output_dir.glob("*.json"):
if jf.stem not in seen:
seen.add(jf.stem)
self.ingest.__wrapped__([jf]) # single-file ingest
time.sleep(poll_interval)
t = threading.Thread(target=_poll, daemon=True)
t.start()
def to_training_dataset(self, script_filter: str | None = None):
"""
Export corrected field notes as a training dataset.
schema: {"image_path": ..., "instruction": ..., "response": ...}
Ready to pass to LoRATextTrainer or a vision LoRA config.
"""
from datasets import Dataset
query = "SELECT data FROM notes WHERE json_extract(data,'$.correction') != ''"
if script_filter:
query += f" AND json_extract(data,'$.tags') LIKE '%script:{script_filter}%'"
rows = [
json.loads(r[0])
for r in self.store.conn.execute(query)
]
training_rows = [
{
"image_path": r["image_path"],
"instruction": r["prompt"],
"response": r["correction"], # human-corrected
"script": next(
(t.split(":")[1] for t in r["tags"] if t.startswith("script:")),
"unknown"
),
}
for r in rows
]
return Dataset.from_list(training_rows)
```
### UI: OCR correction view (Field Notes tab, new subtab)
```
[OCR Pipeline Output dir: ____] [Confidence threshold: 0.85] [Ingest]
Table of uncertain predictions:
| Image | OCR text | Confidence | Your correction | Save |
| [img] | "Rechung 18. Ap" | 0.73 | [____________] | [βœ“] |
| [img] | "Beschlußprotoko" | 0.69 | [____________] | [βœ“] |
[Export corrections as training dataset] [Start LoRA retrain]
```
### Connection to the active learning loop
Your OCR pipeline already has:
- `abstention logic` (the "council abstains" on uncertain predictions)
- `acceptance-gated fine-tuning`
- `RAG-based post-correction`
The workbench hook provides the missing UI layer: human-in-the-loop corrections that feed
the acceptance gate. The `FieldNoteStore.to_hf_dataset()` output plugs directly into the
pipeline's `acceptance-gated fine-tuning` step.
---
## 9. MiniCPM Desk-Pet
### What it is
OpenBMB ships `MiniCPM-Desk-Pet`, a desktop companion app powered by MiniCPM5-1B, alongside
the model release (2026-05-19). Key features:
- Runs locally on Apple Silicon, NVIDIA GPU, or CPU
- LoRA persona switching β€” different personalities loaded as adapters
- Integrates with coding agents (Cursor, Claude Code, Codex)
- Tiny footprint (~2GB VRAM with Q4_K_M)
The workbench extension lets you train LoRA personas directly and export them to the Desk-Pet
format.
### What "persona" means here
A LoRA persona is a small adapter (rank 8–16) trained on ~100–500 conversation examples in a
specific voice or style. Examples:
- "Botanist assistant" β€” answers in scientific plant terminology
- "Friendly field guide" β€” casual, encouraging tone for beginners
- "Historical document expert" β€” formal, precise, citation-aware (connects to OCR pipeline)
Training data is small enough that synthetic gen (Extension 4) can produce it in minutes.
### Architecture
```
agent/desk_pet.py
DeskPetExporter
.train_persona(name, style_desc, n_examples) β†’ LoRA checkpoint
.export_to_deskpet(checkpoint_path) β†’ deskpet_compatible.gguf
.list_personas() β†’ [PersonaMeta]
.load_persona(name) β†’ activates adapter in current session
```
```python
# agent/desk_pet.py
from dataclasses import dataclass
from pathlib import Path
import json, shutil
@dataclass
class PersonaMeta:
name: str
description: str
checkpoint: str
gguf_path: str | None = None
n_examples: int = 0
class DeskPetExporter:
PERSONA_DIR = Path("data/personas")
def __init__(self, base_model_id: str = "openbmb/MiniCPM5-1B"):
self.base_model_id = base_model_id
self.PERSONA_DIR.mkdir(parents=True, exist_ok=True)
def train_persona(
self,
name: str,
style_desc: str,
n_examples: int = 200,
lora_rank: int = 8, # small rank: personas need only ~50-100 examples
) -> str:
"""
1. Use SyntheticGenerator to create conversation examples in the persona style.
2. Fine-tune MiniCPM5-1B LoRA.
3. Save checkpoint.
Returns checkpoint path.
"""
from datasets.synthetic import SyntheticGenerator
from training.lora import LoRATextTrainer
from models.minicpm_text import MiniCPMTextService
import torch
# Generate persona training data
from transformers import AutoModelForCausalLM, AutoTokenizer
base_model = AutoModelForCausalLM.from_pretrained(
self.base_model_id, torch_dtype=torch.bfloat16, device_map="auto"
)
base_tok = AutoTokenizer.from_pretrained(self.base_model_id)
gen_svc = MiniCPMTextService.__new__(MiniCPMTextService)
gen_svc.model, gen_svc.tokenizer = base_model, base_tok
synth = SyntheticGenerator(gen_svc)
ds = synth.generate(
topic=f"Conversation examples in the style of: {style_desc}. "
f"Each example: a user message and a response in that persona's voice.",
n=n_examples,
schema={"instruction": "user message", "response": "persona reply"},
)
# LoRA fine-tune
trainer = LoRATextTrainer(cfg={
"lora_rank": lora_rank,
"lora_alpha": lora_rank * 2,
"epochs": 2,
"batch_size": 8,
"grad_accum": 2,
})
output_dir = str(self.PERSONA_DIR / name / "checkpoint")
trainer.train(base_model, base_tok, ds, run_name=f"persona_{name}")
# Save metadata
meta = PersonaMeta(
name=name, description=style_desc,
checkpoint=output_dir, n_examples=n_examples
)
(self.PERSONA_DIR / name / "meta.json").write_text(
json.dumps(meta.__dict__, indent=2)
)
return output_dir
def export_to_deskpet(self, persona_name: str) -> str:
"""
Merge LoRA into base weights, then export as GGUF for Desk-Pet.
Returns path to merged GGUF.
"""
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
from training.export import GGUFExporter
meta_path = self.PERSONA_DIR / persona_name / "meta.json"
meta = PersonaMeta(**json.loads(meta_path.read_text()))
# Merge LoRA into base
base = AutoModelForCausalLM.from_pretrained(
self.base_model_id, torch_dtype=torch.bfloat16
)
peft_model = PeftModel.from_pretrained(base, meta.checkpoint)
merged = peft_model.merge_and_unload()
merged_path = str(self.PERSONA_DIR / persona_name / "merged")
merged.save_pretrained(merged_path)
AutoTokenizer.from_pretrained(self.base_model_id).save_pretrained(merged_path)
# Export GGUF
exporter = GGUFExporter()
gguf_path = exporter.export(
model_path=merged_path,
output_dir=str(self.PERSONA_DIR / persona_name / "gguf"),
quant="Q4_K_M",
model_type="text",
)[1] # [0] = F16, [1] = quantized
# Update metadata
meta.gguf_path = gguf_path
meta_path.write_text(json.dumps(meta.__dict__, indent=2))
return gguf_path
```
### Usage flow
```
1. UI: "New persona" β†’ enter name + style description
2. Synthetic gen: 200 examples of that voice β†’ fine-tune LoRA (rank 8, ~10 min on RTX)
3. Export β†’ Q4_K_M GGUF
4. Copy to Desk-Pet personas/ dir
5. Desk-Pet: switch persona β†’ instant personality change
```
---
## 10. MiniCPM-o Audio Tab
### What it is
MiniCPM-o-4.5 (released 2026-05-17) is a true omnimodal model β€” it sees, listens, and speaks
simultaneously in real-time. It supports proactive interactions (proactive reminding) and
real-time conversation with both visual and audio input.
This extension adds a new Gradio tab with a microphone + camera (or image) interface, streaming
audio output, and real-time MiniCPM-o inference.
### Architecture
```
ui/audio_tab.py
OmnimodalTab
.build() β†’ gr.Column with audio+image inputs and streaming output
models/minicpm_omni.py
MiniCPMOmniService
.stream_chat(audio_bytes, image=None, text=None) β†’ Generator[str]
.speak(text) β†’ bytes (TTS for audio output)
```
```python
# models/minicpm_omni.py
import torch
import numpy as np
from transformers import AutoProcessor, AutoModel
class MiniCPMOmniService:
"""
MiniCPM-o-4.5: omnimodal service.
Handles text + image + audio simultaneously.
"""
MODEL_ID = "openbmb/MiniCPM-o-4.5"
def __init__(self, cfg: dict):
self.processor = AutoProcessor.from_pretrained(
self.MODEL_ID, trust_remote_code=True
)
self.model = AutoModel.from_pretrained(
self.MODEL_ID,
trust_remote_code=True,
torch_dtype=torch.bfloat16,
device_map="auto",
)
self.thinking = cfg.get("thinking_mode", False)
def chat(
self,
text: str | None = None,
image=None, # PIL Image
audio: np.ndarray | None = None,
sample_rate: int = 16000,
) -> str:
"""
Full omnimodal chat: pass any combination of text, image, audio.
MiniCPM-o-4.5 handles them natively.
"""
content = []
if image is not None:
content.append({"type": "image", "image": image})
if audio is not None:
content.append({
"type": "audio",
"audio": audio,
"sample_rate": sample_rate,
})
if text:
content.append({"type": "text", "text": text})
messages = [{"role": "user", "content": content}]
inputs = self.processor.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt",
return_dict=True,
chat_template_kwargs={"enable_thinking": self.thinking},
).to(self.model.device)
with torch.no_grad():
outputs = self.model.generate(**inputs, max_new_tokens=512)
return self.processor.decode(
outputs[0][inputs["input_ids"].shape[1]:],
skip_special_tokens=True
)
```
```python
# ui/audio_tab.py
import gradio as gr
import numpy as np
def build_audio_tab(model_registry):
with gr.Column():
gr.Markdown("### πŸŽ™οΈ Omnimodal β€” MiniCPM-o-4.5")
gr.Markdown("Speak, show an image, or type β€” all at once.")
with gr.Row():
audio_in = gr.Audio(
sources=["microphone"],
type="numpy",
label="Microphone input",
streaming=True,
)
image_in = gr.Image(
type="pil",
label="Optional image",
)
text_in = gr.Textbox(label="Optional text", placeholder="Or type here...")
submit = gr.Button("Send")
output = gr.Textbox(label="Response", lines=8)
audio_out = gr.Audio(label="Spoken response (TTS)", autoplay=True)
def respond(audio_data, image, text):
svc = model_registry.get("minicpm_o45")
audio_arr = None
if audio_data is not None:
sr, arr = audio_data
audio_arr = arr.astype(np.float32) / 32768.0
response = svc.chat(text=text, image=image, audio=audio_arr)
# Optional: TTS for spoken output
# spoken = tts(response)
return response, None # None = no audio out yet
submit.click(respond, [audio_in, image_in, text_in], [output, audio_out])
```
### Real-time streaming version (advanced)
For true real-time conversation (proactive reminding, interrupt detection):
```python
# streaming audio inference using gradio's streaming audio + SSE
demo = gr.Interface(
fn=stream_omni_response,
inputs=[
gr.Audio(streaming=True, sources=["microphone"]),
gr.Image(type="pil"),
],
outputs=gr.Textbox(),
live=True,
)
```
### TTS for audio output
MiniCPM-o-4.5 has its own audio generation capability β€” check the model card for the
`generate_audio` method. Fallback: use `kokoro-82M` (Apache 2.0, 82M, runs locally) for TTS.
```python
# Kokoro TTS fallback (lightweight, local)
from kokoro import KPipeline
tts_pipe = KPipeline(lang_code="en-us")
audio, sr = tts_pipe(response_text, voice="af_heart")
```
---
## 11. Cross-Extension Wiring
Most extensions are independent, but several combinations unlock powerful compound workflows:
### OCR β†’ VINDEX
Low-confidence OCR outputs β†’ Field Notes β†’ human corrections β†’ LoRA retrain. But additionally:
if the OCR model consistently misreads a specific character class, use VINDEX's `logit_lens` to
identify which layer is responsible, then `calibrated_edit` to target that slot directly β€” a
faster fix than a full retraining cycle.
### Synthetic Gen β†’ Reward Model β†’ DPO
```
SyntheticGenerator.generate(topic, n=1000)
β†’ RewardEvaluator.create_dpo_pairs(prompts, generator, n=4)
β†’ DPO training via TRL DPOTrainer
β†’ Trackio logs win rate
β†’ VINDEX verify alignment not broken
```
### Paper Agent β†’ Desk-Pet Persona
```
PaperAgent reads: "Persona-based dialogue systems for domain experts"
β†’ Implements: persona training data format
β†’ DeskPetExporter.train_persona("expert_botanist", "...")
β†’ Export GGUF β†’ load in Desk-Pet
```
### HF Spaces + vLLM + Trackio
```
SpacesDeployer.deploy(hardware="a10") # production GPU
β†’ app.py runs VLLMRunner on A10G
β†’ All requests logged via Trackio with space_id=deployed_space
β†’ Dashboard visible to collaborators at trackio Space URL
```
### Full active-learning loop (all extensions combined)
```
MiniCPM-o Audio Tab: user speaks + shows image of plant
↓
OmnimodalService: identify species (low confidence)
↓
OCRPipelineLoader: auto-create Field Note (uncertain prediction)
↓
Human: corrects species name in UI
↓
SyntheticGenerator: augment with 50 similar examples
↓
RewardEvaluator: filter synthetic examples
↓
LoRATextTrainer + TRL + Trackio: fine-tune
↓
VINDEX: verify the target species slot was corrected
↓
DeskPetExporter: export updated persona
↓
SpacesDeployer: push updated app to HF Spaces
```
---
*Extensions spec v1.0 β€” Christof Kaller / ki-fusion-labs.de β€” 2026-06-05*