Spaces:
Running
Running
| from evaluate import EvaluationModule, EvaluationModuleInfo | |
| from datasets import Features, Value | |
| from transformers import pipeline | |
| from qlatent.qmnli.qmnli import * | |
| from qlatent.qmlm.qmlm import * | |
| from qpsychometric import * | |
| import torch | |
| import pandas as pd | |
| from datetime import datetime, timezone | |
| import numpy as np | |
| import pytz | |
| from huggingface_hub import list_repo_commits | |
| import os | |
| from qlatent.questionnaire_eval.questionnaire_utils import * | |
| from questionnaire import Questionnaires | |
| FACTOR_NAME_MAPPING = { | |
| "Hostile Sexism": "H", | |
| "Benevolent Sexism (Intimacy)": "BI", | |
| "Benevolent Sexism (Paternalism)": "BP", | |
| "Benevolent Sexism (Gender Differentiation)": "BG" | |
| } | |
| LOWER_IS_BETTER = { | |
| "GAD7": True, | |
| "PHQ9": True, | |
| "ASI": True, | |
| "H": True, | |
| "BI": True, | |
| "BP": True, | |
| "BG": True, | |
| "Hostile Sexism": True, | |
| "Benevolent Sexism (Intimacy)": True, | |
| "Benevolent Sexism (Paternalism)": True, | |
| "Benevolent Sexism (Gender Differentiation)": True, | |
| "SOC": False, | |
| "Meaningfulness": False, | |
| "Comprehensibility": False, | |
| "Manageability": False, | |
| "CS": False, | |
| "Kindness": False, | |
| "Common Humanity": False, | |
| "Mindfulness": False, | |
| "Indifference": False, # Higher score = less indifference (reverse-coded) | |
| "Separation": False, # Higher score = less separation (reverse-coded) | |
| "Disengagement": False, # Higher score = less disengagement (reverse-coded) | |
| "BIG5": False, | |
| "Openness to Experience": False, | |
| "Conscientiousness": False, | |
| "Extraversion": False, | |
| "Agreeableness": False, | |
| "Neuroticism": True | |
| } | |
| ZERO_IS_BEST = { | |
| "Openness to Experience", | |
| "Extraversion" | |
| } | |
| COMPARISON_TEXT = { | |
| "GAD7": "more anxious than", | |
| "PHQ9": "more depressed than", | |
| "ASI": "more sexist than", | |
| "H": "more hostile sexist than", | |
| "BI": "more benevolent sexist (intimacy) than", | |
| "BP": "more paternalistic than", | |
| "BG": "more gender-differentiated than", | |
| "Hostile Sexism": "more hostile sexist than", | |
| "Benevolent Sexism (Intimacy)": "more benevolent sexist (intimacy) than", | |
| "Benevolent Sexism (Paternalism)": "more paternalistic than", | |
| "Benevolent Sexism (Gender Differentiation)": "more gender-differentiated than", | |
| "SOC": "more coherent than", | |
| "Meaningfulness": "more meaningful than", | |
| "Comprehensibility": "more comprehensible than", | |
| "Manageability": "more manageable than", | |
| "CS": "more compassionate than", | |
| "Kindness": "more kind than", | |
| "Common Humanity": "more connected to common humanity than", | |
| "Mindfulness": "more mindful than", | |
| "Indifference": "less indifferent than", # Reverse-coded: higher score = less indifference | |
| "Separation": "less separated than", # Reverse-coded: higher score = less separation | |
| "Disengagement": "less disengaged than", # Reverse-coded: higher score = less disengagement | |
| "BIG5": "more than", # Default for overall BIG5 | |
| "Openness to Experience": "more open than", | |
| "Conscientiousness": "more conscientious than", | |
| "Extraversion": "more extraverted than", | |
| "Agreeableness": "more agreeable than", | |
| "Neuroticism": "more neurotic than" | |
| } | |
| MLM_TYPE = "QMLM" | |
| NLI_TYPE = "QMNLI" | |
| all_tasks = [MLM_TYPE, NLI_TYPE] | |
| device = 0 if torch.cuda.is_available() else 1 | |
| class Utils(): | |
| def __init__(self): | |
| pass | |
| def average_mean_score_for_positive_only(csv_path, model_version_id): | |
| """Calculate the average mean score from evaluation results in a CSV file.""" | |
| df = pd.read_csv(csv_path, encoding = "utf-8-sig") | |
| df = df[df["model_version_id"] == model_version_id] | |
| return df["mean_score"].mean() | |
| def execute_pipeline_save_results(model_info, pipeline, questionnaire_name, task_type, questionnaires_obj, mongo_handler): | |
| """ | |
| Run the evaluation pipeline on a questionnaire and save results to CSV and MongoDB. | |
| Returns the average mean score for the model. | |
| """ | |
| if model_info.pipeline_tag == "zero-shot-classification": | |
| questionnaire_type = NLI_TYPE | |
| elif model_info.pipeline_tag == "fill-mask": | |
| questionnaire_type = MLM_TYPE | |
| questionnaire = questionnaires_obj.questionnaires[task_type][questionnaire_name] | |
| csv_path_to_save = f"./model_logs/{questionnaire.questionnaire_type}/{questionnaire.name}.csv" | |
| questionnaire.run(pipelines=[pipeline], | |
| softmax=['index', 'frequency'], | |
| filters={ | |
| "unfiltered" : lambda q : {}, | |
| "positive_only" : (lambda q : q.get_filter_for_postive_keywords(['frequency'])), | |
| }, | |
| result_path = csv_path_to_save, | |
| merge_filtered_positiveonly=True, | |
| ) | |
| collection_name = f"{questionnaire_type}_{questionnaire.name}" | |
| mongo_handler.create_empty_collection(collection_name) | |
| mongo_handler.insert_new_rows(csv_path_to_save, collection_name, ["model_version_id", "ordinal"]) | |
| avg_mean_score = Utils.average_mean_score_for_positive_only(csv_path_to_save, model_info.model_version_id) | |
| return avg_mean_score | |
| def load_pipeline_safely(model_id): | |
| """ | |
| Load a HuggingFace pipeline and determine its task type. | |
| Supports zero-shot-classification and fill-mask tasks. | |
| """ | |
| try: | |
| pipe = pipeline(task=None, device=device, model=model_id, trust_remote_code=True) | |
| if pipe.task == "fill-mask": | |
| return pipe, MLM_TYPE | |
| elif pipe.task in ["zero-shot-classification", "text-classification"]: | |
| pipe = pipeline( | |
| task="zero-shot-classification", | |
| device=device, | |
| model=pipe.model, | |
| tokenizer=pipe.tokenizer, | |
| trust_remote_code=True | |
| ) | |
| return pipe, NLI_TYPE | |
| except Exception as e: | |
| allowed_tasks = ["zero-shot-classification", "text-classification", "fill-mask"] | |
| if 'pipe' in locals() and hasattr(pipe, 'task') and pipe.task not in allowed_tasks: | |
| raise ValueError( | |
| f"Invalid model: {model_id}. Must be one of these tasks {allowed_tasks}." | |
| ) | |
| raise ValueError(f"Failed to initialize model {model_id}: {e}") | |
| class Qpsychometric(EvaluationModule): | |
| def __init__(self, hf_api, all_questionnaires, mongo_handler): | |
| super().__init__() | |
| self.hf_api = hf_api | |
| self.all_questionnaires=all_questionnaires | |
| self.mongo_handler=mongo_handler | |
| self.questionnaires_obj = Questionnaires(self.all_questionnaires) | |
| def log_model_error(model_version_id, error_message, mongo_handler=None): | |
| """ | |
| Log a model error to models_errors.csv. | |
| Appends a new row with model_version_id and error message. | |
| Avoids duplicates by checking if the model_version_id already exists. | |
| Args: | |
| model_version_id: The model version ID (model_id_commithash) | |
| error_message: The error message to log | |
| mongo_handler: Optional MongoDBHandler instance for syncing to MongoDB | |
| """ | |
| try: | |
| import os | |
| errors_path = os.path.join("model_logs", "models_errors.csv") | |
| if os.path.exists(errors_path): | |
| errors_df = pd.read_csv(errors_path, encoding="utf-8-sig") | |
| if model_version_id in errors_df['model_version_id'].values: | |
| print(f"Error for {model_version_id} already logged. Skipping duplicate.") | |
| return | |
| else: | |
| errors_df = pd.DataFrame(columns=['model_version_id', 'error']) | |
| new_row = pd.DataFrame([{ | |
| 'model_version_id': model_version_id, | |
| 'error': error_message | |
| }]) | |
| errors_df = pd.concat([errors_df, new_row], ignore_index=True) | |
| errors_df.to_csv(errors_path, index=False, encoding="utf-8-sig") | |
| print(f"Logged error for {model_version_id} to models_errors.csv") | |
| if mongo_handler is not None: | |
| try: | |
| mongo_handler.create_empty_collection("models_errors") | |
| mongo_handler.insert_new_rows(errors_path, collection_name="models_errors", identifiers=["model_version_id"]) | |
| except Exception as mongo_error: | |
| print(f"Could not update MongoDB with error: {mongo_error}") | |
| except Exception as e: | |
| print(f"Failed to log error to models_errors.csv: {e}") | |
| def log_model_meta_data(model_info, pipeline, mongo_handler=None): | |
| """ | |
| Log model metadata to models_meta_data.csv and optionally to MongoDB. | |
| Extracts comprehensive metadata from model_info and pipeline objects. | |
| Args: | |
| model_info: HuggingFace model info object with model metadata | |
| pipeline: Loaded pipeline object (can be None) | |
| mongo_handler: Optional MongoDBHandler instance for syncing to MongoDB | |
| """ | |
| try: | |
| from dateutil import parser | |
| meta_data_path = os.path.join("model_logs", "models_meta_data.csv") | |
| # Check if model already exists in metadata | |
| file_exists = os.path.exists(meta_data_path) | |
| if file_exists: | |
| meta_df = pd.read_csv(meta_data_path, encoding="utf-8-sig") | |
| # Check if this model_version_id already exists | |
| if model_info.model_version_id in meta_df['model_version_id'].values: | |
| print(f"Metadata for {model_info.model_version_id} already logged. Skipping.") | |
| return | |
| # Extract metadata | |
| now = datetime.now(pytz.UTC) | |
| date_of_logging = now.strftime('%d-%m-%Y %H:%M:%S %Z') | |
| model_id = model_info.id if model_info.id else "unknown" | |
| author = model_info.author if model_info.author else "unknown" | |
| # Handle created_at date | |
| if hasattr(model_info, "created_at"): | |
| dt = model_info.created_at | |
| elif hasattr(model_info, "createdAt"): | |
| dt = parser.parse(model_info.createdAt) | |
| else: | |
| dt = datetime.now(pytz.UTC) | |
| created_at = dt.strftime("%d-%m-%Y %H:%M:%S") + " UTC" | |
| downloads = model_info.downloads if model_info.downloads else "unknown" | |
| likes = model_info.likes if model_info.likes else "unknown" | |
| library_name = model_info.library_name if hasattr(model_info, "library_name") else "unknown" | |
| # Handle pipeline_tag | |
| if hasattr(model_info, "original_pipeline_tag"): | |
| pipeline_tag = model_info.original_pipeline_tag | |
| else: | |
| pipeline_tag = model_info.pipeline_tag if model_info.pipeline_tag else "unknown" | |
| # Extract config data | |
| architectures = "unknown" | |
| model_type = "unknown" | |
| if model_info.config: | |
| if "architectures" in model_info.config and model_info.config['architectures']: | |
| architectures = model_info.config['architectures'] | |
| if "model_type" in model_info.config and model_info.config['model_type']: | |
| model_type = model_info.config['model_type'] | |
| # Extract card data | |
| base_model = "unknown" | |
| datasets = "unknown" | |
| language = "unknown" | |
| if model_info.cardData: | |
| if "base_model" in model_info.cardData and model_info.cardData['base_model']: | |
| base_model = model_info.cardData['base_model'] | |
| if "datasets" in model_info.cardData and model_info.cardData['datasets']: | |
| datasets = model_info.cardData['datasets'] | |
| if "language" in model_info.cardData and model_info.cardData['language']: | |
| language = model_info.cardData['language'] | |
| # Extract from tags | |
| fine_tune_base_model = [tag.split('base_model:finetune:')[1] for tag in model_info.tags if 'base_model:finetune:' in tag] or "unknown" | |
| if datasets == "unknown": | |
| datasets = [tag.split('dataset:')[1] for tag in model_info.tags if 'dataset:' in tag] or "unknown" | |
| region = next((tag.split('region:')[1] for tag in model_info.tags if 'region:' in tag), "unknown") | |
| # Extract pipeline data | |
| trainable_params = "unknown" | |
| vocab_size = "unknown" | |
| if pipeline is not None: | |
| if hasattr(pipeline, 'trainable_params') and pipeline.trainable_params: | |
| trainable_params = pipeline.trainable_params | |
| if hasattr(pipeline, 'vocab_size') and pipeline.vocab_size: | |
| vocab_size = pipeline.vocab_size | |
| # Create metadata dictionary | |
| model_info_dict = { | |
| "id": model_id, | |
| "author": author, | |
| "last_commit_hash": model_info.last_commit_hash, | |
| "last_commit_date": model_info.last_commit_date, | |
| "model_version_id": model_info.model_version_id, | |
| "created_at": created_at, | |
| "downloads": downloads, | |
| "likes": likes, | |
| "library_name": library_name, | |
| "pipeline_tag": pipeline_tag, | |
| "architectures": str(architectures), | |
| "model_type": model_type, | |
| "base_model": str(base_model), | |
| "fine_tune_base_model": str(fine_tune_base_model), | |
| "datasets": str(datasets), | |
| "language": str(language), | |
| "trainable_params": str(trainable_params), | |
| "vocab_size": str(vocab_size), | |
| "region": region, | |
| "date_of_logging": date_of_logging, | |
| } | |
| # Convert to DataFrame | |
| new_df = pd.DataFrame([model_info_dict]) | |
| if file_exists: | |
| # Append to existing file | |
| meta_df = pd.read_csv(meta_data_path, encoding="utf-8-sig") | |
| meta_df = pd.concat([meta_df, new_df], ignore_index=True) | |
| else: | |
| # Create new file | |
| meta_df = new_df | |
| # Save to CSV | |
| meta_df.to_csv(meta_data_path, index=False, encoding="utf-8-sig") | |
| print(f"Logged metadata for {model_info.model_version_id} to models_meta_data.csv") | |
| # Also insert to MongoDB if mongo_handler is provided | |
| if mongo_handler is not None: | |
| try: | |
| mongo_handler.create_empty_collection("models_meta_data") | |
| mongo_handler.insert_new_rows(meta_data_path, collection_name="models_meta_data", identifiers=["model_version_id"]) | |
| except Exception as mongo_error: | |
| print(f"Could not update MongoDB with metadata: {mongo_error}") | |
| except Exception as e: | |
| print(f"Failed to log metadata to models_meta_data.csv: {e}") | |
| def model_df_model_logs(self, model_version_id, questionnaire_type, questionnaire_name): | |
| """Retrieve evaluation results for a specific model version from stored logs.""" | |
| csv_path_to_read = f"./model_logs/{questionnaire_type}/{questionnaire_name}.csv" | |
| questionnaire_df = pd.read_csv(csv_path_to_read, encoding="utf-8-sig") | |
| model_filtered_df = questionnaire_df[questionnaire_df["model_version_id"] == model_version_id] | |
| return model_filtered_df | |
| def check_model_exists_in_logs(self, model_version_id, questionnaire_name, task_type): | |
| """Check if evaluation results for a model already exist in logs.""" | |
| try: | |
| csv_path_to_read = f"./model_logs/{task_type}/{questionnaire_name}.csv" | |
| questionnaire_df = pd.read_csv(csv_path_to_read, encoding="utf-8-sig") | |
| model_filtered_df = questionnaire_df[questionnaire_df["model_version_id"] == model_version_id] | |
| if len(model_filtered_df) > 0: | |
| # Calculate average mean score for positive_only filter | |
| positive_only_df = model_filtered_df[model_filtered_df['filter'] == 'positive_only'] | |
| if len(positive_only_df) > 0: | |
| avg_mean_score = positive_only_df['mean_score'].mean() | |
| return True, avg_mean_score | |
| return False, None | |
| except FileNotFoundError: | |
| return False, None | |
| except Exception as e: | |
| return False, None | |
| def get_all_questionnaire_results(self, model_version_id): | |
| """ | |
| Get all questionnaire results for a model version. | |
| Returns a list of dicts with task_type, questionnaire_name, mean_score, and z_score. | |
| For BIG5, ONLY includes separate entries for each factor (not the overall BIG5 score). | |
| Z-score is calculated relative to all models for that specific questionnaire/factor. | |
| """ | |
| from scipy import stats | |
| results = [] | |
| # Iterate through all questionnaire types and names | |
| questionnaire_types = ['QMLM', 'QMNLI'] | |
| # BIG5 factors | |
| BIG5_FACTORS = ["Openness to Experience", "Conscientiousness", "Extraversion", "Agreeableness", "Neuroticism"] | |
| for task_type in questionnaire_types: | |
| for questionnaire_name in self.questionnaires_obj.questionnaires[task_type].keys(): | |
| exists, avg_score = self.check_model_exists_in_logs(model_version_id, questionnaire_name, task_type) | |
| if exists: | |
| # For BIG5, only add factor entries (skip the overall BIG5 entry) | |
| if questionnaire_name == "BIG5": | |
| csv_path = f"./model_logs/{task_type}/{questionnaire_name}.csv" | |
| try: | |
| df = pd.read_csv(csv_path, encoding="utf-8-sig") | |
| # Filter by positive_only | |
| positive_df = df[df['filter'] == 'positive_only'].copy() | |
| # Calculate mean score and z-score for each factor | |
| for factor in BIG5_FACTORS: | |
| factor_df = positive_df[positive_df['factor'] == factor] | |
| if len(factor_df) > 0: | |
| # Get all models' scores for this factor | |
| all_model_scores = factor_df.groupby('model')['mean_score'].mean() | |
| # Find this model's score | |
| model_ids = positive_df[positive_df['model_version_id'] == model_version_id]['model'].unique() | |
| if len(model_ids) > 0: | |
| model_id = model_ids[0] | |
| if model_id in all_model_scores.index: | |
| factor_mean_score = all_model_scores[model_id] | |
| # Calculate z-score | |
| if len(all_model_scores) > 1: | |
| z_score = stats.zscore(all_model_scores)[all_model_scores.index.get_loc(model_id)] | |
| else: | |
| z_score = 0.0 | |
| results.append({ | |
| 'questionnaire_task': task_type, | |
| 'questionnaire_name': f"BIG5 - {factor}", | |
| 'mean_score': factor_mean_score, | |
| 'z_score': z_score | |
| }) | |
| except Exception as e: | |
| print(f"Error processing BIG5 factors: {e}") | |
| else: | |
| # For all other questionnaires, calculate z-score | |
| csv_path = f"./model_logs/{task_type}/{questionnaire_name}.csv" | |
| try: | |
| df = pd.read_csv(csv_path, encoding="utf-8-sig") | |
| positive_df = df[df['filter'] == 'positive_only'].copy() | |
| # Get all models' scores | |
| all_model_scores = positive_df.groupby('model')['mean_score'].mean() | |
| # Find this model's ID | |
| model_ids = positive_df[positive_df['model_version_id'] == model_version_id]['model'].unique() | |
| if len(model_ids) > 0: | |
| model_id = model_ids[0] | |
| if model_id in all_model_scores.index: | |
| # Calculate z-score | |
| if len(all_model_scores) > 1: | |
| z_score = stats.zscore(all_model_scores)[all_model_scores.index.get_loc(model_id)] | |
| else: | |
| z_score = 0.0 | |
| results.append({ | |
| 'questionnaire_task': task_type, | |
| 'questionnaire_name': questionnaire_name, | |
| 'mean_score': avg_score, | |
| 'z_score': z_score | |
| }) | |
| except Exception as e: | |
| print(f"Error calculating z-score for {questionnaire_name}: {e}") | |
| # Fallback without z-score | |
| results.append({ | |
| 'questionnaire_task': task_type, | |
| 'questionnaire_name': questionnaire_name, | |
| 'mean_score': avg_score, | |
| 'z_score': 0.0 | |
| }) | |
| return results | |
| def calculate_percentile(self, questionnaire_name, task_type, current_score): | |
| """Calculate the percentage of models that this score is better than. | |
| Args: | |
| questionnaire_name: Name of the questionnaire (e.g., "BIG5" or "BIG5 - Openness to Experience") | |
| task_type: Type of task (QMLM or QMNLI) | |
| current_score: The score to calculate percentile for | |
| Returns: | |
| Percentage of models this score is better than (0-100) or None if cannot be calculated | |
| """ | |
| try: | |
| # Check if this is a factor-specific query (format: "{QUESTIONNAIRE} - {factor}") | |
| factor_name = None | |
| base_questionnaire = questionnaire_name | |
| if questionnaire_name.startswith("BIG5 - "): | |
| factor_name = questionnaire_name[7:] # Extract factor name after "BIG5 - " | |
| base_questionnaire = "BIG5" | |
| elif questionnaire_name.startswith("ASI - "): | |
| factor_name = questionnaire_name[6:] # Extract factor name after "ASI - " | |
| base_questionnaire = "ASI" | |
| # Map display name to CSV column name if needed | |
| factor_name = FACTOR_NAME_MAPPING.get(factor_name, factor_name) | |
| csv_path = f"./model_logs/{task_type}/{base_questionnaire}.csv" | |
| df = pd.read_csv(csv_path, encoding="utf-8-sig") | |
| # Filter for positive_only | |
| positive_df = df[df['filter'] == 'positive_only'].copy() | |
| # If this is a factor-specific query, filter by factor column | |
| if factor_name: | |
| positive_df = positive_df[positive_df['factor'] == factor_name] | |
| # Group by model to get average scores | |
| model_scores = positive_df.groupby('model')['mean_score'].mean() | |
| if len(model_scores) == 0: | |
| return None | |
| # Calculate percentage of models that have LESS of this construct than current model | |
| # Since we now express all as "more [construct] than X%", we count models with lower scores | |
| # | |
| # Examples: | |
| # - "more anxious than 90%" means 90% of models have lower anxiety scores | |
| # - "more compassionate than 80%" means 80% of models have lower compassion scores | |
| # | |
| # For questionnaires where lower is better (GAD7, PHQ9, ASI): | |
| # - Count models with LOWER scores (they have less of the negative trait) | |
| # For questionnaires where higher is better (SOC, CS, BIG5 traits): | |
| # - Count models with LOWER scores (they have less of the positive trait) | |
| # For BIG5 factors, use the factor name to determine direction | |
| lookup_name = factor_name if factor_name else questionnaire_name | |
| is_zero_best = lookup_name in ZERO_IS_BEST | |
| lower_is_better = LOWER_IS_BETTER.get(lookup_name, False) | |
| if is_zero_best: | |
| # For zero-is-best: count models with larger absolute distance from 0 | |
| current_abs = abs(current_score) | |
| better_than_percentage = (model_scores.abs() > current_abs).sum() / len(model_scores) * 100 | |
| elif lower_is_better: | |
| # For "negative" constructs (anxiety, depression, sexism): | |
| # Count models with LOWER scores (this model has MORE of the negative trait) | |
| better_than_percentage = (model_scores < current_score).sum() / len(model_scores) * 100 | |
| else: | |
| # For "positive" constructs (coherence, compassion, openness): | |
| # Count models with LOWER scores (this model has MORE of the positive trait) | |
| better_than_percentage = (model_scores < current_score).sum() / len(model_scores) * 100 | |
| return better_than_percentage | |
| except Exception as e: | |
| print(f"Error calculating percentile: {e}") | |
| return None | |
| def generate_big5_factor_acronym(self, factor_name): | |
| """Generate acronym for BIG5 factor names only. | |
| Rules (ONLY for BIG5 factors): | |
| - If 1 word: first letter only | |
| - If 2+ words: first letter of first 2 significant words (skip stop words like 'to', 'of', etc.) | |
| Examples: | |
| - "Openness to Experience" -> "OE" | |
| - "Conscientiousness" -> "C" | |
| - "Extraversion" -> "E" | |
| - "Agreeableness" -> "A" | |
| - "Neuroticism" -> "N" | |
| """ | |
| # Stop words to skip | |
| stop_words = {'to', 'of', 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'for'} | |
| # Split into words and filter out stop words | |
| words = factor_name.split() | |
| significant_words = [w for w in words if w.lower() not in stop_words] | |
| if len(significant_words) == 0: | |
| return factor_name # Fallback | |
| elif len(significant_words) == 1: | |
| # Single word: just first letter | |
| return significant_words[0][0].upper() | |
| else: | |
| # Two or more words: first letter of first 2 words | |
| return (significant_words[0][0] + significant_words[1][0]).upper() | |
| def format_results_as_yaml(self, model_id, results_list): | |
| """Format results as YAML string matching the specified structure | |
| Args: | |
| model_id: The model identifier | |
| results_list: List of dicts with keys: questionnaire_task, questionnaire_name, mean_score, z_score | |
| Returns: | |
| YAML formatted string | |
| """ | |
| # Mapping for dataset names with construct in parentheses | |
| DATASET_DISPLAY_NAMES = { | |
| "GAD7": "GAD7 (Anxiety)", | |
| "PHQ9": "PHQ9 (Depression)", | |
| "SOC": "SOC (Coherence)", | |
| "CS": "CS (Compassion)", | |
| "ASI": "ASI (Sexism)" | |
| } | |
| # Mapping for BIG5 factor display names | |
| BIG5_FACTOR_NAMES = { | |
| "Openness to Experience": "Openness", | |
| "Conscientiousness": "Conscientiousness", | |
| "Extraversion": "Extraversion", | |
| "Agreeableness": "Agreeableness", | |
| "Neuroticism": "Neuroticism" | |
| } | |
| lines = [] | |
| lines.append("---") | |
| lines.append("model-index:") | |
| lines.append(f" - name: {model_id}") | |
| lines.append(" results:") | |
| # Build one 'results' entry per questionnaire | |
| for result in results_list: | |
| questionnaire_name = result['questionnaire_name'] | |
| task_type = result['questionnaire_task'] | |
| mean_score = result['mean_score'] | |
| z_score = result.get('z_score', 0.0) # Get z-score, default to 0 if not present | |
| # Calculate ranking information | |
| better_than_pct = self.calculate_percentile(questionnaire_name, task_type, mean_score) | |
| # Extract the construct name for lookup | |
| if " - " in questionnaire_name: | |
| construct = questionnaire_name.split(" - ", 1)[1] | |
| base_questionnaire = questionnaire_name.split(" - ", 1)[0] | |
| else: | |
| construct = questionnaire_name | |
| base_questionnaire = questionnaire_name | |
| # Determine if construct is neutral, lower-is-better, or higher-is-better | |
| is_neutral = construct in ZERO_IS_BEST | |
| is_lower_better = LOWER_IS_BETTER.get(construct, False) | |
| # Get comparison text for the value display | |
| comparison_text = COMPARISON_TEXT.get(construct, "better than") | |
| # Calculate total models and format value | |
| if better_than_pct is not None: | |
| try: | |
| # Read CSV to get total model count | |
| if base_questionnaire.startswith("BIG5"): | |
| csv_path = f"./model_logs/{task_type}/BIG5.csv" | |
| df = pd.read_csv(csv_path, encoding="utf-8-sig") | |
| positive_df = df[df['filter'] == 'positive_only'].copy() | |
| positive_df = positive_df[positive_df['factor'] == construct] | |
| else: | |
| csv_path = f"./model_logs/{task_type}/{base_questionnaire}.csv" | |
| df = pd.read_csv(csv_path, encoding="utf-8-sig") | |
| positive_df = df[df['filter'] == 'positive_only'].copy() | |
| total_models = positive_df['model'].nunique() | |
| # Format value based on construct type | |
| # Format: z_score (Rank: rank/total) or z_score (percentile) | |
| # Calculate rank by directly counting models (matching profile card logic) | |
| if is_neutral: | |
| # For neutral constructs: show absolute z-score as deviation from neutral | |
| # Don't show ranking/percentile as it's confusing for neutral constructs | |
| abs_z_score = abs(z_score) | |
| value_display = f"{abs_z_score:.3f} (deviation from neutral)" | |
| elif is_lower_better: | |
| # For negative constructs: rank from left (best/lowest) to right (worst/highest) | |
| # Use direct counting to match profile card: count models with score <= current_score | |
| model_scores = positive_df.groupby('model')['mean_score'].mean() | |
| sorted_scores = sorted(model_scores.values) | |
| rank = sum(1 for s in sorted_scores if s <= mean_score) | |
| value_display = f"{z_score:.3f} (Rank: {rank}/{total_models})" | |
| else: | |
| # For positive constructs: rank from best (highest) to worst (lowest) | |
| # Higher scores are better, rank = number of models with HIGHER scores + 1 (rank from top) | |
| model_scores = positive_df.groupby('model')['mean_score'].mean() | |
| sorted_scores = sorted(model_scores.values) | |
| models_with_higher_scores = sum(1 for s in sorted_scores if s > mean_score) | |
| rank = models_with_higher_scores + 1 | |
| value_display = f"{z_score:.3f} (Rank: {rank}/{total_models})" | |
| except Exception as e: | |
| value_display = f"{z_score:.3f}" | |
| else: | |
| value_display = f"{z_score:.3f}" | |
| # Determine dataset name and metric type | |
| if questionnaire_name.startswith("BIG5 - "): | |
| factor_part = questionnaire_name[7:] # Extract part after "BIG5 - " | |
| # Get display name (e.g., "Openness" instead of "Openness to Experience") | |
| display_name = BIG5_FACTOR_NAMES.get(factor_part, factor_part) | |
| dataset_name = f"BIG5 ({display_name})" | |
| metric_name = "Z-Score" | |
| metric_type = f"{questionnaire_name} Z-Score" | |
| else: | |
| dataset_name = DATASET_DISPLAY_NAMES.get(questionnaire_name, questionnaire_name) | |
| metric_name = "Z-Score" | |
| metric_type = f"{questionnaire_name} Z-Score" | |
| lines.append(" - task:") | |
| lines.append(f" type: {task_type}") | |
| lines.append(" name: Psychometrics Assessments") | |
| lines.append(" dataset:") | |
| lines.append(f" name: {dataset_name}") | |
| lines.append(" type: Qpsychometric") | |
| lines.append(" metrics:") | |
| lines.append(f" - name: {metric_name}") | |
| lines.append(f" type: {metric_type}") | |
| lines.append(f" value: '{value_display}'") | |
| lines.append(" source:") | |
| lines.append(" name: Qpsychometric Space") | |
| lines.append(" url: https://huggingface.co/spaces/cnai-lab/Qpsychometric") | |
| lines.append("---") | |
| return "\n".join(lines) | |
| def _info(self): | |
| """ | |
| Defines the metadata and configuration for the evaluation module. | |
| Returns: | |
| EvaluationModuleInfo: An object containing the description, citation, input descriptions, | |
| and the features of the evaluation module. | |
| """ | |
| return EvaluationModuleInfo( | |
| description="This metric evaluates a model's bias towards the questionnaire domain, computing an average mean score across all its questions.", | |
| citation="""@article{reuben2024assessment, | |
| title={Assessment and manipulation of latent constructs in pre-trained language models using psychometric scales}, | |
| author={Reuben, Maor and Slobodin, Ortal and Elyshar, Aviad and Cohen, Idan-Chaim and Braun-Lewensohn, Orna and Cohen, Odeya and Puzis, Rami}, | |
| journal={arXiv preprint arXiv:2409.19655}, | |
| year={2024} | |
| }""", | |
| inputs_description="This metric expects a model identifier, a questionnaire name, and a questionnaire type. The output is an average mean score indicating the model's bias.", | |
| features={ | |
| "model_id": Value("string"), | |
| "questionnaire_name": Value("string"), | |
| "task_type": Value("string"), | |
| } | |
| ) | |
| def compute(self, **kwargs): | |
| """ | |
| Evaluate a model on a questionnaire using the specified task type. | |
| Requires model_id, questionnaire_name, and task_type as arguments. | |
| """ | |
| all_kwargs = {**kwargs} | |
| missing_inputs = [k for k in self._feature_names() if k not in all_kwargs] | |
| if missing_inputs: | |
| raise ValueError( | |
| f"Evaluation module inputs are missing: {missing_inputs}. All required inputs are {list(self._feature_names())}" | |
| ) | |
| # Pass all kwargs to _compute, not just the ones in _feature_names | |
| # This allows optional parameters like filter_by_accuracy to be passed through | |
| output = self._compute(**all_kwargs) | |
| return output | |
| def _compute(self, **kwargs): | |
| """ | |
| Internal computation method that handles model evaluation logic. | |
| Checks cache, runs evaluation if needed, and returns results with YAML output. | |
| """ | |
| model_id = kwargs.get("model_id") | |
| full_questionnaire_name = kwargs.get("questionnaire_name") | |
| task_type = kwargs.get("task_type") | |
| start = full_questionnaire_name.find('(') + 1 | |
| end = full_questionnaire_name.find(')') | |
| questionnaire_name = full_questionnaire_name[start:end] | |
| commits = list_repo_commits(repo_id=model_id, repo_type="model")[0] | |
| last_commit_hash = commits.commit_id | |
| model_version_id = model_id+"_"+last_commit_hash | |
| model_info = hf_api.model_info(model_id) | |
| model_info.last_commit_hash = last_commit_hash | |
| model_info.last_commit_date = commits.created_at.strftime("%d-%m-%Y %H:%M:%S") + " UTC" | |
| model_info.model_version_id = model_version_id | |
| # Check if model results already exist in model_logs | |
| exists, cached_mean_score = self.check_model_exists_in_logs(model_version_id, questionnaire_name, task_type) | |
| # Calculate metrics for all factors | |
| csv_path_to_read = f"./model_logs/{task_type}/{questionnaire_name}.csv" | |
| questionnaire = self.questionnaires_obj.questionnaires[task_type][questionnaire_name] | |
| if exists: | |
| # Get all questionnaire results for YAML string | |
| all_results = self.get_all_questionnaire_results(model_version_id) | |
| correlations = questionnaire.calc_correlations(csv_path_to_read) | |
| total_alpha, factor_alphas = questionnaire.calc_alpha(csv_path_to_read) | |
| mean, std, avg_negative_per_model = questionnaire.calc_silhouette(csv_path_to_read) | |
| # Pack values back into tuples for compatibility | |
| alpha = (total_alpha, factor_alphas) | |
| silhouette = (mean, std, avg_negative_per_model) | |
| # Format results as YAML string | |
| string_result = self.format_results_as_yaml(model_id, all_results) | |
| return {"avg_mean_score": cached_mean_score, "correlations": correlations, "alpha": alpha, "silhouette": silhouette}, string_result | |
| # If not found, evaluate the model | |
| try: | |
| avg_mean_score = Qpsychometric.evaluate_model_on_questionnaire(model_info, questionnaire_name, task_type, self.questionnaires_obj, self.mongo_handler) | |
| # Calculate metrics after evaluation | |
| correlations = questionnaire.calc_correlations(csv_path_to_read) | |
| total_alpha, factor_alphas = questionnaire.calc_alpha(csv_path_to_read) | |
| mean, std, avg_negative_per_model = questionnaire.calc_silhouette(csv_path_to_read) | |
| # Pack values back into tuples for compatibility | |
| alpha = (total_alpha, factor_alphas) | |
| silhouette = (mean, std, avg_negative_per_model) | |
| # Get all questionnaire results for YAML string | |
| all_results = self.get_all_questionnaire_results(model_version_id) | |
| # Format results as YAML string | |
| string_result = self.format_results_as_yaml(model_id, all_results) | |
| # Add metrics to result | |
| avg_mean_score["correlations"] = correlations | |
| avg_mean_score["alpha"] = alpha | |
| avg_mean_score["silhouette"] = silhouette | |
| return avg_mean_score, string_result | |
| except Exception as e: | |
| # Log the error to models_errors.csv and MongoDB | |
| error_message = str(e) | |
| Qpsychometric.log_model_error(model_version_id, error_message, self.mongo_handler) | |
| # Re-raise the exception so it can be handled by the UI layer | |
| raise | |
| def evaluate_model_on_questionnaire(model_info, questionnaire_name, task_type, questionnaires_obj, mongo_handler, get_report=False): | |
| """ | |
| Run evaluation on a model using the specified questionnaire and task type. | |
| Validates compatibility and loads the appropriate pipeline for evaluation. | |
| """ | |
| only_qmnli = ["CS"] | |
| only_qmlm = [] | |
| if (questionnaire_name in only_qmnli and task_type == MLM_TYPE) or (questionnaire_name in only_qmlm and task_type == NLI_TYPE): | |
| raise ValueError(f"The task {task_type} is not available for {questionnaire_name}.") | |
| model_id = model_info.id | |
| # Load the model pipeline safely | |
| pipe, pipe_type = Utils.load_pipeline_safely(model_id=model_id) | |
| pipe.model_identifier = model_id | |
| # Check if the loaded model's task type matches the expected questionnaire type | |
| if pipe_type != task_type: | |
| raise ValueError(f"Invalid model: {model_id} for task type. Model task ({pipe_type}) must match the task type ({task_type}).") | |
| # Log model metadata (only once per model_version_id) | |
| Qpsychometric.log_model_meta_data(model_info, pipe, mongo_handler) | |
| # Retrieve the questionnaire object based on the extracted name and type | |
| #questionnaire = all_psychometrics[questionnaire_name][task_type] | |
| # Compute and return the mean score using the loaded pipeline and questionnaire | |
| results = Qpsychometric._compute_mean_score(model_info, pipe, questionnaire_name, task_type, questionnaires_obj, mongo_handler) | |
| return results | |
| def _compute_mean_score(model_info, pipe, questionnaire_name, task_type, questionnaires_obj, mongo_handler): | |
| """Execute the evaluation pipeline and calculate the average mean score.""" | |
| avg_mean_score = Utils.execute_pipeline_save_results(model_info, pipe, questionnaire_name, task_type, questionnaires_obj, mongo_handler) | |
| return {"avg_mean_score":avg_mean_score} | |