Qpsychometric / qpsychometric_eval.py
Fadi12's picture
Qpsychometric Space
59a8a7c
from evaluate import EvaluationModule, EvaluationModuleInfo
from datasets import Features, Value
from transformers import pipeline
from qlatent.qmnli.qmnli import *
from qlatent.qmlm.qmlm import *
from qpsychometric import *
import torch
import pandas as pd
from datetime import datetime, timezone
import numpy as np
import pytz
from huggingface_hub import list_repo_commits
import os
from qlatent.questionnaire_eval.questionnaire_utils import *
from questionnaire import Questionnaires
FACTOR_NAME_MAPPING = {
"Hostile Sexism": "H",
"Benevolent Sexism (Intimacy)": "BI",
"Benevolent Sexism (Paternalism)": "BP",
"Benevolent Sexism (Gender Differentiation)": "BG"
}
LOWER_IS_BETTER = {
"GAD7": True,
"PHQ9": True,
"ASI": True,
"H": True,
"BI": True,
"BP": True,
"BG": True,
"Hostile Sexism": True,
"Benevolent Sexism (Intimacy)": True,
"Benevolent Sexism (Paternalism)": True,
"Benevolent Sexism (Gender Differentiation)": True,
"SOC": False,
"Meaningfulness": False,
"Comprehensibility": False,
"Manageability": False,
"CS": False,
"Kindness": False,
"Common Humanity": False,
"Mindfulness": False,
"Indifference": False, # Higher score = less indifference (reverse-coded)
"Separation": False, # Higher score = less separation (reverse-coded)
"Disengagement": False, # Higher score = less disengagement (reverse-coded)
"BIG5": False,
"Openness to Experience": False,
"Conscientiousness": False,
"Extraversion": False,
"Agreeableness": False,
"Neuroticism": True
}
ZERO_IS_BEST = {
"Openness to Experience",
"Extraversion"
}
COMPARISON_TEXT = {
"GAD7": "more anxious than",
"PHQ9": "more depressed than",
"ASI": "more sexist than",
"H": "more hostile sexist than",
"BI": "more benevolent sexist (intimacy) than",
"BP": "more paternalistic than",
"BG": "more gender-differentiated than",
"Hostile Sexism": "more hostile sexist than",
"Benevolent Sexism (Intimacy)": "more benevolent sexist (intimacy) than",
"Benevolent Sexism (Paternalism)": "more paternalistic than",
"Benevolent Sexism (Gender Differentiation)": "more gender-differentiated than",
"SOC": "more coherent than",
"Meaningfulness": "more meaningful than",
"Comprehensibility": "more comprehensible than",
"Manageability": "more manageable than",
"CS": "more compassionate than",
"Kindness": "more kind than",
"Common Humanity": "more connected to common humanity than",
"Mindfulness": "more mindful than",
"Indifference": "less indifferent than", # Reverse-coded: higher score = less indifference
"Separation": "less separated than", # Reverse-coded: higher score = less separation
"Disengagement": "less disengaged than", # Reverse-coded: higher score = less disengagement
"BIG5": "more than", # Default for overall BIG5
"Openness to Experience": "more open than",
"Conscientiousness": "more conscientious than",
"Extraversion": "more extraverted than",
"Agreeableness": "more agreeable than",
"Neuroticism": "more neurotic than"
}
MLM_TYPE = "QMLM"
NLI_TYPE = "QMNLI"
all_tasks = [MLM_TYPE, NLI_TYPE]
device = 0 if torch.cuda.is_available() else 1
class Utils():
def __init__(self):
pass
@staticmethod
def average_mean_score_for_positive_only(csv_path, model_version_id):
"""Calculate the average mean score from evaluation results in a CSV file."""
df = pd.read_csv(csv_path, encoding = "utf-8-sig")
df = df[df["model_version_id"] == model_version_id]
return df["mean_score"].mean()
@staticmethod
def execute_pipeline_save_results(model_info, pipeline, questionnaire_name, task_type, questionnaires_obj, mongo_handler):
"""
Run the evaluation pipeline on a questionnaire and save results to CSV and MongoDB.
Returns the average mean score for the model.
"""
if model_info.pipeline_tag == "zero-shot-classification":
questionnaire_type = NLI_TYPE
elif model_info.pipeline_tag == "fill-mask":
questionnaire_type = MLM_TYPE
questionnaire = questionnaires_obj.questionnaires[task_type][questionnaire_name]
csv_path_to_save = f"./model_logs/{questionnaire.questionnaire_type}/{questionnaire.name}.csv"
questionnaire.run(pipelines=[pipeline],
softmax=['index', 'frequency'],
filters={
"unfiltered" : lambda q : {},
"positive_only" : (lambda q : q.get_filter_for_postive_keywords(['frequency'])),
},
result_path = csv_path_to_save,
merge_filtered_positiveonly=True,
)
collection_name = f"{questionnaire_type}_{questionnaire.name}"
mongo_handler.create_empty_collection(collection_name)
mongo_handler.insert_new_rows(csv_path_to_save, collection_name, ["model_version_id", "ordinal"])
avg_mean_score = Utils.average_mean_score_for_positive_only(csv_path_to_save, model_info.model_version_id)
return avg_mean_score
@staticmethod
def load_pipeline_safely(model_id):
"""
Load a HuggingFace pipeline and determine its task type.
Supports zero-shot-classification and fill-mask tasks.
"""
try:
pipe = pipeline(task=None, device=device, model=model_id, trust_remote_code=True)
if pipe.task == "fill-mask":
return pipe, MLM_TYPE
elif pipe.task in ["zero-shot-classification", "text-classification"]:
pipe = pipeline(
task="zero-shot-classification",
device=device,
model=pipe.model,
tokenizer=pipe.tokenizer,
trust_remote_code=True
)
return pipe, NLI_TYPE
except Exception as e:
allowed_tasks = ["zero-shot-classification", "text-classification", "fill-mask"]
if 'pipe' in locals() and hasattr(pipe, 'task') and pipe.task not in allowed_tasks:
raise ValueError(
f"Invalid model: {model_id}. Must be one of these tasks {allowed_tasks}."
)
raise ValueError(f"Failed to initialize model {model_id}: {e}")
class Qpsychometric(EvaluationModule):
def __init__(self, hf_api, all_questionnaires, mongo_handler):
super().__init__()
self.hf_api = hf_api
self.all_questionnaires=all_questionnaires
self.mongo_handler=mongo_handler
self.questionnaires_obj = Questionnaires(self.all_questionnaires)
@staticmethod
def log_model_error(model_version_id, error_message, mongo_handler=None):
"""
Log a model error to models_errors.csv.
Appends a new row with model_version_id and error message.
Avoids duplicates by checking if the model_version_id already exists.
Args:
model_version_id: The model version ID (model_id_commithash)
error_message: The error message to log
mongo_handler: Optional MongoDBHandler instance for syncing to MongoDB
"""
try:
import os
errors_path = os.path.join("model_logs", "models_errors.csv")
if os.path.exists(errors_path):
errors_df = pd.read_csv(errors_path, encoding="utf-8-sig")
if model_version_id in errors_df['model_version_id'].values:
print(f"Error for {model_version_id} already logged. Skipping duplicate.")
return
else:
errors_df = pd.DataFrame(columns=['model_version_id', 'error'])
new_row = pd.DataFrame([{
'model_version_id': model_version_id,
'error': error_message
}])
errors_df = pd.concat([errors_df, new_row], ignore_index=True)
errors_df.to_csv(errors_path, index=False, encoding="utf-8-sig")
print(f"Logged error for {model_version_id} to models_errors.csv")
if mongo_handler is not None:
try:
mongo_handler.create_empty_collection("models_errors")
mongo_handler.insert_new_rows(errors_path, collection_name="models_errors", identifiers=["model_version_id"])
except Exception as mongo_error:
print(f"Could not update MongoDB with error: {mongo_error}")
except Exception as e:
print(f"Failed to log error to models_errors.csv: {e}")
@staticmethod
def log_model_meta_data(model_info, pipeline, mongo_handler=None):
"""
Log model metadata to models_meta_data.csv and optionally to MongoDB.
Extracts comprehensive metadata from model_info and pipeline objects.
Args:
model_info: HuggingFace model info object with model metadata
pipeline: Loaded pipeline object (can be None)
mongo_handler: Optional MongoDBHandler instance for syncing to MongoDB
"""
try:
from dateutil import parser
meta_data_path = os.path.join("model_logs", "models_meta_data.csv")
# Check if model already exists in metadata
file_exists = os.path.exists(meta_data_path)
if file_exists:
meta_df = pd.read_csv(meta_data_path, encoding="utf-8-sig")
# Check if this model_version_id already exists
if model_info.model_version_id in meta_df['model_version_id'].values:
print(f"Metadata for {model_info.model_version_id} already logged. Skipping.")
return
# Extract metadata
now = datetime.now(pytz.UTC)
date_of_logging = now.strftime('%d-%m-%Y %H:%M:%S %Z')
model_id = model_info.id if model_info.id else "unknown"
author = model_info.author if model_info.author else "unknown"
# Handle created_at date
if hasattr(model_info, "created_at"):
dt = model_info.created_at
elif hasattr(model_info, "createdAt"):
dt = parser.parse(model_info.createdAt)
else:
dt = datetime.now(pytz.UTC)
created_at = dt.strftime("%d-%m-%Y %H:%M:%S") + " UTC"
downloads = model_info.downloads if model_info.downloads else "unknown"
likes = model_info.likes if model_info.likes else "unknown"
library_name = model_info.library_name if hasattr(model_info, "library_name") else "unknown"
# Handle pipeline_tag
if hasattr(model_info, "original_pipeline_tag"):
pipeline_tag = model_info.original_pipeline_tag
else:
pipeline_tag = model_info.pipeline_tag if model_info.pipeline_tag else "unknown"
# Extract config data
architectures = "unknown"
model_type = "unknown"
if model_info.config:
if "architectures" in model_info.config and model_info.config['architectures']:
architectures = model_info.config['architectures']
if "model_type" in model_info.config and model_info.config['model_type']:
model_type = model_info.config['model_type']
# Extract card data
base_model = "unknown"
datasets = "unknown"
language = "unknown"
if model_info.cardData:
if "base_model" in model_info.cardData and model_info.cardData['base_model']:
base_model = model_info.cardData['base_model']
if "datasets" in model_info.cardData and model_info.cardData['datasets']:
datasets = model_info.cardData['datasets']
if "language" in model_info.cardData and model_info.cardData['language']:
language = model_info.cardData['language']
# Extract from tags
fine_tune_base_model = [tag.split('base_model:finetune:')[1] for tag in model_info.tags if 'base_model:finetune:' in tag] or "unknown"
if datasets == "unknown":
datasets = [tag.split('dataset:')[1] for tag in model_info.tags if 'dataset:' in tag] or "unknown"
region = next((tag.split('region:')[1] for tag in model_info.tags if 'region:' in tag), "unknown")
# Extract pipeline data
trainable_params = "unknown"
vocab_size = "unknown"
if pipeline is not None:
if hasattr(pipeline, 'trainable_params') and pipeline.trainable_params:
trainable_params = pipeline.trainable_params
if hasattr(pipeline, 'vocab_size') and pipeline.vocab_size:
vocab_size = pipeline.vocab_size
# Create metadata dictionary
model_info_dict = {
"id": model_id,
"author": author,
"last_commit_hash": model_info.last_commit_hash,
"last_commit_date": model_info.last_commit_date,
"model_version_id": model_info.model_version_id,
"created_at": created_at,
"downloads": downloads,
"likes": likes,
"library_name": library_name,
"pipeline_tag": pipeline_tag,
"architectures": str(architectures),
"model_type": model_type,
"base_model": str(base_model),
"fine_tune_base_model": str(fine_tune_base_model),
"datasets": str(datasets),
"language": str(language),
"trainable_params": str(trainable_params),
"vocab_size": str(vocab_size),
"region": region,
"date_of_logging": date_of_logging,
}
# Convert to DataFrame
new_df = pd.DataFrame([model_info_dict])
if file_exists:
# Append to existing file
meta_df = pd.read_csv(meta_data_path, encoding="utf-8-sig")
meta_df = pd.concat([meta_df, new_df], ignore_index=True)
else:
# Create new file
meta_df = new_df
# Save to CSV
meta_df.to_csv(meta_data_path, index=False, encoding="utf-8-sig")
print(f"Logged metadata for {model_info.model_version_id} to models_meta_data.csv")
# Also insert to MongoDB if mongo_handler is provided
if mongo_handler is not None:
try:
mongo_handler.create_empty_collection("models_meta_data")
mongo_handler.insert_new_rows(meta_data_path, collection_name="models_meta_data", identifiers=["model_version_id"])
except Exception as mongo_error:
print(f"Could not update MongoDB with metadata: {mongo_error}")
except Exception as e:
print(f"Failed to log metadata to models_meta_data.csv: {e}")
def model_df_model_logs(self, model_version_id, questionnaire_type, questionnaire_name):
"""Retrieve evaluation results for a specific model version from stored logs."""
csv_path_to_read = f"./model_logs/{questionnaire_type}/{questionnaire_name}.csv"
questionnaire_df = pd.read_csv(csv_path_to_read, encoding="utf-8-sig")
model_filtered_df = questionnaire_df[questionnaire_df["model_version_id"] == model_version_id]
return model_filtered_df
def check_model_exists_in_logs(self, model_version_id, questionnaire_name, task_type):
"""Check if evaluation results for a model already exist in logs."""
try:
csv_path_to_read = f"./model_logs/{task_type}/{questionnaire_name}.csv"
questionnaire_df = pd.read_csv(csv_path_to_read, encoding="utf-8-sig")
model_filtered_df = questionnaire_df[questionnaire_df["model_version_id"] == model_version_id]
if len(model_filtered_df) > 0:
# Calculate average mean score for positive_only filter
positive_only_df = model_filtered_df[model_filtered_df['filter'] == 'positive_only']
if len(positive_only_df) > 0:
avg_mean_score = positive_only_df['mean_score'].mean()
return True, avg_mean_score
return False, None
except FileNotFoundError:
return False, None
except Exception as e:
return False, None
def get_all_questionnaire_results(self, model_version_id):
"""
Get all questionnaire results for a model version.
Returns a list of dicts with task_type, questionnaire_name, mean_score, and z_score.
For BIG5, ONLY includes separate entries for each factor (not the overall BIG5 score).
Z-score is calculated relative to all models for that specific questionnaire/factor.
"""
from scipy import stats
results = []
# Iterate through all questionnaire types and names
questionnaire_types = ['QMLM', 'QMNLI']
# BIG5 factors
BIG5_FACTORS = ["Openness to Experience", "Conscientiousness", "Extraversion", "Agreeableness", "Neuroticism"]
for task_type in questionnaire_types:
for questionnaire_name in self.questionnaires_obj.questionnaires[task_type].keys():
exists, avg_score = self.check_model_exists_in_logs(model_version_id, questionnaire_name, task_type)
if exists:
# For BIG5, only add factor entries (skip the overall BIG5 entry)
if questionnaire_name == "BIG5":
csv_path = f"./model_logs/{task_type}/{questionnaire_name}.csv"
try:
df = pd.read_csv(csv_path, encoding="utf-8-sig")
# Filter by positive_only
positive_df = df[df['filter'] == 'positive_only'].copy()
# Calculate mean score and z-score for each factor
for factor in BIG5_FACTORS:
factor_df = positive_df[positive_df['factor'] == factor]
if len(factor_df) > 0:
# Get all models' scores for this factor
all_model_scores = factor_df.groupby('model')['mean_score'].mean()
# Find this model's score
model_ids = positive_df[positive_df['model_version_id'] == model_version_id]['model'].unique()
if len(model_ids) > 0:
model_id = model_ids[0]
if model_id in all_model_scores.index:
factor_mean_score = all_model_scores[model_id]
# Calculate z-score
if len(all_model_scores) > 1:
z_score = stats.zscore(all_model_scores)[all_model_scores.index.get_loc(model_id)]
else:
z_score = 0.0
results.append({
'questionnaire_task': task_type,
'questionnaire_name': f"BIG5 - {factor}",
'mean_score': factor_mean_score,
'z_score': z_score
})
except Exception as e:
print(f"Error processing BIG5 factors: {e}")
else:
# For all other questionnaires, calculate z-score
csv_path = f"./model_logs/{task_type}/{questionnaire_name}.csv"
try:
df = pd.read_csv(csv_path, encoding="utf-8-sig")
positive_df = df[df['filter'] == 'positive_only'].copy()
# Get all models' scores
all_model_scores = positive_df.groupby('model')['mean_score'].mean()
# Find this model's ID
model_ids = positive_df[positive_df['model_version_id'] == model_version_id]['model'].unique()
if len(model_ids) > 0:
model_id = model_ids[0]
if model_id in all_model_scores.index:
# Calculate z-score
if len(all_model_scores) > 1:
z_score = stats.zscore(all_model_scores)[all_model_scores.index.get_loc(model_id)]
else:
z_score = 0.0
results.append({
'questionnaire_task': task_type,
'questionnaire_name': questionnaire_name,
'mean_score': avg_score,
'z_score': z_score
})
except Exception as e:
print(f"Error calculating z-score for {questionnaire_name}: {e}")
# Fallback without z-score
results.append({
'questionnaire_task': task_type,
'questionnaire_name': questionnaire_name,
'mean_score': avg_score,
'z_score': 0.0
})
return results
def calculate_percentile(self, questionnaire_name, task_type, current_score):
"""Calculate the percentage of models that this score is better than.
Args:
questionnaire_name: Name of the questionnaire (e.g., "BIG5" or "BIG5 - Openness to Experience")
task_type: Type of task (QMLM or QMNLI)
current_score: The score to calculate percentile for
Returns:
Percentage of models this score is better than (0-100) or None if cannot be calculated
"""
try:
# Check if this is a factor-specific query (format: "{QUESTIONNAIRE} - {factor}")
factor_name = None
base_questionnaire = questionnaire_name
if questionnaire_name.startswith("BIG5 - "):
factor_name = questionnaire_name[7:] # Extract factor name after "BIG5 - "
base_questionnaire = "BIG5"
elif questionnaire_name.startswith("ASI - "):
factor_name = questionnaire_name[6:] # Extract factor name after "ASI - "
base_questionnaire = "ASI"
# Map display name to CSV column name if needed
factor_name = FACTOR_NAME_MAPPING.get(factor_name, factor_name)
csv_path = f"./model_logs/{task_type}/{base_questionnaire}.csv"
df = pd.read_csv(csv_path, encoding="utf-8-sig")
# Filter for positive_only
positive_df = df[df['filter'] == 'positive_only'].copy()
# If this is a factor-specific query, filter by factor column
if factor_name:
positive_df = positive_df[positive_df['factor'] == factor_name]
# Group by model to get average scores
model_scores = positive_df.groupby('model')['mean_score'].mean()
if len(model_scores) == 0:
return None
# Calculate percentage of models that have LESS of this construct than current model
# Since we now express all as "more [construct] than X%", we count models with lower scores
#
# Examples:
# - "more anxious than 90%" means 90% of models have lower anxiety scores
# - "more compassionate than 80%" means 80% of models have lower compassion scores
#
# For questionnaires where lower is better (GAD7, PHQ9, ASI):
# - Count models with LOWER scores (they have less of the negative trait)
# For questionnaires where higher is better (SOC, CS, BIG5 traits):
# - Count models with LOWER scores (they have less of the positive trait)
# For BIG5 factors, use the factor name to determine direction
lookup_name = factor_name if factor_name else questionnaire_name
is_zero_best = lookup_name in ZERO_IS_BEST
lower_is_better = LOWER_IS_BETTER.get(lookup_name, False)
if is_zero_best:
# For zero-is-best: count models with larger absolute distance from 0
current_abs = abs(current_score)
better_than_percentage = (model_scores.abs() > current_abs).sum() / len(model_scores) * 100
elif lower_is_better:
# For "negative" constructs (anxiety, depression, sexism):
# Count models with LOWER scores (this model has MORE of the negative trait)
better_than_percentage = (model_scores < current_score).sum() / len(model_scores) * 100
else:
# For "positive" constructs (coherence, compassion, openness):
# Count models with LOWER scores (this model has MORE of the positive trait)
better_than_percentage = (model_scores < current_score).sum() / len(model_scores) * 100
return better_than_percentage
except Exception as e:
print(f"Error calculating percentile: {e}")
return None
def generate_big5_factor_acronym(self, factor_name):
"""Generate acronym for BIG5 factor names only.
Rules (ONLY for BIG5 factors):
- If 1 word: first letter only
- If 2+ words: first letter of first 2 significant words (skip stop words like 'to', 'of', etc.)
Examples:
- "Openness to Experience" -> "OE"
- "Conscientiousness" -> "C"
- "Extraversion" -> "E"
- "Agreeableness" -> "A"
- "Neuroticism" -> "N"
"""
# Stop words to skip
stop_words = {'to', 'of', 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'for'}
# Split into words and filter out stop words
words = factor_name.split()
significant_words = [w for w in words if w.lower() not in stop_words]
if len(significant_words) == 0:
return factor_name # Fallback
elif len(significant_words) == 1:
# Single word: just first letter
return significant_words[0][0].upper()
else:
# Two or more words: first letter of first 2 words
return (significant_words[0][0] + significant_words[1][0]).upper()
def format_results_as_yaml(self, model_id, results_list):
"""Format results as YAML string matching the specified structure
Args:
model_id: The model identifier
results_list: List of dicts with keys: questionnaire_task, questionnaire_name, mean_score, z_score
Returns:
YAML formatted string
"""
# Mapping for dataset names with construct in parentheses
DATASET_DISPLAY_NAMES = {
"GAD7": "GAD7 (Anxiety)",
"PHQ9": "PHQ9 (Depression)",
"SOC": "SOC (Coherence)",
"CS": "CS (Compassion)",
"ASI": "ASI (Sexism)"
}
# Mapping for BIG5 factor display names
BIG5_FACTOR_NAMES = {
"Openness to Experience": "Openness",
"Conscientiousness": "Conscientiousness",
"Extraversion": "Extraversion",
"Agreeableness": "Agreeableness",
"Neuroticism": "Neuroticism"
}
lines = []
lines.append("---")
lines.append("model-index:")
lines.append(f" - name: {model_id}")
lines.append(" results:")
# Build one 'results' entry per questionnaire
for result in results_list:
questionnaire_name = result['questionnaire_name']
task_type = result['questionnaire_task']
mean_score = result['mean_score']
z_score = result.get('z_score', 0.0) # Get z-score, default to 0 if not present
# Calculate ranking information
better_than_pct = self.calculate_percentile(questionnaire_name, task_type, mean_score)
# Extract the construct name for lookup
if " - " in questionnaire_name:
construct = questionnaire_name.split(" - ", 1)[1]
base_questionnaire = questionnaire_name.split(" - ", 1)[0]
else:
construct = questionnaire_name
base_questionnaire = questionnaire_name
# Determine if construct is neutral, lower-is-better, or higher-is-better
is_neutral = construct in ZERO_IS_BEST
is_lower_better = LOWER_IS_BETTER.get(construct, False)
# Get comparison text for the value display
comparison_text = COMPARISON_TEXT.get(construct, "better than")
# Calculate total models and format value
if better_than_pct is not None:
try:
# Read CSV to get total model count
if base_questionnaire.startswith("BIG5"):
csv_path = f"./model_logs/{task_type}/BIG5.csv"
df = pd.read_csv(csv_path, encoding="utf-8-sig")
positive_df = df[df['filter'] == 'positive_only'].copy()
positive_df = positive_df[positive_df['factor'] == construct]
else:
csv_path = f"./model_logs/{task_type}/{base_questionnaire}.csv"
df = pd.read_csv(csv_path, encoding="utf-8-sig")
positive_df = df[df['filter'] == 'positive_only'].copy()
total_models = positive_df['model'].nunique()
# Format value based on construct type
# Format: z_score (Rank: rank/total) or z_score (percentile)
# Calculate rank by directly counting models (matching profile card logic)
if is_neutral:
# For neutral constructs: show absolute z-score as deviation from neutral
# Don't show ranking/percentile as it's confusing for neutral constructs
abs_z_score = abs(z_score)
value_display = f"{abs_z_score:.3f} (deviation from neutral)"
elif is_lower_better:
# For negative constructs: rank from left (best/lowest) to right (worst/highest)
# Use direct counting to match profile card: count models with score <= current_score
model_scores = positive_df.groupby('model')['mean_score'].mean()
sorted_scores = sorted(model_scores.values)
rank = sum(1 for s in sorted_scores if s <= mean_score)
value_display = f"{z_score:.3f} (Rank: {rank}/{total_models})"
else:
# For positive constructs: rank from best (highest) to worst (lowest)
# Higher scores are better, rank = number of models with HIGHER scores + 1 (rank from top)
model_scores = positive_df.groupby('model')['mean_score'].mean()
sorted_scores = sorted(model_scores.values)
models_with_higher_scores = sum(1 for s in sorted_scores if s > mean_score)
rank = models_with_higher_scores + 1
value_display = f"{z_score:.3f} (Rank: {rank}/{total_models})"
except Exception as e:
value_display = f"{z_score:.3f}"
else:
value_display = f"{z_score:.3f}"
# Determine dataset name and metric type
if questionnaire_name.startswith("BIG5 - "):
factor_part = questionnaire_name[7:] # Extract part after "BIG5 - "
# Get display name (e.g., "Openness" instead of "Openness to Experience")
display_name = BIG5_FACTOR_NAMES.get(factor_part, factor_part)
dataset_name = f"BIG5 ({display_name})"
metric_name = "Z-Score"
metric_type = f"{questionnaire_name} Z-Score"
else:
dataset_name = DATASET_DISPLAY_NAMES.get(questionnaire_name, questionnaire_name)
metric_name = "Z-Score"
metric_type = f"{questionnaire_name} Z-Score"
lines.append(" - task:")
lines.append(f" type: {task_type}")
lines.append(" name: Psychometrics Assessments")
lines.append(" dataset:")
lines.append(f" name: {dataset_name}")
lines.append(" type: Qpsychometric")
lines.append(" metrics:")
lines.append(f" - name: {metric_name}")
lines.append(f" type: {metric_type}")
lines.append(f" value: '{value_display}'")
lines.append(" source:")
lines.append(" name: Qpsychometric Space")
lines.append(" url: https://huggingface.co/spaces/cnai-lab/Qpsychometric")
lines.append("---")
return "\n".join(lines)
def _info(self):
"""
Defines the metadata and configuration for the evaluation module.
Returns:
EvaluationModuleInfo: An object containing the description, citation, input descriptions,
and the features of the evaluation module.
"""
return EvaluationModuleInfo(
description="This metric evaluates a model's bias towards the questionnaire domain, computing an average mean score across all its questions.",
citation="""@article{reuben2024assessment,
title={Assessment and manipulation of latent constructs in pre-trained language models using psychometric scales},
author={Reuben, Maor and Slobodin, Ortal and Elyshar, Aviad and Cohen, Idan-Chaim and Braun-Lewensohn, Orna and Cohen, Odeya and Puzis, Rami},
journal={arXiv preprint arXiv:2409.19655},
year={2024}
}""",
inputs_description="This metric expects a model identifier, a questionnaire name, and a questionnaire type. The output is an average mean score indicating the model's bias.",
features={
"model_id": Value("string"),
"questionnaire_name": Value("string"),
"task_type": Value("string"),
}
)
def compute(self, **kwargs):
"""
Evaluate a model on a questionnaire using the specified task type.
Requires model_id, questionnaire_name, and task_type as arguments.
"""
all_kwargs = {**kwargs}
missing_inputs = [k for k in self._feature_names() if k not in all_kwargs]
if missing_inputs:
raise ValueError(
f"Evaluation module inputs are missing: {missing_inputs}. All required inputs are {list(self._feature_names())}"
)
# Pass all kwargs to _compute, not just the ones in _feature_names
# This allows optional parameters like filter_by_accuracy to be passed through
output = self._compute(**all_kwargs)
return output
def _compute(self, **kwargs):
"""
Internal computation method that handles model evaluation logic.
Checks cache, runs evaluation if needed, and returns results with YAML output.
"""
model_id = kwargs.get("model_id")
full_questionnaire_name = kwargs.get("questionnaire_name")
task_type = kwargs.get("task_type")
start = full_questionnaire_name.find('(') + 1
end = full_questionnaire_name.find(')')
questionnaire_name = full_questionnaire_name[start:end]
commits = list_repo_commits(repo_id=model_id, repo_type="model")[0]
last_commit_hash = commits.commit_id
model_version_id = model_id+"_"+last_commit_hash
model_info = hf_api.model_info(model_id)
model_info.last_commit_hash = last_commit_hash
model_info.last_commit_date = commits.created_at.strftime("%d-%m-%Y %H:%M:%S") + " UTC"
model_info.model_version_id = model_version_id
# Check if model results already exist in model_logs
exists, cached_mean_score = self.check_model_exists_in_logs(model_version_id, questionnaire_name, task_type)
# Calculate metrics for all factors
csv_path_to_read = f"./model_logs/{task_type}/{questionnaire_name}.csv"
questionnaire = self.questionnaires_obj.questionnaires[task_type][questionnaire_name]
if exists:
# Get all questionnaire results for YAML string
all_results = self.get_all_questionnaire_results(model_version_id)
correlations = questionnaire.calc_correlations(csv_path_to_read)
total_alpha, factor_alphas = questionnaire.calc_alpha(csv_path_to_read)
mean, std, avg_negative_per_model = questionnaire.calc_silhouette(csv_path_to_read)
# Pack values back into tuples for compatibility
alpha = (total_alpha, factor_alphas)
silhouette = (mean, std, avg_negative_per_model)
# Format results as YAML string
string_result = self.format_results_as_yaml(model_id, all_results)
return {"avg_mean_score": cached_mean_score, "correlations": correlations, "alpha": alpha, "silhouette": silhouette}, string_result
# If not found, evaluate the model
try:
avg_mean_score = Qpsychometric.evaluate_model_on_questionnaire(model_info, questionnaire_name, task_type, self.questionnaires_obj, self.mongo_handler)
# Calculate metrics after evaluation
correlations = questionnaire.calc_correlations(csv_path_to_read)
total_alpha, factor_alphas = questionnaire.calc_alpha(csv_path_to_read)
mean, std, avg_negative_per_model = questionnaire.calc_silhouette(csv_path_to_read)
# Pack values back into tuples for compatibility
alpha = (total_alpha, factor_alphas)
silhouette = (mean, std, avg_negative_per_model)
# Get all questionnaire results for YAML string
all_results = self.get_all_questionnaire_results(model_version_id)
# Format results as YAML string
string_result = self.format_results_as_yaml(model_id, all_results)
# Add metrics to result
avg_mean_score["correlations"] = correlations
avg_mean_score["alpha"] = alpha
avg_mean_score["silhouette"] = silhouette
return avg_mean_score, string_result
except Exception as e:
# Log the error to models_errors.csv and MongoDB
error_message = str(e)
Qpsychometric.log_model_error(model_version_id, error_message, self.mongo_handler)
# Re-raise the exception so it can be handled by the UI layer
raise
@staticmethod
def evaluate_model_on_questionnaire(model_info, questionnaire_name, task_type, questionnaires_obj, mongo_handler, get_report=False):
"""
Run evaluation on a model using the specified questionnaire and task type.
Validates compatibility and loads the appropriate pipeline for evaluation.
"""
only_qmnli = ["CS"]
only_qmlm = []
if (questionnaire_name in only_qmnli and task_type == MLM_TYPE) or (questionnaire_name in only_qmlm and task_type == NLI_TYPE):
raise ValueError(f"The task {task_type} is not available for {questionnaire_name}.")
model_id = model_info.id
# Load the model pipeline safely
pipe, pipe_type = Utils.load_pipeline_safely(model_id=model_id)
pipe.model_identifier = model_id
# Check if the loaded model's task type matches the expected questionnaire type
if pipe_type != task_type:
raise ValueError(f"Invalid model: {model_id} for task type. Model task ({pipe_type}) must match the task type ({task_type}).")
# Log model metadata (only once per model_version_id)
Qpsychometric.log_model_meta_data(model_info, pipe, mongo_handler)
# Retrieve the questionnaire object based on the extracted name and type
#questionnaire = all_psychometrics[questionnaire_name][task_type]
# Compute and return the mean score using the loaded pipeline and questionnaire
results = Qpsychometric._compute_mean_score(model_info, pipe, questionnaire_name, task_type, questionnaires_obj, mongo_handler)
return results
@staticmethod
def _compute_mean_score(model_info, pipe, questionnaire_name, task_type, questionnaires_obj, mongo_handler):
"""Execute the evaluation pipeline and calculate the average mean score."""
avg_mean_score = Utils.execute_pipeline_save_results(model_info, pipe, questionnaire_name, task_type, questionnaires_obj, mongo_handler)
return {"avg_mean_score":avg_mean_score}