hamzabouajila's picture
Added traceback import to handle error traces
f12b6ec
raw
history blame
23 kB
import json
import os
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig
from datasets import load_dataset
import traceback
from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, RESULTS_REPO
from src.display.utils import Tasks
class EvaluationStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
FAILED = "FAILED"
@dataclass
class EvaluationResult:
model: str
revision: str
precision: str
weight_type: str
results: Dict[str, float]
error: str = None
def evaluate_tsac_sentiment(model, tokenizer, device):
"""Evaluate model on TSAC sentiment analysis task"""
try:
print("\n=== Starting TSAC sentiment evaluation ===")
print(f"Current device: {device}")
# Load and preprocess dataset
print("\nLoading and preprocessing TSAC dataset...")
dataset = load_dataset("fbougares/tsac", split="test", trust_remote_code=True)
print(f"Dataset size: {len(dataset)} examples")
def preprocess(examples):
print(f"\nProcessing batch of {len(examples['sentence'])} examples")
# Use 'sentence' field as per dataset structure
return tokenizer(
examples['sentence'],
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
)
dataset = dataset.map(preprocess, batched=True)
dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'target'])
# Check first example
first_example = dataset[0]
print("\nFirst example details:")
print(f"Input IDs shape: {first_example['input_ids'].shape}")
print(f"Attention mask shape: {first_example['attention_mask'].shape}")
print(f"Target: {first_example['target']}")
model.eval()
print(f"\nModel class: {model.__class__.__name__}")
print(f"Model device: {next(model.parameters()).device}")
with torch.no_grad():
predictions = []
targets = []
# Create DataLoader with batch size 16
from torch.utils.data import DataLoader
# Define a custom collate function
def collate_fn(batch):
# Stack tensors for input_ids and attention_mask
input_ids = torch.stack([sample['input_ids'] for sample in batch])
attention_mask = torch.stack([sample['attention_mask'] for sample in batch])
# Stack targets
targets = torch.stack([torch.tensor(sample['target']) for sample in batch])
return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'target': targets
}
dataloader = DataLoader(
dataset,
batch_size=16,
shuffle=False,
collate_fn=collate_fn
)
for i, batch in enumerate(dataloader):
if i == 0:
print("\nProcessing first batch...")
print(f"Batch keys: {list(batch.keys())}")
print(f"Target shape: {batch['target'].shape}")
inputs = {k: v.to(device) for k, v in batch.items() if k != 'target'}
target = batch['target'].to(device)
outputs = model(**inputs)
print(f"\nBatch {i} output type: {type(outputs)}")
# Handle different model output formats
if isinstance(outputs, dict):
print(f"Output keys: {list(outputs.keys())}")
if 'logits' in outputs:
logits = outputs['logits']
elif 'prediction_logits' in outputs:
logits = outputs['prediction_logits']
else:
raise ValueError(f"Unknown output format. Available keys: {list(outputs.keys())}")
elif isinstance(outputs, tuple):
print(f"Output tuple length: {len(outputs)}")
logits = outputs[0]
else:
logits = outputs
print(f"Logits shape: {logits.shape}")
# For sequence classification, we typically use the [CLS] token's prediction
if len(logits.shape) == 3: # [batch_size, sequence_length, num_classes]
logits = logits[:, 0, :] # Take the [CLS] token prediction
print(f"Final logits shape: {logits.shape}")
batch_predictions = logits.argmax(dim=-1).cpu().tolist()
batch_targets = target.cpu().tolist()
predictions.extend(batch_predictions)
targets.extend(batch_targets)
if i == 0:
print("\nFirst batch predictions:")
print(f"Predictions: {batch_predictions[:5]}")
print(f"Targets: {batch_targets[:5]}")
print(f"\nTotal predictions: {len(predictions)}")
print(f"Total targets: {len(targets)}")
# Calculate accuracy
correct = sum(p == t for p, t in zip(predictions, targets))
total = len(predictions)
accuracy = correct / total if total > 0 else 0.0
print(f"\nEvaluation results:")
print(f"Correct predictions: {correct}")
print(f"Total predictions: {total}")
print(f"Accuracy: {accuracy:.4f}")
return {"accuracy": accuracy}
except Exception as e:
print(f"\n=== Error in TSAC evaluation: {str(e)} ===")
print(f"Full traceback: {traceback.format_exc()}")
raise e
def evaluate_tunisian_corpus_coverage(model, tokenizer, device):
"""Evaluate model's coverage on Tunisian Dialect Corpus"""
try:
dataset = load_dataset("arbml/Tunisian_Dialect_Corpus", split="train")
def preprocess(examples):
print("Tunisian Corpus preprocess exemples -------------",examples)
# Use 'Tweet' field as per dataset structure
return tokenizer(
examples['Tweet'],
padding=False, # We don't need padding for token coverage
truncation=False, # Don't truncate long sequences
max_length=None # Let tokenizer handle the length
)
dataset = dataset.map(preprocess, batched=True)
# Calculate token coverage
total_tokens = 0
covered_tokens = 0
for example in dataset:
# Get the tokenized input IDs
input_ids = example['input_ids']
# Convert to tokens and count
tokens = tokenizer.convert_ids_to_tokens(input_ids)
total_tokens += len(tokens)
covered_tokens += len([t for t in tokens if t != tokenizer.unk_token])
coverage = covered_tokens / total_tokens if total_tokens > 0 else 0
print(f"Tunisian Corpus Coverage: {coverage:.2%}")
return {"coverage": coverage}
except Exception as e:
print(f"Error in Tunisian Corpus evaluation: {str(e)}")
print(f"Full traceback: {traceback.format_exc()}")
raise e
def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult:
"""Evaluate a single model on all tasks"""
try:
print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})")
print(f"Current working directory: {os.getcwd()}")
print(f"Evaluation requests path: {EVAL_REQUESTS_PATH}")
print(f"Evaluation results path: {EVAL_RESULTS_PATH}")
# Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load model and tokenizer with enhanced error handling
try:
print(f"\nLoading model: {model_name}")
print(f"Model path exists: {os.path.exists(model_name)}")
# First try to load the config to check model type
try:
config = AutoConfig.from_pretrained(model_name, revision=revision)
print(f"Model type from config: {config.model_type}")
except Exception as config_error:
print(f"Error loading config: {str(config_error)}")
# Try loading with trust_remote_code=True first
try:
print("\nAttempting to load with trust_remote_code=True...")
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
revision=revision,
torch_dtype=getattr(torch, precision),
trust_remote_code=True
).to(device)
print(f"Successfully loaded model {model_name} with trust_remote_code=True")
print(f"Model class: {model.__class__.__name__}")
except Exception as e1:
print(f"Error loading with trust_remote_code=True: {str(e1)}")
print(f"Error type: {type(e1).__name__}")
# If it's a model type error, try with llama as model type
if "Unrecognized model" in str(e1) and "llama" in model_name.lower():
print("\nAttempting to load as llama model...")
try:
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
revision=revision,
torch_dtype=getattr(torch, precision),
trust_remote_code=True,
model_type="llama"
).to(device)
print(f"Successfully loaded model {model_name} as llama model")
print(f"Model class: {model.__class__.__name__}")
except Exception as e2:
print(f"Error loading as llama model: {str(e2)}")
print(f"Error type: {type(e2).__name__}")
raise Exception(f"Failed to load model with both methods: {str(e1)}, {str(e2)}")
else:
raise e1
print(f"\nLoading tokenizer: {model_name}")
try:
tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision)
print(f"Successfully loaded tokenizer for {model_name}")
print(f"Tokenizer class: {tokenizer.__class__.__name__}")
except Exception as e:
print(f"Error loading tokenizer: {str(e)}")
print(f"Error type: {type(e).__name__}")
raise Exception(f"Failed to load tokenizer: {str(e)}")
# Run evaluations
print("\nStarting TSAC sentiment evaluation...")
try:
tsac_results = evaluate_tsac_sentiment(model, tokenizer, device)
print(f"TSAC results: {tsac_results}")
except Exception as e:
print(f"Error in TSAC evaluation for {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
tsac_results = {"accuracy": None}
print("\nStarting Tunisian Corpus evaluation...")
try:
tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device)
print(f"Tunisian Corpus results: {tunisian_results}")
except Exception as e:
print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
tunisian_results = {"coverage": None}
print("\nEvaluation completed successfully!")
print(f"Final results: {tsac_results} | {tunisian_results}")
return EvaluationResult(
model=model_name,
revision=revision,
precision=precision,
weight_type=weight_type,
results={
**tsac_results,
**tunisian_results
}
)
except Exception as e:
print(f"\nError loading model {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
print(f"Full traceback: {traceback.format_exc()}")
return EvaluationResult(
model=model_name,
revision=revision,
precision=precision,
weight_type=weight_type,
results={},
error=str(e)
)
except Exception as e:
print(f"\nError evaluating model {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
print(f"Full traceback: {traceback.format_exc()}")
return EvaluationResult(
model=model_name,
revision=revision,
precision=precision,
weight_type=weight_type,
results={},
error=str(e)
)
def process_evaluation_queue():
"""Process all pending evaluations in the queue"""
print(f"\n=== Starting evaluation queue processing ===")
print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}")
# Get all pending evaluations
if not os.path.exists(EVAL_REQUESTS_PATH):
print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}")
return
# Find all model directories (each model has its own directory)
model_dirs = [d for d in os.listdir(EVAL_REQUESTS_PATH) if os.path.isdir(os.path.join(EVAL_REQUESTS_PATH, d))]
print(f"Found {len(model_dirs)} model directories")
for model_dir in model_dirs:
model_dir_path = os.path.join(EVAL_REQUESTS_PATH, model_dir)
print(f"\nChecking model directory: {model_dir_path}")
# Find all JSON files in the model directory
json_files = [f for f in os.listdir(model_dir_path) if f.endswith('.json')]
print(f"Found {len(json_files)} pending evaluation requests")
for file in json_files:
file_path = os.path.join(model_dir_path, file)
print(f" - {file_path}")
try:
with open(file_path, 'r') as f:
eval_entry = json.load(f)
# Check if this is a pending or running evaluation
status = eval_entry.get('status', '')
if status == EvaluationStatus.PENDING.value:
print(f"\n=== Found pending evaluation ===")
print(f"Model: {eval_entry['model']}")
print(f"Revision: {eval_entry['revision']}")
print(f"Precision: {eval_entry['precision']}")
print(f"Weight type: {eval_entry['weight_type']}")
# Update status to RUNNING
eval_entry['status'] = EvaluationStatus.RUNNING.value
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
# Run evaluation
try:
print("\n=== Starting evaluation ===")
eval_result = evaluate_model(
model_name=eval_entry['model'],
revision=eval_entry['revision'],
precision=eval_entry['precision'],
weight_type=eval_entry['weight_type']
)
print("\n=== Evaluation completed ===")
print(f"Results: {eval_result.results}")
# Update status to FINISHED and add results
eval_entry['status'] = EvaluationStatus.FINISHED.value
eval_entry['results'] = eval_result.results
if eval_result.error:
eval_entry['error'] = eval_result.error
# Save updated entry
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
# Move file to results directory
if not os.path.exists(EVAL_RESULTS_PATH):
os.makedirs(EVAL_RESULTS_PATH)
result_filename = os.path.basename(file_path)
result_path = os.path.join(EVAL_RESULTS_PATH, result_filename)
os.rename(file_path, result_path)
print(f"\nMoved evaluation result to: {result_path}")
# Upload to Hugging Face
try:
API.upload_file(
path_or_fileobj=result_path,
path_in_repo=result_filename,
repo_id=RESULTS_REPO,
repo_type="dataset",
commit_message=f"Add evaluation results for {eval_entry['model']}"
)
print("\nResults uploaded to Hugging Face")
except Exception as upload_error:
print(f"Error uploading results: {str(upload_error)}")
eval_entry['error'] = f"Evaluation completed but failed to upload results: {str(upload_error)}"
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
except Exception as eval_error:
print(f"\n=== Error during evaluation ===")
print(f"Error: {str(eval_error)}")
print(f"Full traceback: {traceback.format_exc()}")
# Update status to FAILED and add error
eval_entry['status'] = EvaluationStatus.FAILED.value
eval_entry['error'] = str(eval_error)
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
# Move failed evaluation to results directory
if not os.path.exists(EVAL_RESULTS_PATH):
os.makedirs(EVAL_RESULTS_PATH)
result_filename = os.path.basename(file_path)
result_path = os.path.join(EVAL_RESULTS_PATH, result_filename)
os.rename(file_path, result_path)
print(f"\nMoved failed evaluation to: {result_path}")
# Upload error file
try:
API.upload_file(
path_or_fileobj=result_path,
path_in_repo=result_filename,
repo_id=RESULTS_REPO,
repo_type="dataset",
commit_message=f"Add evaluation error for {eval_entry['model']}"
)
print("\nError file uploaded to Hugging Face")
except Exception as upload_error:
print(f"Error uploading error file: {str(upload_error)}")
elif status == EvaluationStatus.RUNNING.value:
print(f"\n=== Found running evaluation ===")
print(f"Model: {eval_entry['model']}")
print(f"Revision: {eval_entry['revision']}")
print(f"Precision: {eval_entry['precision']}")
print(f"Weight type: {eval_entry['weight_type']}")
try:
# Check if we have results for this evaluation
result_filename = os.path.basename(file_path)
result_path = os.path.join(EVAL_RESULTS_PATH, result_filename)
if os.path.exists(result_path):
print(f"\nFound existing results file: {result_path}")
# Update status to FINISHED
eval_entry['status'] = EvaluationStatus.FINISHED.value
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
else:
print("\nNo results found. Restarting evaluation...")
# Restart the evaluation
eval_entry['status'] = EvaluationStatus.PENDING.value
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
except Exception as check_error:
print(f"\n=== Error checking running evaluation ===")
print(f"Error: {str(check_error)}")
print(f"Full traceback: {traceback.format_exc()}")
# If we can't check the status, restart the evaluation
eval_entry['status'] = EvaluationStatus.PENDING.value
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
except Exception as e:
print(f"Error processing file {file}: {str(e)}")
print(f"Full traceback: {traceback.format_exc()}")
continue
print(f"\n=== Evaluation queue summary ===")
print(f"Total directories checked: {len(model_dirs)}")
print(f"Total files processed: {len(json_files)}")
print(f"\nEvaluation queue processed. Sleeping for 5 minutes...")
return