Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import uuid | |
| from datetime import datetime, timedelta, timezone | |
| from src.envs import API, SUBMISSIONS_PATH, SUBMISSIONS_REPO | |
| from src.evaluation.compute_metrics import compute_metrics | |
| from src.evaluation.load_labels import load_true_labels | |
| from src.submission.validate_csv import validate_csv | |
| from src.teams.auth import validate_token | |
| from src.teams.storage import get_team_by_name, hash_token, update_last_valid_submission | |
| def get_team_best_scores(team_name: str) -> dict | None: | |
| results_file = os.path.join(SUBMISSIONS_PATH, "results", f"{team_name}.json") | |
| if os.path.exists(results_file): | |
| try: | |
| with open(results_file, "r") as f: | |
| return json.load(f) | |
| except Exception: | |
| pass | |
| return None | |
| def save_team_best_scores(team_name: str, scores: dict): | |
| results_dir = os.path.join(SUBMISSIONS_PATH, "results") | |
| os.makedirs(results_dir, exist_ok=True) | |
| results_file = os.path.join(results_dir, f"{team_name}.json") | |
| with open(results_file, "w") as f: | |
| json.dump(scores, f) | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=results_file, | |
| path_in_repo=f"results/{team_name}.json", | |
| repo_id=SUBMISSIONS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update scores for team: {team_name}", | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not upload results to hub: {e}") | |
| def save_submission(team_name: str, token_hash: str, csv_content: str, scores: dict, status: str): | |
| os.makedirs(SUBMISSIONS_PATH, exist_ok=True) | |
| submission_id = str(uuid.uuid4()) | |
| timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| submission_data = { | |
| "submission_id": submission_id, | |
| "team_name": team_name, | |
| "token_hash": token_hash, | |
| "timestamp": timestamp, | |
| "scores": scores, | |
| "status": status, | |
| } | |
| submission_file = os.path.join(SUBMISSIONS_PATH, f"{submission_id}.json") | |
| with open(submission_file, "w") as f: | |
| json.dump(submission_data, f) | |
| csv_file = os.path.join(SUBMISSIONS_PATH, f"{submission_id}.csv") | |
| with open(csv_file, "w") as f: | |
| f.write(csv_content) | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=submission_file, | |
| path_in_repo=f"{submission_id}.json", | |
| repo_id=SUBMISSIONS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Submission from {team_name}", | |
| ) | |
| API.upload_file( | |
| path_or_fileobj=csv_file, | |
| path_in_repo=f"{submission_id}.csv", | |
| repo_id=SUBMISSIONS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"CSV for submission {submission_id}", | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not upload submission to hub: {e}") | |
| def should_update_scores(new_scores: dict, best_scores: dict | None) -> bool: | |
| if best_scores is None: | |
| return True | |
| new_f1 = new_scores.get("f1", 0.0) | |
| best_f1 = best_scores.get("best_f1", 0.0) | |
| return new_f1 > best_f1 | |
| def check_rate_limit(team_name: str) -> tuple[bool, str]: | |
| team_data = get_team_by_name(team_name) | |
| if not team_data: | |
| return True, "" | |
| last_submission = team_data.get("last_valid_submission") | |
| if not last_submission: | |
| return True, "" | |
| try: | |
| last_time = datetime.strptime(last_submission, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=timezone.utc) | |
| now = datetime.now(timezone.utc) | |
| time_diff = now - last_time | |
| if time_diff < timedelta(seconds=2): | |
| remaining_seconds = (timedelta(seconds=2) - time_diff).total_seconds() | |
| remaining_secs = int(remaining_seconds) | |
| return ( | |
| False, | |
| f"Rate limit exceeded. Please wait {remaining_secs} seconds before submitting again.", | |
| ) | |
| return True, "" | |
| except Exception: | |
| return True, "" | |
| def submit_csv(token: str, csv_content: str) -> tuple[bool, str]: | |
| team = validate_token(token) | |
| if not team: | |
| return False, "Invalid token. Please check your team token." | |
| team_name = team["team_name"] | |
| token_hash = hash_token(token) | |
| true_labels = load_true_labels() | |
| if not true_labels: | |
| return False, "Error: True labels not available. Please contact administrators." | |
| is_valid, error_msg, predictions_df = validate_csv(csv_content, true_labels) | |
| if not is_valid: | |
| return False, f"CSV validation failed: {error_msg}" | |
| can_submit, rate_limit_msg = check_rate_limit(team_name) | |
| if not can_submit: | |
| return False, rate_limit_msg | |
| scores = compute_metrics(predictions_df, true_labels) | |
| update_last_valid_submission(team_name) | |
| best_scores = get_team_best_scores(team_name) | |
| if should_update_scores(scores, best_scores): | |
| timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| updated_scores = { | |
| "team_name": team_name, | |
| "best_accuracy": scores["accuracy"], | |
| "best_f1": scores["f1"], | |
| "best_precision": scores["precision"], | |
| "best_recall": scores["recall"], | |
| "best_tp": scores["tp"], | |
| "best_fp": scores["fp"], | |
| "best_fn": scores["fn"], | |
| "best_tn": scores["tn"], | |
| "best_submission_date": timestamp, | |
| } | |
| save_team_best_scores(team_name, updated_scores) | |
| status = "ACCEPTED" | |
| message = f"Submission accepted! Your scores: Accuracy={scores['accuracy']:.4f}, F1={scores['f1']:.4f}, Precision={scores['precision']:.4f}, Recall={scores['recall']:.4f}, TP={scores['tp']}, FP={scores['fp']}, FN={scores['fn']}, TN={scores['tn']}" | |
| else: | |
| status = "ACCEPTED, BUT WORST" | |
| best_acc = best_scores.get("best_accuracy", 0.0) if best_scores else 0.0 | |
| best_f1 = best_scores.get("best_f1", 0.0) if best_scores else 0.0 | |
| message = f"Submission accepted but did not improve your best score. Your scores (Accuracy={scores['accuracy']:.4f}, F1={scores['f1']:.4f}) vs. your best scores (Accuracy={best_acc:.4f}, F1={best_f1:.4f})." | |
| save_submission(team_name, token_hash, csv_content, scores, status) | |
| return True, message | |