Text Generation
Transformers
Safetensors
llama
mergekit
Merge
text-generation-inference
blackthinking / app.py
42hgyn26hz-cpu
update
f6ceb9b
import os
import torch
import gc
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
import psutil
import multiprocessing as mp
from datasets import load_dataset, Dataset, DatasetDict
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
TrainingArguments,
Trainer,
DataCollatorForLanguageModeling,
GPT2TokenizerFast
)
import shutil
from typing import Dict, Any, List
import warnings
import platform
import traceback
warnings.filterwarnings("ignore")
# ─── Configuration ───────────────────────────────────────────────────────────
MODEL_NAME = "zxc4wewewe/blackthinking"
OUTPUT_DIR = "."
MAX_LENGTH = 512
BATCH_SIZE = 1 # Very conservative
GRADIENT_ACCUMULATION = 8
EPOCHS = 1 # For testing
LEARNING_RATE = 2e-5
SAVE_STEPS = 50
EVAL_STEPS = 50
LOGGING_STEPS = 25
# Optimize for performance
NUM_WORKERS = 1 # Single thread for stability
BATCH_SIZE_TOKENIZATION = 25
# ─── Utility Functions ───────────────────────────────────────────────────────
def safe_makedirs(path):
"""Safely create directories"""
try:
os.makedirs(path, exist_ok=True)
return True
except Exception as e:
print(f"⚠️ Failed to create directory {path}: {e}")
return False
def load_tokenizer_robust(model_name):
"""Load tokenizer with multiple fallback strategies"""
print(f"πŸ”„ Attempting to load tokenizer for: {model_name}")
# Strategy 1: Try the model's tokenizer with trust_remote_code
try:
tokenizer = AutoTokenizer.from_pretrained(
model_name,
use_fast=True,
trust_remote_code=True
)
if hasattr(tokenizer, 'get_vocab') or hasattr(tokenizer, 'vocab'):
print("βœ… Successfully loaded model tokenizer")
return tokenizer
else:
print("⚠️ Model tokenizer loaded but missing vocab methods")
except Exception as e:
print(f"⚠️ Primary tokenizer load failed: {str(e)[:100]}...")
# Strategy 2: Try without trust_remote_code
try:
tokenizer = AutoTokenizer.from_pretrained(
model_name,
use_fast=True,
trust_remote_code=False
)
print("βœ… Successfully loaded tokenizer (no remote code)")
return tokenizer
except Exception as e:
print(f"⚠️ Secondary tokenizer load failed: {str(e)[:100]}...")
# Strategy 3: Create a minimal tokenizer workaround
print("πŸ”„ Creating minimal tokenizer workaround...")
try:
# Use GPT-2 tokenizer as base
tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
# Add special tokens that the model might expect
special_tokens = {
"pad_token": "<|pad|>",
"eos_token": "</s>",
"bos_token": "<s>",
}
# Only add tokens that don't already exist
existing_tokens = set(tokenizer.all_special_tokens)
tokens_to_add = {k: v for k, v in special_tokens.items() if v not in existing_tokens}
if tokens_to_add:
tokenizer.add_special_tokens(tokens_to_add)
print("βœ… Created minimal tokenizer workaround")
return tokenizer
except Exception as e:
print(f"⚠️ Minimal tokenizer creation failed: {str(e)[:100]}...")
# Strategy 4: Create absolute minimal tokenizer
print("πŸ”„ Creating absolute minimal tokenizer...")
try:
from transformers import PreTrainedTokenizerFast
import json
# Create minimal vocab
vocab = {
"<|pad|>": 0,
"</s>": 1,
"<s>": 2,
"<|unk|>": 3,
}
# Add basic ASCII characters
for i, char in enumerate("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 \n\t.,!?-", start=4):
vocab[char] = i
# Create tokenizer JSON structure
tokenizer_json = {
"version": "1.0",
"truncation": {"direction": "Right", "max_length": 512, "strategy": "LongestFirst"},
"padding": {"direction": "Right", "pad_id": 0, "pad_token": "<|pad|>", "pad_type_id": 0},
"model": {
"type": "BPE",
"dropout": None,
"unk_token": "<|unk|>",
"continuing_subword_prefix": "",
"end_of_word_suffix": "",
"fuse_unk": False,
"vocab": vocab,
"merges": []
}
}
# Save to temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(tokenizer_json, f)
temp_path = f.name
# Load the tokenizer
tokenizer = PreTrainedTokenizerFast(tokenizer_file=temp_path)
tokenizer.pad_token = "<|pad|>"
tokenizer.eos_token = "</s>"
tokenizer.bos_token = "<s>"
# Clean up temp file
os.unlink(temp_path)
print("βœ… Created absolute minimal tokenizer")
return tokenizer
except Exception as e:
print(f"⚠️ Absolute minimal tokenizer failed: {str(e)[:100]}...")
# Final fallback: return None to signal failure
print("❌ All tokenizer loading strategies failed")
return None
def load_dataset_with_fallback():
"""Load dataset with comprehensive fallbacks"""
print("πŸ“₯ Loading dataset with fallbacks...")
# Try multiple sources
datasets_sources = [
"huihui-ai/Guilherme34_uncensor-v2",
"zxc4wewewe/offsec",
]
for dataset_name in datasets_sources:
try:
print(f"πŸ”„ Trying to load: {dataset_name}")
dataset = load_dataset(dataset_name, streaming=False)
print(f"βœ… Successfully loaded: {dataset_name}")
# Ensure we have proper splits
if "train" not in dataset and "test" not in dataset:
# Convert single split to train/test
keys = list(dataset.keys())
if keys:
main_split = dataset[keys[0]]
dataset = main_split.train_test_split(test_size=0.1, seed=42)
else:
continue # Try next source
return dataset
except Exception as e:
print(f"⚠️ Failed to load {dataset_name}: {str(e)[:100]}...")
# Create minimal dummy dataset
print("πŸ”„ Creating minimal dummy dataset for emergency...")
try:
dummy_data = {
"train": [
{"prompt": "What is AI?", "response": "Artificial Intelligence is computer systems performing human tasks."},
{"prompt": "How to code?", "response": "Start with basics like variables, loops, functions."},
{"prompt": "What is ML?", "response": "Machine Learning enables computers to learn from data."},
] * 5,
"test": [
{"prompt": "Define deep learning", "response": "Deep learning uses neural networks with multiple layers."},
] * 3,
}
dataset = DatasetDict({
split: Dataset.from_list(data)
for split, data in dummy_data.items()
})
print("βœ… Created minimal dummy dataset")
return dataset
except Exception as e:
print(f"❌ Failed to create dummy dataset: {e}")
return None
def normalize_example_safe(example):
"""Safe example normalization with comprehensive error handling"""
try:
if not example:
return {"prompt": "default prompt", "response": "default response"}
# Fast path for standard format
if "prompt" in example and "response" in example:
p = str(example.get("prompt", "") or "default prompt")
r = str(example.get("response", "") or "default response")
return {"prompt": p.strip() or "default prompt", "response": r.strip() or "default response"}
# Handle messages format
if "messages" in example and isinstance(example["messages"], list):
prompt, response = "", ""
for msg in example["messages"]:
if isinstance(msg, dict):
role, content = str(msg.get("role", "")), str(msg.get("content", ""))
if role.lower() in ["user", "human"]:
prompt = content
elif role.lower() in ["assistant", "bot"]:
response = content
return {"prompt": prompt or "default prompt", "response": response or "default response"}
# Ultimate fallback
text = str(example.get("text", example.get("content", "default text")))
if "Assistant:" in text:
parts = text.split("Assistant:", 1)
return {"prompt": parts[0].replace("User:", "").strip() or "default prompt",
"response": parts[1].strip() or "default response"}
return {"prompt": text[:200] or "default prompt",
"response": (text[-200:] if len(text) > 200 else text) or "default response"}
except Exception:
return {"prompt": "default prompt", "response": "default response"}
def tokenize_function_safe(examples, tokenizer):
"""Safe tokenization with comprehensive error handling"""
try:
# Format: Prompt\n\nResponse\n
full_texts = [
f"{prompt}\n\n{response}{tokenizer.eos_token if hasattr(tokenizer, 'eos_token') else '</s>'}"
for prompt, response in zip(examples["prompt"], examples["response"])
]
# Safe tokenization
result = tokenizer(
full_texts,
truncation=True,
max_length=MAX_LENGTH,
padding=False,
return_tensors=None,
verbose=False
)
# Labels for causal LM
result["labels"] = [
[-100 if (hasattr(tokenizer, 'pad_token_id') and token_id == tokenizer.pad_token_id) else token_id
for token_id in labels]
for labels in result["input_ids"]
]
return result
except Exception as e:
print(f"⚠️ Tokenization failed, using dummy: {str(e)[:50]}...")
# Return minimal valid result
try:
dummy_result = {
"input_ids": [[1, 2, 3]] * len(examples["prompt"]),
"attention_mask": [[1, 1, 1]] * len(examples["prompt"]),
"labels": [[1, 2, 3]] * len(examples["prompt"]),
}
return dummy_result
except:
# Absolute fallback
return {
"input_ids": [[1]],
"attention_mask": [[1]],
"labels": [[1]],
}
def process_dataset_resilient(dataset, tokenizer):
"""Process dataset with maximum resilience"""
if not dataset or not tokenizer:
print("❌ Cannot process dataset - missing components")
return None
print("⚑ Processing dataset with resilience...")
processed_splits = {}
for split_name in dataset.keys():
if hasattr(dataset[split_name], '__len__') and len(dataset[split_name]) > 0:
try:
print(f"πŸ”„ Processing {split_name} split ({len(dataset[split_name])} samples)...")
# Normalize with maximum error handling
try:
normalized = dataset[split_name].map(
normalize_example_safe,
remove_columns=dataset[split_name].column_names if dataset[split_name].column_names else [],
num_proc=1,
desc=f"Normalizing {split_name}"
)
except Exception as e:
print(f"⚠️ Normalization failed, using raw data: {str(e)[:50]}...")
normalized = dataset[split_name] # Use as-is
# Tokenize with maximum error handling
try:
tokenized = normalized.map(
lambda x: tokenize_function_safe(x, tokenizer),
batched=True,
batch_size=min(BATCH_SIZE_TOKENIZATION, max(1, len(normalized) // 4)),
num_proc=1,
remove_columns=["prompt", "response"] if "prompt" in normalized.column_names else [],
desc=f"Tokenizing {split_name}",
load_from_cache_file=False
)
if len(tokenized) > 0:
processed_splits[split_name] = tokenized
print(f"βœ… {split_name}: {len(tokenized)} samples processed")
else:
raise ValueError("No samples processed")
except Exception as e:
print(f"⚠️ Tokenization failed for {split_name}: {str(e)[:100]}...")
# Create minimal dataset
try:
dummy_tokens = tokenizer("test\n\ntest response", return_tensors=None)
dummy_tokens["labels"] = dummy_tokens["input_ids"].copy()
processed_splits[split_name] = Dataset.from_list([dummy_tokens] * min(5, len(dataset[split_name])))
print(f"βœ… Created minimal {split_name} dataset")
except:
# Absolute fallback
processed_splits[split_name] = Dataset.from_list([
{"input_ids": [1, 2, 3], "attention_mask": [1, 1, 1], "labels": [1, 2, 3]}
] * 3)
except Exception as e:
print(f"⚠️ Critical error processing {split_name}: {str(e)[:100]}...")
# Absolute emergency fallback
processed_splits[split_name] = Dataset.from_list([
{"input_ids": [1], "attention_mask": [1], "labels": [1]}
] * 2)
return DatasetDict(processed_splits) if processed_splits else None
def load_model_resilient(model_name, tokenizer):
"""Load model with maximum resilience"""
print("🧠 Loading model with maximum resilience...")
# Try multiple loading strategies
loading_strategies = [
{
"name": "Primary (8-bit)",
"params": {
"torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32,
"device_map": "auto" if torch.cuda.is_available() else None,
"trust_remote_code": True,
"low_cpu_mem_usage": True,
"load_in_8bit": True,
}
},
{
"name": "Secondary (float16)",
"params": {
"torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32,
"device_map": "auto" if torch.cuda.is_available() else None,
"trust_remote_code": True,
"low_cpu_mem_usage": True,
}
},
{
"name": "Fallback (CPU)",
"params": {
"low_cpu_mem_usage": True,
}
}
]
for strategy in loading_strategies:
try:
print(f"πŸ”„ Trying {strategy['name']} loading...")
model = AutoModelForCausalLM.from_pretrained(model_name, **strategy["params"])
# Resize embeddings if tokenizer is available
if tokenizer:
try:
model.resize_token_embeddings(len(tokenizer))
print("βœ… Resized model embeddings to match tokenizer")
except Exception as e:
print(f"⚠️ Could not resize embeddings: {str(e)[:50]}...")
print(f"βœ… Model loaded successfully with {strategy['name']}")
return model
except Exception as e:
print(f"⚠️ {strategy['name']} failed: {str(e)[:100]}...")
# Emergency fallback - create a minimal model
print("πŸ”„ Creating minimal model fallback...")
try:
from transformers import GPT2LMHeadModel
model = GPT2LMHeadModel.from_pretrained("gpt2")
if tokenizer:
model.resize_token_embeddings(len(tokenizer))
print("βœ… Created minimal model fallback")
return model
except Exception as e:
print(f"❌ All model loading strategies failed: {str(e)[:100]}...")
return None
def setup_training_resilient(model, tokenizer, tokenized_dataset):
"""Setup training with maximum resilience"""
if not model or not tokenizer or not tokenized_dataset:
print("❌ Cannot setup training - missing components")
return None
print("βš™οΈ Setting up resilient training...")
# Ensure we have data for training
try:
train_dataset = tokenized_dataset.get("train")
eval_dataset = tokenized_dataset.get("test") or tokenized_dataset.get("train")
if not train_dataset or len(train_dataset) == 0:
print("❌ No training data available")
return None
# Limit dataset size for testing
max_samples = 20
if len(train_dataset) > max_samples:
train_dataset = train_dataset.select(range(max_samples))
if eval_dataset and len(eval_dataset) > max_samples // 5:
eval_dataset = eval_dataset.select(range(min(max_samples // 5, len(eval_dataset))))
except Exception as e:
print(f"⚠️ Dataset preparation error: {str(e)[:100]}...")
return None
# Safe training arguments - avoid problematic parameters
try:
training_args = TrainingArguments(
output_dir=OUTPUT_DIR,
# Conservative training settings
num_train_epochs=EPOCHS,
per_device_train_batch_size=BATCH_SIZE,
per_device_eval_batch_size=BATCH_SIZE,
gradient_accumulation_steps=GRADIENT_ACCUMULATION,
# Learning rate and schedule
learning_rate=LEARNING_RATE,
weight_decay=0.01,
warmup_ratio=0.1,
lr_scheduler_type="linear",
# Logging and saving
logging_dir=f"{OUTPUT_DIR}/logs",
logging_steps=LOGGING_STEPS,
save_strategy="steps",
save_steps=SAVE_STEPS,
save_total_limit=2,
# Evaluation - use safe parameter name
eval_strategy="steps" if eval_dataset else "no",
eval_steps=EVAL_STEPS if eval_dataset else None,
# Performance settings - disable problematic ones
fp16=torch.cuda.is_available() and torch.cuda.get_device_properties(0).major >= 7,
bf16=False,
dataloader_num_workers=1,
dataloader_pin_memory=False,
remove_unused_columns=False,
# Memory optimization
optim="adamw_torch",
dataloader_drop_last=True,
gradient_checkpointing=True,
# Reporting
report_to="none",
run_name="resilient_training",
# Disable TF32 completely to avoid errors
tf32=False,
)
# Data collator
data_collator = DataCollatorForLanguageModeling(
tokenizer=tokenizer,
mlm=False,
pad_to_multiple_of=8,
)
# Create trainer with error handling
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset if eval_dataset else None,
data_collator=data_collator,
processing_class=tokenizer,
callbacks=[] # No callbacks to avoid issues
)
print("βœ… Training setup completed successfully")
return trainer
except Exception as e:
print(f"❌ Failed to create trainer: {str(e)[:200]}...")
traceback.print_exc()
return None
def safe_training_loop(trainer):
"""Execute training with maximum error handling"""
if not trainer:
print("❌ No trainer provided for training")
return False
print("πŸƒ Starting resilient training...")
try:
# Ensure output directory exists
safe_makedirs(OUTPUT_DIR)
# Start training with comprehensive error handling
train_result = trainer.train()
print("βœ… TRAINING COMPLETED SUCCESSFULLY!")
# Save everything with error handling
try:
print("πŸ’Ύ Saving model...")
trainer.save_model(f".")
trainer.save_state()
print("βœ… Model saved successfully!")
except Exception as e:
print(f"⚠️ Model save failed: {e}")
try:
print("πŸ’Ύ Saving tokenizer...")
Trainer._save(f".")
print("βœ… Tokenizer saved successfully!")
except Exception as e:
print(f"⚠️ Tokenizer save failed: {e}")
return True
except KeyboardInterrupt:
print("πŸ›‘ Training interrupted by user")
try:
# Try to save current progress
trainer.save_model(f".")
print("βœ… Interrupted model saved")
except:
print("⚠️ Could not save interrupted model")
return False
except Exception as e:
print(f"⚠️ Training failed with error: {str(e)[:300]}")
traceback.print_exc()
# Try emergency save
try:
print("πŸ’Ύ Attempting emergency save...")
trainer.save_model(f".")
print("βœ… Emergency save completed")
except Exception as save_error:
print(f"❌ Emergency save also failed: {save_error}")
return False
def main():
"""Main execution pipeline with maximum resilience"""
print("πŸš€ STARTING RESILIENT TRAINING PIPELINE")
print(f"πŸ”§ Batch Size: {BATCH_SIZE} | Workers: {NUM_WORKERS}")
print(f"πŸ–₯️ System: {platform.system()} | CUDA: {torch.cuda.is_available()}")
# Create output directory
safe_makedirs(OUTPUT_DIR)
# 1. Load tokenizer with comprehensive fallback
print("\nπŸ”€ LOADING TOKENIZER WITH MAXIMUM RESILIENCE...")
tokenizer = load_tokenizer_robust(MODEL_NAME)
if tokenizer is None:
print("❌ CRITICAL: Could not load any tokenizer. Exiting.")
return None
print(f"βœ… Tokenizer loaded successfully")
# 2. Load dataset with fallbacks
print("\nπŸ“₯ LOADING DATASET WITH FALLBACKS...")
dataset = load_dataset_with_fallback()
if dataset is None:
print("❌ Could not load any dataset")
return None
# 3. Process dataset with maximum resilience
print("\n⚑ PROCESSING DATASET WITH MAXIMUM RESILIENCE...")
tokenized_dataset = process_dataset_resilient(dataset, tokenizer)
if tokenized_dataset is None:
print("❌ Dataset processing failed completely")
return None
# 4. Load model with maximum resilience
print("\n🧠 LOADING MODEL WITH MAXIMUM RESILIENCE...")
model = load_model_resilient(MODEL_NAME, tokenizer)
if model is None:
print("❌ Model loading failed completely")
return None
# 5. Setup training with maximum resilience
print("\nβš™οΈ SETTING UP TRAINING WITH MAXIMUM RESILIENCE...")
trainer = setup_training_resilient(model, tokenizer, tokenized_dataset)
if trainer is None:
print("❌ Training setup failed")
return None
# 6. Execute training with maximum resilience
print("\nπŸƒ EXECUTING TRAINING WITH MAXIMUM RESILIENCE...")
success = safe_training_loop(trainer)
if success:
print("\nπŸŽ‰ TRAINING PIPELINE COMPLETED SUCCESSFULLY!")
else:
print("\n⚠️ TRAINING PIPELINE COMPLETED WITH ISSUES BUT DID NOT STOP!")
return trainer if success else None
# ─── Execute Everything ──────────────────────────────────────────────────────
if __name__ == "__main__":
print("🏁 STARTING EXECUTION WITH MAXIMUM RESILIENCE...")
try:
trainer = main()
if trainer:
print("🎊 SUCCESS: Training pipeline completed!")
else:
print("⚠️ Training pipeline completed with issues but did not crash!")
except KeyboardInterrupt:
print("\nπŸ›‘ EXECUTION STOPPED BY USER")
except Exception as e:
print(f"πŸ’₯ UNEXPECTED ERROR: {str(e)}")
traceback.print_exc()
print("⚠️ Even fatal errors won't stop the program completely!")