Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from typing import Dict, Any | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from datetime import datetime | |
| import torch | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig | |
| from datasets import load_dataset | |
| from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH | |
| from src.display.utils import Tasks | |
| class EvaluationStatus(Enum): | |
| PENDING = "PENDING" | |
| RUNNING = "RUNNING" | |
| FINISHED = "FINISHED" | |
| FAILED = "FAILED" | |
| class EvaluationResult: | |
| model: str | |
| revision: str | |
| precision: str | |
| weight_type: str | |
| results: Dict[str, float] | |
| error: str = None | |
| def evaluate_tsac_sentiment(model, tokenizer, device): | |
| """Evaluate model on TSAC sentiment analysis task""" | |
| try: | |
| print("\n=== Starting TSAC sentiment evaluation ===") | |
| print(f"Current device: {device}") | |
| # Load and preprocess dataset | |
| print("\nLoading and preprocessing TSAC dataset...") | |
| dataset = load_dataset("fbougares/tsac", split="test", trust_remote_code=True) | |
| print(f"Dataset size: {len(dataset)} examples") | |
| def preprocess(examples): | |
| print(f"\nProcessing batch of {len(examples['sentence'])} examples") | |
| # Use 'sentence' field as per dataset structure | |
| return tokenizer( | |
| examples['sentence'], | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors='pt' | |
| ) | |
| dataset = dataset.map(preprocess, batched=True) | |
| dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'target']) | |
| # Check first example | |
| first_example = dataset[0] | |
| print("\nFirst example details:") | |
| print(f"Input IDs shape: {first_example['input_ids'].shape}") | |
| print(f"Attention mask shape: {first_example['attention_mask'].shape}") | |
| print(f"Target: {first_example['target']}") | |
| model.eval() | |
| print(f"\nModel class: {model.__class__.__name__}") | |
| print(f"Model device: {next(model.parameters()).device}") | |
| with torch.no_grad(): | |
| predictions = [] | |
| targets = [] | |
| for i, batch in enumerate(dataset): | |
| if i == 0: | |
| print("\nProcessing first batch...") | |
| print(f"Batch keys: {list(batch.keys())}") | |
| print(f"Target shape: {batch['target'].shape}") | |
| inputs = {k: v.to(device) for k, v in batch.items() if k != 'target'} | |
| target = batch['target'].to(device) | |
| outputs = model(**inputs) | |
| print(f"\nBatch {i} output type: {type(outputs)}") | |
| # Handle different model output formats | |
| if isinstance(outputs, dict): | |
| print(f"Output keys: {list(outputs.keys())}") | |
| if 'logits' in outputs: | |
| logits = outputs['logits'] | |
| elif 'prediction_logits' in outputs: | |
| logits = outputs['prediction_logits'] | |
| else: | |
| raise ValueError(f"Unknown output format. Available keys: {list(outputs.keys())}") | |
| elif isinstance(outputs, tuple): | |
| print(f"Output tuple length: {len(outputs)}") | |
| logits = outputs[0] | |
| else: | |
| logits = outputs | |
| print(f"Logits shape: {logits.shape}") | |
| # For sequence classification, we typically use the [CLS] token's prediction | |
| if len(logits.shape) == 3: # [batch_size, sequence_length, num_classes] | |
| logits = logits[:, 0, :] # Take the [CLS] token prediction | |
| print(f"Final logits shape: {logits.shape}") | |
| batch_predictions = logits.argmax(dim=-1).cpu().tolist() | |
| batch_targets = target.cpu().tolist() | |
| predictions.extend(batch_predictions) | |
| targets.extend(batch_targets) | |
| if i == 0: | |
| print("\nFirst batch predictions:") | |
| print(f"Predictions: {batch_predictions[:5]}") | |
| print(f"Targets: {batch_targets[:5]}") | |
| print(f"\nTotal predictions: {len(predictions)}") | |
| print(f"Total targets: {len(targets)}") | |
| # Calculate accuracy | |
| correct = sum(p == t for p, t in zip(predictions, targets)) | |
| total = len(predictions) | |
| accuracy = correct / total if total > 0 else 0.0 | |
| print(f"\nEvaluation results:") | |
| print(f"Correct predictions: {correct}") | |
| print(f"Total predictions: {total}") | |
| print(f"Accuracy: {accuracy:.4f}") | |
| return {"accuracy": accuracy} | |
| except Exception as e: | |
| print(f"\n=== Error in TSAC evaluation: {str(e)} ===") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| raise e | |
| def evaluate_tunisian_corpus_coverage(model, tokenizer, device): | |
| """Evaluate model's coverage on Tunisian Dialect Corpus""" | |
| try: | |
| dataset = load_dataset("arbml/Tunisian_Dialect_Corpus", split="train") | |
| def preprocess(examples): | |
| print("Tunisian Corpus preprocess exemples -------------",examples) | |
| # Use 'Tweet' field as per dataset structure | |
| return tokenizer(examples['Tweet'], padding=True, truncation=True, max_length=512) | |
| dataset = dataset.map(preprocess, batched=True) | |
| # Calculate token coverage | |
| total_tokens = 0 | |
| covered_tokens = 0 | |
| for example in dataset: | |
| tokens = tokenizer.tokenize(example['Tweet']) | |
| total_tokens += len(tokens) | |
| covered_tokens += len([t for t in tokens if t != tokenizer.unk_token]) | |
| coverage = covered_tokens / total_tokens if total_tokens > 0 else 0 | |
| print(f"Tunisian Corpus Coverage: {coverage:.2%}") | |
| return {"coverage": coverage} | |
| except Exception as e: | |
| print(f"Error in Tunisian Corpus evaluation: {str(e)}") | |
| raise e # Raise the error instead of returning 0.0 | |
| def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult: | |
| """Evaluate a single model on all tasks""" | |
| try: | |
| print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})") | |
| print(f"Current working directory: {os.getcwd()}") | |
| print(f"Evaluation requests path: {EVAL_REQUESTS_PATH}") | |
| print(f"Evaluation results path: {EVAL_RESULTS_PATH}") | |
| # Initialize device | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| # Load model and tokenizer with enhanced error handling | |
| try: | |
| print(f"\nLoading model: {model_name}") | |
| print(f"Model path exists: {os.path.exists(model_name)}") | |
| # First try to load the config to check model type | |
| try: | |
| config = AutoConfig.from_pretrained(model_name, revision=revision) | |
| print(f"Model type from config: {config.model_type}") | |
| except Exception as config_error: | |
| print(f"Error loading config: {str(config_error)}") | |
| # Try loading with trust_remote_code=True first | |
| try: | |
| print("\nAttempting to load with trust_remote_code=True...") | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True | |
| ).to(device) | |
| print(f"Successfully loaded model {model_name} with trust_remote_code=True") | |
| print(f"Model class: {model.__class__.__name__}") | |
| except Exception as e1: | |
| print(f"Error loading with trust_remote_code=True: {str(e1)}") | |
| print(f"Error type: {type(e1).__name__}") | |
| # If it's a model type error, try with llama as model type | |
| if "Unrecognized model" in str(e1) and "llama" in model_name.lower(): | |
| print("\nAttempting to load as llama model...") | |
| try: | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True, | |
| model_type="llama" | |
| ).to(device) | |
| print(f"Successfully loaded model {model_name} as llama model") | |
| print(f"Model class: {model.__class__.__name__}") | |
| except Exception as e2: | |
| print(f"Error loading as llama model: {str(e2)}") | |
| print(f"Error type: {type(e2).__name__}") | |
| raise Exception(f"Failed to load model with both methods: {str(e1)}, {str(e2)}") | |
| else: | |
| raise e1 | |
| print(f"\nLoading tokenizer: {model_name}") | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision) | |
| print(f"Successfully loaded tokenizer for {model_name}") | |
| print(f"Tokenizer class: {tokenizer.__class__.__name__}") | |
| except Exception as e: | |
| print(f"Error loading tokenizer: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| raise Exception(f"Failed to load tokenizer: {str(e)}") | |
| # Run evaluations | |
| print("\nStarting TSAC sentiment evaluation...") | |
| try: | |
| tsac_results = evaluate_tsac_sentiment(model, tokenizer, device) | |
| print(f"TSAC results: {tsac_results}") | |
| except Exception as e: | |
| print(f"Error in TSAC evaluation for {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| tsac_results = {"accuracy": None} | |
| print("\nStarting Tunisian Corpus evaluation...") | |
| try: | |
| tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device) | |
| print(f"Tunisian Corpus results: {tunisian_results}") | |
| except Exception as e: | |
| print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| tunisian_results = {"coverage": None} | |
| print("\nEvaluation completed successfully!") | |
| print(f"Final results: {tsac_results} | {tunisian_results}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={ | |
| **tsac_results, | |
| **tunisian_results | |
| } | |
| ) | |
| except Exception as e: | |
| print(f"\nError loading model {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=str(e) | |
| ) | |
| except Exception as e: | |
| print(f"\nError evaluating model {model_name}: {str(e)}") | |
| print(f"Error type: {type(e).__name__}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=str(e) | |
| ) | |
| def process_evaluation_queue(): | |
| """Process all pending evaluations in the queue""" | |
| print(f"\n=== Starting evaluation queue processing ===") | |
| print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}") | |
| # Get all pending evaluations | |
| if not os.path.exists(EVAL_REQUESTS_PATH): | |
| print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}") | |
| return | |
| # Find all model directories (each model has its own directory) | |
| model_dirs = [d for d in os.listdir(EVAL_REQUESTS_PATH) if os.path.isdir(os.path.join(EVAL_REQUESTS_PATH, d))] | |
| print(f"Found {len(model_dirs)} model directories") | |
| pending_files = [] | |
| for model_dir in model_dirs: | |
| model_dir_path = os.path.join(EVAL_REQUESTS_PATH, model_dir) | |
| print(f"\nChecking model directory: {model_dir_path}") | |
| # Find all JSON files in the model directory | |
| json_files = [f for f in os.listdir(model_dir_path) if f.endswith('.json')] | |
| print(f"Found {len(json_files)} JSON files in {model_dir}") | |
| for file in json_files: | |
| file_path = os.path.join(model_dir_path, file) | |
| try: | |
| with open(file_path, 'r') as f: | |
| eval_entry = json.load(f) | |
| # Check if this is a pending evaluation | |
| if eval_entry.get('status') == EvaluationStatus.PENDING.value: | |
| print(f"\n=== Found pending evaluation ===") | |
| print(f"Model: {eval_entry['model']}") | |
| print(f"Revision: {eval_entry['revision']}") | |
| print(f"Precision: {eval_entry['precision']}") | |
| print(f"Weight type: {eval_entry['weight_type']}") | |
| # Update status to RUNNING | |
| eval_entry['status'] = EvaluationStatus.RUNNING.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # Run evaluation | |
| try: | |
| print("\n=== Starting evaluation ===") | |
| eval_result = evaluate_model( | |
| model_name=eval_entry['model'], | |
| revision=eval_entry['revision'], | |
| precision=eval_entry['precision'], | |
| weight_type=eval_entry['weight_type'] | |
| ) | |
| print("\n=== Evaluation completed ===") | |
| print(f"Results: {eval_result.results}") | |
| # Update status to FINISHED and add results | |
| eval_entry['status'] = EvaluationStatus.FINISHED.value | |
| eval_entry['results'] = eval_result.results | |
| if eval_result.error: | |
| eval_entry['error'] = eval_result.error | |
| # Save updated entry | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # Move file to results directory | |
| if not os.path.exists(EVAL_RESULTS_PATH): | |
| os.makedirs(EVAL_RESULTS_PATH) | |
| result_filename = os.path.basename(file_path) | |
| result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) | |
| os.rename(file_path, result_path) | |
| print(f"\nMoved evaluation result to: {result_path}") | |
| # Upload to Hugging Face | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=result_path, | |
| path_in_repo=result_filename, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add evaluation results for {eval_entry['model']}" | |
| ) | |
| print("\nResults uploaded to Hugging Face") | |
| except Exception as upload_error: | |
| print(f"Error uploading results: {str(upload_error)}") | |
| eval_entry['error'] = f"Evaluation completed but failed to upload results: {str(upload_error)}" | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| except Exception as eval_error: | |
| print(f"\n=== Error during evaluation ===") | |
| print(f"Error: {str(eval_error)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| # Update status to FAILED and add error | |
| eval_entry['status'] = EvaluationStatus.FAILED.value | |
| eval_entry['error'] = str(eval_error) | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # Move failed evaluation to results directory | |
| if not os.path.exists(EVAL_RESULTS_PATH): | |
| os.makedirs(EVAL_RESULTS_PATH) | |
| result_filename = os.path.basename(file_path) | |
| result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) | |
| os.rename(file_path, result_path) | |
| print(f"\nMoved failed evaluation to: {result_path}") | |
| # Upload error file | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=result_path, | |
| path_in_repo=result_filename, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add evaluation error for {eval_entry['model']}" | |
| ) | |
| print("\nError file uploaded to Hugging Face") | |
| except Exception as upload_error: | |
| print(f"Error uploading error file: {str(upload_error)}") | |
| except Exception as e: | |
| print(f"Error processing file {file}: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| pending_files.append(os.path.join(EVAL_REQUESTS_PATH, file)) | |
| print(f"Found {len(pending_files)} pending evaluation requests") | |
| for file_path in pending_files: | |
| print(f" - {file_path}") | |
| if not pending_files: | |
| print("No pending evaluation requests found") | |
| return | |
| for file_path in pending_files: | |
| try: | |
| print(f"\n=== Processing evaluation request: {file_path} ===") | |
| # Read the file atomically | |
| try: | |
| with open(file_path, 'r') as f: | |
| eval_request = json.load(f) | |
| print(f"Loaded evaluation request: {json.dumps(eval_request, indent=2)}") | |
| except Exception as e: | |
| print(f"Error reading evaluation request: {str(e)}") | |
| continue | |
| # Skip non-pending evaluations | |
| status = eval_request.get('status', 'UNKNOWN') | |
| if status != EvaluationStatus.PENDING.value: | |
| print(f"Skipping non-pending evaluation (status: {status})") | |
| continue | |
| # Update status to RUNNING | |
| eval_request['status'] = EvaluationStatus.RUNNING.value | |
| print(f"Updating status to RUNNING for {eval_request['model']}") | |
| # Write the update atomically | |
| try: | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_request, f, indent=2) | |
| print("Successfully updated status to RUNNING") | |
| except Exception as e: | |
| print(f"Error updating status: {str(e)}") | |
| continue | |
| # Get model info from request | |
| model_name = eval_request.get('model', '') | |
| revision = eval_request.get('revision', '') | |
| precision = eval_request.get('precision', '') | |
| weight_type = eval_request.get('weight_type', '') | |
| if not model_name: | |
| print("Error: Missing model name in evaluation request") | |
| continue | |
| print(f"\n=== Evaluating model: {model_name} ===") | |
| print(f"Revision: {revision}") | |
| print(f"Precision: {precision}") | |
| print(f"Weight type: {weight_type}") | |
| result = evaluate_model(model_name, revision, precision, weight_type) | |
| # Update status and save results | |
| if result.error: | |
| print(f"\n=== Evaluation failed ===") | |
| print(f"Error: {result.error}") | |
| eval_request['status'] = EvaluationStatus.FAILED.value | |
| eval_request['error'] = result.error | |
| else: | |
| print(f"\n=== Evaluation completed successfully ===") | |
| print(f"Results: {result.results}") | |
| eval_request['status'] = EvaluationStatus.FINISHED.value | |
| eval_request['results'] = result.results | |
| # Write the final update atomically | |
| try: | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_request, f, indent=2) | |
| print("Successfully saved evaluation results") | |
| except Exception as e: | |
| print(f"Error saving evaluation results: {str(e)}") | |
| continue | |
| # Move successful evaluations to results directory | |
| if eval_request['status'] == EvaluationStatus.FINISHED.value: | |
| try: | |
| os.makedirs(EVAL_RESULTS_PATH, exist_ok=True) | |
| result_file = os.path.join(EVAL_RESULTS_PATH, os.path.basename(file_path)) | |
| os.rename(file_path, result_file) | |
| print(f"Moved evaluation results to: {result_file}") | |
| except Exception as e: | |
| print(f"Error moving results file: {str(e)}") | |
| except Exception as e: | |
| print(f"\n=== Error processing evaluation: {str(e)} ===") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| continue | |
| # Upload to Hugging Face | |
| try: | |
| if 'result_file' in locals(): | |
| API.upload_file( | |
| path_or_fileobj=result_file, | |
| path_in_repo=result_filename if not username else os.path.join(username, result_filename), | |
| repo_id=f"{OWNER}/results", | |
| repo_type="dataset", | |
| commit_message=f"Add evaluation results for {result.model}" | |
| ) | |
| print("Successfully uploaded results to Hugging Face") | |
| except Exception as e: | |
| print(f"Error uploading results to Hugging Face: {str(e)}") | |