midicaps-logs / eval_runner.py
dvilasuero's picture
Upload eval_runner.py with huggingface_hub
316475b verified
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "inspect-ai",
# "datasets",
# "openai",
# "transformers",
# "accelerate",
# "huggingface_hub",
# ]
# ///
"""
Wrapper script that runs an arbitrary Inspect eval and uploads logs to a Space.
This script is meant to be run on HF Jobs.
"""
import os
import sys
import tempfile
from pathlib import Path
from huggingface_hub import HfApi
from inspect_ai import eval
from inspect_ai.log import bundle_log_dir
def bundle_and_upload_to_space(log_dir: str, hf_space_id: str, hf_token: str):
"""Bundle logs and upload to HF Space."""
log_path = Path(log_dir)
if not log_path.exists():
raise ValueError(f"Log directory '{log_dir}' does not exist")
with tempfile.TemporaryDirectory() as temp_bundle_dir:
bundle_output_dir = os.path.join(temp_bundle_dir, "bundle")
print(f"Bundling logs from {log_dir}...")
bundle_log_dir(log_dir=log_dir, output_dir=bundle_output_dir, overwrite=True)
print("Bundle created successfully")
api = HfApi(token=hf_token)
# Create Space if it doesn't exist
try:
api.create_repo(
repo_id=hf_space_id,
repo_type="space",
exist_ok=True,
space_sdk="static",
)
print(f"Space {hf_space_id} is ready")
except Exception as e:
print(f"Warning: Could not create/verify Space: {e}")
# Upload all files
print(f"Uploading bundle to Space {hf_space_id}...")
uploaded_count = 0
for root, dirs, files in os.walk(bundle_output_dir):
for file in files:
local_path = os.path.join(root, file)
rel_path = os.path.relpath(local_path, bundle_output_dir)
path_in_repo = rel_path.replace(os.sep, "/")
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=path_in_repo,
repo_id=hf_space_id,
repo_type="space",
)
uploaded_count += 1
print(f"Successfully uploaded {uploaded_count} files")
print(f"View at: https://huggingface.co/spaces/{hf_space_id}")
if __name__ == "__main__":
# Usage: eval_runner.py <eval_script_url> <task_name> <model> <hf_space_id> [log_dir]
if len(sys.argv) < 5:
print("Usage: eval_runner.py <eval_script_url> <task_name> <model> <hf_space_id> [log_dir]")
sys.exit(1)
eval_script_url = sys.argv[1]
task_name = sys.argv[2]
model = sys.argv[3]
hf_space_id = sys.argv[4]
log_dir = sys.argv[5] if len(sys.argv) > 5 else "./logs"
# Download eval script
print(f"Downloading eval script from {eval_script_url}...")
import urllib.request
with urllib.request.urlopen(eval_script_url) as response:
eval_code = response.read().decode('utf-8')
# Write eval code to a temporary file and import it
print("Loading eval...")
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(eval_code)
temp_eval_file = f.name
try:
import importlib.util
spec = importlib.util.spec_from_file_location("user_eval", temp_eval_file)
user_eval_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(user_eval_module)
finally:
# Clean up temp file
os.unlink(temp_eval_file)
# Get the task function
if task_name == "auto":
# Auto-detect: find all functions decorated with @task
tasks = [name for name in dir(user_eval_module)
if not name.startswith('_') and callable(getattr(user_eval_module, name))]
# Filter to likely task functions (heuristic: decorated tasks)
task_candidates = [t for t in tasks if not t.startswith('record_to_')]
if len(task_candidates) == 0:
print(f"Error: No tasks found in {eval_script}")
sys.exit(1)
elif len(task_candidates) > 1:
print(f"Error: Multiple tasks found: {task_candidates}")
print("Please specify --task <task_name>")
sys.exit(1)
task_name = task_candidates[0]
print(f"Auto-detected task: {task_name}")
if not hasattr(user_eval_module, task_name):
print(f"Error: Task '{task_name}' not found in {eval_script}")
available = [name for name in dir(user_eval_module) if not name.startswith('_')]
print(f"Available: {available}")
sys.exit(1)
task_fn = getattr(user_eval_module, task_name)
# Run evaluation
print(f"Running eval: {task_name} with model {model}")
eval(task_fn(), model=model, max_tokens=4096, log_dir=log_dir)
# Upload logs if space specified
if hf_space_id:
print(f"Uploading logs to {hf_space_id}...")
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
print("Warning: HF_TOKEN not set, skipping upload")
else:
bundle_and_upload_to_space(log_dir, hf_space_id, hf_token)
else:
print("No Space ID provided, logs remain in job")