hamzabouajila's picture
update evaluator to start if pending models were found
29e54f1
raw
history blame
24.1 kB
import json
import os
from typing import Dict, Any
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig
from datasets import load_dataset
from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH
from src.display.utils import Tasks
class EvaluationStatus(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
FAILED = "FAILED"
@dataclass
class EvaluationResult:
model: str
revision: str
precision: str
weight_type: str
results: Dict[str, float]
error: str = None
def evaluate_tsac_sentiment(model, tokenizer, device):
"""Evaluate model on TSAC sentiment analysis task"""
try:
print("\n=== Starting TSAC sentiment evaluation ===")
print(f"Current device: {device}")
# Load and preprocess dataset
print("\nLoading and preprocessing TSAC dataset...")
dataset = load_dataset("fbougares/tsac", split="test", trust_remote_code=True)
print(f"Dataset size: {len(dataset)} examples")
def preprocess(examples):
print(f"\nProcessing batch of {len(examples['sentence'])} examples")
# Use 'sentence' field as per dataset structure
return tokenizer(
examples['sentence'],
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
)
dataset = dataset.map(preprocess, batched=True)
dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'target'])
# Check first example
first_example = dataset[0]
print("\nFirst example details:")
print(f"Input IDs shape: {first_example['input_ids'].shape}")
print(f"Attention mask shape: {first_example['attention_mask'].shape}")
print(f"Target: {first_example['target']}")
model.eval()
print(f"\nModel class: {model.__class__.__name__}")
print(f"Model device: {next(model.parameters()).device}")
with torch.no_grad():
predictions = []
targets = []
for i, batch in enumerate(dataset):
if i == 0:
print("\nProcessing first batch...")
print(f"Batch keys: {list(batch.keys())}")
print(f"Target shape: {batch['target'].shape}")
inputs = {k: v.to(device) for k, v in batch.items() if k != 'target'}
target = batch['target'].to(device)
outputs = model(**inputs)
print(f"\nBatch {i} output type: {type(outputs)}")
# Handle different model output formats
if isinstance(outputs, dict):
print(f"Output keys: {list(outputs.keys())}")
if 'logits' in outputs:
logits = outputs['logits']
elif 'prediction_logits' in outputs:
logits = outputs['prediction_logits']
else:
raise ValueError(f"Unknown output format. Available keys: {list(outputs.keys())}")
elif isinstance(outputs, tuple):
print(f"Output tuple length: {len(outputs)}")
logits = outputs[0]
else:
logits = outputs
print(f"Logits shape: {logits.shape}")
# For sequence classification, we typically use the [CLS] token's prediction
if len(logits.shape) == 3: # [batch_size, sequence_length, num_classes]
logits = logits[:, 0, :] # Take the [CLS] token prediction
print(f"Final logits shape: {logits.shape}")
batch_predictions = logits.argmax(dim=-1).cpu().tolist()
batch_targets = target.cpu().tolist()
predictions.extend(batch_predictions)
targets.extend(batch_targets)
if i == 0:
print("\nFirst batch predictions:")
print(f"Predictions: {batch_predictions[:5]}")
print(f"Targets: {batch_targets[:5]}")
print(f"\nTotal predictions: {len(predictions)}")
print(f"Total targets: {len(targets)}")
# Calculate accuracy
correct = sum(p == t for p, t in zip(predictions, targets))
total = len(predictions)
accuracy = correct / total if total > 0 else 0.0
print(f"\nEvaluation results:")
print(f"Correct predictions: {correct}")
print(f"Total predictions: {total}")
print(f"Accuracy: {accuracy:.4f}")
return {"accuracy": accuracy}
except Exception as e:
print(f"\n=== Error in TSAC evaluation: {str(e)} ===")
print(f"Full traceback: {traceback.format_exc()}")
raise e
def evaluate_tunisian_corpus_coverage(model, tokenizer, device):
"""Evaluate model's coverage on Tunisian Dialect Corpus"""
try:
dataset = load_dataset("arbml/Tunisian_Dialect_Corpus", split="train")
def preprocess(examples):
print("Tunisian Corpus preprocess exemples -------------",examples)
# Use 'Tweet' field as per dataset structure
return tokenizer(examples['Tweet'], padding=True, truncation=True, max_length=512)
dataset = dataset.map(preprocess, batched=True)
# Calculate token coverage
total_tokens = 0
covered_tokens = 0
for example in dataset:
tokens = tokenizer.tokenize(example['Tweet'])
total_tokens += len(tokens)
covered_tokens += len([t for t in tokens if t != tokenizer.unk_token])
coverage = covered_tokens / total_tokens if total_tokens > 0 else 0
print(f"Tunisian Corpus Coverage: {coverage:.2%}")
return {"coverage": coverage}
except Exception as e:
print(f"Error in Tunisian Corpus evaluation: {str(e)}")
raise e # Raise the error instead of returning 0.0
def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult:
"""Evaluate a single model on all tasks"""
try:
print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})")
print(f"Current working directory: {os.getcwd()}")
print(f"Evaluation requests path: {EVAL_REQUESTS_PATH}")
print(f"Evaluation results path: {EVAL_RESULTS_PATH}")
# Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load model and tokenizer with enhanced error handling
try:
print(f"\nLoading model: {model_name}")
print(f"Model path exists: {os.path.exists(model_name)}")
# First try to load the config to check model type
try:
config = AutoConfig.from_pretrained(model_name, revision=revision)
print(f"Model type from config: {config.model_type}")
except Exception as config_error:
print(f"Error loading config: {str(config_error)}")
# Try loading with trust_remote_code=True first
try:
print("\nAttempting to load with trust_remote_code=True...")
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
revision=revision,
torch_dtype=getattr(torch, precision),
trust_remote_code=True
).to(device)
print(f"Successfully loaded model {model_name} with trust_remote_code=True")
print(f"Model class: {model.__class__.__name__}")
except Exception as e1:
print(f"Error loading with trust_remote_code=True: {str(e1)}")
print(f"Error type: {type(e1).__name__}")
# If it's a model type error, try with llama as model type
if "Unrecognized model" in str(e1) and "llama" in model_name.lower():
print("\nAttempting to load as llama model...")
try:
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
revision=revision,
torch_dtype=getattr(torch, precision),
trust_remote_code=True,
model_type="llama"
).to(device)
print(f"Successfully loaded model {model_name} as llama model")
print(f"Model class: {model.__class__.__name__}")
except Exception as e2:
print(f"Error loading as llama model: {str(e2)}")
print(f"Error type: {type(e2).__name__}")
raise Exception(f"Failed to load model with both methods: {str(e1)}, {str(e2)}")
else:
raise e1
print(f"\nLoading tokenizer: {model_name}")
try:
tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision)
print(f"Successfully loaded tokenizer for {model_name}")
print(f"Tokenizer class: {tokenizer.__class__.__name__}")
except Exception as e:
print(f"Error loading tokenizer: {str(e)}")
print(f"Error type: {type(e).__name__}")
raise Exception(f"Failed to load tokenizer: {str(e)}")
# Run evaluations
print("\nStarting TSAC sentiment evaluation...")
try:
tsac_results = evaluate_tsac_sentiment(model, tokenizer, device)
print(f"TSAC results: {tsac_results}")
except Exception as e:
print(f"Error in TSAC evaluation for {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
tsac_results = {"accuracy": None}
print("\nStarting Tunisian Corpus evaluation...")
try:
tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device)
print(f"Tunisian Corpus results: {tunisian_results}")
except Exception as e:
print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
tunisian_results = {"coverage": None}
print("\nEvaluation completed successfully!")
print(f"Final results: {tsac_results} | {tunisian_results}")
return EvaluationResult(
model=model_name,
revision=revision,
precision=precision,
weight_type=weight_type,
results={
**tsac_results,
**tunisian_results
}
)
except Exception as e:
print(f"\nError loading model {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
print(f"Full traceback: {traceback.format_exc()}")
return EvaluationResult(
model=model_name,
revision=revision,
precision=precision,
weight_type=weight_type,
results={},
error=str(e)
)
except Exception as e:
print(f"\nError evaluating model {model_name}: {str(e)}")
print(f"Error type: {type(e).__name__}")
print(f"Full traceback: {traceback.format_exc()}")
return EvaluationResult(
model=model_name,
revision=revision,
precision=precision,
weight_type=weight_type,
results={},
error=str(e)
)
def process_evaluation_queue():
"""Process all pending evaluations in the queue"""
print(f"\n=== Starting evaluation queue processing ===")
print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}")
# Get all pending evaluations
if not os.path.exists(EVAL_REQUESTS_PATH):
print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}")
return
# Find all model directories (each model has its own directory)
model_dirs = [d for d in os.listdir(EVAL_REQUESTS_PATH) if os.path.isdir(os.path.join(EVAL_REQUESTS_PATH, d))]
print(f"Found {len(model_dirs)} model directories")
pending_files = []
for model_dir in model_dirs:
model_dir_path = os.path.join(EVAL_REQUESTS_PATH, model_dir)
print(f"\nChecking model directory: {model_dir_path}")
# Find all JSON files in the model directory
json_files = [f for f in os.listdir(model_dir_path) if f.endswith('.json')]
print(f"Found {len(json_files)} JSON files in {model_dir}")
for file in json_files:
file_path = os.path.join(model_dir_path, file)
try:
with open(file_path, 'r') as f:
eval_entry = json.load(f)
# Check if this is a pending evaluation
if eval_entry.get('status') == EvaluationStatus.PENDING.value:
print(f"\n=== Found pending evaluation ===")
print(f"Model: {eval_entry['model']}")
print(f"Revision: {eval_entry['revision']}")
print(f"Precision: {eval_entry['precision']}")
print(f"Weight type: {eval_entry['weight_type']}")
# Update status to RUNNING
eval_entry['status'] = EvaluationStatus.RUNNING.value
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
# Run evaluation
try:
print("\n=== Starting evaluation ===")
eval_result = evaluate_model(
model_name=eval_entry['model'],
revision=eval_entry['revision'],
precision=eval_entry['precision'],
weight_type=eval_entry['weight_type']
)
print("\n=== Evaluation completed ===")
print(f"Results: {eval_result.results}")
# Update status to FINISHED and add results
eval_entry['status'] = EvaluationStatus.FINISHED.value
eval_entry['results'] = eval_result.results
if eval_result.error:
eval_entry['error'] = eval_result.error
# Save updated entry
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
# Move file to results directory
if not os.path.exists(EVAL_RESULTS_PATH):
os.makedirs(EVAL_RESULTS_PATH)
result_filename = os.path.basename(file_path)
result_path = os.path.join(EVAL_RESULTS_PATH, result_filename)
os.rename(file_path, result_path)
print(f"\nMoved evaluation result to: {result_path}")
# Upload to Hugging Face
try:
API.upload_file(
path_or_fileobj=result_path,
path_in_repo=result_filename,
repo_id=RESULTS_REPO,
repo_type="dataset",
commit_message=f"Add evaluation results for {eval_entry['model']}"
)
print("\nResults uploaded to Hugging Face")
except Exception as upload_error:
print(f"Error uploading results: {str(upload_error)}")
eval_entry['error'] = f"Evaluation completed but failed to upload results: {str(upload_error)}"
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
except Exception as eval_error:
print(f"\n=== Error during evaluation ===")
print(f"Error: {str(eval_error)}")
print(f"Full traceback: {traceback.format_exc()}")
# Update status to FAILED and add error
eval_entry['status'] = EvaluationStatus.FAILED.value
eval_entry['error'] = str(eval_error)
with open(file_path, 'w') as f:
json.dump(eval_entry, f, indent=2)
# Move failed evaluation to results directory
if not os.path.exists(EVAL_RESULTS_PATH):
os.makedirs(EVAL_RESULTS_PATH)
result_filename = os.path.basename(file_path)
result_path = os.path.join(EVAL_RESULTS_PATH, result_filename)
os.rename(file_path, result_path)
print(f"\nMoved failed evaluation to: {result_path}")
# Upload error file
try:
API.upload_file(
path_or_fileobj=result_path,
path_in_repo=result_filename,
repo_id=RESULTS_REPO,
repo_type="dataset",
commit_message=f"Add evaluation error for {eval_entry['model']}"
)
print("\nError file uploaded to Hugging Face")
except Exception as upload_error:
print(f"Error uploading error file: {str(upload_error)}")
except Exception as e:
print(f"Error processing file {file}: {str(e)}")
print(f"Full traceback: {traceback.format_exc()}")
pending_files.append(os.path.join(EVAL_REQUESTS_PATH, file))
print(f"Found {len(pending_files)} pending evaluation requests")
for file_path in pending_files:
print(f" - {file_path}")
if not pending_files:
print("No pending evaluation requests found")
return
for file_path in pending_files:
try:
print(f"\n=== Processing evaluation request: {file_path} ===")
# Read the file atomically
try:
with open(file_path, 'r') as f:
eval_request = json.load(f)
print(f"Loaded evaluation request: {json.dumps(eval_request, indent=2)}")
except Exception as e:
print(f"Error reading evaluation request: {str(e)}")
continue
# Skip non-pending evaluations
status = eval_request.get('status', 'UNKNOWN')
if status != EvaluationStatus.PENDING.value:
print(f"Skipping non-pending evaluation (status: {status})")
continue
# Update status to RUNNING
eval_request['status'] = EvaluationStatus.RUNNING.value
print(f"Updating status to RUNNING for {eval_request['model']}")
# Write the update atomically
try:
with open(file_path, 'w') as f:
json.dump(eval_request, f, indent=2)
print("Successfully updated status to RUNNING")
except Exception as e:
print(f"Error updating status: {str(e)}")
continue
# Get model info from request
model_name = eval_request.get('model', '')
revision = eval_request.get('revision', '')
precision = eval_request.get('precision', '')
weight_type = eval_request.get('weight_type', '')
if not model_name:
print("Error: Missing model name in evaluation request")
continue
print(f"\n=== Evaluating model: {model_name} ===")
print(f"Revision: {revision}")
print(f"Precision: {precision}")
print(f"Weight type: {weight_type}")
result = evaluate_model(model_name, revision, precision, weight_type)
# Update status and save results
if result.error:
print(f"\n=== Evaluation failed ===")
print(f"Error: {result.error}")
eval_request['status'] = EvaluationStatus.FAILED.value
eval_request['error'] = result.error
else:
print(f"\n=== Evaluation completed successfully ===")
print(f"Results: {result.results}")
eval_request['status'] = EvaluationStatus.FINISHED.value
eval_request['results'] = result.results
# Write the final update atomically
try:
with open(file_path, 'w') as f:
json.dump(eval_request, f, indent=2)
print("Successfully saved evaluation results")
except Exception as e:
print(f"Error saving evaluation results: {str(e)}")
continue
# Move successful evaluations to results directory
if eval_request['status'] == EvaluationStatus.FINISHED.value:
try:
os.makedirs(EVAL_RESULTS_PATH, exist_ok=True)
result_file = os.path.join(EVAL_RESULTS_PATH, os.path.basename(file_path))
os.rename(file_path, result_file)
print(f"Moved evaluation results to: {result_file}")
except Exception as e:
print(f"Error moving results file: {str(e)}")
except Exception as e:
print(f"\n=== Error processing evaluation: {str(e)} ===")
print(f"Full traceback: {traceback.format_exc()}")
continue
# Upload to Hugging Face
try:
if 'result_file' in locals():
API.upload_file(
path_or_fileobj=result_file,
path_in_repo=result_filename if not username else os.path.join(username, result_filename),
repo_id=f"{OWNER}/results",
repo_type="dataset",
commit_message=f"Add evaluation results for {result.model}"
)
print("Successfully uploaded results to Hugging Face")
except Exception as e:
print(f"Error uploading results to Hugging Face: {str(e)}")