File size: 9,855 Bytes
66153d5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | """
Master Pipeline Orchestrator
==============================
Runs all 5 phases end-to-end or individually.
Usage:
# Full pipeline for 'code' category
python pipeline.py run --category code
# Just discover + merge
python pipeline.py run --category reasoning --skip-ft --skip-serve
# Iterative improvement loop
python pipeline.py run --category medical --loop --max-iter 3
"""
from __future__ import annotations
from pathlib import Path
from typing import Optional
import typer
from rich.console import Console
from rich.panel import Panel
from rich.rule import Rule
from configs.settings import (
FT_BASE_MODEL, TOP_K_CANDIDATES, MERGES_DIR, ADAPTERS_DIR, HF_ORG
)
from utils.logger import logger
app = typer.Typer(help="LLM Pipeline β full orchestrator")
console = Console()
def _banner(text: str):
console.print(Rule(f"[bold cyan]{text}[/bold cyan]"))
@app.command()
def run(
category: str = typer.Argument("reasoning", help="Model category to target"),
top_k: int = typer.Option(TOP_K_CANDIDATES, help="Candidates per category"),
strategy: str = typer.Option("ties", help="Merge strategy: slerp|ties|dare_ties|task_arithmetic|breadcrumbs"),
base_model: str = typer.Option(FT_BASE_MODEL, help="Base model for fine-tuning"),
n_eval: int = typer.Option(100, help="Eval samples"),
n_syn: int = typer.Option(50, help="Synthetic samples per gap"),
ft_epochs: int = typer.Option(2, help="Fine-tuning epochs"),
loop: bool = typer.Option(False, "--loop", help="Enable iterative improvement loop"),
max_iter: int = typer.Option(3, help="Max loop iterations"),
skip_discover: bool = typer.Option(False, "--skip-discover"),
skip_merge: bool = typer.Option(False, "--skip-merge"),
skip_eval: bool = typer.Option(False, "--skip-eval"),
skip_ft: bool = typer.Option(False, "--skip-ft"),
skip_serve: bool = typer.Option(True, "--skip-serve/--serve"), # off by default
deploy: bool = typer.Option(False, "--deploy"),
hf_repo: str = typer.Option("", "--repo"),
use_wandb: bool = typer.Option(False, "--wandb"),
use_mergekit: bool = typer.Option(False, "--mergekit/--no-mergekit"),
):
console.print(Panel(
f"[bold]LLM Pipeline[/bold]\n"
f"Category: [cyan]{category}[/cyan] | Strategy: [magenta]{strategy}[/magenta] | "
f"Base: [green]{base_model.split('/')[-1]}[/green]",
title="Starting",
))
# ββββββββββββββββββββββββββββββ
# Phase 1: Discovery
# ββββββββββββββββββββββββββββββ
candidates = []
if not skip_discover:
_banner("Phase 1 β Discovery")
from phase1_discovery.discover import discover
candidates = discover(category, top_k=top_k)
if not candidates:
logger.error("No candidates found. Exiting.")
raise typer.Exit(1)
console.print(f"[green]β Found {len(candidates)} candidates[/green]")
# ββββββββββββββββββββββββββββββ
# Phase 2: Merging
# ββββββββββββββββββββββββββββββ
merged_path: Optional[Path] = None
if not skip_merge and candidates:
_banner("Phase 2 β Merging")
from phase2_merging.merge import merge_models
model_ids = [c.model_id for c in candidates[:3]] # merge top-3
merged_path = merge_models(
strategy = strategy,
models = model_ids,
base_model = model_ids[0],
use_mergekit = use_mergekit,
)
console.print(f"[green]β Merged β {merged_path}[/green]")
eval_model = str(merged_path) if merged_path else base_model
# ββββββββββββββββββββββββββββββ
# Phase 3: Evaluation
# ββββββββββββββββββββββββββββββ
eval_result = None
if not skip_eval:
_banner("Phase 3 β Evaluation")
from phase3_evaluation.evaluate import evaluate, load_squad
samples = load_squad(n_eval)
eval_result = evaluate(eval_model, samples, category, run_judge=True)
console.print(
f"[green]β ROUGE-1: {eval_result.avg_rouge1:.3f} | "
f"BERTScore: {eval_result.avg_bertscore:.3f} | "
f"Judge: {eval_result.avg_judge_score:.1f}/10[/green]"
)
# ββββββββββββββββββββββββββββββ
# Phase 4: Fine-tuning
# ββββββββββββββββββββββββββββββ
best_adapter: Optional[Path] = None
if not skip_ft:
_banner("Phase 4 β Fine-Tuning")
from phase4_finetuning.finetune import improvement_loop, fine_tune, generate_synthetic_data, format_as_hf_dataset
from phase3_evaluation.evaluate import load_squad
if loop:
# Full iterative loop
best_adapter = improvement_loop(
base_model_id = eval_model,
eval_samples_fn = lambda: load_squad(min(50, n_eval)),
max_iterations = max_iter,
n_syn_per_gap = n_syn,
use_wandb = use_wandb,
)
elif eval_result and eval_result.gap_categories:
# One-shot: target detected gaps
from configs.settings import ADAPTERS_DIR
syn = generate_synthetic_data(eval_result.gap_categories, n_per_gap=n_syn)
dataset = format_as_hf_dataset(syn)
adapter_dir = ADAPTERS_DIR / f"{category}_{strategy}"
best_adapter = fine_tune(
base_model_id = eval_model,
dataset = dataset,
output_dir = adapter_dir,
run_name = f"{category}-{strategy}",
epochs = ft_epochs,
use_wandb = use_wandb,
)
else:
logger.info("[FT] No gaps detected, skipping fine-tuning")
if best_adapter:
console.print(f"[green]β Adapter β {best_adapter}[/green]")
# ββββββββββββββββββββββββββββββ
# Phase 5a: MLOps tracking
# ββββββββββββββββββββββββββββββ
_banner("Phase 5 β MLOps")
if eval_result:
from phase5_mlops.serve import ExperimentTracker, ExperimentMetrics
metrics = ExperimentMetrics(
run_name = f"{category}-{strategy}",
model_id = eval_model,
merge_strategy = strategy,
base_models = [c.model_id for c in candidates] if candidates else [],
avg_rouge1 = eval_result.avg_rouge1,
avg_rouge2 = eval_result.avg_rouge2,
avg_rougeL = eval_result.avg_rougeL,
avg_bertscore = eval_result.avg_bertscore,
avg_faithfulness = eval_result.avg_faithfulness,
hallucination_rate = eval_result.hallucination_rate,
avg_judge_score = eval_result.avg_judge_score,
gap_categories = eval_result.gap_categories,
)
tracker = ExperimentTracker(use_wandb=use_wandb, use_mlflow=True)
tracker.log(metrics)
console.print("[green]β Experiment tracked[/green]")
# ββββββββββββββββββββββββββββββ
# Phase 5b: HF Hub deploy
# ββββββββββββββββββββββββββββββ
if deploy:
from phase5_mlops.serve import deploy_to_hub, ExperimentMetrics
repo = hf_repo or f"{HF_ORG}/{category}-{strategy}-7b"
deploy_to_hub(
model_path = str(merged_path or base_model),
repo_id = repo,
metrics = metrics if eval_result else None,
)
# ββββββββββββββββββββββββββββββ
# Phase 5c: vLLM server
# ββββββββββββββββββββββββββββββ
if not skip_serve:
_banner("Phase 5 β Inference Server")
final_model = str(best_adapter or merged_path or base_model)
from phase5_mlops.serve import VLLMServer
server = VLLMServer(final_model)
server.start_api_server()
# ββββββββββββββββββββββββββββββ
# Summary
# ββββββββββββββββββββββββββββββ
_banner("Pipeline Complete")
from phase5_mlops.serve import print_pipeline_summary
print_pipeline_summary()
@app.command("leaderboard")
def leaderboard():
"""Print the evaluation leaderboard from saved results."""
from phase5_mlops.serve import print_pipeline_summary
print_pipeline_summary()
@app.command("introspect")
def introspect(model: str = typer.Argument(..., help="Model ID to introspect")):
"""Print DOM-tree-style architecture of a model."""
from phase2_merging.merge import introspect_architecture, print_architecture_tree
from transformers import AutoModelForCausalLM
from configs.settings import HF_TOKEN
logger.info(f"Loading {model}...")
m = AutoModelForCausalLM.from_pretrained(model, device_map="cpu", token=HF_TOKEN or None)
root = introspect_architecture(m, model)
print_architecture_tree(root)
if __name__ == "__main__":
app()
|