CAIA-evaluate / app.py
Zhejian
bugfix
e23f952
import asyncio
import datetime
import threading
import time
from huggingface_hub import HfApi, list_repo_files
from env import (
REPO_ID, TOKEN, SUBMISSION_DATASET,
INTERNAL_DATASET, BENCHMARK_INTERNAL_EVALUATE_DATASET_FILE, EVALUATE_RESULT_DATASET,
llm_config, EVALUATE_RESULT_DATASET_FILE
)
from loguru import logger
from schemas import AgentOutputItem, EnsembleEvaluateScore
from score import init_evaluators, score_in_threadpool
from datasets import load_dataset, VerificationMode, Dataset, concatenate_datasets
from utils import parse_eval_dataset
API = HfApi(token=TOKEN)
benchmark_internal_evaluate_dataset = load_dataset(INTERNAL_DATASET, data_files=BENCHMARK_INTERNAL_EVALUATE_DATASET_FILE, token=TOKEN, verification_mode=VerificationMode.NO_CHECKS, download_mode="force_redownload",trust_remote_code=True)
eval_results = load_dataset(EVALUATE_RESULT_DATASET, data_files=EVALUATE_RESULT_DATASET_FILE, token=TOKEN, verification_mode=VerificationMode.NO_CHECKS, download_mode="force_redownload",trust_remote_code=True)
benchmark_dataset = parse_eval_dataset(benchmark_internal_evaluate_dataset) # type: ignore
evaluator_list = init_evaluators(benchmark_dataset, llm_config)
def get_hf_dataset_files(dataset_name):
return set(list_repo_files(dataset_name, repo_type="dataset"))
def format_score_result(score_results: list[EnsembleEvaluateScore]) -> tuple[float, float, float, float]:
if len(score_results) == 0:
return 0.0, 0.0, 0.0, 0.0
l1,l2,l3 = [],[],[]
for result in score_results:
if result.level == 1:
l1.append(
result.total_score
)
elif result.level== 2:
l2.append(result.total_score)
elif result.level == 3:
l3.append(result.total_score)
l1_total_score = round(sum(l1) / len(l1),2) if len(l1) > 0 else 0
l2_total_score = round(sum(l2) / len(l2),2) if len(l2) > 0 else 0
l3_total_score = round(sum(l3) / len(l3),2) if len(l3) > 0 else 0
total_score = round((sum(l1) + sum(l2) + sum(l3)) / (len(l1) + len(l2) + len(l3)), 2)
return total_score, l1_total_score, l2_total_score, l3_total_score
def on_new_files(new_files):
logger.info(f"New Files Found {new_files}")
for file in new_files:
file_name = file.split('/')[-1]
names = file_name.split('<')
model, organization = names[0].split('>')[0], names[1].split('>')[0]
json_data = read_json_file(file)
if not json_data:
continue
agent_outputs = [AgentOutputItem(**item) for item in json_data]
score_results: list[EnsembleEvaluateScore] = asyncio.run(score_in_threadpool(
evaluator_list=evaluator_list,
agent_output_list=agent_outputs,
benchmark_data=benchmark_dataset
))
total_score, l1_total_score, l2_total_score, l3_total_score = format_score_result(score_results)
#save to public result
# add to eval_results
new_eval_result = {
"model": model,
"model_family": "",
"url": "",
"organisation": organization,
"score": total_score,
"score_level1": l1_total_score,
"score_level2": l2_total_score,
"score_level3": l3_total_score,
"date": datetime.datetime.now().strftime("%Y-%m-%d")
}
print(new_eval_result)
origin_eval_results = eval_results['train']
eval_results_list = list(origin_eval_results)
print(eval_results_list)
eval_results_list.append(new_eval_result)
# eval_results = Dataset.from_list(eval_results_list, features=eval_results.features)
# eval_results.push_to_hub(EVALUATE_RESULT_DATASET, token=TOKEN, commit_message=f"add {model} from {organization} evaluate result score {total_score}")
update_eval_results_json(eval_results_list)
def update_eval_results_json(eval_results_list):
"""
更新评测结果的json文件,并推送到Hub
"""
import tempfile
import json
import os
# 先将eval_results_list写入临时json文件
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") as temp_file:
json.dump(eval_results_list, temp_file, ensure_ascii=False, indent=4, default=str)
temp_file_path = temp_file.name
try:
# 上传到Hub
API.upload_file(
path_or_fileobj=temp_file_path,
path_in_repo=EVALUATE_RESULT_DATASET_FILE, # 你需要定义EVAL_RESULT_JSON_FILE为目标json文件名
repo_id=EVALUATE_RESULT_DATASET,
repo_type='dataset',
token=TOKEN,
commit_message="更新评测结果json"
)
except Exception as e:
print(f"上传评测结果json失败: {e}")
finally:
# 删除临时文件
os.unlink(temp_file_path)
def read_json_file(file_path):
"""
Read JSON file and return its contents
Args:
file_path (str): Path to the JSON file
Returns:
dict/list: Contents of the JSON file
"""
import json
from huggingface_hub import hf_hub_download
try:
# Download file from Hugging Face Hub
local_path = hf_hub_download(
repo_id=SUBMISSION_DATASET,
filename=file_path,
token=TOKEN,
repo_type='dataset'
)
# Read JSON file
with open(local_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Successfully read file: {file_path}")
return data
except Exception as e:
logger.error(f"Error reading file {file_path}: {str(e)}")
return None
def monitor_hf_dataset(dataset_name, interval=60):
last_files = get_hf_dataset_files(dataset_name)
print(last_files)
while True:
time.sleep(interval)
current_files = get_hf_dataset_files(dataset_name)
print(current_files)
new_files = current_files - last_files
if new_files:
on_new_files(new_files)
last_files = current_files
def start_monitoring_delayed(delay_seconds=30):
"""延迟启动监控任务,确保 Space 先完成启动"""
def start_monitor():
logger.info("开始监控 HuggingFace 数据集变化...")
monitor_hf_dataset(SUBMISSION_DATASET, interval=60)
# 使用线程启动监控任务
monitor_thread = threading.Thread(target=start_monitor, daemon=True)
threading.Timer(delay_seconds, monitor_thread.start).start()
logger.info(f"监控任务将在 {delay_seconds} 秒后启动")
if __name__ == "__main__":
start_monitoring_delayed(30)