MergeMind / pipeline.py
Tejha's picture
Upload 10 files
66153d5 verified
"""
Master Pipeline Orchestrator
==============================
Runs all 5 phases end-to-end or individually.
Usage:
# Full pipeline for 'code' category
python pipeline.py run --category code
# Just discover + merge
python pipeline.py run --category reasoning --skip-ft --skip-serve
# Iterative improvement loop
python pipeline.py run --category medical --loop --max-iter 3
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional
import typer
from rich.console import Console
from rich.panel import Panel
from rich.rule import Rule
from configs.settings import (
FT_BASE_MODEL, TOP_K_CANDIDATES, MERGES_DIR, ADAPTERS_DIR, HF_ORG
)
from utils.logger import logger
app = typer.Typer(help="LLM Pipeline β€” full orchestrator")
console = Console()
def _banner(text: str):
console.print(Rule(f"[bold cyan]{text}[/bold cyan]"))
@app.command()
def run(
category: str = typer.Argument("reasoning", help="Model category to target"),
top_k: int = typer.Option(TOP_K_CANDIDATES, help="Candidates per category"),
strategy: str = typer.Option("ties", help="Merge strategy: slerp|ties|dare_ties|task_arithmetic|breadcrumbs"),
base_model: str = typer.Option(FT_BASE_MODEL, help="Base model for fine-tuning"),
n_eval: int = typer.Option(100, help="Eval samples"),
n_syn: int = typer.Option(50, help="Synthetic samples per gap"),
ft_epochs: int = typer.Option(2, help="Fine-tuning epochs"),
loop: bool = typer.Option(False, "--loop", help="Enable iterative improvement loop"),
max_iter: int = typer.Option(3, help="Max loop iterations"),
skip_discover: bool = typer.Option(False, "--skip-discover"),
skip_merge: bool = typer.Option(False, "--skip-merge"),
skip_eval: bool = typer.Option(False, "--skip-eval"),
skip_ft: bool = typer.Option(False, "--skip-ft"),
skip_serve: bool = typer.Option(True, "--skip-serve/--serve"), # off by default
deploy: bool = typer.Option(False, "--deploy"),
hf_repo: str = typer.Option("", "--repo"),
use_wandb: bool = typer.Option(False, "--wandb"),
use_mergekit: bool = typer.Option(False, "--mergekit/--no-mergekit"),
):
console.print(Panel(
f"[bold]LLM Pipeline[/bold]\n"
f"Category: [cyan]{category}[/cyan] | Strategy: [magenta]{strategy}[/magenta] | "
f"Base: [green]{base_model.split('/')[-1]}[/green]",
title="Starting",
))
# ──────────────────────────────
# Phase 1: Discovery
# ──────────────────────────────
candidates = []
if not skip_discover:
_banner("Phase 1 β€” Discovery")
from phase1_discovery.discover import discover
candidates = discover(category, top_k=top_k)
if not candidates:
logger.error("No candidates found. Exiting.")
raise typer.Exit(1)
console.print(f"[green]βœ“ Found {len(candidates)} candidates[/green]")
# ──────────────────────────────
# Phase 2: Merging
# ──────────────────────────────
merged_path: Optional[Path] = None
if not skip_merge and candidates:
_banner("Phase 2 β€” Merging")
from phase2_merging.merge import merge_models
model_ids = [c.model_id for c in candidates[:3]] # merge top-3
merged_path = merge_models(
strategy = strategy,
models = model_ids,
base_model = model_ids[0],
use_mergekit = use_mergekit,
)
console.print(f"[green]βœ“ Merged β†’ {merged_path}[/green]")
eval_model = str(merged_path) if merged_path else base_model
# ──────────────────────────────
# Phase 3: Evaluation
# ──────────────────────────────
eval_result = None
if not skip_eval:
_banner("Phase 3 β€” Evaluation")
from phase3_evaluation.evaluate import evaluate, load_squad
samples = load_squad(n_eval)
eval_result = evaluate(eval_model, samples, category, run_judge=True)
console.print(
f"[green]βœ“ ROUGE-1: {eval_result.avg_rouge1:.3f} | "
f"BERTScore: {eval_result.avg_bertscore:.3f} | "
f"Judge: {eval_result.avg_judge_score:.1f}/10[/green]"
)
# ──────────────────────────────
# Phase 4: Fine-tuning
# ──────────────────────────────
best_adapter: Optional[Path] = None
if not skip_ft:
_banner("Phase 4 β€” Fine-Tuning")
from phase4_finetuning.finetune import improvement_loop, fine_tune, generate_synthetic_data, format_as_hf_dataset
from phase3_evaluation.evaluate import load_squad
if loop:
# Full iterative loop
best_adapter = improvement_loop(
base_model_id = eval_model,
eval_samples_fn = lambda: load_squad(min(50, n_eval)),
max_iterations = max_iter,
n_syn_per_gap = n_syn,
use_wandb = use_wandb,
)
elif eval_result and eval_result.gap_categories:
# One-shot: target detected gaps
from configs.settings import ADAPTERS_DIR
syn = generate_synthetic_data(eval_result.gap_categories, n_per_gap=n_syn)
dataset = format_as_hf_dataset(syn)
adapter_dir = ADAPTERS_DIR / f"{category}_{strategy}"
best_adapter = fine_tune(
base_model_id = eval_model,
dataset = dataset,
output_dir = adapter_dir,
run_name = f"{category}-{strategy}",
epochs = ft_epochs,
use_wandb = use_wandb,
)
else:
logger.info("[FT] No gaps detected, skipping fine-tuning")
if best_adapter:
console.print(f"[green]βœ“ Adapter β†’ {best_adapter}[/green]")
# ──────────────────────────────
# Phase 5a: MLOps tracking
# ──────────────────────────────
_banner("Phase 5 β€” MLOps")
if eval_result:
from phase5_mlops.serve import ExperimentTracker, ExperimentMetrics
metrics = ExperimentMetrics(
run_name = f"{category}-{strategy}",
model_id = eval_model,
merge_strategy = strategy,
base_models = [c.model_id for c in candidates] if candidates else [],
avg_rouge1 = eval_result.avg_rouge1,
avg_rouge2 = eval_result.avg_rouge2,
avg_rougeL = eval_result.avg_rougeL,
avg_bertscore = eval_result.avg_bertscore,
avg_faithfulness = eval_result.avg_faithfulness,
hallucination_rate = eval_result.hallucination_rate,
avg_judge_score = eval_result.avg_judge_score,
gap_categories = eval_result.gap_categories,
)
tracker = ExperimentTracker(use_wandb=use_wandb, use_mlflow=True)
tracker.log(metrics)
console.print("[green]βœ“ Experiment tracked[/green]")
# ──────────────────────────────
# Phase 5b: HF Hub deploy
# ──────────────────────────────
if deploy:
from phase5_mlops.serve import deploy_to_hub, ExperimentMetrics
repo = hf_repo or f"{HF_ORG}/{category}-{strategy}-7b"
deploy_to_hub(
model_path = str(merged_path or base_model),
repo_id = repo,
metrics = metrics if eval_result else None,
)
# ──────────────────────────────
# Phase 5c: vLLM server
# ──────────────────────────────
if not skip_serve:
_banner("Phase 5 β€” Inference Server")
final_model = str(best_adapter or merged_path or base_model)
from phase5_mlops.serve import VLLMServer
server = VLLMServer(final_model)
server.start_api_server()
# ──────────────────────────────
# Summary
# ──────────────────────────────
_banner("Pipeline Complete")
from phase5_mlops.serve import print_pipeline_summary
print_pipeline_summary()
@app.command("leaderboard")
def leaderboard():
"""Print the evaluation leaderboard from saved results."""
from phase5_mlops.serve import print_pipeline_summary
print_pipeline_summary()
@app.command("introspect")
def introspect(model: str = typer.Argument(..., help="Model ID to introspect")):
"""Print DOM-tree-style architecture of a model."""
from phase2_merging.merge import introspect_architecture, print_architecture_tree
from transformers import AutoModelForCausalLM
from configs.settings import HF_TOKEN
logger.info(f"Loading {model}...")
m = AutoModelForCausalLM.from_pretrained(model, device_map="cpu", token=HF_TOKEN or None)
root = introspect_architecture(m, model)
print_architecture_tree(root)
if __name__ == "__main__":
app()