| import json |
| import logging |
| import os |
| import re |
| import time |
| from tempfile import TemporaryDirectory |
| from typing import List, Optional |
|
|
| import jsonlines |
| from huggingface_hub import CommitOperationAdd |
| from huggingface_hub import Discussion, HfApi, HfFileSystem |
| from tqdm import tqdm |
|
|
| from .evaluation import METRICS |
| from .formatting import styled_error, styled_message, styled_warning |
| from .tasks_content import TASKS_PRETTY_REVERSE |
| from .utils import MD_LINK_PATTERN |
|
|
|
|
| class AlreadyExists(Exception): |
| pass |
|
|
|
|
| class SubmissionUploader: |
| """Class for adding new files to a dataset on a Hub and opening a PR. |
| |
| Heavily influenced by these amazing spaces: |
| * https://huggingface.co/spaces/safetensors/convert |
| * https://huggingface.co/spaces/gaia-benchmark/leaderboard |
| * https://huggingface.co/spaces/HuggingFaceH4/open_llm_leaderboard |
| """ |
|
|
| def __init__(self, dataset_id: str, private_dataset_id: str): |
| self._api = HfApi(token=os.environ["HF_TOKEN"]) |
| self._fs = HfFileSystem(token=os.environ["HF_TOKEN"]) |
| self._results_dataset_id = dataset_id |
| self._requests_dataset_id = private_dataset_id |
|
|
| def _get_previous_pr(self, pr_title: str) -> Optional[Discussion]: |
| """Searches among discussions of the results dataset for a PR with the given title.""" |
| try: |
| discussions = self._api.get_repo_discussions(repo_id=self._results_dataset_id, repo_type="dataset") |
| except Exception: |
| return None |
| for discussion in discussions: |
| if discussion.status == "open" and discussion.is_pull_request and discussion.title == pr_title: |
| return discussion |
| return None |
|
|
| def _upload_request( |
| self, |
| task_id: str, |
| model_folder: str, |
| model_name_pretty: str, |
| model_availability: str, |
| model_url: Optional[str], |
| urls: Optional[str], |
| context_size: str, |
| submitted_by: str, |
| contact_information: str, |
| comment: Optional[str], |
| pr_url: str, |
| temp_directory: str, |
| ) -> List[CommitOperationAdd]: |
| """Adds a file with metadata about the current request to the requests dataset.""" |
| request_metadata = { |
| "model_folder": model_folder, |
| "model_name_pretty": model_name_pretty, |
| "model_availability": model_availability, |
| "model_url": model_url, |
| "urls": urls, |
| "context_size": context_size, |
| "submitted_by": submitted_by, |
| "contact_information": contact_information, |
| "comment": comment, |
| "timestamp": time.time(), |
| "pr_url": pr_url, |
| } |
|
|
| with open(os.path.join(temp_directory, "request_metadata.json"), "w") as f: |
| json.dump(request_metadata, f) |
|
|
| num_requests_already_present = ( |
| len(self._fs.ls(f"datasets/{self._requests_dataset_id}/{task_id}/")) |
| if self._fs.isdir(f"datasets/{self._requests_dataset_id}/{task_id}/") |
| else 0 |
| ) |
| commit_operations = [ |
| CommitOperationAdd( |
| path_in_repo=f"{task_id}/{num_requests_already_present}_{model_folder}.json", |
| path_or_fileobj=os.path.join(temp_directory, "request_metadata.json"), |
| ) |
| ] |
| return commit_operations |
|
|
| def _upload_predictions( |
| self, |
| task_id: str, |
| model_folder: str, |
| filenames: List[str], |
| ) -> List[CommitOperationAdd]: |
| """Adds all files with current model's predictions to the results dataset.""" |
| commit_operations = [ |
| CommitOperationAdd( |
| path_in_repo=f"{task_id}/predictions/{model_folder}/{os.path.basename(filename)}", |
| path_or_fileobj=filename, |
| ) |
| for filename in filenames |
| ] |
| return commit_operations |
|
|
| def _compute_metrics_for_predictions(self, task_id: str, filenames: List[str], temp_directory: str) -> None: |
| """Computes metrics for each submitted file with the current model's predictions.""" |
| metrics_module = METRICS[task_id] |
| assert metrics_module is not None, f"Computing metrics for {task_id} is not supported." |
| metrics_module.reset() |
| open(os.path.join(temp_directory, "metrics.jsonl"), "w").close() |
|
|
| |
| for filename in filenames: |
| with jsonlines.open(filename, "r") as reader: |
| for example in tqdm(reader, desc=f"Computing metrics for {os.path.basename(filename)}"): |
| metrics_module.add_batch( |
| predictions=[example["prediction"]], |
| references=[example["reference"]], |
| ) |
| computed_metrics = metrics_module.compute() |
| metrics_module.reset() |
| with jsonlines.open(os.path.join(temp_directory, "metrics.jsonl"), "a") as writer: |
| writer.write(computed_metrics) |
|
|
| |
| with jsonlines.open(os.path.join(temp_directory, "metrics.jsonl"), "r") as reader: |
| metrics_results = [line for line in reader] |
| final_metrics_results = { |
| key: sum(entry[key] for entry in metrics_results) / len(metrics_results) for key in metrics_results[0] |
| } |
| with open(os.path.join(temp_directory, "final_metrics.json"), "w") as f: |
| json.dump(final_metrics_results, f) |
|
|
| def _upload_results( |
| self, |
| task_id: str, |
| model_folder: str, |
| model_name_pretty: str, |
| model_availability: str, |
| model_url: Optional[str], |
| urls: Optional[str], |
| context_size: str, |
| submitted_by: str, |
| temp_directory: str, |
| ) -> List[CommitOperationAdd]: |
| """Adds files with the current model's metrics values to the results dataset.""" |
| final_results = {} |
| with open(os.path.join(temp_directory, "final_metrics.json"), "r") as f: |
| metrics = json.load(f) |
| final_results.update(metrics) |
| final_results.update( |
| { |
| "model_name": model_name_pretty, |
| "model_availability": model_availability, |
| "model_url": model_url, |
| "urls": urls, |
| "context_size": context_size, |
| "submitted_by": submitted_by, |
| } |
| ) |
|
|
| with jsonlines.open(os.path.join(temp_directory, "final_results.jsonl"), "w") as writer: |
| writer.write(final_results) |
|
|
| return [ |
| CommitOperationAdd( |
| path_in_repo=f"{task_id}/results/{model_folder}.jsonl", |
| path_or_fileobj=os.path.join(temp_directory, "final_results.jsonl"), |
| ) |
| ] |
|
|
| def _verify_arguments( |
| self, |
| task_pretty: str, |
| model_folder: str, |
| model_name_pretty: str, |
| model_availability: str, |
| model_url: Optional[str], |
| urls: Optional[str], |
| context_size: str, |
| submitted_by: str, |
| contact_information: str, |
| comment: Optional[str], |
| filenames: Optional[List[str]], |
| ): |
| """Verifies that all necessary arguments are not None (and also runs other sanity checks).""" |
| assert task_pretty and task_pretty in TASKS_PRETTY_REVERSE, "Please, select one of the supported tasks." |
| assert model_folder, "Please, specify non-empty name for a directory with a model's results." |
| assert model_name_pretty, "Please, specify non-empty name for a model." |
| assert model_availability, "Please, specify non-empty information about a model's availability." |
| assert context_size, "Please, specify non-empty information about a model's context size." |
| try: |
| _ = int(context_size) |
| except: |
| raise ValueError("Please, specify a model's context size as an integer (e.g., 16000).") |
|
|
| if urls is not None and "," in urls: |
| urls_list = urls.split(",") |
| assert all( |
| re.match(rf"^{MD_LINK_PATTERN}$", url.strip()) for url in urls_list |
| ), 'Please, use the following format for URLs: "[text1](link1), [text2](link2)"' |
|
|
| assert submitted_by, "Please, specify non-empty information about a submission's author(s)." |
| assert filenames, "Please, attach at least one file with predictions." |
| assert contact_information, "Please, fill in the field with contact information." |
|
|
| def upload_files( |
| self, |
| task_pretty: str, |
| model_folder: str, |
| model_name_pretty: str, |
| model_availability: str, |
| model_url: Optional[str], |
| urls: Optional[str], |
| context_size: str, |
| submitted_by: str, |
| contact_information: str, |
| comment: Optional[str], |
| filenames: Optional[List[str]], |
| force: bool = False, |
| ) -> str: |
| try: |
| self._verify_arguments( |
| task_pretty=task_pretty, |
| model_folder=model_folder, |
| model_name_pretty=model_name_pretty, |
| model_availability=model_availability, |
| model_url=model_url, |
| urls=urls, |
| context_size=context_size, |
| submitted_by=submitted_by, |
| contact_information=contact_information, |
| comment=comment, |
| filenames=filenames, |
| ) |
| pr_title = f"π New submission to {task_pretty} task: {model_name_pretty} with {context_size} context size from {submitted_by}" |
|
|
| logging.info(f"Start processing {pr_title}") |
|
|
| task_id = TASKS_PRETTY_REVERSE[task_pretty] |
|
|
| logging.info("Checking if this request has already been submitted...") |
| if not force: |
| if self._fs.isdir(f"datasets/{self._results_dataset_id}/{task_id}/predictions/{model_folder}"): |
| return styled_warning( |
| f"{model_folder} is already present in {self._results_dataset_id}, please, select another folder name." |
| ) |
|
|
| prev_pr = self._get_previous_pr(pr_title) |
| if prev_pr is not None: |
| url = f"https://huggingface.co/datasets/{self._results_dataset_id}/discussions/{prev_pr.num}" |
| return styled_warning( |
| f"{self._results_dataset_id} already has an open PR for this submission: {url}." |
| ) |
|
|
| logging.info("Processing predictions...") |
| predictions_commit_operations = self._upload_predictions( |
| task_id=task_id, |
| model_folder=model_folder, |
| filenames=filenames, |
| ) |
|
|
| with TemporaryDirectory() as d: |
| logging.info("Computing metrics...") |
| self._compute_metrics_for_predictions(task_id=task_id, filenames=filenames, temp_directory=str(d)) |
|
|
| logging.info("Processing results...") |
| results_commit_operations = self._upload_results( |
| task_id=task_id, |
| model_folder=model_folder, |
| model_name_pretty=model_name_pretty, |
| model_availability=model_availability, |
| model_url=model_url, |
| urls=urls, |
| context_size=context_size, |
| submitted_by=submitted_by, |
| temp_directory=str(d), |
| ) |
|
|
| logging.info("Creating commit to the results dataset...") |
| new_pr = self._api.create_commit( |
| repo_id=self._results_dataset_id, |
| operations=predictions_commit_operations + results_commit_operations, |
| commit_message=pr_title, |
| commit_description=f"""New submission to {task_pretty} task in ποΈ Long Code Arena benchmark!\n* Model name: {model_name_pretty}\n* Model availability: {model_availability}\n* Context Size: {context_size}\n* Relevant URLs: {urls}\n* Submitted By: {submitted_by}""", |
| create_pr=True, |
| repo_type="dataset", |
| ) |
|
|
| logging.info("Creating commit to the requests dataset...") |
| request_commit_operations = self._upload_request( |
| task_id=task_id, |
| model_folder=model_folder, |
| temp_directory=str(d), |
| model_name_pretty=model_name_pretty, |
| model_availability=model_availability, |
| model_url=model_url, |
| urls=urls, |
| context_size=context_size, |
| submitted_by=submitted_by, |
| contact_information=contact_information, |
| comment=comment, |
| pr_url=new_pr.pr_url, |
| ) |
| self._api.create_commit( |
| repo_id=self._requests_dataset_id, |
| operations=request_commit_operations, |
| commit_message=pr_title, |
| commit_description=f"""New submission to {task_pretty} task in ποΈ Long Code Arena benchmark!\n* Model name: {model_name_pretty}\n* Model availability: {model_availability}\n* Context Size: {context_size}\n* Relevant URLs: {urls}\n* Submitted By: {submitted_by}\n* PR: {new_pr.pr_url}\n* Contact information: {contact_information}\n* Comment: {comment}""", |
| create_pr=True, |
| repo_type="dataset", |
| ) |
|
|
| return styled_message(f"π PR created at {new_pr.pr_url}.") |
|
|
| except Exception as e: |
| exception_msg = str(e) |
| if exception_msg and os.environ["PRIVATE_DATASET_ID"] in exception_msg: |
| exception_msg = exception_msg.replace(os.environ["PRIVATE_DATASET_ID"], "{private_dataset}") |
| if exception_msg: |
| return styled_error(exception_msg) |
| return styled_error("An exception occurred. Please, try again.") |
|
|