import json import os from typing import Dict, Any from dataclasses import dataclass from enum import Enum from datetime import datetime import torch from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig from datasets import load_dataset import traceback from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, RESULTS_REPO from src.display.utils import Tasks class EvaluationStatus(Enum): PENDING = "PENDING" RUNNING = "RUNNING" FINISHED = "FINISHED" FAILED = "FAILED" @dataclass class EvaluationResult: model: str revision: str precision: str weight_type: str results: Dict[str, float] error: str = None def evaluate_tsac_sentiment(model, tokenizer, device): """Evaluate model on TSAC sentiment analysis task""" try: print("\n=== Starting TSAC sentiment evaluation ===") print(f"Current device: {device}") # Load and preprocess dataset print("\nLoading and preprocessing TSAC dataset...") dataset = load_dataset("fbougares/tsac", split="test", trust_remote_code=True) print(f"Dataset size: {len(dataset)} examples") def preprocess(examples): print(f"\nProcessing batch of {len(examples['sentence'])} examples") # Use 'sentence' field as per dataset structure return tokenizer( examples['sentence'], padding=True, truncation=True, max_length=512, return_tensors='pt' ) dataset = dataset.map(preprocess, batched=True) dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'target']) # Check first example first_example = dataset[0] print("\nFirst example details:") print(f"Input IDs shape: {first_example['input_ids'].shape}") print(f"Attention mask shape: {first_example['attention_mask'].shape}") print(f"Target: {first_example['target']}") model.eval() print(f"\nModel class: {model.__class__.__name__}") print(f"Model device: {next(model.parameters()).device}") with torch.no_grad(): predictions = [] targets = [] # Create DataLoader with batch size 16 from torch.utils.data import DataLoader # Define a custom collate function def collate_fn(batch): # Stack tensors for input_ids and attention_mask input_ids = torch.stack([sample['input_ids'] for sample in batch]) attention_mask = torch.stack([sample['attention_mask'] for sample in batch]) # Stack targets targets = torch.stack([torch.tensor(sample['target']) for sample in batch]) return { 'input_ids': input_ids, 'attention_mask': attention_mask, 'target': targets } dataloader = DataLoader( dataset, batch_size=16, shuffle=False, collate_fn=collate_fn ) for i, batch in enumerate(dataloader): if i == 0: print("\nProcessing first batch...") print(f"Batch keys: {list(batch.keys())}") print(f"Target shape: {batch['target'].shape}") inputs = {k: v.to(device) for k, v in batch.items() if k != 'target'} target = batch['target'].to(device) outputs = model(**inputs) print(f"\nBatch {i} output type: {type(outputs)}") # Handle different model output formats if isinstance(outputs, dict): print(f"Output keys: {list(outputs.keys())}") if 'logits' in outputs: logits = outputs['logits'] elif 'prediction_logits' in outputs: logits = outputs['prediction_logits'] else: raise ValueError(f"Unknown output format. Available keys: {list(outputs.keys())}") elif isinstance(outputs, tuple): print(f"Output tuple length: {len(outputs)}") logits = outputs[0] else: logits = outputs print(f"Logits shape: {logits.shape}") # For sequence classification, we typically use the [CLS] token's prediction if len(logits.shape) == 3: # [batch_size, sequence_length, num_classes] logits = logits[:, 0, :] # Take the [CLS] token prediction print(f"Final logits shape: {logits.shape}") batch_predictions = logits.argmax(dim=-1).cpu().tolist() batch_targets = target.cpu().tolist() predictions.extend(batch_predictions) targets.extend(batch_targets) if i == 0: print("\nFirst batch predictions:") print(f"Predictions: {batch_predictions[:5]}") print(f"Targets: {batch_targets[:5]}") print(f"\nTotal predictions: {len(predictions)}") print(f"Total targets: {len(targets)}") # Calculate accuracy correct = sum(p == t for p, t in zip(predictions, targets)) total = len(predictions) accuracy = correct / total if total > 0 else 0.0 print(f"\nEvaluation results:") print(f"Correct predictions: {correct}") print(f"Total predictions: {total}") print(f"Accuracy: {accuracy:.4f}") return {"accuracy": accuracy} except Exception as e: print(f"\n=== Error in TSAC evaluation: {str(e)} ===") print(f"Full traceback: {traceback.format_exc()}") raise e def evaluate_tunisian_corpus_coverage(model, tokenizer, device): """Evaluate model's coverage on Tunisian Dialect Corpus""" try: dataset = load_dataset("arbml/Tunisian_Dialect_Corpus", split="train") def preprocess(examples): print("Tunisian Corpus preprocess exemples -------------",examples) # Use 'Tweet' field as per dataset structure return tokenizer( examples['Tweet'], padding=False, # We don't need padding for token coverage truncation=False, # Don't truncate long sequences max_length=None # Let tokenizer handle the length ) dataset = dataset.map(preprocess, batched=True) # Calculate token coverage total_tokens = 0 covered_tokens = 0 for example in dataset: # Get the tokenized input IDs input_ids = example['input_ids'] # Convert to tokens and count tokens = tokenizer.convert_ids_to_tokens(input_ids) total_tokens += len(tokens) covered_tokens += len([t for t in tokens if t != tokenizer.unk_token]) coverage = covered_tokens / total_tokens if total_tokens > 0 else 0 print(f"Tunisian Corpus Coverage: {coverage:.2%}") return {"coverage": coverage} except Exception as e: print(f"Error in Tunisian Corpus evaluation: {str(e)}") print(f"Full traceback: {traceback.format_exc()}") raise e def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult: """Evaluate a single model on all tasks""" try: print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})") print(f"Current working directory: {os.getcwd()}") print(f"Evaluation requests path: {EVAL_REQUESTS_PATH}") print(f"Evaluation results path: {EVAL_RESULTS_PATH}") # Initialize device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Load model and tokenizer with enhanced error handling try: print(f"\nLoading model: {model_name}") print(f"Model path exists: {os.path.exists(model_name)}") # First try to load the config to check model type try: config = AutoConfig.from_pretrained(model_name, revision=revision) print(f"Model type from config: {config.model_type}") except Exception as config_error: print(f"Error loading config: {str(config_error)}") # Try loading with trust_remote_code=True first try: print("\nAttempting to load with trust_remote_code=True...") model = AutoModelForSequenceClassification.from_pretrained( model_name, revision=revision, torch_dtype=getattr(torch, precision), trust_remote_code=True ).to(device) print(f"Successfully loaded model {model_name} with trust_remote_code=True") print(f"Model class: {model.__class__.__name__}") except Exception as e1: print(f"Error loading with trust_remote_code=True: {str(e1)}") print(f"Error type: {type(e1).__name__}") # If it's a model type error, try with llama as model type if "Unrecognized model" in str(e1) and "llama" in model_name.lower(): print("\nAttempting to load as llama model...") try: model = AutoModelForSequenceClassification.from_pretrained( model_name, revision=revision, torch_dtype=getattr(torch, precision), trust_remote_code=True, model_type="llama" ).to(device) print(f"Successfully loaded model {model_name} as llama model") print(f"Model class: {model.__class__.__name__}") except Exception as e2: print(f"Error loading as llama model: {str(e2)}") print(f"Error type: {type(e2).__name__}") raise Exception(f"Failed to load model with both methods: {str(e1)}, {str(e2)}") else: raise e1 print(f"\nLoading tokenizer: {model_name}") try: tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision) print(f"Successfully loaded tokenizer for {model_name}") print(f"Tokenizer class: {tokenizer.__class__.__name__}") except Exception as e: print(f"Error loading tokenizer: {str(e)}") print(f"Error type: {type(e).__name__}") raise Exception(f"Failed to load tokenizer: {str(e)}") # Run evaluations print("\nStarting TSAC sentiment evaluation...") try: tsac_results = evaluate_tsac_sentiment(model, tokenizer, device) print(f"TSAC results: {tsac_results}") except Exception as e: print(f"Error in TSAC evaluation for {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") tsac_results = {"accuracy": None} print("\nStarting Tunisian Corpus evaluation...") try: tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device) print(f"Tunisian Corpus results: {tunisian_results}") except Exception as e: print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") tunisian_results = {"coverage": None} print("\nEvaluation completed successfully!") print(f"Final results: {tsac_results} | {tunisian_results}") return EvaluationResult( model=model_name, revision=revision, precision=precision, weight_type=weight_type, results={ **tsac_results, **tunisian_results } ) except Exception as e: print(f"\nError loading model {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") print(f"Full traceback: {traceback.format_exc()}") return EvaluationResult( model=model_name, revision=revision, precision=precision, weight_type=weight_type, results={}, error=str(e) ) except Exception as e: print(f"\nError evaluating model {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") print(f"Full traceback: {traceback.format_exc()}") return EvaluationResult( model=model_name, revision=revision, precision=precision, weight_type=weight_type, results={}, error=str(e) ) def process_evaluation_queue(): """Process all pending evaluations in the queue""" print(f"\n=== Starting evaluation queue processing ===") print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}") # Get all pending evaluations if not os.path.exists(EVAL_REQUESTS_PATH): print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}") return # Find all model directories (each model has its own directory) model_dirs = [d for d in os.listdir(EVAL_REQUESTS_PATH) if os.path.isdir(os.path.join(EVAL_REQUESTS_PATH, d))] print(f"Found {len(model_dirs)} model directories") for model_dir in model_dirs: model_dir_path = os.path.join(EVAL_REQUESTS_PATH, model_dir) print(f"\nChecking model directory: {model_dir_path}") # Find all JSON files in the model directory json_files = [f for f in os.listdir(model_dir_path) if f.endswith('.json')] print(f"Found {len(json_files)} pending evaluation requests") for file in json_files: file_path = os.path.join(model_dir_path, file) print(f" - {file_path}") try: with open(file_path, 'r') as f: eval_entry = json.load(f) # Check if this is a pending or running evaluation status = eval_entry.get('status', '') if status == EvaluationStatus.PENDING.value: print(f"\n=== Found pending evaluation ===") print(f"Model: {eval_entry['model']}") print(f"Revision: {eval_entry['revision']}") print(f"Precision: {eval_entry['precision']}") print(f"Weight type: {eval_entry['weight_type']}") # Update status to RUNNING eval_entry['status'] = EvaluationStatus.RUNNING.value with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) # Run evaluation try: print("\n=== Starting evaluation ===") eval_result = evaluate_model( model_name=eval_entry['model'], revision=eval_entry['revision'], precision=eval_entry['precision'], weight_type=eval_entry['weight_type'] ) print("\n=== Evaluation completed ===") print(f"Results: {eval_result.results}") # Update status to FINISHED and add results eval_entry['status'] = EvaluationStatus.FINISHED.value eval_entry['results'] = eval_result.results if eval_result.error: eval_entry['error'] = eval_result.error # Save updated entry with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) # Move file to results directory if not os.path.exists(EVAL_RESULTS_PATH): os.makedirs(EVAL_RESULTS_PATH) result_filename = os.path.basename(file_path) result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) os.rename(file_path, result_path) print(f"\nMoved evaluation result to: {result_path}") # Upload to Hugging Face try: API.upload_file( path_or_fileobj=result_path, path_in_repo=result_filename, repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add evaluation results for {eval_entry['model']}" ) print("\nResults uploaded to Hugging Face") except Exception as upload_error: print(f"Error uploading results: {str(upload_error)}") eval_entry['error'] = f"Evaluation completed but failed to upload results: {str(upload_error)}" with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) except Exception as eval_error: print(f"\n=== Error during evaluation ===") print(f"Error: {str(eval_error)}") print(f"Full traceback: {traceback.format_exc()}") # Update status to FAILED and add error eval_entry['status'] = EvaluationStatus.FAILED.value eval_entry['error'] = str(eval_error) with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) # Move failed evaluation to results directory if not os.path.exists(EVAL_RESULTS_PATH): os.makedirs(EVAL_RESULTS_PATH) result_filename = os.path.basename(file_path) result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) os.rename(file_path, result_path) print(f"\nMoved failed evaluation to: {result_path}") # Upload error file try: API.upload_file( path_or_fileobj=result_path, path_in_repo=result_filename, repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add evaluation error for {eval_entry['model']}" ) print("\nError file uploaded to Hugging Face") except Exception as upload_error: print(f"Error uploading error file: {str(upload_error)}") elif status == EvaluationStatus.RUNNING.value: print(f"\n=== Found running evaluation ===") print(f"Model: {eval_entry['model']}") print(f"Revision: {eval_entry['revision']}") print(f"Precision: {eval_entry['precision']}") print(f"Weight type: {eval_entry['weight_type']}") try: # Check if we have results for this evaluation result_filename = os.path.basename(file_path) result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) if os.path.exists(result_path): print(f"\nFound existing results file: {result_path}") # Update status to FINISHED eval_entry['status'] = EvaluationStatus.FINISHED.value with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) else: print("\nNo results found. Restarting evaluation...") # Restart the evaluation eval_entry['status'] = EvaluationStatus.PENDING.value with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) except Exception as check_error: print(f"\n=== Error checking running evaluation ===") print(f"Error: {str(check_error)}") print(f"Full traceback: {traceback.format_exc()}") # If we can't check the status, restart the evaluation eval_entry['status'] = EvaluationStatus.PENDING.value with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) except Exception as e: print(f"Error processing file {file}: {str(e)}") print(f"Full traceback: {traceback.format_exc()}") continue print(f"\n=== Evaluation queue summary ===") print(f"Total directories checked: {len(model_dirs)}") print(f"Total files processed: {len(json_files)}") print(f"\nEvaluation queue processed. Sleeping for 5 minutes...") return