Spaces:
Build error
Build error
| """ | |
| Utilities for file management. | |
| """ | |
| import json | |
| import os | |
| import datetime | |
| import shutil | |
| import time | |
| import random | |
| import tempfile | |
| import logging | |
| from filelock import FileLock | |
| logger = logging.getLogger("leaderboard-parser") | |
| def save_results(results, file_path): | |
| """ | |
| Save results to a JSON file. | |
| Args: | |
| results: The results to save | |
| file_path: The path to the file | |
| """ | |
| with open(file_path, "w") as f: | |
| json.dump(results, f, indent=2) | |
| def create_category_slug(category_name): | |
| """ | |
| Creates a slug from a category name. | |
| The slug uses only hyphens as separators (no underscore). | |
| Args: | |
| category_name: The category name | |
| Returns: | |
| The category slug | |
| """ | |
| if not category_name: | |
| return "" | |
| # Convert to lowercase and replace spaces with hyphens | |
| # Ensure no underscores are used in the category slug | |
| return category_name.lower().replace(" ", "-").replace("_", "-") | |
| def create_combined_id(category, uid): | |
| """ | |
| Creates a normalized combined identifier from a category and UID. | |
| First normalizes the category using create_category_slug. | |
| Args: | |
| category: The category name | |
| uid: The UID of the leaderboard | |
| Returns: | |
| The combined identifier in the format category_slug_uid | |
| """ | |
| normalized_category = create_category_slug(category) | |
| return f"{normalized_category}_{uid}" | |
| def validate_leaderboard_result(result): | |
| """ | |
| Validates and corrects if necessary a leaderboard result to ensure identifier consistency. | |
| This function checks: | |
| 1. That 'uid' is present and correctly formatted (category_original_uid) | |
| 2. That 'original_uid' is present | |
| 3. That 'category' is present and normalized | |
| 4. That 'uid' corresponds to the combination of category and original_uid | |
| Args: | |
| result: The leaderboard result to validate (dict) | |
| Returns: | |
| The validated and corrected result, or None if validation is impossible | |
| """ | |
| if not isinstance(result, dict): | |
| logger.error(f"Validation error: the result is not a dictionary") | |
| return None | |
| # Check if required fields are present | |
| if "original_uid" not in result: | |
| logger.error(f"Validation error: original_uid missing from result") | |
| return None | |
| if "category" not in result: | |
| logger.error(f"Validation error: category missing from result") | |
| return None | |
| original_uid = result["original_uid"] | |
| category = result["category"] | |
| # Normalize the category if necessary | |
| normalized_category = create_category_slug(category) | |
| if normalized_category != category: | |
| logger.warning(f"Category not normalized: '{category}' -> '{normalized_category}'") | |
| result["category"] = normalized_category | |
| # Recalculate the correct combined uid | |
| correct_uid = create_combined_id(normalized_category, original_uid) | |
| # Check if existing uid is correct | |
| if "uid" not in result: | |
| logger.warning(f"uid missing, adding calculated uid: {correct_uid}") | |
| result["uid"] = correct_uid | |
| elif result["uid"] != correct_uid: | |
| logger.warning(f"uid inconsistent: '{result['uid']}' does not match '{correct_uid}', correction applied") | |
| result["uid"] = correct_uid | |
| return result | |
| def load_and_validate_results(file_path): | |
| """ | |
| Loads results from the file without strict validation. | |
| Args: | |
| file_path: Path to the results file | |
| Returns: | |
| List of results, or empty list in case of error | |
| """ | |
| try: | |
| # Load results from the file | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| results_data = json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError) as e: | |
| logger.warning(f"Unable to load file {file_path}: {str(e)}") | |
| return [] | |
| # Convert from dict with "leaderboards" to array if necessary | |
| if isinstance(results_data, dict) and "leaderboards" in results_data: | |
| array_results = [] | |
| for uid, item in results_data["leaderboards"].items(): | |
| item_copy = item.copy() | |
| item_copy["uid"] = uid | |
| array_results.append(item_copy) | |
| results_data = array_results | |
| # Ensure results_data is a list | |
| if not isinstance(results_data, list): | |
| logger.warning(f"Invalid data format in {file_path}, initializing empty list") | |
| return [] | |
| # Sort results | |
| results_data.sort(key=lambda x: (x.get("category", ""), x.get("original_uid", ""))) | |
| logger.info(f"Load successful: {len(results_data)} results") | |
| return results_data | |
| except Exception as e: | |
| logger.error(f"Error loading results: {str(e)}") | |
| return [] | |
| def update_leaderboard_result(leaderboard_result, file_path, max_wait_seconds=30): | |
| """ | |
| Updates a leaderboard result in the specified file. | |
| If an entry with the same uid already exists, it is updated. | |
| Otherwise, a new entry is added. | |
| Args: | |
| leaderboard_result: The leaderboard result to update (must contain a uid) | |
| file_path: Path to the results file | |
| max_wait_seconds: Maximum wait time for file lock (in seconds) | |
| Returns: | |
| Updated results list or None in case of error | |
| """ | |
| if not leaderboard_result or "uid" not in leaderboard_result: | |
| logger.error("Unable to update: invalid or missing leaderboard result or uid") | |
| return None | |
| # Create parent directory if necessary | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| # Use a lock to avoid concurrent writes | |
| lock_path = f"{file_path}.lock" | |
| lock = FileLock(lock_path, timeout=max_wait_seconds) | |
| try: | |
| with lock: | |
| # Load existing results | |
| current_results = load_and_validate_results(file_path) | |
| # Index by uid for easy update | |
| results_by_uid = {r.get("uid", ""): r for r in current_results if "uid" in r} | |
| # Update or add result | |
| uid = leaderboard_result["uid"] | |
| if uid in results_by_uid: | |
| # Update existing result | |
| results_by_uid[uid].update(leaderboard_result) | |
| logger.info(f"Result updated for uid: {uid}") | |
| else: | |
| # Add new result | |
| results_by_uid[uid] = leaderboard_result | |
| logger.info(f"New result added for uid: {uid}") | |
| # Convert to list for writing | |
| updated_results = list(results_by_uid.values()) | |
| # Sort results | |
| updated_results.sort(key=lambda x: (x.get("category", ""), x.get("original_uid", ""))) | |
| # Write to temporary file then rename for atomicity | |
| fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(file_path)) | |
| try: | |
| with os.fdopen(fd, 'w', encoding='utf-8') as f: | |
| json.dump(updated_results, f, indent=2, ensure_ascii=False) | |
| # Replace original file with temporary file | |
| shutil.move(temp_path, file_path) | |
| logger.info(f"File updated successfully: {file_path}") | |
| return updated_results | |
| except Exception as e: | |
| # Clean up in case of error | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| raise e | |
| except Exception as e: | |
| logger.error(f"Error updating file {file_path}: {str(e)}") | |
| return None | |
| def split_combined_id(combined_id): | |
| """ | |
| Splits a combined identifier (category_uid) into its components. | |
| Uses only the first underscore "_" as separator. | |
| Args: | |
| combined_id: The combined identifier (category_uid) | |
| Returns: | |
| A tuple (category, uid) or (None, combined_id) if no underscore | |
| """ | |
| if not combined_id: | |
| return None, None | |
| # Search for the first underscore to separate category and uid | |
| parts = combined_id.split("_", 1) | |
| if len(parts) == 2: | |
| return parts[0], parts[1] | |
| else: | |
| # If no underscore, consider it as just a uid without category | |
| return None, combined_id | |
| def format_datetime(dt_str): | |
| """ | |
| Format a datetime string to a human readable format. | |
| Args: | |
| dt_str: The datetime string to format | |
| Returns: | |
| A formatted datetime string | |
| """ | |
| try: | |
| # Check if input is already a datetime object | |
| if isinstance(dt_str, datetime.datetime): | |
| dt = dt_str | |
| else: | |
| # Convert ISO format to datetime object | |
| # Handle different formats of ISO dates including fractional seconds and timezone | |
| try: | |
| dt = datetime.datetime.fromisoformat(dt_str) | |
| except ValueError: | |
| # Handle other common formats | |
| formats = [ | |
| "%Y-%m-%dT%H:%M:%S.%f%z", | |
| "%Y-%m-%dT%H:%M:%S.%f", | |
| "%Y-%m-%dT%H:%M:%S%z", | |
| "%Y-%m-%dT%H:%M:%S", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%Y-%m-%d" | |
| ] | |
| for fmt in formats: | |
| try: | |
| dt = datetime.datetime.strptime(dt_str, fmt) | |
| break | |
| except ValueError: | |
| continue | |
| else: | |
| # If no format matches | |
| return dt_str | |
| # Format the datetime object | |
| return dt.strftime("%d/%m/%Y à %H:%M:%S") | |
| except (ValueError, TypeError) as e: | |
| print(f"Error formatting date {dt_str}: {e}") | |
| return dt_str | |
| def clean_output_files(results_file): | |
| """ | |
| Clean the output files, but keep a backup of the original. | |
| Args: | |
| results_file: The results file to clean | |
| """ | |
| # If results file exists, make a backup | |
| if os.path.exists(results_file): | |
| backup_file = f"{results_file}.backup" | |
| shutil.copy2(results_file, backup_file) | |
| print(f"Backup of {results_file} created in {backup_file}") | |
| # Create an empty results file | |
| with open(results_file, "w") as f: | |
| json.dump([], f, indent=2) | |
| print(f"File {results_file} cleaned") |