""" Auto-FineTune-Ops: The Boss Orchestrator ========================================== One-click autonomous ML fine-tuning pipeline. Usage: python main.py --data ./data.csv --goal "medical_assistant" """ import os import sys import yaml import argparse from pathlib import Path from datetime import datetime from typing import Optional, Dict, Any from rich.console import Console from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn from rich.markdown import Markdown # Add project root to path sys.path.insert(0, str(Path(__file__).parent)) from agents.data_architect import DataArchitectAgent, CleaningConfig from agents.training_pilot import TrainingPilot from agents.the_judge import TheJudge, JudgeModel console = Console() class AutoFineTuneOps: """ The Boss Orchestrator - Runs the complete end-to-end fine-tuning pipeline. Pipeline stages: 1. Data Preparation (DataArchitectAgent) 2. Fine-Tuning (TrainingPilot) 3. Evaluation (TheJudge) 4. Deployment Ready """ def __init__( self, config_path: Optional[str] = None, output_dir: str = "./output" ): """ Initialize the orchestrator. Args: config_path: Path to configuration YAML output_dir: Base output directory """ self.config = self._load_config(config_path) self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories (self.output_dir / "processed_data").mkdir(exist_ok=True) (self.output_dir / "models").mkdir(exist_ok=True) (self.output_dir / "logs").mkdir(exist_ok=True) (self.output_dir / "reports").mkdir(exist_ok=True) # Initialize agents self.data_agent = None self.training_agent = None self.judge_agent = None # Pipeline state self.processed_data_path = None self.model_path = None self.evaluation_result = None def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]: """Load configuration from YAML file.""" default_config_path = Path(__file__).parent / "configs" / "default_config.yaml" if config_path and Path(config_path).exists(): with open(config_path, 'r') as f: return yaml.safe_load(f) elif default_config_path.exists(): with open(default_config_path, 'r') as f: return yaml.safe_load(f) return {} def _print_header(self): """Print the main header.""" header = """ ╔═══════════════════════════════════════════════════════════════╗ ║ ║ ║ 🤖 AUTO-FINETUNE-OPS: AUTONOMOUS ML PIPELINE 🤖 ║ ║ ║ ║ "One-Click Fine-Tuning That Replaces Senior Engineers" ║ ║ ║ ╚═══════════════════════════════════════════════════════════════╝ """ console.print(Panel(header, style="bold magenta")) def _print_stage(self, stage: int, name: str, description: str): """Print a stage header.""" console.print(f"\n[bold cyan]{'='*60}[/]") console.print(f"[bold cyan]STAGE {stage}: {name}[/]") console.print(f"[dim]{description}[/]") console.print(f"[bold cyan]{'='*60}[/]\n") def run( self, data_path: str, goal: str, base_model: Optional[str] = None, skip_training: bool = False, skip_evaluation: bool = False, judge_model: str = "gpt-4o", num_eval_samples: int = 50 ) -> Dict[str, Any]: """ Run the complete fine-tuning pipeline. Args: data_path: Path to input dataset (CSV/JSON) goal: Training goal/purpose base_model: Override base model skip_training: Skip training stage (use existing model) skip_evaluation: Skip evaluation stage judge_model: LLM to use as judge num_eval_samples: Number of samples for evaluation Returns: Dict with pipeline results """ self._print_header() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_name = f"{goal}_{timestamp}" console.print(f"[bold]Run Name:[/] {run_name}") console.print(f"[bold]Input Data:[/] {data_path}") console.print(f"[bold]Goal:[/] {goal}") console.print(f"[bold]Base Model:[/] {base_model or self.config.get('model', {}).get('base_model', 'unsloth/llama-3-8b-bnb-4bit')}") results = { "run_name": run_name, "goal": goal, "stages": {} } try: # ═══════════════════════════════════════════════════════════ # STAGE 1: DATA PREPARATION # ═══════════════════════════════════════════════════════════ self._print_stage( 1, "DATA PREPARATION", "Analyzing, cleaning, and formatting dataset for training" ) # Initialize data agent with config data_config = self.config.get('data', {}) cleaning_config = CleaningConfig( min_instruction_length=data_config.get('min_instruction_length', 10), max_instruction_length=data_config.get('max_instruction_length', 2048), min_response_length=data_config.get('min_response_length', 20), max_response_length=data_config.get('max_response_length', 4096), remove_duplicates=data_config.get('remove_duplicates', True), quality_threshold=data_config.get('quality_threshold', 0.7) ) self.data_agent = DataArchitectAgent(config=cleaning_config) # Process data output_jsonl = self.output_dir / "processed_data" / f"{run_name}_training.jsonl" self.processed_data_path, data_analysis = self.data_agent.process( input_path=data_path, output_path=str(output_jsonl), goal=goal ) results["stages"]["data_preparation"] = { "status": "success", "output_path": self.processed_data_path, "total_samples": data_analysis.valid_rows, "quality_score": data_analysis.quality_score } # ═══════════════════════════════════════════════════════════ # STAGE 2: FINE-TUNING # ═══════════════════════════════════════════════════════════ if not skip_training: self._print_stage( 2, "FINE-TUNING", "Auto-configuring hyperparameters and training with Unsloth" ) # Get model config model_config = self.config.get('model', {}) base_model = base_model or model_config.get('base_model', 'unsloth/llama-3-8b-bnb-4bit') max_seq_length = model_config.get('max_seq_length', 2048) self.training_agent = TrainingPilot( base_model=base_model, max_seq_length=max_seq_length, output_dir=str(self.output_dir / "models"), config_path=None ) # Run training training_result = self.training_agent.run( data_path=self.processed_data_path, output_name=run_name ) self.model_path = training_result.model_path results["stages"]["training"] = { "status": "success", "model_path": self.model_path, "training_time": training_result.training_time, "final_loss": training_result.final_loss, "hyperparams": training_result.hyperparams.to_dict() } else: console.print("[yellow]⏭️ Skipping training stage[/]") results["stages"]["training"] = {"status": "skipped"} # ═══════════════════════════════════════════════════════════ # STAGE 3: EVALUATION # ═══════════════════════════════════════════════════════════ if not skip_evaluation and self.model_path: self._print_stage( 3, "EVALUATION", "Running Model Arena with LLM-as-Judge" ) # Check for API keys eval_config = self.config.get('evaluation', {}) judge_model_str = judge_model or eval_config.get('judge_model', 'gpt-4o') if judge_model_str == "gpt-4o" and not os.getenv("OPENAI_API_KEY"): console.print("[yellow]⚠️ OPENAI_API_KEY not set. Skipping evaluation.[/]") results["stages"]["evaluation"] = { "status": "skipped", "reason": "No API key" } elif "claude" in judge_model_str and not os.getenv("ANTHROPIC_API_KEY"): console.print("[yellow]⚠️ ANTHROPIC_API_KEY not set. Skipping evaluation.[/]") results["stages"]["evaluation"] = { "status": "skipped", "reason": "No API key" } else: # Determine judge model enum if "claude" in judge_model_str.lower(): judge_enum = JudgeModel.CLAUDE_35_SONNET else: judge_enum = JudgeModel.GPT4O self.judge_agent = TheJudge( judge_model=judge_enum, temperature=eval_config.get('temperature', 0.2), max_tokens=eval_config.get('max_tokens', 1024) ) # Load models for evaluation console.print("[blue]Loading models for evaluation...[/]") try: from unsloth import FastLanguageModel # Load base model base_model_name = base_model or self.config.get('model', {}).get('base_model', 'unsloth/llama-3-8b-bnb-4bit') base_model_obj, base_tokenizer = FastLanguageModel.from_pretrained( model_name=base_model_name, max_seq_length=2048, load_in_4bit=True, ) # Load fine-tuned model ft_model, ft_tokenizer = FastLanguageModel.from_pretrained( model_name=self.model_path, max_seq_length=2048, load_in_4bit=True, ) # Run evaluation self.evaluation_result = self.judge_agent.run_with_test_data( base_model=base_model_obj, finetuned_model=ft_model, tokenizer=base_tokenizer, test_data_path=self.processed_data_path, num_samples=num_eval_samples, finetuned_tokenizer=ft_tokenizer ) # Generate report report_path = self.output_dir / "reports" / f"{run_name}_evaluation.json" self.judge_agent.generate_report( self.evaluation_result, str(report_path) ) results["stages"]["evaluation"] = { "status": "success", "win_rate": self.evaluation_result.win_rate, "base_avg_score": self.evaluation_result.base_model_avg_score, "finetuned_avg_score": self.evaluation_result.finetuned_avg_score, "report_path": str(report_path) } except ImportError: console.print("[yellow]⚠️ Unsloth not available for evaluation. Skipping.[/]") results["stages"]["evaluation"] = { "status": "skipped", "reason": "Unsloth not available" } else: if skip_evaluation: console.print("[yellow]⏭️ Skipping evaluation stage[/]") results["stages"]["evaluation"] = {"status": "skipped"} # ═══════════════════════════════════════════════════════════ # STAGE 4: SUMMARY # ═══════════════════════════════════════════════════════════ self._print_stage( 4, "PIPELINE COMPLETE", "Summary of the autonomous fine-tuning run" ) self._print_summary(results) # Save results results_path = self.output_dir / "logs" / f"{run_name}_results.yaml" with open(results_path, 'w') as f: yaml.dump(results, f, default_flow_style=False) console.print(f"\n[green]✓ Results saved to: {results_path}[/]") return results except Exception as e: console.print(f"\n[bold red]❌ Pipeline failed: {str(e)}[/]") import traceback traceback.print_exc() results["error"] = str(e) return results def _print_summary(self, results: Dict[str, Any]): """Print pipeline summary.""" from rich.table import Table table = Table(title="Pipeline Summary", show_header=True) table.add_column("Stage", style="cyan") table.add_column("Status", style="green") table.add_column("Details", style="dim") # Data preparation data_stage = results["stages"].get("data_preparation", {}) if data_stage.get("status") == "success": table.add_row( "Data Preparation", "✅ Success", f"{data_stage.get('total_samples', 0):,} samples (Quality: {data_stage.get('quality_score', 0):.1%})" ) # Training train_stage = results["stages"].get("training", {}) if train_stage.get("status") == "success": table.add_row( "Fine-Tuning", "✅ Success", f"Loss: {train_stage.get('final_loss', 0):.4f}" ) elif train_stage.get("status") == "skipped": table.add_row("Fine-Tuning", "⏭️ Skipped", "") # Evaluation eval_stage = results["stages"].get("evaluation", {}) if eval_stage.get("status") == "success": table.add_row( "Evaluation", "✅ Success", f"Win Rate: {eval_stage.get('win_rate', 0):.1%}" ) elif eval_stage.get("status") == "skipped": table.add_row("Evaluation", "⏭️ Skipped", eval_stage.get("reason", "")) console.print(table) # Print model path if available if self.model_path: console.print(f"\n[bold green]📦 Fine-tuned model saved to:[/]") console.print(f" {self.model_path}") console.print(f"\n[bold]To deploy, run:[/]") console.print(f" [cyan]python scripts/deploy.py --model {self.model_path}[/]") def main(): """CLI entry point.""" parser = argparse.ArgumentParser( description="Auto-FineTune-Ops: One-click autonomous ML fine-tuning pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python main.py --data ./data.csv --goal medical_assistant python main.py --data ./qa_pairs.json --goal customer_support --model unsloth/llama-3-8b-bnb-4bit python main.py --data ./dataset.jsonl --goal code_assistant --skip-eval """ ) parser.add_argument( "--data", required=True, help="Path to input dataset (CSV, JSON, or JSONL)" ) parser.add_argument( "--goal", required=True, help="Training goal (e.g., medical_assistant, customer_support)" ) parser.add_argument( "--model", default=None, help="Base model to fine-tune (default: unsloth/llama-3-8b-bnb-4bit)" ) parser.add_argument( "--config", default=None, help="Path to configuration YAML file" ) parser.add_argument( "--output", default="./output", help="Output directory (default: ./output)" ) parser.add_argument( "--skip-training", action="store_true", help="Skip training stage" ) parser.add_argument( "--skip-eval", action="store_true", help="Skip evaluation stage" ) parser.add_argument( "--judge", choices=["gpt-4o", "claude-3-5-sonnet"], default="gpt-4o", help="Judge LLM for evaluation (default: gpt-4o)" ) parser.add_argument( "--eval-samples", type=int, default=50, help="Number of samples for evaluation (default: 50)" ) args = parser.parse_args() # Run pipeline orchestrator = AutoFineTuneOps( config_path=args.config, output_dir=args.output ) orchestrator.run( data_path=args.data, goal=args.goal, base_model=args.model, skip_training=args.skip_training, skip_evaluation=args.skip_eval, judge_model=args.judge, num_eval_samples=args.eval_samples ) if __name__ == "__main__": main()