| """ |
| Master Pipeline Orchestrator |
| ============================== |
| Runs all 5 phases end-to-end or individually. |
| |
| Usage: |
| # Full pipeline for 'code' category |
| python pipeline.py run --category code |
| |
| # Just discover + merge |
| python pipeline.py run --category reasoning --skip-ft --skip-serve |
| |
| # Iterative improvement loop |
| python pipeline.py run --category medical --loop --max-iter 3 |
| """ |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Optional |
|
|
| import typer |
| from rich.console import Console |
| from rich.panel import Panel |
| from rich.rule import Rule |
|
|
| from configs.settings import ( |
| FT_BASE_MODEL, TOP_K_CANDIDATES, MERGES_DIR, ADAPTERS_DIR, HF_ORG |
| ) |
| from utils.logger import logger |
|
|
| app = typer.Typer(help="LLM Pipeline β full orchestrator") |
| console = Console() |
|
|
|
|
| def _banner(text: str): |
| console.print(Rule(f"[bold cyan]{text}[/bold cyan]")) |
|
|
|
|
| @app.command() |
| def run( |
| category: str = typer.Argument("reasoning", help="Model category to target"), |
| top_k: int = typer.Option(TOP_K_CANDIDATES, help="Candidates per category"), |
| strategy: str = typer.Option("ties", help="Merge strategy: slerp|ties|dare_ties|task_arithmetic|breadcrumbs"), |
| base_model: str = typer.Option(FT_BASE_MODEL, help="Base model for fine-tuning"), |
| n_eval: int = typer.Option(100, help="Eval samples"), |
| n_syn: int = typer.Option(50, help="Synthetic samples per gap"), |
| ft_epochs: int = typer.Option(2, help="Fine-tuning epochs"), |
| loop: bool = typer.Option(False, "--loop", help="Enable iterative improvement loop"), |
| max_iter: int = typer.Option(3, help="Max loop iterations"), |
| skip_discover: bool = typer.Option(False, "--skip-discover"), |
| skip_merge: bool = typer.Option(False, "--skip-merge"), |
| skip_eval: bool = typer.Option(False, "--skip-eval"), |
| skip_ft: bool = typer.Option(False, "--skip-ft"), |
| skip_serve: bool = typer.Option(True, "--skip-serve/--serve"), |
| deploy: bool = typer.Option(False, "--deploy"), |
| hf_repo: str = typer.Option("", "--repo"), |
| use_wandb: bool = typer.Option(False, "--wandb"), |
| use_mergekit: bool = typer.Option(False, "--mergekit/--no-mergekit"), |
| ): |
| console.print(Panel( |
| f"[bold]LLM Pipeline[/bold]\n" |
| f"Category: [cyan]{category}[/cyan] | Strategy: [magenta]{strategy}[/magenta] | " |
| f"Base: [green]{base_model.split('/')[-1]}[/green]", |
| title="Starting", |
| )) |
|
|
| |
| |
| |
| candidates = [] |
| if not skip_discover: |
| _banner("Phase 1 β Discovery") |
| from phase1_discovery.discover import discover |
| candidates = discover(category, top_k=top_k) |
| if not candidates: |
| logger.error("No candidates found. Exiting.") |
| raise typer.Exit(1) |
| console.print(f"[green]β Found {len(candidates)} candidates[/green]") |
|
|
| |
| |
| |
| merged_path: Optional[Path] = None |
| if not skip_merge and candidates: |
| _banner("Phase 2 β Merging") |
| from phase2_merging.merge import merge_models |
| model_ids = [c.model_id for c in candidates[:3]] |
| merged_path = merge_models( |
| strategy = strategy, |
| models = model_ids, |
| base_model = model_ids[0], |
| use_mergekit = use_mergekit, |
| ) |
| console.print(f"[green]β Merged β {merged_path}[/green]") |
|
|
| eval_model = str(merged_path) if merged_path else base_model |
|
|
| |
| |
| |
| eval_result = None |
| if not skip_eval: |
| _banner("Phase 3 β Evaluation") |
| from phase3_evaluation.evaluate import evaluate, load_squad |
| samples = load_squad(n_eval) |
| eval_result = evaluate(eval_model, samples, category, run_judge=True) |
| console.print( |
| f"[green]β ROUGE-1: {eval_result.avg_rouge1:.3f} | " |
| f"BERTScore: {eval_result.avg_bertscore:.3f} | " |
| f"Judge: {eval_result.avg_judge_score:.1f}/10[/green]" |
| ) |
|
|
| |
| |
| |
| best_adapter: Optional[Path] = None |
| if not skip_ft: |
| _banner("Phase 4 β Fine-Tuning") |
| from phase4_finetuning.finetune import improvement_loop, fine_tune, generate_synthetic_data, format_as_hf_dataset |
| from phase3_evaluation.evaluate import load_squad |
|
|
| if loop: |
| |
| best_adapter = improvement_loop( |
| base_model_id = eval_model, |
| eval_samples_fn = lambda: load_squad(min(50, n_eval)), |
| max_iterations = max_iter, |
| n_syn_per_gap = n_syn, |
| use_wandb = use_wandb, |
| ) |
| elif eval_result and eval_result.gap_categories: |
| |
| from configs.settings import ADAPTERS_DIR |
| syn = generate_synthetic_data(eval_result.gap_categories, n_per_gap=n_syn) |
| dataset = format_as_hf_dataset(syn) |
| adapter_dir = ADAPTERS_DIR / f"{category}_{strategy}" |
| best_adapter = fine_tune( |
| base_model_id = eval_model, |
| dataset = dataset, |
| output_dir = adapter_dir, |
| run_name = f"{category}-{strategy}", |
| epochs = ft_epochs, |
| use_wandb = use_wandb, |
| ) |
| else: |
| logger.info("[FT] No gaps detected, skipping fine-tuning") |
|
|
| if best_adapter: |
| console.print(f"[green]β Adapter β {best_adapter}[/green]") |
|
|
| |
| |
| |
| _banner("Phase 5 β MLOps") |
| if eval_result: |
| from phase5_mlops.serve import ExperimentTracker, ExperimentMetrics |
| metrics = ExperimentMetrics( |
| run_name = f"{category}-{strategy}", |
| model_id = eval_model, |
| merge_strategy = strategy, |
| base_models = [c.model_id for c in candidates] if candidates else [], |
| avg_rouge1 = eval_result.avg_rouge1, |
| avg_rouge2 = eval_result.avg_rouge2, |
| avg_rougeL = eval_result.avg_rougeL, |
| avg_bertscore = eval_result.avg_bertscore, |
| avg_faithfulness = eval_result.avg_faithfulness, |
| hallucination_rate = eval_result.hallucination_rate, |
| avg_judge_score = eval_result.avg_judge_score, |
| gap_categories = eval_result.gap_categories, |
| ) |
| tracker = ExperimentTracker(use_wandb=use_wandb, use_mlflow=True) |
| tracker.log(metrics) |
| console.print("[green]β Experiment tracked[/green]") |
|
|
| |
| |
| |
| if deploy: |
| from phase5_mlops.serve import deploy_to_hub, ExperimentMetrics |
| repo = hf_repo or f"{HF_ORG}/{category}-{strategy}-7b" |
| deploy_to_hub( |
| model_path = str(merged_path or base_model), |
| repo_id = repo, |
| metrics = metrics if eval_result else None, |
| ) |
|
|
| |
| |
| |
| if not skip_serve: |
| _banner("Phase 5 β Inference Server") |
| final_model = str(best_adapter or merged_path or base_model) |
| from phase5_mlops.serve import VLLMServer |
| server = VLLMServer(final_model) |
| server.start_api_server() |
|
|
| |
| |
| |
| _banner("Pipeline Complete") |
| from phase5_mlops.serve import print_pipeline_summary |
| print_pipeline_summary() |
|
|
|
|
| @app.command("leaderboard") |
| def leaderboard(): |
| """Print the evaluation leaderboard from saved results.""" |
| from phase5_mlops.serve import print_pipeline_summary |
| print_pipeline_summary() |
|
|
|
|
| @app.command("introspect") |
| def introspect(model: str = typer.Argument(..., help="Model ID to introspect")): |
| """Print DOM-tree-style architecture of a model.""" |
| from phase2_merging.merge import introspect_architecture, print_architecture_tree |
| from transformers import AutoModelForCausalLM |
| from configs.settings import HF_TOKEN |
| logger.info(f"Loading {model}...") |
| m = AutoModelForCausalLM.from_pretrained(model, device_map="cpu", token=HF_TOKEN or None) |
| root = introspect_architecture(m, model) |
| print_architecture_tree(root) |
|
|
|
|
| if __name__ == "__main__": |
| app() |
|
|