Spaces:
Running
Running
| import json | |
| import os | |
| import time | |
| import io | |
| import shutil | |
| from uuid import uuid4 | |
| import glob | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Any | |
| import torch | |
| import pandas as pd | |
| import lpips | |
| import numpy as np | |
| from huggingface_hub import HfApi, snapshot_download | |
| from loguru import logger | |
| from PIL import Image | |
| from torchvision.transforms.functional import to_tensor | |
| from torchmetrics.image import PeakSignalNoiseRatio, StructuralSimilarityIndexMeasure | |
| from competitions.enums import SubmissionStatus | |
| from competitions.info import CompetitionInfo | |
| from competitions.utils import submission_api, user_token_api | |
| def _psnr_mask(img1, img2, mask): | |
| # Flatten mask | |
| mask_flat = mask.reshape(-1) | |
| img1_flat = img1.reshape(-1) | |
| img2_flat = img2.reshape(-1) | |
| # Non-zero indices | |
| nonzero_indices = torch.nonzero(~mask_flat).squeeze() | |
| # Only keep non-zero pixel | |
| img1_nonzero = torch.index_select(img1_flat, 0, nonzero_indices) | |
| img2_nonzero = torch.index_select(img2_flat, 0, nonzero_indices) | |
| # MSE | |
| mse = ((img1_nonzero - img2_nonzero) ** 2).mean() | |
| # PSNR | |
| psnr_value = 20 * torch.log10(1.0 / torch.sqrt(mse)) | |
| return psnr_value | |
| class JobRunner: | |
| competition_id: str | |
| token: str | |
| output_path: str | |
| def __post_init__(self): | |
| self.competition_info = CompetitionInfo(competition_id=self.competition_id, autotrain_token=self.token) | |
| self.competition_id = self.competition_info.competition_id | |
| self.competition_type = self.competition_info.competition_type | |
| self.metric = self.competition_info.metric | |
| self.submission_id_col = self.competition_info.submission_id_col | |
| self.submission_cols = self.competition_info.submission_cols | |
| self.submission_rows = self.competition_info.submission_rows | |
| self.time_limit = self.competition_info.time_limit | |
| self.dataset = self.competition_info.dataset | |
| self.submission_filenames = self.competition_info.submission_filenames | |
| def _get_all_submissions(self) -> List[Dict[str, Any]]: | |
| submission_jsons = snapshot_download( | |
| repo_id=self.competition_id, | |
| allow_patterns="submission_info/*.json", | |
| token=self.token, | |
| repo_type="dataset", | |
| ) | |
| submission_jsons = glob.glob(os.path.join(submission_jsons, "submission_info/*.json")) | |
| all_submissions = [] | |
| for _json_path in submission_jsons: | |
| with open(_json_path, "r", encoding="utf-8") as f: | |
| _json = json.load(f) | |
| team_id = _json["id"] | |
| for sub in _json["submissions"]: | |
| all_submissions.append( | |
| { | |
| "team_id": team_id, | |
| "submission_id": sub["submission_id"], | |
| "datetime": sub["datetime"], | |
| "status": sub["status"], | |
| "submission_repo": sub["submission_repo"], | |
| "hardware": sub["hardware"], | |
| } | |
| ) | |
| return all_submissions | |
| def _get_pending_subs(self, submissions: List[Dict[str, Any]]) -> pd.DataFrame: | |
| pending_submissions = [] | |
| for sub in submissions: | |
| if sub["status"] == SubmissionStatus.PENDING.value: | |
| pending_submissions.append(sub) | |
| if len(pending_submissions) == 0: | |
| return None | |
| logger.info(f"Found {len(pending_submissions)} pending submissions.") | |
| pending_submissions = pd.DataFrame(pending_submissions) | |
| pending_submissions["datetime"] = pd.to_datetime(pending_submissions["datetime"]) | |
| pending_submissions = pending_submissions.sort_values("datetime") | |
| pending_submissions = pending_submissions.reset_index(drop=True) | |
| return pending_submissions | |
| def _avg_score(self, score_list: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| total = 0 | |
| psnr, ssim, lpips = [], [], [] | |
| for score in score_list: | |
| total += score["weight"] | |
| psnr.append(score['psnr'] * score['weight']) | |
| ssim.append(score['ssim'] * score['weight']) | |
| lpips.append(score['lpips'] * score['weight']) | |
| return {'psnr': sum(psnr)/total, 'ssim': sum(ssim)/total, 'lpips': sum(lpips)/total} | |
| def _calculate_score(self, results: Dict[str, Any]) -> Dict[str, Any]: | |
| new_results = { | |
| key: {**value, "weight": 1 if "loc" not in key else 0.5} | |
| for key, value in results.items() | |
| } | |
| all_scores, level1, level2, level3 = [], [], [], [] | |
| for im_name, scores in new_results.items(): | |
| all_scores.append(scores) | |
| if "level1" in im_name: | |
| level1.append(scores) | |
| if "level2" in im_name: | |
| level2.append(scores) | |
| if "level3" in im_name: | |
| level3.append(scores) | |
| return { | |
| "all": self._avg_score(all_scores), | |
| "level1": self._avg_score(level1), | |
| "level2": self._avg_score(level2), | |
| "level3": self._avg_score(level3), | |
| } | |
| def _process_submission(self, submission: Dict[str, Any]): | |
| api = HfApi(token=self.token) | |
| user_repo = submission["submission_repo"] | |
| team_id = submission["team_id"] | |
| submission_id = submission["submission_id"] | |
| user_token = user_token_api.get(team_id) | |
| client_commits = api.list_repo_commits(user_repo, repo_type="dataset") | |
| client_code_local_dir = f"/tmp/data/client_repo/{uuid4().hex}" | |
| try: | |
| api.snapshot_download( | |
| repo_id=user_repo, | |
| repo_type="dataset", | |
| revision=client_commits[0].commit_id, | |
| token=user_token, | |
| local_dir=client_code_local_dir, | |
| allow_patterns=["*"], | |
| ) | |
| evel_result = self._eval("./test_gt_datas", client_code_local_dir) | |
| finally: | |
| shutil.rmtree(client_code_local_dir, ignore_errors=True) | |
| evel_result_json_string = json.dumps(evel_result, indent=2) | |
| evel_result_json_bytes = evel_result_json_string.encode("utf-8") | |
| evel_result_json_buffer = io.BytesIO(evel_result_json_bytes) | |
| api.upload_file( | |
| path_or_fileobj=evel_result_json_buffer, | |
| path_in_repo=f"eval_results/{submission_id}.json", | |
| repo_id=self.competition_id, | |
| repo_type="dataset", | |
| ) | |
| final_score = self._calculate_score(evel_result) | |
| score = { | |
| "score": final_score["all"]["psnr"] / 100 * 0.4 + final_score["all"]["ssim"] * 0.3 + (1 - final_score["all"]["lpips"]) * 0.3, | |
| "psnr": final_score["all"]["psnr"], | |
| "ssim": final_score["all"]["ssim"], | |
| "lpips": final_score["all"]["lpips"], | |
| } | |
| for key in score.keys(): | |
| score[key] = np.round(score[key], 3) | |
| submission_api.update_submission_data( | |
| team_id=team_id, | |
| submission_id=submission_id, | |
| data={ | |
| "status": SubmissionStatus.SUCCESS.value, | |
| "final_score": final_score, | |
| "score": score, | |
| } | |
| ) | |
| def _eval(self, gt_folder_path: str, test_folder_path: str) -> Dict[str, Any]: | |
| # list all files | |
| files1 = sorted(glob.glob(os.path.join(gt_folder_path, '*/*/images', "*"))) | |
| files2 = sorted(glob.glob(os.path.join(test_folder_path, '*/*/images', "*"))) | |
| # filter by extensions | |
| image_extensions = ('.png', '.jpg', '.jpeg') | |
| images1 = [os.path.relpath(f, gt_folder_path) for f in files1 if f.lower().endswith(image_extensions)] | |
| images2 = [os.path.relpath(f, test_folder_path) for f in files2 if f.lower().endswith(image_extensions)] | |
| # format check | |
| if set(images1) != set(images2): | |
| raise ValueError("Submission Format Error") | |
| # metrics | |
| ssim_metric = StructuralSimilarityIndexMeasure(data_range=1.0).to("cuda" if torch.cuda.is_available() else "cpu") | |
| lpips_metric = lpips.LPIPS(net='alex').to("cuda" if torch.cuda.is_available() else "cpu") | |
| results = {} | |
| for img_name in images1: | |
| path1 = os.path.join(gt_folder_path, img_name) | |
| path2 = os.path.join(test_folder_path, img_name) | |
| try: | |
| # load images | |
| img1 = Image.open(path1).convert("RGB") | |
| img2 = Image.open(path2).convert("RGB") | |
| if os.path.exists(path1.replace('images', 'masks')): | |
| dynamic_mask = Image.open(path1.replace('images', 'masks')) | |
| else: | |
| dynamic_mask = Image.open(path1.replace('images', 'masks').replace('.jpg', '.png')) | |
| # to tensor | |
| tensor1 = to_tensor(img1).unsqueeze(0) | |
| tensor2 = to_tensor(img2).unsqueeze(0) | |
| dynamic_mask = to_tensor(dynamic_mask).unsqueeze(0).bool() | |
| dynamic_mask = dynamic_mask.expand(-1, 3, -1, -1) | |
| tensor1[dynamic_mask] *= 0 | |
| tensor2[dynamic_mask] *= 0 | |
| # move to devices | |
| tensor1 = tensor1.to("cuda" if torch.cuda.is_available() else "cpu") | |
| tensor2 = tensor2.to("cuda" if torch.cuda.is_available() else "cpu") | |
| # metrics | |
| psnr_val = _psnr_mask(tensor1, tensor2, dynamic_mask).item() | |
| ssim_val = ssim_metric(tensor1, tensor2).item() | |
| lpips_val = lpips_metric(tensor1 * 2 - 1, tensor2 * 2 - 1).item() | |
| results[img_name] = { | |
| "psnr": psnr_val, | |
| "ssim": ssim_val, | |
| "lpips": lpips_val | |
| } | |
| except Exception: | |
| raise RuntimeError | |
| return results | |
| def run(self): | |
| while True: | |
| time.sleep(5) | |
| all_submissions = self._get_all_submissions() | |
| pending_submissions = self._get_pending_subs(all_submissions) | |
| if pending_submissions is None: | |
| continue | |
| first_pending_sub = pending_submissions.iloc[0] | |
| submission_api.update_submission_status( | |
| team_id=first_pending_sub['team_id'], | |
| submission_id=first_pending_sub['submission_id'], | |
| status=SubmissionStatus.PROCESSING.value, | |
| ) | |
| try: | |
| self._process_submission(first_pending_sub) | |
| except Exception as e: | |
| logger.error( | |
| f"Failed to process {first_pending_sub['submission_id']}: {e}" | |
| ) | |
| submission_api.update_submission_data( | |
| team_id=first_pending_sub['team_id'], | |
| submission_id=first_pending_sub['submission_id'], | |
| data={ | |
| "status": SubmissionStatus.FAILED.value, | |
| "error_message": str(e) | |
| } | |
| ) | |
| raise e | |
| continue | |