Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from typing import Dict, Any | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from datetime import datetime | |
| import torch | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig | |
| from datasets import load_dataset | |
| import traceback | |
| from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, RESULTS_REPO | |
| from src.display.utils import Tasks | |
| class EvaluationStatus(Enum): | |
| PENDING = "PENDING" | |
| RUNNING = "RUNNING" | |
| FINISHED = "FINISHED" | |
| FAILED = "FAILED" | |
| class EvaluationResult: | |
| model: str | |
| revision: str | |
| precision: str | |
| weight_type: str | |
| results: Dict[str, float] | |
| error: str = None | |
| def evaluate_tsac_sentiment(model, tokenizer, device): | |
| """Evaluate model on TSAC sentiment analysis task""" | |
| try: | |
| print("\n=== Starting TSAC sentiment evaluation ===") | |
| print(f"Current device: {device}") | |
| # Load and preprocess dataset | |
| print("\nLoading and preprocessing TSAC dataset...") | |
| dataset = load_dataset("fbougares/tsac", split="test", trust_remote_code=True) | |
| print(f"Dataset size: {len(dataset)} examples") | |
| def preprocess(examples): | |
| print(f"\nProcessing batch of {len(examples['sentence'])} examples") | |
| # Use 'sentence' field as per dataset structure | |
| return tokenizer( | |
| examples['sentence'], | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors='pt' | |
| ) | |
| dataset = dataset.map(preprocess, batched=True) | |
| dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'target']) | |
| # Check first example | |
| first_example = dataset[0] | |
| print("\nFirst example details:") | |
| print(f"Input IDs shape: {first_example['input_ids'].shape}") | |
| print(f"Attention mask shape: {first_example['attention_mask'].shape}") | |
| print(f"Target: {first_example['target']}") | |
| model.eval() | |
| print(f"\nModel class: {model.__class__.__name__}") | |
| print(f"Model device: {next(model.parameters()).device}") | |
| with torch.no_grad(): | |
| predictions = [] | |
| targets = [] | |
| # Create DataLoader with batch size 16 | |
| from torch.utils.data import DataLoader | |
| # Define a custom collate function | |
| def collate_fn(batch): | |
| # Stack tensors for input_ids and attention_mask | |
| input_ids = torch.stack([sample['input_ids'] for sample in batch]) | |
| attention_mask = torch.stack([sample['attention_mask'] for sample in batch]) | |
| # Stack targets | |
| targets = torch.stack([torch.tensor(sample['target']) for sample in batch]) | |
| return { | |
| 'input_ids': input_ids, | |
| 'attention_mask': attention_mask, | |
| 'target': targets | |
| } | |
| dataloader = DataLoader( | |
| dataset, | |
| batch_size=16, | |
| shuffle=False, | |
| collate_fn=collate_fn | |
| ) | |
| for i, batch in enumerate(dataloader): | |
| if i == 0: | |
| print("\nProcessing first batch...") | |
| print(f"Batch keys: {list(batch.keys())}") | |
| print(f"Target shape: {batch['target'].shape}") | |
| inputs = {k: v.to(device) for k, v in batch.items() if k != 'target'} | |
| target = batch['target'].to(device) | |
| outputs = model(**inputs) | |
| print(f"\nBatch {i} output type: {type(outputs)}") | |
| # Handle different model output formats | |
| if isinstance(outputs, dict): | |
| print(f"Output keys: {list(outputs.keys())}") | |
| if 'logits' in outputs: | |
| logits = outputs['logits'] | |
| elif 'prediction_logits' in outputs: | |
| logits = outputs['prediction_logits'] | |
| else: | |
| raise ValueError(f"Unknown output format. Available keys: {list(outputs.keys())}") | |
| elif isinstance(outputs, tuple): | |
| print(f"Output tuple length: {len(outputs)}") | |
| logits = outputs[0] | |
| else: | |
| logits = outputs | |
| print(f"Logits shape: {logits.shape}") | |
| # For sequence classification, we typically use the [CLS] token's prediction | |
| if len(logits.shape) == 3: # [batch_size, sequence_length, num_classes] | |
| logits = logits[:, 0, :] # Take the [CLS] token prediction | |
| print(f"Final logits shape: {logits.shape}") | |
| batch_predictions = logits.argmax(dim=-1).cpu().tolist() | |
| batch_targets = target.cpu().tolist() | |
| predictions.extend(batch_predictions) | |
| targets.extend(batch_targets) | |
| if i == 0: | |
| print("\nFirst batch predictions:") | |
| print(f"Predictions: {batch_predictions[:5]}") | |
| print(f"Targets: {batch_targets[:5]}") | |
| print(f"\nTotal predictions: {len(predictions)}") | |
| print(f"Total targets: {len(targets)}") | |
| # Calculate accuracy | |
| correct = sum(p == t for p, t in zip(predictions, targets)) | |
| total = len(predictions) | |
| accuracy = correct / total if total > 0 else 0.0 | |
| print(f"\nEvaluation results:") | |
| print(f"Correct predictions: {correct}") | |
| print(f"Total predictions: {total}") | |
| print(f"Accuracy: {accuracy:.4f}") | |
| return {"accuracy": accuracy} | |
| except Exception as e: | |
| print(f"\n=== Error in TSAC evaluation: {str(e)} ===") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| raise e | |
| def evaluate_tunisian_corpus_coverage(model, tokenizer, device): | |
| """Evaluate model's coverage on Tunisian Dialect Corpus""" | |
| try: | |
| dataset = load_dataset("arbml/Tunisian_Dialect_Corpus", split="train") | |
| def preprocess(examples): | |
| print("Tunisian Corpus preprocess exemples -------------",examples) | |
| # Use 'Tweet' field as per dataset structure | |
| return tokenizer( | |
| examples['Tweet'], | |
| padding=False, # We don't need padding for token coverage | |
| truncation=False, # Don't truncate long sequences | |
| max_length=None # Let tokenizer handle the length | |
| ) | |
| dataset = dataset.map(preprocess, batched=True) | |
| # Calculate token coverage | |
| total_tokens = 0 | |
| covered_tokens = 0 | |
| for example in dataset: | |
| # Get the tokenized input IDs | |
| input_ids = example['input_ids'] | |
| # Convert to tokens and count | |
| tokens = tokenizer.convert_ids_to_tokens(input_ids) | |
| total_tokens += len(tokens) | |
| covered_tokens += len([t for t in tokens if t != tokenizer.unk_token]) | |
| coverage = covered_tokens / total_tokens if total_tokens > 0 else 0 | |
| print(f"Tunisian Corpus Coverage: {coverage:.2%}") | |
| return {"coverage": coverage} | |
| except Exception as e: | |
| print(f"Error in Tunisian Corpus evaluation: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| raise e | |
| def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult: | |
| """Evaluate a single model on all tasks""" | |
| try: | |
| print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})") | |
| print(f"Current working directory: {os.getcwd()}") | |
| print(f"Evaluation requests path: {EVAL_REQUESTS_PATH}") | |
| print(f"Evaluation results path: {EVAL_RESULTS_PATH}") | |
| # Initialize device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| # Load model and tokenizer with enhanced error handling | |
| try: | |
| print(f"\nLoading model: {model_name}") | |
| print(f"Model path exists: {os.path.exists(model_name)}") | |
| # First try to load the config to check model type | |
| try: | |
| config = AutoConfig.from_pretrained(model_name, revision=revision) | |
| print(f"Model type from config: {config.model_type}") | |
| except Exception as config_error: | |
| print(f"Error loading config: {str(config_error)}") | |
| # Try loading with trust_remote_code=True first | |
| try: | |
| print("\nAttempting to load with trust_remote_code=True...") | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True | |
| ).to(device) | |
| print(f"Successfully loaded model {model_name} with trust_remote_code=True") | |
| print(f"Model class: {model.__class__.__name__}") | |
| except Exception as e1: | |
| print(f"Error loading with trust_remote_code=True: {str(e1)}") | |
| print(f"Error type: {type(e1).__name__}") | |
| # If it's a model type error, try with llama as model type | |
| if "Unrecognized model" in str(e1) and "llama" in model_name.lower(): | |
| print("\nAttempting to load as llama model...") | |
| try: | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True, | |
| model_type="llama" | |
| ).to(device) | |
| print(f"Successfully loaded model {model_name} as llama model") | |
| print(f"Model class: {model.__class__.__name__}") | |
| except Exception as e2: | |
| print(f"Error loading as llama model: {str(e2)}") | |
| print(f"Error type: {type(e2).__name__}") | |
| raise Exception(f"Failed to load model with both methods: {str(e1)}, {str(e2)}") | |
| else: | |
| raise e1 | |
| print(f"\nLoading tokenizer: {model_name}") | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision) | |
| print(f"Successfully loaded tokenizer for {model_name}") | |
| print(f"Tokenizer class: {tokenizer.__class__.__name__}") | |
| except Exception as e: | |
| print(f"Error loading tokenizer: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| raise Exception(f"Failed to load tokenizer: {str(e)}") | |
| # Run evaluations | |
| print("\nStarting TSAC sentiment evaluation...") | |
| try: | |
| tsac_results = evaluate_tsac_sentiment(model, tokenizer, device) | |
| print(f"TSAC results: {tsac_results}") | |
| except Exception as e: | |
| print(f"Error in TSAC evaluation for {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| tsac_results = {"accuracy": None} | |
| print("\nStarting Tunisian Corpus evaluation...") | |
| try: | |
| tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device) | |
| print(f"Tunisian Corpus results: {tunisian_results}") | |
| except Exception as e: | |
| print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| tunisian_results = {"coverage": None} | |
| print("\nEvaluation completed successfully!") | |
| print(f"Final results: {tsac_results} | {tunisian_results}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={ | |
| **tsac_results, | |
| **tunisian_results | |
| } | |
| ) | |
| except Exception as e: | |
| print(f"\nError loading model {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=str(e) | |
| ) | |
| except Exception as e: | |
| print(f"\nError evaluating model {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=str(e) | |
| ) | |
| def process_evaluation_queue(): | |
| """Process all pending evaluations in the queue""" | |
| print(f"\n=== Starting evaluation queue processing ===") | |
| print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}") | |
| # Get all pending evaluations | |
| if not os.path.exists(EVAL_REQUESTS_PATH): | |
| print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}") | |
| return | |
| # Find all model directories (each model has its own directory) | |
| model_dirs = [d for d in os.listdir(EVAL_REQUESTS_PATH) if os.path.isdir(os.path.join(EVAL_REQUESTS_PATH, d))] | |
| print(f"Found {len(model_dirs)} model directories") | |
| for model_dir in model_dirs: | |
| model_dir_path = os.path.join(EVAL_REQUESTS_PATH, model_dir) | |
| print(f"\nChecking model directory: {model_dir_path}") | |
| # Find all JSON files in the model directory | |
| json_files = [f for f in os.listdir(model_dir_path) if f.endswith('.json')] | |
| print(f"Found {len(json_files)} pending evaluation requests") | |
| for file in json_files: | |
| file_path = os.path.join(model_dir_path, file) | |
| print(f" - {file_path}") | |
| try: | |
| with open(file_path, 'r') as f: | |
| eval_entry = json.load(f) | |
| # Check if this is a pending or running evaluation | |
| status = eval_entry.get('status', '') | |
| if status == EvaluationStatus.PENDING.value: | |
| print(f"\n=== Found pending evaluation ===") | |
| print(f"Model: {eval_entry['model']}") | |
| print(f"Revision: {eval_entry['revision']}") | |
| print(f"Precision: {eval_entry['precision']}") | |
| print(f"Weight type: {eval_entry['weight_type']}") | |
| # Update status to RUNNING | |
| eval_entry['status'] = EvaluationStatus.RUNNING.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # Run evaluation | |
| try: | |
| print("\n=== Starting evaluation ===") | |
| eval_result = evaluate_model( | |
| model_name=eval_entry['model'], | |
| revision=eval_entry['revision'], | |
| precision=eval_entry['precision'], | |
| weight_type=eval_entry['weight_type'] | |
| ) | |
| print("\n=== Evaluation completed ===") | |
| print(f"Results: {eval_result.results}") | |
| # Update status to FINISHED and add results | |
| eval_entry['status'] = EvaluationStatus.FINISHED.value | |
| eval_entry['results'] = eval_result.results | |
| if eval_result.error: | |
| eval_entry['error'] = eval_result.error | |
| # Save updated entry | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # Move file to results directory | |
| if not os.path.exists(EVAL_RESULTS_PATH): | |
| os.makedirs(EVAL_RESULTS_PATH) | |
| result_filename = os.path.basename(file_path) | |
| result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) | |
| os.rename(file_path, result_path) | |
| print(f"\nMoved evaluation result to: {result_path}") | |
| # Upload to Hugging Face | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=result_path, | |
| path_in_repo=result_filename, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add evaluation results for {eval_entry['model']}" | |
| ) | |
| print("\nResults uploaded to Hugging Face") | |
| except Exception as upload_error: | |
| print(f"Error uploading results: {str(upload_error)}") | |
| eval_entry['error'] = f"Evaluation completed but failed to upload results: {str(upload_error)}" | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| except Exception as eval_error: | |
| print(f"\n=== Error during evaluation ===") | |
| print(f"Error: {str(eval_error)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| # Update status to FAILED and add error | |
| eval_entry['status'] = EvaluationStatus.FAILED.value | |
| eval_entry['error'] = str(eval_error) | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # Move failed evaluation to results directory | |
| if not os.path.exists(EVAL_RESULTS_PATH): | |
| os.makedirs(EVAL_RESULTS_PATH) | |
| result_filename = os.path.basename(file_path) | |
| result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) | |
| os.rename(file_path, result_path) | |
| print(f"\nMoved failed evaluation to: {result_path}") | |
| # Upload error file | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=result_path, | |
| path_in_repo=result_filename, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add evaluation error for {eval_entry['model']}" | |
| ) | |
| print("\nError file uploaded to Hugging Face") | |
| except Exception as upload_error: | |
| print(f"Error uploading error file: {str(upload_error)}") | |
| elif status == EvaluationStatus.RUNNING.value: | |
| print(f"\n=== Found running evaluation ===") | |
| print(f"Model: {eval_entry['model']}") | |
| print(f"Revision: {eval_entry['revision']}") | |
| print(f"Precision: {eval_entry['precision']}") | |
| print(f"Weight type: {eval_entry['weight_type']}") | |
| try: | |
| # Check if we have results for this evaluation | |
| result_filename = os.path.basename(file_path) | |
| result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) | |
| if os.path.exists(result_path): | |
| print(f"\nFound existing results file: {result_path}") | |
| # Update status to FINISHED | |
| eval_entry['status'] = EvaluationStatus.FINISHED.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| else: | |
| print("\nNo results found. Restarting evaluation...") | |
| # Restart the evaluation | |
| eval_entry['status'] = EvaluationStatus.PENDING.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| except Exception as check_error: | |
| print(f"\n=== Error checking running evaluation ===") | |
| print(f"Error: {str(check_error)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| # If we can't check the status, restart the evaluation | |
| eval_entry['status'] = EvaluationStatus.PENDING.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| except Exception as e: | |
| print(f"Error processing file {file}: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| continue | |
| print(f"\n=== Evaluation queue summary ===") | |
| print(f"Total directories checked: {len(model_dirs)}") | |
| print(f"Total files processed: {len(json_files)}") | |
| print(f"\nEvaluation queue processed. Sleeping for 5 minutes...") | |
| return | |