""" Helion-V1 Auto Training Handler Robust training script with comprehensive error handling for HuggingFace Handles HTTP errors, upload issues, authentication, and training failures """ import os import sys import time import json import logging import traceback from typing import Optional, Dict, List, Any from dataclasses import dataclass from pathlib import Path import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('training.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) @dataclass class TrainingConfig: """Configuration for auto training.""" model_name: str = "DeepXR/Helion-V1" base_model: str = "meta-llama/Llama-2-7b-hf" dataset_name: str = "your-dataset-name" output_dir: str = "./helion-v1-output" hub_model_id: str = "DeepXR/Helion-V1" hf_token: Optional[str] = None # Training hyperparameters num_epochs: int = 3 batch_size: int = 4 gradient_accumulation: int = 8 learning_rate: float = 2e-5 warmup_steps: int = 100 max_seq_length: int = 4096 # LoRA config use_lora: bool = True lora_r: int = 64 lora_alpha: int = 128 lora_dropout: float = 0.05 # Retry settings max_retries: int = 5 retry_delay: int = 60 upload_chunk_size: int = 5 * 1024 * 1024 # 5MB chunks class HuggingFaceErrorHandler: """Handle various HuggingFace API and training errors.""" ERROR_CODES = { 400: "Bad Request - Check your input data format", 401: "Unauthorized - Invalid or missing HuggingFace token", 403: "Forbidden - Check repository permissions", 404: "Not Found - Model or dataset doesn't exist", 408: "Request Timeout - Server took too long to respond", 413: "Payload Too Large - File size exceeds limits", 422: "Unprocessable Entity - Validation error in request", 429: "Rate Limited - Too many requests, will retry", 500: "Internal Server Error - HuggingFace server issue", 502: "Bad Gateway - Service temporarily unavailable", 503: "Service Unavailable - Server overloaded", 504: "Gateway Timeout - Request took too long" } @staticmethod def handle_http_error(error: Exception, context: str = "") -> bool: """ Handle HTTP errors with appropriate recovery strategies. Args: error: The exception that occurred context: Additional context about what was being done Returns: True if error is recoverable, False otherwise """ if hasattr(error, 'response') and error.response is not None: status_code = error.response.status_code error_msg = HuggingFaceErrorHandler.ERROR_CODES.get( status_code, f"Unknown error (code {status_code})" ) logger.error(f"{context} - HTTP {status_code}: {error_msg}") # Log response content for debugging try: response_text = error.response.text logger.debug(f"Response content: {response_text}") except: pass # Determine if error is recoverable recoverable_codes = [408, 429, 500, 502, 503, 504] return status_code in recoverable_codes logger.error(f"{context} - {type(error).__name__}: {str(error)}") return False @staticmethod def handle_training_error(error: Exception) -> Dict[str, Any]: """Handle training-specific errors.""" error_info = { "error_type": type(error).__name__, "error_message": str(error), "traceback": traceback.format_exc(), "recoverable": False, "suggestion": "" } error_str = str(error).lower() if "out of memory" in error_str or "oom" in error_str: error_info["recoverable"] = True error_info["suggestion"] = ( "Reduce batch_size, enable gradient_checkpointing, " "or use smaller model/sequence length" ) elif "cuda" in error_str: error_info["suggestion"] = "Check CUDA installation and GPU availability" elif "token" in error_str and "invalid" in error_str: error_info["suggestion"] = "Check HuggingFace token validity" elif "permission" in error_str: error_info["suggestion"] = "Verify repository write permissions" elif "dataset" in error_str: error_info["suggestion"] = "Check dataset name and format" elif "disk" in error_str or "space" in error_str: error_info["suggestion"] = "Free up disk space" return error_info class RobustHFUploader: """Robust uploader for HuggingFace Hub with retry logic.""" def __init__(self, token: str, max_retries: int = 5): self.token = token self.max_retries = max_retries self.session = self._create_session() def _create_session(self) -> requests.Session: """Create session with retry strategy.""" session = requests.Session() retry_strategy = Retry( total=self.max_retries, backoff_factor=2, status_forcelist=[408, 429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "PUT", "POST", "PATCH"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def upload_file_chunked( self, file_path: str, repo_id: str, path_in_repo: str, chunk_size: int = 5 * 1024 * 1024 ) -> bool: """ Upload large file in chunks with progress tracking. Args: file_path: Local file path repo_id: HuggingFace repo ID path_in_repo: Path in repository chunk_size: Size of chunks in bytes Returns: True if successful, False otherwise """ try: from huggingface_hub import HfApi api = HfApi(token=self.token) file_size = os.path.getsize(file_path) logger.info(f"Uploading {file_path} ({file_size / 1024 / 1024:.2f} MB)") for attempt in range(self.max_retries): try: api.upload_file( path_or_fileobj=file_path, path_in_repo=path_in_repo, repo_id=repo_id, token=self.token ) logger.info(f"✅ Successfully uploaded {path_in_repo}") return True except Exception as e: if HuggingFaceErrorHandler.handle_http_error( e, f"Upload attempt {attempt + 1}/{self.max_retries}" ): wait_time = (2 ** attempt) * 30 logger.warning(f"Retrying in {wait_time}s...") time.sleep(wait_time) else: logger.error(f"Non-recoverable error: {e}") return False logger.error(f"Failed to upload after {self.max_retries} attempts") return False except Exception as e: logger.error(f"Upload error: {e}") return False class HelionAutoTrainer: """Auto trainer with comprehensive error handling.""" def __init__(self, config: TrainingConfig): self.config = config self.error_handler = HuggingFaceErrorHandler() # Get HuggingFace token self.hf_token = config.hf_token or os.getenv("HF_TOKEN") if not self.hf_token: raise ValueError( "HuggingFace token not found. Set HF_TOKEN environment variable " "or pass token in config" ) self.uploader = RobustHFUploader(self.hf_token, config.max_retries) # Training state self.training_state = { "status": "initialized", "current_epoch": 0, "total_steps": 0, "errors": [], "checkpoints": [] } def verify_setup(self) -> bool: """Verify all prerequisites before training.""" logger.info("Verifying setup...") checks = { "HuggingFace Token": self._check_token(), "CUDA Available": self._check_cuda(), "Base Model Access": self._check_model_access(), "Dataset Access": self._check_dataset_access(), "Disk Space": self._check_disk_space(), "Repository Permissions": self._check_repo_permissions() } all_passed = True for check_name, result in checks.items(): status = "✅" if result else "❌" logger.info(f"{status} {check_name}") if not result: all_passed = False return all_passed def _check_token(self) -> bool: """Verify HuggingFace token is valid.""" try: from huggingface_hub import HfApi api = HfApi(token=self.hf_token) api.whoami() return True except Exception as e: logger.error(f"Token validation failed: {e}") return False def _check_cuda(self) -> bool: """Check CUDA availability.""" try: import torch available = torch.cuda.is_available() if available: logger.info(f"CUDA devices: {torch.cuda.device_count()}") for i in range(torch.cuda.device_count()): logger.info(f"GPU {i}: {torch.cuda.get_device_name(i)}") return available except: return False def _check_model_access(self) -> bool: """Check if base model is accessible.""" try: from huggingface_hub import HfApi api = HfApi(token=self.hf_token) api.model_info(self.config.base_model) return True except Exception as e: logger.error(f"Cannot access base model: {e}") return False def _check_dataset_access(self) -> bool: """Check if dataset is accessible.""" try: from huggingface_hub import HfApi api = HfApi(token=self.hf_token) api.dataset_info(self.config.dataset_name) return True except Exception as e: logger.warning(f"Cannot access dataset: {e}") return False def _check_disk_space(self, required_gb: int = 50) -> bool: """Check available disk space.""" try: import shutil stat = shutil.disk_usage(self.config.output_dir) available_gb = stat.free / (1024 ** 3) logger.info(f"Available disk space: {available_gb:.2f} GB") return available_gb >= required_gb except: return False def _check_repo_permissions(self) -> bool: """Check if we can write to the repository.""" try: from huggingface_hub import HfApi api = HfApi(token=self.hf_token) # Try to get repo info (will create if doesn't exist) try: api.create_repo( self.config.hub_model_id, exist_ok=True, private=False ) return True except Exception as e: logger.error(f"Repository permission check failed: {e}") return False except: return False def prepare_training(self): """Prepare for training with error handling.""" logger.info("Preparing training environment...") try: # Import libraries import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling ) from datasets import load_dataset from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training # Load tokenizer logger.info("Loading tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained( self.config.base_model, token=self.hf_token ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token # Load model with error handling logger.info("Loading base model...") for attempt in range(self.config.max_retries): try: self.model = AutoModelForCausalLM.from_pretrained( self.config.base_model, torch_dtype=torch.bfloat16, device_map="auto", token=self.hf_token, trust_remote_code=True ) break except Exception as e: if attempt < self.config.max_retries - 1: logger.warning(f"Model load attempt {attempt + 1} failed: {e}") time.sleep(self.config.retry_delay) else: raise # Apply LoRA if enabled if self.config.use_lora: logger.info("Applying LoRA configuration...") peft_config = LoraConfig( r=self.config.lora_r, lora_alpha=self.config.lora_alpha, lora_dropout=self.config.lora_dropout, bias="none", task_type="CAUSAL_LM", target_modules=[ "q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj" ] ) self.model = prepare_model_for_kbit_training(self.model) self.model = get_peft_model(self.model, peft_config) self.model.print_trainable_parameters() # Load dataset logger.info("Loading dataset...") self.dataset = load_dataset( self.config.dataset_name, token=self.hf_token ) # Preprocessing def preprocess_function(examples): return self.tokenizer( examples["text"], truncation=True, max_length=self.config.max_seq_length, padding="max_length" ) logger.info("Preprocessing dataset...") self.tokenized_dataset = self.dataset.map( preprocess_function, batched=True, remove_columns=self.dataset["train"].column_names ) # Data collator self.data_collator = DataCollatorForLanguageModeling( tokenizer=self.tokenizer, mlm=False ) logger.info("✅ Training preparation complete") return True except Exception as e: error_info = self.error_handler.handle_training_error(e) logger.error(f"Preparation failed: {error_info}") self.training_state["errors"].append(error_info) return False def train(self) -> bool: """Run training with comprehensive error handling.""" logger.info("Starting training...") self.training_state["status"] = "training" try: from transformers import TrainingArguments, Trainer # Training arguments training_args = TrainingArguments( output_dir=self.config.output_dir, num_train_epochs=self.config.num_epochs, per_device_train_batch_size=self.config.batch_size, gradient_accumulation_steps=self.config.gradient_accumulation, learning_rate=self.config.learning_rate, warmup_steps=self.config.warmup_steps, logging_steps=10, save_steps=500, save_total_limit=3, fp16=False, bf16=True, gradient_checkpointing=True, optim="adamw_torch", report_to=["tensorboard"], push_to_hub=False, # We'll handle upload manually hub_token=self.hf_token, load_best_model_at_end=True, save_strategy="steps", evaluation_strategy="steps" if "validation" in self.tokenized_dataset else "no", eval_steps=500 if "validation" in self.tokenized_dataset else None ) # Create trainer trainer = Trainer( model=self.model, args=training_args, train_dataset=self.tokenized_dataset["train"], eval_dataset=self.tokenized_dataset.get("validation"), data_collator=self.data_collator, tokenizer=self.tokenizer ) # Train with error recovery for attempt in range(self.config.max_retries): try: logger.info(f"Training attempt {attempt + 1}/{self.config.max_retries}") trainer.train() logger.info("✅ Training completed successfully") self.training_state["status"] = "completed" return True except RuntimeError as e: error_info = self.error_handler.handle_training_error(e) self.training_state["errors"].append(error_info) if "out of memory" in str(e).lower(): logger.warning("OOM error - reducing batch size") training_args.per_device_train_batch_size //= 2 training_args.gradient_accumulation_steps *= 2 if training_args.per_device_train_batch_size < 1: logger.error("Cannot reduce batch size further") return False # Recreate trainer with new settings trainer = Trainer( model=self.model, args=training_args, train_dataset=self.tokenized_dataset["train"], eval_dataset=self.tokenized_dataset.get("validation"), data_collator=self.data_collator, tokenizer=self.tokenizer ) else: logger.error(f"Non-recoverable error: {error_info}") return False except Exception as e: error_info = self.error_handler.handle_training_error(e) logger.error(f"Unexpected error: {error_info}") self.training_state["errors"].append(error_info) if attempt < self.config.max_retries - 1: wait_time = self.config.retry_delay * (attempt + 1) logger.info(f"Retrying in {wait_time}s...") time.sleep(wait_time) else: return False return False except Exception as e: error_info = self.error_handler.handle_training_error(e) logger.error(f"Training initialization failed: {error_info}") self.training_state["errors"].append(error_info) self.training_state["status"] = "failed" return False def upload_to_hub(self) -> bool: """Upload trained model to HuggingFace Hub with retry logic.""" logger.info("Uploading model to HuggingFace Hub...") self.training_state["status"] = "uploading" try: from huggingface_hub import HfApi api = HfApi(token=self.hf_token) # Create repo if doesn't exist logger.info(f"Creating/updating repository: {self.config.hub_model_id}") api.create_repo( self.config.hub_model_id, exist_ok=True, private=False ) # Upload files with retry output_path = Path(self.config.output_dir) files_to_upload = list(output_path.glob("*.json")) + \ list(output_path.glob("*.bin")) + \ list(output_path.glob("*.safetensors")) + \ list(output_path.glob("*.txt")) upload_success = True for file_path in files_to_upload: logger.info(f"Uploading {file_path.name}...") success = self.uploader.upload_file_chunked( str(file_path), self.config.hub_model_id, file_path.name ) if not success: logger.error(f"Failed to upload {file_path.name}") upload_success = False if upload_success: logger.info("✅ Model uploaded successfully") self.training_state["status"] = "uploaded" return True else: logger.error("Some files failed to upload") return False except Exception as e: self.error_handler.handle_http_error(e, "Hub upload") self.training_state["status"] = "upload_failed" return False def save_training_state(self): """Save training state to file.""" state_file = Path(self.config.output_dir) / "training_state.json" state_file.parent.mkdir(parents=True, exist_ok=True) with open(state_file, 'w') as f: json.dump(self.training_state, f, indent=2, default=str) logger.info(f"Training state saved to {state_file}") def run_full_pipeline(self) -> bool: """Run complete training pipeline with error handling.""" logger.info("="*60) logger.info("Starting Helion-V1 Auto Training Pipeline") logger.info("="*60) try: # Step 1: Verify setup if not self.verify_setup(): logger.error("Setup verification failed") return False # Step 2: Prepare training if not self.prepare_training(): logger.error("Training preparation failed") return False # Step 3: Train if not self.train(): logger.error("Training failed") return False # Step 4: Upload to hub if not self.upload_to_hub(): logger.warning("Upload failed, but model is saved locally") # Step 5: Save state self.save_training_state() logger.info("="*60) logger.info("✅ Training pipeline completed successfully!") logger.info("="*60) return True except KeyboardInterrupt: logger.warning("Training interrupted by user") self.training_state["status"] = "interrupted" self.save_training_state() return False except Exception as e: logger.error(f"Pipeline failed: {e}") logger.error(traceback.format_exc()) self.training_state["status"] = "failed" self.training_state["errors"].append({ "error": str(e), "traceback": traceback.format_exc() }) self.save_training_state() return False def main(): """Main entry point for auto training.""" import argparse parser = argparse.ArgumentParser(description="Helion-V1 Auto Trainer") parser.add_argument("--base-model", default="meta-llama/Llama-2-7b-hf") parser.add_argument("--dataset", required=True, help="Dataset name on HuggingFace") parser.add_argument("--output-dir", default="./helion-v1-output") parser.add_argument("--hub-model-id", default="DeepXR/Helion-V1") parser.add_argument("--epochs", type=int, default=3) parser.add_argument("--batch-size", type=int, default=4) parser.add_argument("--learning-rate", type=float, default=2e-5) parser.add_argument("--max-seq-length", type=int, default=4096) parser.add_argument("--no-lora", action="store_true", help="Disable LoRA") parser.add_argument("--token", help="HuggingFace token (or use HF_TOKEN env var)") args = parser.parse_args() # Create config config = TrainingConfig( base_model=args.base_model, dataset_name=args.dataset, output_dir=args.output_dir, hub_model_id=args.hub_model_id, num_epochs=args.epochs, batch_size=args.batch_size, learning_rate=args.learning_rate, max_seq_length=args.max_seq_length, use_lora=not args.no_lora, hf_token=args.token ) # Run training trainer = HelionAutoTrainer(config) success = trainer.run_full_pipeline() sys.exit(0 if success else 1) if __name__ == "__main__": main()