Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from datetime import datetime,timedelta,timezone | |
| from typing import Dict | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import torch | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
| import traceback | |
| from src.envs import API, OWNER, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, RESULTS_REPO, QUEUE_REPO,TOKEN | |
| from src.evaluator.tunisian_corpus_coverage import evaluate_tunisian_corpus_coverage | |
| from src.evaluator.tsac import evaluate_tsac_sentiment | |
| class EvaluationStatus(Enum): | |
| PENDING = "PENDING" | |
| RUNNING = "RUNNING" | |
| FINISHED = "FINISHED" | |
| FAILED = "FAILED" | |
| class EvaluationResult: | |
| """Dataclass to hold the results of a single model evaluation.""" | |
| model: str | |
| revision: str | |
| precision: str | |
| weight_type: str | |
| results: Dict[str, float] | |
| error: str = None | |
| def evaluate_model(model_name: str, revision: str, precision: str, weight_type: str) -> EvaluationResult: | |
| """ | |
| Evaluates a single model on all defined tasks. | |
| Args: | |
| model_name (str): The name of the model on the Hugging Face Hub. | |
| revision (str): The specific revision (commit hash or branch name) to use. | |
| precision (str): The precision (e.g., 'float16') for model loading. | |
| weight_type (str): The type of weights ('Original' or 'Adapter'). | |
| Returns: | |
| EvaluationResult: A dataclass containing the evaluation results or an error message. | |
| """ | |
| try: | |
| print(f"\nStarting evaluation for model: {model_name} (revision: {revision}, precision: {precision}, weight_type: {weight_type})") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| print(f"Using device: {device}") | |
| try: | |
| print(f"\nLoading model and tokenizer for: {model_name}") | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, | |
| revision=revision, | |
| torch_dtype=getattr(torch, precision), | |
| trust_remote_code=True | |
| ).to(device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, revision=revision) | |
| print(f"Successfully loaded model and tokenizer.") | |
| except Exception as e: | |
| error_msg = f"Failed to load model or tokenizer: {str(e)}" | |
| print(f"Error: {error_msg}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=error_msg | |
| ) | |
| tsac_results = {"accuracy": None} | |
| tunisian_results = {"coverage": None} | |
| print("\nStarting TSAC sentiment evaluation...") | |
| try: | |
| tsac_results = evaluate_tsac_sentiment(model, tokenizer, device) | |
| print(f"TSAC results: {tsac_results}") | |
| except Exception as e: | |
| print(f"Error in TSAC evaluation for {model_name}: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| print("\nStarting Tunisian Corpus evaluation...") | |
| try: | |
| tunisian_results = evaluate_tunisian_corpus_coverage(model, tokenizer, device) | |
| print(f"Tunisian Corpus results: {tunisian_results}") | |
| except Exception as e: | |
| print(f"Error in Tunisian Corpus evaluation for {model_name}: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| print("\nEvaluation completed successfully!") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={ | |
| "accuracy": tsac_results.get("fbougares/tsac"), | |
| "coverage": tunisian_results.get("arbml/Tunisian_Dialect_Corpus") | |
| } | |
| ) | |
| except Exception as e: | |
| error_msg = f"An unexpected error occurred during evaluation: {str(e)}" | |
| print(f"Error: {error_msg}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| return EvaluationResult( | |
| model=model_name, | |
| revision=revision, | |
| precision=precision, | |
| weight_type=weight_type, | |
| results={}, | |
| error=error_msg | |
| ) | |
| def reset_stale_running_eval(eval_entry,root ,file_path ,filename ,timeout_interval=10): | |
| submission = eval_entry.get("submitted_time") | |
| try: | |
| started = datetime.fromisoformat(submission) # aware datetime | |
| except Exception as e: | |
| print("Invalid submitted_time format:", submission, e) | |
| now_utc = datetime.now(timezone.utc) | |
| if now_utc - started > timedelta(seconds=timeout_interval): | |
| print(f"Timeout detected — resetting {eval_entry['model']} to PENDING") | |
| eval_entry["status"] = EvaluationStatus.PENDING.value | |
| eval_entry["submitted_time"] = now_utc.isoformat() | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=os.path.join(os.path.basename(root), filename), | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update status to PENDING for {eval_entry['model']} (timeout)", | |
| token=TOKEN | |
| ) | |
| return | |
| def process_evaluation_queue(): | |
| """ | |
| Processes all pending evaluations in the queue. | |
| This function acts as a worker that finds a PENDING job, runs it, | |
| and updates the status on the Hugging Face Hub. | |
| """ | |
| print(f"\n=== Starting evaluation queue processing ===") | |
| print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print(f"Looking for evaluation requests in: {EVAL_REQUESTS_PATH}") | |
| if not os.path.exists(EVAL_REQUESTS_PATH): | |
| print(f"Evaluation requests path does not exist: {EVAL_REQUESTS_PATH}") | |
| return | |
| for root, _, files in os.walk(EVAL_REQUESTS_PATH): | |
| for filename in files: | |
| if filename.endswith('.json'): | |
| file_path = os.path.join(root, filename) | |
| print(f"\nProcessing file: {file_path}") | |
| try: | |
| with open(file_path, 'r') as f: | |
| eval_entry = json.load(f) | |
| status = eval_entry.get('status', '') | |
| if status == EvaluationStatus.PENDING.value: | |
| print(f"Found pending evaluation for model: {eval_entry['model']}") | |
| # --- Step 1: Update status to RUNNING locally and on Hub --- | |
| eval_entry['status'] = EvaluationStatus.RUNNING.value | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| user_name = os.path.basename(root) | |
| path_in_repo_queue = os.path.join(user_name, filename) | |
| # Upload the updated file to the queue repo to reflect 'RUNNING' status | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo_queue, | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update status to RUNNING for {eval_entry['model']}" | |
| ) | |
| print(f"Updated status to RUNNING in queue: {path_in_repo_queue}") | |
| # --- Step 2: Run the evaluation --- | |
| print("\n=== Starting evaluation ===") | |
| eval_result = evaluate_model( | |
| model_name=eval_entry['model'], | |
| revision=eval_entry['revision'], | |
| precision=eval_entry['precision'], | |
| weight_type=eval_entry['weight_type'] | |
| ) | |
| print("\n=== Evaluation completed ===") | |
| # --- Step 3: Update file with final status and results locally --- | |
| if eval_result.error: | |
| eval_entry['status'] = EvaluationStatus.FAILED.value | |
| eval_entry['error'] = eval_result.error | |
| print(f"Evaluation failed with error: {eval_result.error}") | |
| else: | |
| eval_entry['status'] = EvaluationStatus.FINISHED.value | |
| eval_entry['results'] = eval_result.results | |
| print(f"Evaluation finished successfully. Results: {eval_result.results}") | |
| with open(file_path, 'w') as f: | |
| json.dump(eval_entry, f, indent=2) | |
| # --- Step 4: Upload the final file to the results directory on the Hub --- | |
| try: | |
| # Use the local file with its final status as the basis for the results file | |
| path_in_repo_results = os.path.join(user_name, filename) | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo_results, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Evaluation {'results' if not eval_result.error else 'error'} for {eval_entry['model']}" | |
| ) | |
| print("\nResults uploaded to Hugging Face successfully.") | |
| except Exception as upload_error: | |
| print(f"Error uploading results: {str(upload_error)}") | |
| # --- Step 5: Update the status of the request in the queue to FINISHED/FAILED --- | |
| # This keeps a record of all processed jobs in the queue repo. | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=file_path, | |
| path_in_repo=path_in_repo_queue, | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Final status update for {eval_entry['model']}" | |
| ) | |
| print(f"Final status for {eval_entry['model']} updated in the queue repository.") | |
| except Exception as status_update_error: | |
| print(f"Error updating status in queue: {str(status_update_error)}") | |
| elif status == EvaluationStatus.RUNNING.value: | |
| print("Found Running evaluation for model: ", eval_entry['model']) | |
| reset_stale_running_eval(eval_entry, root, file_path, filename) | |
| else: | |
| print(f"Skipping file with status: {status}") | |
| except Exception as e: | |
| print(f"Error processing file {file_path}: {str(e)}") | |
| print(f"Full traceback: {traceback.format_exc()}") | |
| continue | |
| print("\n=== Evaluation queue processed. ===") | |
| print("No more pending jobs found.") | |
| return | |