import json import os from typing import Dict, Any from dataclasses import dataclass from enum import Enum from datetime import datetime import torch from transformers import AutoModelForSequenceClassification, AutoTokenizer, AutoConfig from datasets import load_dataset from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH from src.display.utils import Tasks class EvaluationStatus(Enum): PENDING = "PENDING" RUNNING = "RUNNING" FINISHED = "FINISHED" FAILED = "FAILED" @dataclass class EvaluationResult: model: str revision: str precision: str weight_type: str results: Dict[str, float] error: str = None def evaluate_tsac_sentiment(model, tokenizer, device): """Evaluate model on TSAC sentiment analysis task""" try: print("\n=== Starting TSAC sentiment evaluation ===") print(f"Current device: {device}") # Load and preprocess dataset print("\nLoading and preprocessing TSAC dataset...") dataset = load_dataset("fbougares/tsac", split="test", trust_remote_code=True) print(f"Dataset size: {len(dataset)} examples") def preprocess(examples): print(f"\nProcessing batch of {len(examples['sentence'])} examples") # Use 'sentence' field as per dataset structure return tokenizer( examples['sentence'], padding=True, truncation=True, max_length=512, return_tensors='pt' ) dataset = dataset.map(preprocess, batched=True) dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'target']) # Check first example first_example = dataset[0] print("\nFirst example details:") print(f"Input IDs shape: {first_example['input_ids'].shape}") print(f"Attention mask shape: {first_example['attention_mask'].shape}") print(f"Target: {first_example['target']}") model.eval() print(f"\nModel class: {model.__class__.__name__}") print(f"Model device: {next(model.parameters()).device}") with torch.no_grad(): predictions = [] targets = [] for i, batch in enumerate(dataset): if i == 0: print("\nProcessing first batch...") print(f"Batch keys: {list(batch.keys())}") print(f"Target shape: {batch['target'].shape}") inputs = {k: v.to(device) for k, v in batch.items() if k != 'target'} target = batch['target'].to(device) outputs = model(**inputs) print(f"\nBatch {i} output type: {type(outputs)}") # Handle different model output formats if isinstance(outputs, dict): print(f"Output keys: {list(outputs.keys())}") if 'logits' in outputs: logits = outputs['logits'] elif 'prediction_logits' in outputs: logits = outputs['prediction_logits'] else: raise ValueError(f"Unknown output format. Available keys: {list(outputs.keys())}") elif isinstance(outputs, tuple): print(f"Output tuple length: {len(outputs)}") logits = outputs[0] else: logits = outputs print(f"Logits shape: {logits.shape}") # For sequence classification, we typically use the [CLS] token's prediction if len(logits.shape) == 3: # [batch_size, sequence_length, num_classes] logits = logits[:, 0, :] # Take the [CLS] token prediction print(f"Final logits shape: {logits.shape}") batch_predictions = logits.argmax(dim=-1).cpu().tolist() batch_targets = target.cpu().tolist() predictions.extend(batch_predictions) targets.extend(batch_targets) if i == 0: print("\nFirst batch predictions:") print(f"Predictions: {batch_predictions[:5]}") print(f"Targets: {batch_targets[:5]}") print(f"\nTotal predictions: {len(predictions)}") print(f"Total targets: {len(targets)}") # Calculate accuracy correct = sum(p == t for p, t in zip(predictions, targets)) total = len(predictions) accuracy = correct / total if total > 0 else 0.0 print(f"\nEvaluation results:") print(f"Correct predictions: {correct}") print(f"Total predictions: {total}") print(f"Accuracy: {accuracy:.4f}") return {"accuracy": accuracy} except Exception as e: print(f"\n=== Error in TSAC evaluation: {str(e)} ===") print(f"Full traceback: {traceback.format_exc()}") raise e def evaluate_tunisian_corpus_coverage(model, tokenizer, device): """Evaluate model's coverage on Tunisian Dialect Corpus""" try: dataset = load_dataset("arbml/Tunisian_Dialect_Corpus", split="train") def preprocess(examples): print("Tunisian Corpus preprocess exemples -------------",examples) # Use 'Tweet' field as per dataset structure return tokenizer(examples['Tweet'], padding=True, truncation=True, max_length=512) dataset = dataset.map(preprocess, batched=True) # Calculate token coverage total_tokens = 0 covered_tokens = 0 for example in dataset: tokens = tokenizer.tokenize(example['Tweet']) total_tokens += len(tokens) covered_tokens += len([t for t in tokens if t != tokenizer.unk_token]) coverage = covered_tokens / total_tokens if total_tokens > 0 else 0 print(f"Tunisian Corpus Coverage: {coverage:.2%}") return {"coverage": coverage} except Exception as e: print(f"Error in Tunisian Corpus evaluation: {str(e)}") raise e # Raise the error instead of returning 0.0 def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult: """Evaluate a single model on all tasks""" try: print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})") print(f"Current working directory: {os.getcwd()}") print(f"Evaluation requests path: {EVAL_REQUESTS_PATH}") print(f"Evaluation results path: {EVAL_RESULTS_PATH}") # Initialize device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Load model and tokenizer with enhanced error handling try: print(f"\nLoading model: {model_name}") print(f"Model path exists: {os.path.exists(model_name)}") # First try to load the config to check model type try: config = AutoConfig.from_pretrained(model_name, revision=revision) print(f"Model type from config: {config.model_type}") except Exception as config_error: print(f"Error loading config: {str(config_error)}") # Try loading with trust_remote_code=True first try: print("\nAttempting to load with trust_remote_code=True...") model = AutoModelForSequenceClassification.from_pretrained( model_name, revision=revision, torch_dtype=getattr(torch, precision), trust_remote_code=True ).to(device) print(f"Successfully loaded model {model_name} with trust_remote_code=True") print(f"Model class: {model.__class__.__name__}") except Exception as e1: print(f"Error loading with trust_remote_code=True: {str(e1)}") print(f"Error type: {type(e1).__name__}") # If it's a model type error, try with llama as model type if "Unrecognized model" in str(e1) and "llama" in model_name.lower(): print("\nAttempting to load as llama model...") try: model = AutoModelForSequenceClassification.from_pretrained( model_name, revision=revision, torch_dtype=getattr(torch, precision), trust_remote_code=True, model_type="llama" ).to(device) print(f"Successfully loaded model {model_name} as llama model") print(f"Model class: {model.__class__.__name__}") except Exception as e2: print(f"Error loading as llama model: {str(e2)}") print(f"Error type: {type(e2).__name__}") raise Exception(f"Failed to load model with both methods: {str(e1)}, {str(e2)}") else: raise e1 print(f"\nLoading tokenizer: {model_name}") try: tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision) print(f"Successfully loaded tokenizer for {model_name}") print(f"Tokenizer class: {tokenizer.__class__.__name__}") except Exception as e: print(f"Error loading tokenizer: {str(e)}") print(f"Error type: {type(e).__name__}") raise Exception(f"Failed to load tokenizer: {str(e)}") # Run evaluations print("\nStarting TSAC sentiment evaluation...") try: tsac_results = evaluate_tsac_sentiment(model, tokenizer, device) print(f"TSAC results: {tsac_results}") except Exception as e: print(f"Error in TSAC evaluation for {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") tsac_results = {"accuracy": None} print("\nStarting Tunisian Corpus evaluation...") try: tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device) print(f"Tunisian Corpus results: {tunisian_results}") except Exception as e: print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") tunisian_results = {"coverage": None} print("\nEvaluation completed successfully!") print(f"Final results: {tsac_results} | {tunisian_results}") return EvaluationResult( model=model_name, revision=revision, precision=precision, weight_type=weight_type, results={ **tsac_results, **tunisian_results } ) except Exception as e: print(f"\nError loading model {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") print(f"Full traceback: {traceback.format_exc()}") return EvaluationResult( model=model_name, revision=revision, precision=precision, weight_type=weight_type, results={}, error=str(e) ) except Exception as e: print(f"\nError evaluating model {model_name}: {str(e)}") print(f"Error type: {type(e).__name__}") print(f"Full traceback: {traceback.format_exc()}") return EvaluationResult( model=model_name, revision=revision, precision=precision, weight_type=weight_type, results={}, error=str(e) ) def process_evaluation_queue(): """Process all pending evaluations in the queue""" print(f"\n=== Starting evaluation queue processing ===") print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}") # Get all pending evaluations if not os.path.exists(EVAL_REQUESTS_PATH): print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}") return # Find all model directories (each model has its own directory) model_dirs = [d for d in os.listdir(EVAL_REQUESTS_PATH) if os.path.isdir(os.path.join(EVAL_REQUESTS_PATH, d))] print(f"Found {len(model_dirs)} model directories") pending_files = [] for model_dir in model_dirs: model_dir_path = os.path.join(EVAL_REQUESTS_PATH, model_dir) print(f"\nChecking model directory: {model_dir_path}") # Find all JSON files in the model directory json_files = [f for f in os.listdir(model_dir_path) if f.endswith('.json')] print(f"Found {len(json_files)} JSON files in {model_dir}") for file in json_files: file_path = os.path.join(model_dir_path, file) try: with open(file_path, 'r') as f: eval_entry = json.load(f) # Check if this is a pending evaluation if eval_entry.get('status') == EvaluationStatus.PENDING.value: print(f"\n=== Found pending evaluation ===") print(f"Model: {eval_entry['model']}") print(f"Revision: {eval_entry['revision']}") print(f"Precision: {eval_entry['precision']}") print(f"Weight type: {eval_entry['weight_type']}") # Update status to RUNNING eval_entry['status'] = EvaluationStatus.RUNNING.value with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) # Run evaluation try: print("\n=== Starting evaluation ===") eval_result = evaluate_model( model_name=eval_entry['model'], revision=eval_entry['revision'], precision=eval_entry['precision'], weight_type=eval_entry['weight_type'] ) print("\n=== Evaluation completed ===") print(f"Results: {eval_result.results}") # Update status to FINISHED and add results eval_entry['status'] = EvaluationStatus.FINISHED.value eval_entry['results'] = eval_result.results if eval_result.error: eval_entry['error'] = eval_result.error # Save updated entry with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) # Move file to results directory if not os.path.exists(EVAL_RESULTS_PATH): os.makedirs(EVAL_RESULTS_PATH) result_filename = os.path.basename(file_path) result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) os.rename(file_path, result_path) print(f"\nMoved evaluation result to: {result_path}") # Upload to Hugging Face try: API.upload_file( path_or_fileobj=result_path, path_in_repo=result_filename, repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add evaluation results for {eval_entry['model']}" ) print("\nResults uploaded to Hugging Face") except Exception as upload_error: print(f"Error uploading results: {str(upload_error)}") eval_entry['error'] = f"Evaluation completed but failed to upload results: {str(upload_error)}" with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) except Exception as eval_error: print(f"\n=== Error during evaluation ===") print(f"Error: {str(eval_error)}") print(f"Full traceback: {traceback.format_exc()}") # Update status to FAILED and add error eval_entry['status'] = EvaluationStatus.FAILED.value eval_entry['error'] = str(eval_error) with open(file_path, 'w') as f: json.dump(eval_entry, f, indent=2) # Move failed evaluation to results directory if not os.path.exists(EVAL_RESULTS_PATH): os.makedirs(EVAL_RESULTS_PATH) result_filename = os.path.basename(file_path) result_path = os.path.join(EVAL_RESULTS_PATH, result_filename) os.rename(file_path, result_path) print(f"\nMoved failed evaluation to: {result_path}") # Upload error file try: API.upload_file( path_or_fileobj=result_path, path_in_repo=result_filename, repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add evaluation error for {eval_entry['model']}" ) print("\nError file uploaded to Hugging Face") except Exception as upload_error: print(f"Error uploading error file: {str(upload_error)}") except Exception as e: print(f"Error processing file {file}: {str(e)}") print(f"Full traceback: {traceback.format_exc()}") pending_files.append(os.path.join(EVAL_REQUESTS_PATH, file)) print(f"Found {len(pending_files)} pending evaluation requests") for file_path in pending_files: print(f" - {file_path}") if not pending_files: print("No pending evaluation requests found") return for file_path in pending_files: try: print(f"\n=== Processing evaluation request: {file_path} ===") # Read the file atomically try: with open(file_path, 'r') as f: eval_request = json.load(f) print(f"Loaded evaluation request: {json.dumps(eval_request, indent=2)}") except Exception as e: print(f"Error reading evaluation request: {str(e)}") continue # Skip non-pending evaluations status = eval_request.get('status', 'UNKNOWN') if status != EvaluationStatus.PENDING.value: print(f"Skipping non-pending evaluation (status: {status})") continue # Update status to RUNNING eval_request['status'] = EvaluationStatus.RUNNING.value print(f"Updating status to RUNNING for {eval_request['model']}") # Write the update atomically try: with open(file_path, 'w') as f: json.dump(eval_request, f, indent=2) print("Successfully updated status to RUNNING") except Exception as e: print(f"Error updating status: {str(e)}") continue # Get model info from request model_name = eval_request.get('model', '') revision = eval_request.get('revision', '') precision = eval_request.get('precision', '') weight_type = eval_request.get('weight_type', '') if not model_name: print("Error: Missing model name in evaluation request") continue print(f"\n=== Evaluating model: {model_name} ===") print(f"Revision: {revision}") print(f"Precision: {precision}") print(f"Weight type: {weight_type}") result = evaluate_model(model_name, revision, precision, weight_type) # Update status and save results if result.error: print(f"\n=== Evaluation failed ===") print(f"Error: {result.error}") eval_request['status'] = EvaluationStatus.FAILED.value eval_request['error'] = result.error else: print(f"\n=== Evaluation completed successfully ===") print(f"Results: {result.results}") eval_request['status'] = EvaluationStatus.FINISHED.value eval_request['results'] = result.results # Write the final update atomically try: with open(file_path, 'w') as f: json.dump(eval_request, f, indent=2) print("Successfully saved evaluation results") except Exception as e: print(f"Error saving evaluation results: {str(e)}") continue # Move successful evaluations to results directory if eval_request['status'] == EvaluationStatus.FINISHED.value: try: os.makedirs(EVAL_RESULTS_PATH, exist_ok=True) result_file = os.path.join(EVAL_RESULTS_PATH, os.path.basename(file_path)) os.rename(file_path, result_file) print(f"Moved evaluation results to: {result_file}") except Exception as e: print(f"Error moving results file: {str(e)}") except Exception as e: print(f"\n=== Error processing evaluation: {str(e)} ===") print(f"Full traceback: {traceback.format_exc()}") continue # Upload to Hugging Face try: if 'result_file' in locals(): API.upload_file( path_or_fileobj=result_file, path_in_repo=result_filename if not username else os.path.join(username, result_filename), repo_id=f"{OWNER}/results", repo_type="dataset", commit_message=f"Add evaluation results for {result.model}" ) print("Successfully uploaded results to Hugging Face") except Exception as e: print(f"Error uploading results to Hugging Face: {str(e)}")