import pandas as pd from dash import html, dcc from dash_iconify import DashIconify import dash_mantine_components as dmc import base64 button_style = { "display": "inline-block", "marginBottom": "10px", "marginRight": "15px", "marginTop": "30px", "padding": "6px 16px", "backgroundColor": "#082030", "color": "white", "borderRadius": "6px", "textDecoration": "none", "fontWeight": "bold", "fontSize": "14px", } country_icon_map = { "USA": "🇺🇸", "China": "🇨🇳", "Germany": "🇩🇪", "France": "🇫🇷", "India": "🇮🇳", "Italy": "🇮🇹", "Japan": "🇯🇵", "South Korea": "🇰🇷", "United Kingdom": "🇬🇧", "Canada": "🇨🇦", "Brazil": "🇧🇷", "Australia": "🇦🇺", "Unknown": "❓", "Finland": "🇫🇮", "Lebanon": "🇱🇧", "Iceland": "🇮🇸", "Singapore": "🇸🇬", "Israel": "🇮🇱", "Iran": "🇮🇷", "Hong Kong": "🇭🇰", "Netherlands": "🇳🇱", "Chile": "🇨🇱", "Vietnam": "🇻🇳", "Russia": "🇷🇺", "Qatar": "🇶🇦", "Switzerland": "🇨🇭", "User": "👤", "International/Online": "🌐", "Spain": "🇪🇸", "Sweden": "🇸🇪", "Norway": "🇳🇴", "Denmark": "🇩🇰", "Austria": "🇦🇹", "Belgium": "🇧🇪", "Poland": "🇵🇱", "Turkey": "🇹🇷", "Mexico": "🇲🇽", "Argentina": "🇦🇷", "Thailand": "🇹🇭", "Indonesia": "🇮🇩", "Malaysia": "🇲🇾", "Philippines": "🇵🇭", "Egypt": "🇪🇬", "South Africa": "🇿🇦", "New Zealand": "🇳🇿", "Ireland": "🇮🇪", "Portugal": "🇵🇹", "Greece": "🇬🇷", "Czech Republic": "🇨🇿", "Romania": "🇷🇴", "Ukraine": "🇺🇦", "United Arab Emirates": "🇦🇪", "Saudi Arabia": "🇸🇦", "Pakistan": "🇵🇰", "Bangladesh": "🇧🇩", } company_icon_map = { "google": "../assets/icons/google.png", "distilbert": "../assets/icons/hugging-face.png", "sentence-transformers": "../assets/icons/hugging-face.png", "facebook": "../assets/icons/meta.png", "openai": "../assets/icons/openai.png", } meta_cols_map = { "org_country_single": ["org_country_single"], "author": ["org_country_single", "author", "merged_country_groups_single"], "model": [ "org_country_single", "author", "merged_country_groups_single", "merged_modality", "total_downloads", ], } # Chip renderer def chip(text, bg_color="#F0F0F0"): return html.Span( text, style={ "backgroundColor": bg_color, "padding": "4px 10px", "borderRadius": "12px", "margin": "2px", "display": "inline-flex", "alignItems": "center", "fontSize": "14px", }, ) # Progress bar for % of total def progress_bar(percent, bar_color="#082030"): return html.Div( style={ "position": "relative", "backgroundColor": "#E0E0E0", "borderRadius": "8px", "height": "20px", "width": "100%", "overflow": "hidden", }, children=[ html.Div( style={ "backgroundColor": bar_color, "width": f"{percent}%", "height": "100%", "borderRadius": "8px", "transition": "width 0.5s", } ), html.Div( f"{percent:.1f}%", style={ "position": "absolute", "top": 0, "left": "50%", "transform": "translateX(-50%)", "color": "black", "fontWeight": "bold", "fontSize": "12px", "lineHeight": "20px", "textAlign": "center", }, ), ], ) # Helper to convert DataFrame to CSV and encode for download def df_to_download_link(df, filename): csv_string = df.to_csv(index=False) b64 = base64.b64encode(csv_string.encode()).decode() return html.Div( html.A( children=dmc.ActionIcon( DashIconify(icon="mdi:download", width=24), size="lg", color="#082030", ), id=f"download-{filename}", download=f"{filename}.csv", href=f"data:text/csv;base64,{b64}", target="_blank", title="Download CSV", style={ "padding": "6px 12px", "display": "inline-flex", "alignItems": "center", "justifyContent": "center", }, ), style={"textAlign": "right"}, ) # Render multiple chips in one row def render_chips(metadata_list, chip_color): chips = [] for icon, name in metadata_list: if isinstance(icon, str) and icon.endswith((".png", ".jpg", ".jpeg", ".svg")): chips.append( html.Span( [ html.Img( src=icon, style={"height": "18px", "marginRight": "6px"} ), name, ], style={ "backgroundColor": chip_color, "padding": "4px 10px", "borderRadius": "12px", "margin": "2px", "display": "inline-flex", "alignItems": "left", "fontSize": "14px", }, ) ) else: chips.append(chip(f"{icon} {name}", chip_color)) return html.Div( chips, style={"display": "flex", "flexWrap": "wrap", "justifyContent": "left"} ) def render_table_content( df, download_df, chip_color, bar_color="#082030", filename="data" ): return html.Div( [ html.Table( [ html.Thead( html.Tr( [ html.Th( "Rank", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", }, ), html.Th( "Name", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", }, ), html.Th( "Metadata", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", "marginRight": "10px", }, ), html.Th( "% of Total", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", }, ), ] ) ), html.Tbody( [ html.Tr( [ html.Td(idx + 1, style={"textAlign": "center"}), html.Td(row["Name"], style={"textAlign": "left"}), html.Td(render_chips(row["Metadata"], chip_color)), html.Td( progress_bar(row["% of total"], bar_color), style={"textAlign": "center"}, ), ] ) for idx, row in df.iterrows() ] ), ], style={"borderCollapse": "collapse", "width": "100%"}, ), ] ) # Table renderer def render_table( df, download_df, title, chip_color, bar_color="#AC482A", filename="data" ): return html.Div( id=f"{filename}-div", children=[ html.Div( [ html.H4( title, style={ "textAlign": "left", "marginBottom": "10px", "fontSize": "20px", "display": "inline-block", }, ), df_to_download_link(download_df, filename), ], style={ "display": "flex", "alignItems": "center", "justifyContent": "space-between", }, ), html.Div( id=f"{filename}-table", children=[ html.Table( [ html.Thead( html.Tr( [ html.Th( "Rank", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", }, ), html.Th( "Name", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", }, ), html.Th( "Metadata", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", "marginRight": "10px", }, ), html.Th( "% of Total", style={ "backgroundColor": "#F0F0F0", "textAlign": "left", }, ), ] ) ), html.Tbody( [ html.Tr( [ html.Td( idx + 1, style={"textAlign": "center"} ), html.Td( row["Name"], style={"textAlign": "left"} ), html.Td( render_chips( row["Metadata"], chip_color ) ), html.Td( progress_bar( row["% of total"], bar_color ), style={"textAlign": "center"}, ), ] ) for idx, row in df.iterrows() ] ), ], style={ "borderCollapse": "collapse", "width": "100%", "border": "none", }, ), ], ), dcc.Loading( id=f"loading-{filename}-toggle", type="dot", color="#082030", children=html.Div( [ html.Button( "▼ Show Top 50", id=f"{filename}-toggle", n_clicks=0, style={**button_style, "border": "none"}, ) ], style={"marginTop": "5px", "textAlign": "left"}, ), ), ], style={"marginBottom": "20px"}, ) # Function to get top N leaderboard (now accepts pandas DataFrame from DuckDB query) def get_top_n_leaderboard(filtered_df, group_col, top_n=10): """ Get top N entries for a leaderboard Args: filtered_df: Pandas DataFrame (already filtered by time from DuckDB query) group_col: Column to group by top_n: Number of top entries to return Returns: tuple: (display_df, download_df) """ # Group by and get top N top = ( filtered_df.groupby(group_col)[["total_downloads", "percent_of_total"]] .sum() .nlargest(top_n, columns="total_downloads") .reset_index() .rename(columns={group_col: "Name", "total_downloads": "Total Value", "percent_of_total": "% of total"}) ) # Create a downloadable version of the leaderboard download_top = top.copy() download_top["Total Value"] = download_top["Total Value"].astype(int) download_top["% of total"] = download_top["% of total"].round(2) # Replace "User" in names top["Name"] = top["Name"].replace("User", "user") # All relevant metadata columns meta_cols = meta_cols_map.get(group_col, []) # Collect all metadata per top n for each category (country, author, model) meta_map = {} download_map = {} for name in top["Name"]: name_data = filtered_df[filtered_df[group_col] == name] meta_map[name] = {} download_map[name] = {} for col in meta_cols: if col in name_data.columns: unique_vals = name_data[col].unique() meta_map[name][col] = list(unique_vals) download_map[name][col] = list(unique_vals) # Function to build metadata chips def build_metadata(nm): meta = meta_map.get(nm, {}) chips = [] # Countries for c in meta.get("org_country_single", []): if c == "United States of America": c = "USA" if c == "user": c = "User" chips.append((country_icon_map.get(c, "🌍"), c)) # Author for a in meta.get("author", []): icon = company_icon_map.get(a, "") if icon == "": if meta.get("merged_country_groups_single", ["User"])[0] != "User": icon = "🏢" else: icon = "👤" chips.append((icon, a)) # Downloads total_downloads = sum( d for d in meta.get("total_downloads", []) if pd.notna(d) ) if total_downloads: chips.append(("⬇️", f"{int(total_downloads):,}")) # Modality for m in meta.get("merged_modality", []): if pd.notna(m): chips.append(("", m)) # Estimated Parameters for p in meta.get("estimated_parameters", []): if pd.notna(p): if p >= 1e9: p_str = f"{p / 1e9:.1f}B" elif p >= 1e6: p_str = f"{p / 1e6:.1f}M" elif p >= 1e3: p_str = f"{p / 1e3:.1f}K" else: p_str = str(int(p)) chips.append(("⚙️", p_str)) return chips # Function to create downloadable dataframe metadata def build_download_metadata(nm): meta = download_map.get(nm, {}) download_info = {} for col in meta_cols: if col not in meta or not meta[col]: continue vals = meta.get(col, []) if vals: download_info[col] = ", ".join(str(v) for v in vals if pd.notna(v)) else: download_info[col] = "" return download_info # Apply metadata builder to top dataframe top["Metadata"] = top["Name"].astype(object).apply(build_metadata) # Build download dataframe with metadata download_info_list = [build_download_metadata(nm) for nm in download_top["Name"]] download_info_df = pd.DataFrame(download_info_list) download_top = pd.concat([download_top, download_info_df], axis=1) return top[["Name", "Metadata", "% of total"]], download_top def get_top_n_from_duckdb(con, group_col, top_n=10, time_filter=None): """ Query DuckDB directly to get top N entries with minimal data transfer Args: con: DuckDB connection object group_col: Column to group by top_n: Number of top entries time_filter: Optional tuple of (start_timestamp, end_timestamp) Returns: Pandas DataFrame with only the rows needed for top N """ # Build time filter clause time_clause = "" if time_filter: start = pd.to_datetime(time_filter[0], unit="s") end = pd.to_datetime(time_filter[1], unit="s") time_clause = f"WHERE time >= '{start}' AND time <= '{end}'" # Optimized query: first find top N, then get only those rows query = f""" WITH base_data AS ( SELECT {group_col}, CASE WHEN org_country_single = 'HF' THEN 'United States of America' WHEN org_country_single = 'International' THEN 'International/Online' WHEN org_country_single = 'Online' THEN 'International/Online' ELSE org_country_single END AS org_country_single, author, merged_country_groups_single, merged_modality, downloads, estimated_parameters, model FROM filtered_df {time_clause} ), -- Compute the total downloads for all rows in the time range total_downloads_cte AS ( SELECT SUM(downloads) AS total_downloads_all FROM base_data ), -- Compute per-group totals and their percentage of all downloads top_items AS ( SELECT b.{group_col} AS name, SUM(b.downloads) AS total_downloads, ROUND(SUM(b.downloads) * 100.0 / t.total_downloads_all, 2) AS percent_of_total, -- Pick first non-null metadata values for reference ANY_VALUE(b.org_country_single) AS org_country_single, ANY_VALUE(b.author) AS author, ANY_VALUE(b.merged_country_groups_single) AS merged_country_groups_single, ANY_VALUE(b.merged_modality) AS merged_modality, ANY_VALUE(b.model) AS model FROM base_data b CROSS JOIN total_downloads_cte t GROUP BY b.{group_col}, t.total_downloads_all ) SELECT * FROM top_items ORDER BY total_downloads DESC LIMIT {top_n}; """ print("Executing DuckDB query:") print(query) # Print the query for debugging try: return con.execute(query).fetchdf() except Exception as e: print(f"Error querying DuckDB: {e}") return pd.DataFrame() def create_leaderboard(con, board_type, top_n=10): """ Create leaderboard using DuckDB connection with optimized queries Args: con: DuckDB connection object board_type: Type of leaderboard ('countries', 'developers', 'models') top_n: Number of top entries to display Returns: Dash HTML component with the leaderboard table """ # Map board type to column name column_map = { "countries": "org_country_single", "developers": "author", "models": "model" } title_map = { "countries": "Top Countries", "developers": "Top Developers", "models": "Top Models" } filename_map = { "countries": "top_countries", "developers": "top_developers", "models": "top_models" } group_col = column_map.get(board_type) if not group_col: return html.Div(f"Unknown board type: {board_type}") # Get only the top N rows from DuckDB filtered_df = get_top_n_from_duckdb(con, group_col, top_n) if filtered_df.empty: return html.Div("No data available") # Process the already-filtered data top_data, download_data = get_top_n_leaderboard(filtered_df, group_col, top_n) print(f"Creating leaderboard for {board_type} with top {top_n} entries.") print(top_data[0:5]) # Print first 5 rows for debugging return render_table( top_data, download_data, title_map[board_type], chip_color="#F0F9FF", bar_color="#082030", filename=filename_map[board_type], )