Spaces:
Running
Running
| import gradio as gr | |
| import pandas as pd | |
| import geopandas as gpd | |
| import plotly.express as px | |
| import numpy as np | |
| # 1. Load Data | |
| # --------------------------------------------------------- | |
| try: | |
| df = pd.read_parquet('Russia_regions_data.parquet') | |
| gdf = gpd.read_parquet('Russia_regions_geo_simplified.geoparquet') | |
| except Exception as e: | |
| print(f"Error loading files: {e}") | |
| # Fallback dummy data | |
| df = pd.DataFrame(columns=['section', 'subsection', 'indicator_name', 'comment', 'year', | |
| 'indicator_value', 'rel_indicator_value', 'indicator_unit', 'positive', | |
| 'indicator_code', 'Region', 'object_oktmo', 'ISO']) | |
| gdf = gpd.GeoDataFrame(columns=['ISO', 'geometry']) | |
| # Ensure geometries are in standard lat/lon | |
| gdf = gdf.to_crs(epsg=4326) | |
| # 2. Prepare the Table Data (The "Menu") | |
| # --------------------------------------------------------- | |
| # Filter for unique indicator_code | |
| df_unique = df.drop_duplicates(subset=['indicator_code']).copy() | |
| # Define columns (added indicator_value and rel_indicator_value, hidden positive) | |
| display_columns = [ | |
| 'section', 'subsection', 'indicator_name', 'comment', | |
| 'year', 'indicator_value', 'rel_indicator_value', 'indicator_unit' | |
| ] | |
| hidden_link_key = 'indicator_code' | |
| # Create the display dataframe | |
| df_display = df_unique[display_columns].reset_index(drop=True) | |
| # Calculate optimal column widths based on content with min/max constraints | |
| def calculate_column_widths(df, columns): | |
| """Calculate column widths based on content length with constraints""" | |
| widths = [] | |
| for col in columns: | |
| # Get max of header length and content lengths | |
| header_len = len(str(col)) | |
| if len(df) > 0: | |
| content_lengths = df[col].astype(str).str.len() | |
| max_content_len = content_lengths.max() | |
| avg_content_len = content_lengths.mean() | |
| # Use average of max and mean, bounded by min/max | |
| optimal_len = (max_content_len + avg_content_len) / 2 | |
| final_len = max(header_len, min(optimal_len, 50)) # Cap at 50 chars | |
| else: | |
| final_len = header_len | |
| # Scale to pixel width (roughly 8 pixels per character) | |
| # Minimum width of 100px, maximum of 400px | |
| width = max(100, min(int(final_len * 8), 400)) | |
| widths.append(width) | |
| return widths | |
| column_widths = calculate_column_widths(df_display, display_columns) | |
| # Create styling for smaller font | |
| def create_styling(df): | |
| """Create styling array for smaller font""" | |
| num_rows = len(df) | |
| num_cols = len(df.columns) | |
| # Apply small font style to all cells | |
| styling = [["font-size: 10px; white-space: normal; word-wrap: break-word;" for _ in range(num_cols)] for _ in range(num_rows)] | |
| return styling | |
| table_styling = create_styling(df_display) | |
| # 3. Utility Functions | |
| # --------------------------------------------------------- | |
| def remove_outliers(series): | |
| """Remove outliers using IQR method, replacing them with NaN""" | |
| Q1 = series.quantile(0.01) | |
| Q3 = series.quantile(0.99) | |
| IQR = Q3 - Q1 | |
| lower_bound = Q1 - 2 * IQR | |
| upper_bound = Q3 + 2 * IQR | |
| return series.where((series >= lower_bound) & (series <= upper_bound), np.nan) | |
| def should_use_log_scale(values): | |
| """ | |
| Heuristic to decide if logarithmic scale should be used. | |
| Returns True if: | |
| 1. All values are positive | |
| 2. The ratio of max to min is greater than 100 | |
| 3. The distribution is highly skewed (skewness > 2) | |
| """ | |
| # Remove NaN values for analysis | |
| clean_values = values.dropna() | |
| if len(clean_values) == 0: | |
| return False | |
| # Check if all values are positive | |
| if (clean_values <= 0).any(): | |
| return False | |
| # Check ratio of max to min | |
| max_val = clean_values.max() | |
| min_val = clean_values.min() | |
| if min_val == 0: | |
| return False | |
| ratio = max_val / min_val | |
| # Check skewness | |
| skewness = clean_values.skew() | |
| # Use log scale if ratio is high and distribution is skewed | |
| return ratio > 100 and skewness > 2 | |
| def format_value(value): | |
| """Format value for tooltip: integers without decimals, floats with max 3 decimals""" | |
| if pd.isna(value): | |
| return "N/A" | |
| if isinstance(value, (int, np.integer)) or value == int(value): | |
| return f"{int(value)}" | |
| else: | |
| return f"{value:.3f}".rstrip('0').rstrip('.') | |
| # 4. Define App Logic | |
| # --------------------------------------------------------- | |
| def create_ranking_map(): | |
| """Create the default map showing overall rankings""" | |
| # Filter for the overall ranking indicator | |
| df_ranking = df[df['indicator_code'] == 'OVERALL_RANKING'].copy() | |
| if df_ranking.empty: | |
| return px.choropleth(title="Overall Ranking not found") | |
| # Ensure one row per region | |
| df_ranking = df_ranking.drop_duplicates(subset=['ISO'], keep='first') | |
| # Use rel_indicator_value (which is the same as indicator_value for ranking) | |
| df_ranking['color_value'] = df_ranking['rel_indicator_value'] | |
| # Merge with Geometry | |
| merged = gdf.merge(df_ranking, on='ISO', how='inner') | |
| if merged.empty: | |
| return px.choropleth(title="No ranking data available") | |
| # Color scale: blue (lower/better) to red (higher/worse) | |
| fig = px.choropleth_map( | |
| merged, | |
| geojson=merged.geometry, | |
| locations=merged.index, | |
| color='color_value', | |
| color_continuous_scale="RdYlBu_r", # Blue (low/good) to Red (high/bad) | |
| map_style="satellite-streets", | |
| zoom=2, | |
| center={"lat": 60, "lon": 90}, | |
| opacity=0.6, | |
| labels={'color_value': 'Overall Ranking'} | |
| ) | |
| # Format values for tooltip | |
| merged['formatted_value'] = merged['rel_indicator_value'].apply(format_value) | |
| # Tooltip Configuration | |
| fig.update_traces( | |
| customdata=merged[['Region', 'formatted_value']], | |
| hovertemplate=( | |
| "<b>Region:</b> %{customdata[0]}<br>" | |
| "<b>Overall Ranking:</b> %{customdata[1]}" | |
| "<extra></extra>" | |
| ) | |
| ) | |
| fig.update_layout( | |
| margin={"r":0,"t":0,"l":0,"b":0}, | |
| height=800 | |
| ) | |
| return fig | |
| def update_map(select_data: gr.SelectData, current_table_data): | |
| """ | |
| Triggered when a cell in the table is clicked. | |
| select_data.index is a tuple (row, col) or int depending on version. | |
| """ | |
| if select_data is None: | |
| return create_ranking_map() | |
| # Handle index format (it often comes as [row, col] or just row index) | |
| # We safely extract the row index | |
| if isinstance(select_data.index, (list, tuple)): | |
| row_index = select_data.index[0] | |
| else: | |
| row_index = select_data.index | |
| # Get the row data from the visible table | |
| # In newer Gradio, current_table_data is a DataFrame | |
| if isinstance(current_table_data, pd.DataFrame): | |
| selected_row = current_table_data.iloc[row_index] | |
| else: | |
| return create_ranking_map() | |
| # Find the corresponding unique indicator code | |
| match = df_unique[ | |
| (df_unique['indicator_name'] == selected_row['indicator_name']) & | |
| (df_unique['section'] == selected_row['section']) | |
| ] | |
| if match.empty: | |
| return px.choropleth(title="Indicator not found") | |
| selected_code = match.iloc[0][hidden_link_key] | |
| selected_unit = match.iloc[0]['indicator_unit'] | |
| selected_positive = match.iloc[0]['positive'] | |
| # Filter main data for this indicator | |
| df_filtered = df[df['indicator_code'] == selected_code].copy() | |
| # Ensure one row per region (remove duplicates, keep first occurrence) | |
| df_filtered = df_filtered.drop_duplicates(subset=['ISO'], keep='first') | |
| # Remove outliers (replace with NaN) - use rel_indicator_value | |
| df_filtered['value_clean'] = remove_outliers(df_filtered['rel_indicator_value']) | |
| # Calculate Rankings based on positive column | |
| df_filtered_for_ranking = df_filtered.dropna(subset=['value_clean']).copy() | |
| # If P: higher is better (ascending=False), if N or other: lower is better (ascending=True) | |
| ascending = (selected_positive != 'P') | |
| df_filtered_for_ranking = df_filtered_for_ranking.sort_values('value_clean', ascending=ascending).reset_index(drop=True) | |
| df_filtered_for_ranking['Ranking'] = range(1, len(df_filtered_for_ranking) + 1) | |
| # Merge rankings back | |
| df_filtered = df_filtered.merge( | |
| df_filtered_for_ranking[['ISO', 'Ranking']], | |
| on='ISO', | |
| how='left' | |
| ) | |
| # Decide if we should use log scale | |
| use_log = should_use_log_scale(df_filtered['value_clean']) | |
| # Create color scale values (log if needed) | |
| if use_log: | |
| df_filtered['color_value'] = np.log10(df_filtered['value_clean']) | |
| color_label = f"{selected_unit} (log scale)" | |
| else: | |
| df_filtered['color_value'] = df_filtered['value_clean'] | |
| color_label = selected_unit | |
| # Merge with Geometry | |
| merged = gdf.merge(df_filtered, on='ISO', how='inner') | |
| if merged.empty: | |
| return px.choropleth(title="No data for this indicator") | |
| # Determine color scale based on positive column | |
| # If P: red (low/bad) to blue (high/good) | |
| # If N or other: blue (low/good) to red (high/bad) | |
| if selected_positive == 'P': | |
| color_scale = "RdYlBu" # Blue (low/bad) to Red (high/good) | |
| else: | |
| color_scale = "RdYlBu_r" # Red (low/good) to Blue (high/bad) reversed | |
| # Construct Map with appropriate color scale | |
| fig = px.choropleth_map( | |
| merged, | |
| geojson=merged.geometry, | |
| locations=merged.index, | |
| color='color_value', | |
| color_continuous_scale=color_scale, # Red (low/good) to Blue (high/bad) | |
| map_style="satellite-streets", | |
| zoom=2, | |
| center={"lat": 60, "lon": 90}, | |
| opacity=0.6, | |
| labels={'color_value': 'Overall Ranking'} | |
| ) | |
| # Format values for tooltip | |
| merged['formatted_value'] = merged['rel_indicator_value'].apply(format_value) | |
| merged['formatted_ranking'] = merged['Ranking'].apply(lambda x: str(int(x)) if pd.notna(x) else "N/A") | |
| # Tooltip Configuration | |
| fig.update_traces( | |
| customdata=merged[['formatted_ranking', 'Region', 'indicator_name', 'formatted_value']], | |
| hovertemplate=( | |
| "<b>Rank:</b> %{customdata[0]}<br>" | |
| "<b>Region:</b> %{customdata[1]}<br>" | |
| "<b>Indicator Name:</b> %{customdata[2]}<br>" | |
| "<b>Relative Value:</b> %{customdata[3]}" | |
| "<extra></extra>" | |
| ) | |
| ) | |
| fig.update_layout( | |
| margin={"r":0,"t":0,"l":0,"b":0}, | |
| height=800 | |
| ) | |
| return fig | |
| # 5. Build Gradio UI | |
| # --------------------------------------------------------- | |
| with gr.Blocks(title="Russian Regions Analytics") as demo: | |
| gr.Markdown("## Russian Regional Indicators") | |
| with gr.Row(): | |
| # Initialize with ranking map | |
| map_plot = gr.Plot(label="Regional Distribution", value=create_ranking_map()) | |
| with gr.Row(): | |
| # Prepare table value with styling metadata | |
| table_value = { | |
| "data": df_display.values.tolist(), | |
| "headers": display_columns, | |
| "metadata": { | |
| "styling": table_styling | |
| } | |
| } | |
| table = gr.DataFrame( | |
| value=table_value, | |
| label="Select an Indicator", | |
| datatype=["str", "str", "str", "str", "number", "number", "number", "str"], | |
| interactive=True, | |
| max_height=700, | |
| column_widths=column_widths | |
| ) | |
| # Wire the selection event | |
| table.select( | |
| fn=update_map, | |
| inputs=[table], | |
| outputs=[map_plot] | |
| ) | |
| # Footer with data sources | |
| gr.Markdown(""" | |
| --- | |
| ### Data Sources & Attribution | |
| **Statistical Data:** [Tochno.st Regional Datasets](https://tochno.st/datasets/regions_collection) | |
| **Administrative Boundaries:** [geoBoundaries](https://www.geoboundaries.org/countryDownloads.html) | |
| **Regional Codes:** [Codes of Subjects of the Russian Federation](https://en.everybodywiki.com/Codes_of_subjects_of_the_Russian_Federation) | |
| **Translation Model:** [Tencent HY-MT1.5-7B](https://huggingface.co/tencent/HY-MT1.5-7B) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |