Spaces:
Build error
Build error
| """ | |
| Leaderboard processing module for the leaderboard parser. | |
| This module contains the main functions for processing leaderboards. | |
| """ | |
| import json | |
| import os | |
| import datetime | |
| import logging | |
| import time | |
| import argparse | |
| from typing import Dict, Any, List, Tuple, Optional | |
| # Import functions from other modules | |
| from src.file_utils import save_results, format_datetime, clean_output_files, update_leaderboard_result | |
| from src.file_utils import create_category_slug, split_combined_id, create_combined_id | |
| from src.file_utils import load_and_validate_results, validate_leaderboard_result | |
| from src.hub_utils import upload_to_hub, download_from_hub | |
| from src.leaderboard_processor import process_single_leaderboard | |
| from src.agents.parser_agent import get_default_model | |
| from src.agents.browser import cleanup_browser | |
| # Configure logger | |
| logger = logging.getLogger("leaderboard-parser") | |
| # Update state variables in server module | |
| def update_server_status(status, error=None): | |
| """ | |
| Updates the server status. | |
| Args: | |
| status: The new status ('idle', 'running', 'completed', 'failed') | |
| error: The error message in case of failure | |
| """ | |
| try: | |
| from src.server import processing_status, processing_error, last_run_time | |
| # Update global variables in server.py | |
| globals()['processing_status'] = status | |
| globals()['processing_error'] = error | |
| # Update server module variables | |
| import src.server | |
| src.server.processing_status = status | |
| src.server.processing_error = error | |
| # Update last run time when processing completes | |
| if status == "completed": | |
| now = datetime.datetime.now() | |
| src.server.last_run_time = now | |
| logger.info(f"Updated last run time to {now.isoformat()}") | |
| except ImportError: | |
| # In non-server mode, these variables don't exist | |
| pass | |
| def process_leaderboards(args_dict=None) -> Tuple[bool, str]: | |
| """ | |
| Process leaderboards with the given arguments. | |
| Returns a tuple of (success, message) | |
| """ | |
| # Update status | |
| update_server_status("running") | |
| # Set default arguments if none provided | |
| if args_dict is None: | |
| args_dict = {"local_only": False} | |
| # Create an argparse.Namespace object from the dictionary | |
| args = argparse.Namespace(**args_dict) | |
| try: | |
| # Ensure we're in the correct directory | |
| script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| os.chdir(script_dir) | |
| # Verify that the HF token is set | |
| if not os.environ.get("HUGGING_FACE_HUB_TOKEN") and not args.local_only: | |
| raise ValueError("HUGGING_FACE_HUB_TOKEN environment variable is not set!") | |
| # Use default paths for category list and leaderboards | |
| category_list_file = "data/best_model_for_category_list.json" | |
| leaderboards_file = "data/final_leaderboards.json" | |
| results_file = "data/best_model_for_results.json" | |
| # Clean if requested | |
| if getattr(args, "clean", False): | |
| clean_output_files(results_file) | |
| # Check if we're just uploading | |
| if getattr(args, "upload_only", False): | |
| upload_to_hub(to_parse_file=category_list_file, results_file=results_file) | |
| update_server_status("completed") | |
| return True, "Upload completed successfully" | |
| # Download data from the Hub if not in local-only mode | |
| if not getattr(args, "local_only", False): | |
| download_from_hub() | |
| # Just before the line that generates the error | |
| logger.info(f"Starting leaderboard processing") | |
| # Load the category list and leaderboards data | |
| try: | |
| with open(category_list_file, "r", encoding="utf-8") as f: | |
| category_list = json.load(f) | |
| with open(leaderboards_file, "r", encoding="utf-8") as f: | |
| leaderboards = json.load(f) | |
| # Create a mapping UID -> HOST for all leaderboards | |
| uid_to_host = {lb["uid"]: lb["host"] for lb in leaderboards if "uid" in lb and "host" in lb} | |
| logger.info(f"Loaded {len(uid_to_host)} UID -> HOST mappings from {leaderboards_file}") | |
| except FileNotFoundError as e: | |
| update_server_status("failed", str(e)) | |
| return False, f"File not found: {e}" | |
| # Load existing results if any | |
| try: | |
| logger.info(f"Loading and validating results from {results_file}") | |
| results_data = load_and_validate_results(results_file) | |
| all_results = results_data | |
| logger.info(f"Loaded and validated {len(all_results)} existing results") | |
| except Exception as e: | |
| logger.warning(f"Error loading results: {str(e)}") | |
| results_data = [] | |
| all_results = [] | |
| # Create a map of combined UIDs to their complete data (for checking parsing date) | |
| processed_results_map = {} | |
| for result in results_data: | |
| if "uid" in result: | |
| processed_results_map[result["uid"]] = result | |
| # Get reprocessing interval from environment variable (in hours) | |
| # Default value: 24 hours | |
| reprocess_interval_hours = int(os.getenv("LEADERBOARD_REPROCESS_INTERVAL_HOURS", "24")) | |
| # Maximum age without update (in seconds) | |
| max_age_seconds = reprocess_interval_hours * 60 * 60 | |
| logger.info(f"Leaderboard reprocessing interval: {reprocess_interval_hours} hours") | |
| # Current date and time | |
| now = datetime.datetime.now() | |
| print(f"Current system date: {now.isoformat()} - Readable format: {format_datetime(now.isoformat())}") | |
| # Get the default agent | |
| model = get_default_model() | |
| # Collect all leaderboards to process | |
| leaderboards_to_process = [] | |
| force_retry_leaderboards = [] | |
| # Add logs for debugging | |
| logger.info(f"Available categories: {len(category_list)}") | |
| logger.info(f"Available leaderboards: {len(uid_to_host)}") | |
| logger.info(f"Sample of available UIDs: {list(uid_to_host.keys())[:5]}") | |
| # Check if a specific category is requested | |
| target_category = getattr(args, "force_retry_category", None) | |
| target_uid = getattr(args, "force_retry_uid", None) | |
| # Exclusive mode (only process specified leaderboards) | |
| exclusive_mode = target_category is not None or target_uid is not None | |
| if target_category: | |
| logger.info(f"Force retry category mode enabled (exclusive): {target_category}") | |
| if target_uid: | |
| logger.info(f"Force retry UID mode enabled (exclusive): {target_uid}") | |
| # Process leaderboards | |
| for category in category_list: | |
| category_name = category["category"] | |
| normalized_category = create_category_slug(category_name) | |
| # If in specific category mode and this is not the target category, skip to the next | |
| if target_category and target_category != normalized_category: | |
| logger.info(f"Category {category_name} (normalized: {normalized_category}) ignored - Does not match target category {target_category}") | |
| continue | |
| # ADDITIONAL SAFETY: Reload data from file before each new category | |
| # This ensures there is no contamination between categories | |
| try: | |
| logger.info(f"Reloading data from file before processing category: {category_name}") | |
| all_results = load_and_validate_results(results_file) | |
| logger.info(f"Data reloaded successfully: {len(all_results)} results available") | |
| except Exception as e: | |
| logger.warning(f"Unable to reload data before category {category_name}: {str(e)}") | |
| # In case of error, keep existing data if possible | |
| if not isinstance(all_results, list): | |
| all_results = [] | |
| # Check if category has leaderboards | |
| if "leaderboards" not in category or not isinstance(category["leaderboards"], list): | |
| logger.warning(f"Category '{category_name}' has no leaderboards or incorrect format.") | |
| continue | |
| # Process each leaderboard in the category | |
| for leaderboard in category["leaderboards"]: | |
| if "uid" not in leaderboard: | |
| logger.warning(f"Leaderboard in category '{category_name}' has no UID.") | |
| continue | |
| leaderboard_uid = leaderboard["uid"] | |
| # In specific UID mode, ignore all other leaderboards | |
| if target_uid and target_uid != leaderboard_uid: | |
| logger.info(f"Leaderboard {leaderboard_uid} ignored - Does not match target UID {target_uid}") | |
| continue | |
| # Get additional rules if available | |
| additional_rules = leaderboard.get("additionnal_agent_rules", None) | |
| # Check if we should force processing this leaderboard | |
| # Using the new distinct options | |
| force_retry_uid = getattr(args, "force_retry_uid", None) == leaderboard_uid | |
| force_retry_category = getattr(args, "force_retry_category", None) == normalized_category | |
| # Support for the old option for backward compatibility (to be removed later) | |
| legacy_force_retry = False | |
| if hasattr(args, "force_retry") and getattr(args, "force_retry", None) is not None: | |
| legacy_force_retry = ( | |
| getattr(args, "force_retry", None) == leaderboard_uid or | |
| getattr(args, "force_retry", None) == normalized_category | |
| ) | |
| if legacy_force_retry: | |
| logger.warning("The --force-retry option is obsolete. Use --force-retry-uid or --force-retry-category instead.") | |
| # Combine different sources of force_retry | |
| force_retry = force_retry_uid or force_retry_category or legacy_force_retry | |
| # Add explicit logs about the reason for force retry | |
| if force_retry: | |
| if force_retry_uid: | |
| logger.info(f"Force retry enabled for leaderboard UID: {leaderboard_uid}") | |
| elif force_retry_category: | |
| logger.info(f"Force retry enabled for all leaderboards in category: {normalized_category}") | |
| elif legacy_force_retry: | |
| logger.info(f"Force retry enabled via the old --force-retry option for: {getattr(args, 'force_retry', None)}") | |
| # Search for the leaderboard URL in uid_to_host (direct dictionary lookup) | |
| host = uid_to_host.get(leaderboard_uid) | |
| if not host: | |
| logger.warning(f"UID '{leaderboard_uid}' (category: {normalized_category}) not found in leaderboards.") | |
| # Show more information for debugging | |
| logger.debug(f"Total number of UIDs available: {len(uid_to_host)}") | |
| continue | |
| # Create combined identifier (category_uid) | |
| # The category is already normalized by create_category_slug | |
| combined_uid = create_combined_id(normalized_category, leaderboard_uid) | |
| # If force_retry is enabled, process the leaderboard without checking the time since last processing | |
| if force_retry: | |
| logger.info(f"Force retry enabled for {combined_uid} - Processing forced independently of last processing date.") | |
| leaderboards_to_process.append({ | |
| "uid": leaderboard_uid, | |
| "host": host, | |
| "category": normalized_category, | |
| "additional_rules": additional_rules, | |
| "force_retry": force_retry | |
| }) | |
| continue # Skip directly to the next leaderboard | |
| # Check if the leaderboard has already been processed recently | |
| needs_reprocessing = True | |
| if combined_uid in processed_results_map: | |
| # Check if the leaderboard has been processed within the interval | |
| result = processed_results_map[combined_uid] | |
| # If the --ignore-cooldown option is active, force reprocessing regardless of status | |
| if getattr(args, "ignore_cooldown", False): | |
| logger.info(f"Leaderboard {combined_uid} forced reprocessing with --ignore-cooldown, ignoring cooldown period.") | |
| elif "parsed_at" in result: | |
| try: | |
| # Convert parsing date to datetime object | |
| parsed_at = datetime.datetime.fromisoformat(result["parsed_at"]) | |
| # Calculate time elapsed since last parsing | |
| time_diff = now - parsed_at | |
| # Add logs for debugging date checks | |
| logger.info(f"DEBUG: Current date: {now.isoformat()}") | |
| logger.info(f"DEBUG: Last parsing date: {parsed_at.isoformat()}") | |
| logger.info(f"DEBUG: Time difference in seconds: {time_diff.total_seconds()}") | |
| logger.info(f"DEBUG: Reprocessing threshold (seconds): {max_age_seconds}") | |
| # Strictly check if the duration in seconds is greater than the threshold | |
| time_seconds = time_diff.total_seconds() | |
| # If time elapsed is greater than max_age_seconds, reparse | |
| if time_seconds > max_age_seconds: | |
| needs_reprocessing = True | |
| print(f"\n\nLeaderboard {combined_uid} - {host} parsed more than {reprocess_interval_hours} hours ago ({format_datetime(result['parsed_at'])}), reprocessing necessary.") | |
| else: | |
| print(f"\n\nLeaderboard {combined_uid} - {host} already processed recently ({format_datetime(result['parsed_at'])}), moving to next. Age: {time_seconds} seconds (threshold: {max_age_seconds})") | |
| continue | |
| except (ValueError, TypeError): | |
| # If date is invalid, reprocess by precaution | |
| logger.info(f"Leaderboard {combined_uid} has an invalid processing date, reprocessing necessary.") | |
| else: | |
| # If parsing date is missing, reprocess by precaution | |
| logger.info(f"Leaderboard {combined_uid} has no processing date, reprocessing necessary.") | |
| else: | |
| # If the leaderboard has never been processed, process it | |
| logger.info(f"New leaderboard {combined_uid} to process.") | |
| if needs_reprocessing or force_retry: | |
| leaderboards_to_process.append({ | |
| "uid": leaderboard_uid, | |
| "host": host, | |
| "category": normalized_category, | |
| "additional_rules": additional_rules, | |
| "force_retry": force_retry | |
| }) | |
| # Information on the number of leaderboards to process | |
| logger.info(f"Total number of leaderboards to process: {len(leaderboards_to_process)}") | |
| # Process each leaderboard | |
| for index, leaderboard_info in enumerate(leaderboards_to_process): | |
| leaderboard_uid = leaderboard_info["uid"] | |
| host = leaderboard_info["host"] | |
| category_name = leaderboard_info["category"] | |
| additional_rules = leaderboard_info["additional_rules"] | |
| force_retry = leaderboard_info["force_retry"] | |
| # Process this leaderboard | |
| logger.info(f"Processing leaderboard {index+1}/{len(leaderboards_to_process)}: {leaderboard_uid} (category: {category_name})") | |
| try: | |
| # Force restart of browser every 2 leaderboards to avoid memory leaks | |
| if index > 0 and index % 2 == 0: | |
| logger.info(f"Periodic browser cleanup after {index} leaderboards to avoid memory leaks") | |
| cleanup_browser() | |
| # Force garbage collection | |
| import gc | |
| gc.collect() | |
| # Small pause to let the system clean up | |
| time.sleep(3) | |
| # Process the leaderboard | |
| all_results = process_single_leaderboard( | |
| leaderboard_uid, | |
| host, | |
| model, | |
| index, | |
| all_results, | |
| additional_rules, | |
| category_name | |
| ) | |
| # Add detailed logs for diagnosing problems | |
| logger.info(f"Results after processing: {len(all_results)} elements") | |
| # Search for results corresponding to the processed leaderboard | |
| for idx, res in enumerate(all_results): | |
| if res.get("original_uid") == leaderboard_uid: | |
| logger.info(f"Found result {idx}: uid={res.get('uid')}, original_uid={res.get('original_uid')}, category={res.get('category')}") | |
| # Clean up after each processing | |
| cleanup_browser() | |
| # Verify if the leaderboard exists with the exact normalized category | |
| # MODIFICATION: Strict search by original_uid AND category | |
| normalized_category_name = create_category_slug(category_name) | |
| current_result = None | |
| for result in all_results: | |
| # Always compare normalized categories to avoid format issues | |
| result_category = result.get("category", "") | |
| if result.get("original_uid") == leaderboard_uid and create_category_slug(result_category) == normalized_category_name: | |
| current_result = result | |
| logger.info(f"Found result for {leaderboard_uid}, category: {result.get('category')}") | |
| break | |
| # SUPPRESSION: No longer search for alternatives with only original_uid | |
| # If result is not found, it's probably an error or processing failed | |
| if not current_result: | |
| logger.error(f"RESULT NOT FOUND for {leaderboard_uid}, normalized_category: {normalized_category_name}") | |
| logger.error(f"Search for all results corresponding to this UID:") | |
| for res in all_results: | |
| if res.get("original_uid") == leaderboard_uid: | |
| logger.error(f" - Result with category={res.get('category')}, uid={res.get('uid')}") | |
| logger.error(f"Leaderboard {leaderboard_uid} (category: {category_name}) not updated because result not found") | |
| continue | |
| # Update only this specific leaderboard in the results file | |
| logger.info(f"Updating leaderboard {leaderboard_uid} (category: {category_name}) in file") | |
| updated_results = update_leaderboard_result(current_result, results_file) | |
| # CORRECTION CRITIQUE: Update all_results list with file data | |
| # to avoid desynchronization between file and in-memory list | |
| all_results = updated_results | |
| # Update global result for next leaderboard | |
| results_data = updated_results | |
| logger.info(f"Leaderboard {leaderboard_uid} (category: {category_name}) saved") | |
| # Upload to HF Hub after each leaderboard if not in local-only mode | |
| if not getattr(args, "local_only", False): | |
| logger.info(f"Uploading results to HF Hub after processing leaderboard {leaderboard_uid}") | |
| try: | |
| upload_to_hub(to_parse_file=category_list_file, results_file=results_file) | |
| logger.info(f"Upload successful to HF Hub for leaderboard {leaderboard_uid}") | |
| except Exception as upload_err: | |
| logger.warning(f"Upload to HF Hub failed after processing leaderboard {leaderboard_uid}: {str(upload_err)}") | |
| except Exception as e: | |
| logger.error(f"Error processing leaderboard {leaderboard_uid} (category: {category_name}): {str(e)}") | |
| continue | |
| # Final save not necessary as all leaderboards have already been updated individually | |
| logger.info("Leaderboard processing completed") | |
| update_server_status("completed") | |
| return True, "Processing completed successfully" | |
| except Exception as e: | |
| update_server_status("failed", str(e)) | |
| logger.exception("Error processing leaderboards") | |
| return False, f"Error processing leaderboards: {str(e)}" |