Spaces:
Sleeping
Sleeping
| from huggingface_hub import HfFileSystem | |
| import pandas as pd | |
| from utils import logger | |
| from datetime import datetime, timedelta | |
| import threading | |
| import traceback | |
| import json | |
| import re | |
| from typing import List, Tuple, Optional | |
| # NOTE: if caching is an issue, try adding `use_listings_cache=False` | |
| fs = HfFileSystem() | |
| IMPORTANT_MODELS = [ | |
| "auto", | |
| "bert", # old but dominant (encoder only) | |
| "gpt2", # old (decoder) | |
| "t5", # old (encoder-decoder) | |
| "modernbert", # (encoder only) | |
| "vit", # old (vision) - fixed comma | |
| "clip", # old but dominant (vision) | |
| "detr", # objection detection, segmentation (vision) | |
| "table-transformer", # objection detection (visioin) - maybe just detr? | |
| "got_ocr2", # ocr (vision) | |
| "whisper", # old but dominant (audio) | |
| "wav2vec2", # old (audio) | |
| "llama", # new and dominant (meta) | |
| "gemma3", # new (google) | |
| "qwen2", # new (Alibaba) | |
| "mistral3", # new (Mistral) - added missing comma | |
| "qwen2_5_vl", # new (vision) | |
| "llava", # many models from it (vision) | |
| "smolvlm", # new (video) | |
| "internvl", # new (video) | |
| "gemma3n", # new (omnimodal models) | |
| "qwen2_5_omni", # new (omnimodal models) | |
| ] | |
| KEYS_TO_KEEP = [ | |
| "success_amd", | |
| "success_nvidia", | |
| "skipped_amd", | |
| "skipped_nvidia", | |
| "failed_multi_no_amd", | |
| "failed_multi_no_nvidia", | |
| "failed_single_no_amd", | |
| "failed_single_no_nvidia", | |
| "failures_amd", | |
| "failures_nvidia", | |
| "job_link_amd", | |
| "job_link_nvidia", | |
| ] | |
| def log_dataframe_link(link: str) -> str: | |
| """ | |
| Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the | |
| report. | |
| """ | |
| logger.info(f"Reading df located at {link}") | |
| # Make sure the links starts with an http adress | |
| if link.startswith("hf://"): | |
| link = "https://huggingface.co/" + link.removeprefix("hf://") | |
| # Pattern to match transformers_daily_ci followed by any path, then a date (YYYY-MM-DD format) | |
| pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})' | |
| match = re.search(pattern, link) | |
| # Failure case: | |
| if not match: | |
| logger.error("Could not find transformers_daily_ci and.or date in the link") | |
| return "9999-99-99" | |
| # Replace the path between with blob/main | |
| path_between = match.group(1) | |
| link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main") | |
| logger.info(f"Link to data source: {link}") | |
| # Return the date | |
| return match.group(2) | |
| def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str: | |
| # Early return if one of the dates is invalid | |
| if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"): | |
| return "could not find last update time" | |
| # Warn if dates are not the same | |
| if date_df_amd != date_df_nvidia: | |
| logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)") | |
| # Take the latest date and format it | |
| try: | |
| latest_date = max(date_df_amd, date_df_nvidia) | |
| yyyy, mm, dd = latest_date.split("-") | |
| return f"last updated {mm}/{dd}/{yyyy}" | |
| except Exception as e: | |
| logger.error(f"When trying to infer latest date, got error {e}") | |
| return "could not find last update time" | |
| def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]: | |
| df_upload_date = log_dataframe_link(json_path) | |
| df = pd.read_json(json_path, orient="index") | |
| df.index.name = "model_name" | |
| df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) | |
| df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) | |
| return df, df_upload_date | |
| def get_available_dates() -> List[str]: | |
| """Get list of available dates from both AMD and NVIDIA datasets.""" | |
| try: | |
| # Get AMD dates - the path structure is: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json | |
| amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" | |
| files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) | |
| logger.info(f"Found {len(files_amd)} AMD files") | |
| # Get NVIDIA dates - structure is: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json | |
| nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" | |
| files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) | |
| logger.info(f"Found {len(files_nvidia)} NVIDIA files") | |
| # Extract dates from file paths | |
| amd_dates = set() | |
| for file_path in files_amd: | |
| # Pattern to match the date in the AMD path: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json | |
| pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/runs/[^/]+/ci_results_run_models_gpu/model_results\.json' | |
| match = re.search(pattern, file_path) | |
| if match: | |
| amd_dates.add(match.group(1)) | |
| else: | |
| # Log unmatched paths for debugging | |
| logger.debug(f"AMD file path didn't match pattern: {file_path}") | |
| # Log a few example AMD file paths for debugging | |
| if files_amd: | |
| logger.info(f"Example AMD file paths: {files_amd[:3]}") | |
| nvidia_dates = set() | |
| for file_path in files_nvidia: | |
| # Pattern to match the date in the NVIDIA path: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json | |
| pattern = r'transformers_daily_ci/(\d{4}-\d{2}-\d{2})/ci_results_run_models_gpu/model_results\.json' | |
| match = re.search(pattern, file_path) | |
| if match: | |
| nvidia_dates.add(match.group(1)) | |
| logger.info(f"AMD dates: {sorted(amd_dates, reverse=True)[:5]}...") # Show first 5 | |
| logger.info(f"NVIDIA dates: {sorted(nvidia_dates, reverse=True)[:5]}...") # Show first 5 | |
| # Return intersection of both datasets (dates where both have data) | |
| common_dates = sorted(amd_dates.intersection(nvidia_dates), reverse=True) | |
| logger.info(f"Common dates: {len(common_dates)} dates where both AMD and NVIDIA have data") | |
| return common_dates[:30] # Limit to last 30 days for performance | |
| except Exception as e: | |
| logger.error(f"Error getting available dates: {e}") | |
| # Return empty list if no data available | |
| return [] | |
| def get_data_for_date(target_date: str) -> tuple[pd.DataFrame, str]: | |
| """Get data for a specific date.""" | |
| try: | |
| # For AMD, we need to find the specific run file for the date | |
| # AMD structure: YYYY-MM-DD/runs/{run_id}/ci_results_run_models_gpu/model_results.json | |
| amd_src = f"hf://datasets/optimum-amd/transformers_daily_ci/{target_date}/runs/*/ci_results_run_models_gpu/model_results.json" | |
| amd_files = fs.glob(amd_src, refresh=True) | |
| if not amd_files: | |
| raise FileNotFoundError(f"No AMD data found for date {target_date}") | |
| # Use the first (most recent) run for the date | |
| amd_file = amd_files[0] | |
| # NVIDIA structure: YYYY-MM-DD/ci_results_run_models_gpu/model_results.json | |
| nvidia_src = f"hf://datasets/hf-internal-testing/transformers_daily_ci/{target_date}/ci_results_run_models_gpu/model_results.json" | |
| # Read dataframes | |
| df_amd, _ = read_one_dataframe(amd_file, "amd") | |
| df_nvidia, _ = read_one_dataframe(nvidia_src, "nvidia") | |
| # Join both dataframes | |
| joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") | |
| joined = joined[KEYS_TO_KEEP] | |
| joined.index = joined.index.str.replace("^models_", "", regex=True) | |
| # Filter out all but important models | |
| important_models_lower = [model.lower() for model in IMPORTANT_MODELS] | |
| filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] | |
| return filtered_joined, target_date | |
| except Exception as e: | |
| logger.error(f"Error getting data for date {target_date}: {e}") | |
| # Return empty dataframe instead of sample data for historical functionality | |
| return pd.DataFrame(), target_date | |
| def get_historical_data(start_date: str, end_date: str) -> pd.DataFrame: | |
| """Get historical data for a date range.""" | |
| try: | |
| start_dt = datetime.strptime(start_date, "%Y-%m-%d") | |
| end_dt = datetime.strptime(end_date, "%Y-%m-%d") | |
| historical_data = [] | |
| current_dt = start_dt | |
| while current_dt <= end_dt: | |
| date_str = current_dt.strftime("%Y-%m-%d") | |
| try: | |
| df, _ = get_data_for_date(date_str) | |
| # Only add non-empty dataframes | |
| if not df.empty: | |
| df['date'] = date_str | |
| historical_data.append(df) | |
| logger.info(f"Loaded data for {date_str}") | |
| else: | |
| logger.warning(f"No data available for {date_str}") | |
| except Exception as e: | |
| logger.warning(f"Could not load data for {date_str}: {e}") | |
| current_dt += timedelta(days=1) | |
| if not historical_data: | |
| logger.warning("No historical data found for the specified range") | |
| return pd.DataFrame() | |
| # Combine all dataframes | |
| combined_df = pd.concat(historical_data, ignore_index=False) | |
| return combined_df | |
| except Exception as e: | |
| logger.error(f"Error getting historical data: {e}") | |
| # Return empty dataframe with proper structure | |
| return pd.DataFrame() | |
| def get_distant_data() -> tuple[pd.DataFrame, str]: | |
| # Retrieve AMD dataframe | |
| amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" | |
| files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) | |
| df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") | |
| # Retrieve NVIDIA dataframe, which pattern should be: | |
| # hf://datasets/hf-internal-testing`/transformers_daily_ci/raw/main/YYYY-MM-DD/ci_results_run_models_gpu/model_results.json | |
| nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" | |
| files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) | |
| # NOTE: should this be removeprefix instead of lstrip? | |
| nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') | |
| nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path | |
| df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia") | |
| # Infer and format the latest df date | |
| latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia) | |
| # Join both dataframes | |
| joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") | |
| joined = joined[KEYS_TO_KEEP] | |
| joined.index = joined.index.str.replace("^models_", "", regex=True) | |
| # Fitler out all but important models | |
| important_models_lower = [model.lower() for model in IMPORTANT_MODELS] | |
| filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] | |
| # Warn for ach missing important models | |
| for model in IMPORTANT_MODELS: | |
| if model not in filtered_joined.index: | |
| print(f"[WARNING] Model {model} was missing from index.") | |
| return filtered_joined, latest_update_msg | |
| def get_sample_data() -> tuple[pd.DataFrame, str]: | |
| # Retrieve sample dataframes | |
| df_amd, _ = read_one_dataframe("sample_amd.json", "amd") | |
| df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia") | |
| # Join both dataframes | |
| joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") | |
| joined = joined[KEYS_TO_KEEP] | |
| joined.index = joined.index.str.replace("^models_", "", regex=True) | |
| # Fitler out all but important models | |
| important_models_lower = [model.lower() for model in IMPORTANT_MODELS] | |
| filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] | |
| # Prefix all model names with "sample_" | |
| filtered_joined.index = "sample_" + filtered_joined.index | |
| return filtered_joined, "sample data was loaded" | |
| def safe_extract(row: pd.DataFrame, key: str) -> int: | |
| return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 | |
| def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: | |
| """Extract and process model data from DataFrame row.""" | |
| # Handle missing values and get counts directly from dataframe | |
| success_nvidia = safe_extract(row, "success_nvidia") | |
| success_amd = safe_extract(row, "success_amd") | |
| skipped_nvidia = safe_extract(row, "skipped_nvidia") | |
| skipped_amd = safe_extract(row, "skipped_amd") | |
| failed_multi_amd = safe_extract(row, 'failed_multi_no_amd') | |
| failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia') | |
| failed_single_amd = safe_extract(row, 'failed_single_no_amd') | |
| failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia') | |
| # Calculate total failures | |
| total_failed_amd = failed_multi_amd + failed_single_amd | |
| total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia | |
| # Create stats dictionaries directly from dataframe values | |
| amd_stats = { | |
| 'passed': success_amd, | |
| 'failed': total_failed_amd, | |
| 'skipped': skipped_amd, | |
| 'error': 0 # Not available in this dataset | |
| } | |
| nvidia_stats = { | |
| 'passed': success_nvidia, | |
| 'failed': total_failed_nvidia, | |
| 'skipped': skipped_nvidia, | |
| 'error': 0 # Not available in this dataset | |
| } | |
| return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia | |
| class CIResults: | |
| def __init__(self): | |
| self.df = pd.DataFrame() | |
| self.available_models = [] | |
| self.latest_update_msg = "" | |
| self.available_dates = [] | |
| self.historical_df = pd.DataFrame() | |
| def load_data(self) -> None: | |
| """Load data from the data source.""" | |
| # Try loading the distant data, and fall back on sample data for local tinkering | |
| try: | |
| logger.info("Loading distant data...") | |
| new_df, latest_update_msg = get_distant_data() | |
| self.latest_update_msg = latest_update_msg | |
| except Exception as e: | |
| error_msg = [ | |
| "Loading data failed:", | |
| "-" * 120, | |
| traceback.format_exc(), | |
| "-" * 120, | |
| "Falling back on sample data." | |
| ] | |
| logger.error("\n".join(error_msg)) | |
| new_df, latest_update_msg = get_sample_data() | |
| self.latest_update_msg = latest_update_msg | |
| # Load available dates | |
| try: | |
| self.available_dates = get_available_dates() | |
| logger.info(f"Available dates: {len(self.available_dates)} dates") | |
| if self.available_dates: | |
| logger.info(f"Date range: {self.available_dates[-1]} to {self.available_dates[0]}") | |
| else: | |
| logger.warning("No available dates found") | |
| except Exception as e: | |
| logger.error(f"Error loading available dates: {e}") | |
| self.available_dates = [] | |
| # Update attributes | |
| self.df = new_df | |
| self.available_models = new_df.index.tolist() | |
| # Log and return distant load status | |
| logger.info(f"Data loaded successfully: {len(self.available_models)} models") | |
| logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") | |
| logger.info(f"Latest update message: {self.latest_update_msg}") | |
| # Log a preview of the df | |
| msg = {} | |
| for model in self.available_models[:3]: | |
| msg[model] = {} | |
| for col in self.df.columns: | |
| value = self.df.loc[model, col] | |
| if not isinstance(value, int): | |
| value = str(value) | |
| if len(value) > 10: | |
| value = value[:10] + "..." | |
| msg[model][col] = value | |
| logger.info(json.dumps(msg, indent=4)) | |
| def load_historical_data(self, start_date: str, end_date: str) -> None: | |
| """Load historical data for a date range.""" | |
| try: | |
| logger.info(f"Loading historical data from {start_date} to {end_date}") | |
| self.historical_df = get_historical_data(start_date, end_date) | |
| logger.info(f"Historical data loaded: {len(self.historical_df)} records") | |
| except Exception as e: | |
| logger.error(f"Error loading historical data: {e}") | |
| self.historical_df = pd.DataFrame() | |
| def schedule_data_reload(self): | |
| """Schedule the next data reload.""" | |
| def reload_data(): | |
| self.load_data() | |
| # Schedule the next reload in 15 minutes (900 seconds) | |
| timer = threading.Timer(900.0, reload_data) | |
| timer.daemon = True # Dies when main thread dies | |
| timer.start() | |
| logger.info("Next data reload scheduled in 15 minutes") | |
| # Start the first reload timer | |
| timer = threading.Timer(900.0, reload_data) | |
| timer.daemon = True | |
| timer.start() | |
| logger.info("Data auto-reload scheduled every 15 minutes") | |