import gradio as gr import pandas as pd import geopandas as gpd import plotly.express as px import numpy as np # 1. Load Data # --------------------------------------------------------- try: df = pd.read_parquet('Russia_regions_data.parquet') gdf = gpd.read_parquet('Russia_regions_geo_simplified.geoparquet') except Exception as e: print(f"Error loading files: {e}") # Fallback dummy data df = pd.DataFrame(columns=['section', 'subsection', 'indicator_name', 'comment', 'year', 'indicator_value', 'rel_indicator_value', 'indicator_unit', 'positive', 'indicator_code', 'Region', 'object_oktmo', 'ISO']) gdf = gpd.GeoDataFrame(columns=['ISO', 'geometry']) # Ensure geometries are in standard lat/lon gdf = gdf.to_crs(epsg=4326) # 2. Prepare the Table Data (The "Menu") # --------------------------------------------------------- # Filter for unique indicator_code df_unique = df.drop_duplicates(subset=['indicator_code']).copy() # Define columns (added indicator_value and rel_indicator_value, hidden positive) display_columns = [ 'section', 'subsection', 'indicator_name', 'comment', 'year', 'indicator_value', 'rel_indicator_value', 'indicator_unit' ] hidden_link_key = 'indicator_code' # Create the display dataframe df_display = df_unique[display_columns].reset_index(drop=True) # Calculate optimal column widths based on content with min/max constraints def calculate_column_widths(df, columns): """Calculate column widths based on content length with constraints""" widths = [] for col in columns: # Get max of header length and content lengths header_len = len(str(col)) if len(df) > 0: content_lengths = df[col].astype(str).str.len() max_content_len = content_lengths.max() avg_content_len = content_lengths.mean() # Use average of max and mean, bounded by min/max optimal_len = (max_content_len + avg_content_len) / 2 final_len = max(header_len, min(optimal_len, 50)) # Cap at 50 chars else: final_len = header_len # Scale to pixel width (roughly 8 pixels per character) # Minimum width of 100px, maximum of 400px width = max(100, min(int(final_len * 8), 400)) widths.append(width) return widths column_widths = calculate_column_widths(df_display, display_columns) # Create styling for smaller font def create_styling(df): """Create styling array for smaller font""" num_rows = len(df) num_cols = len(df.columns) # Apply small font style to all cells styling = [["font-size: 10px; white-space: normal; word-wrap: break-word;" for _ in range(num_cols)] for _ in range(num_rows)] return styling table_styling = create_styling(df_display) # 3. Utility Functions # --------------------------------------------------------- def remove_outliers(series): """Remove outliers using IQR method, replacing them with NaN""" Q1 = series.quantile(0.01) Q3 = series.quantile(0.99) IQR = Q3 - Q1 lower_bound = Q1 - 2 * IQR upper_bound = Q3 + 2 * IQR return series.where((series >= lower_bound) & (series <= upper_bound), np.nan) def should_use_log_scale(values): """ Heuristic to decide if logarithmic scale should be used. Returns True if: 1. All values are positive 2. The ratio of max to min is greater than 100 3. The distribution is highly skewed (skewness > 2) """ # Remove NaN values for analysis clean_values = values.dropna() if len(clean_values) == 0: return False # Check if all values are positive if (clean_values <= 0).any(): return False # Check ratio of max to min max_val = clean_values.max() min_val = clean_values.min() if min_val == 0: return False ratio = max_val / min_val # Check skewness skewness = clean_values.skew() # Use log scale if ratio is high and distribution is skewed return ratio > 100 and skewness > 2 def format_value(value): """Format value for tooltip: integers without decimals, floats with max 3 decimals""" if pd.isna(value): return "N/A" if isinstance(value, (int, np.integer)) or value == int(value): return f"{int(value)}" else: return f"{value:.3f}".rstrip('0').rstrip('.') # 4. Define App Logic # --------------------------------------------------------- def create_ranking_map(): """Create the default map showing overall rankings""" # Filter for the overall ranking indicator df_ranking = df[df['indicator_code'] == 'OVERALL_RANKING'].copy() if df_ranking.empty: return px.choropleth(title="Overall Ranking not found") # Ensure one row per region df_ranking = df_ranking.drop_duplicates(subset=['ISO'], keep='first') # Use rel_indicator_value (which is the same as indicator_value for ranking) df_ranking['color_value'] = df_ranking['rel_indicator_value'] # Merge with Geometry merged = gdf.merge(df_ranking, on='ISO', how='inner') if merged.empty: return px.choropleth(title="No ranking data available") # Color scale: blue (lower/better) to red (higher/worse) fig = px.choropleth_map( merged, geojson=merged.geometry, locations=merged.index, color='color_value', color_continuous_scale="RdYlBu_r", # Blue (low/good) to Red (high/bad) map_style="satellite-streets", zoom=2, center={"lat": 60, "lon": 90}, opacity=0.6, labels={'color_value': 'Overall Ranking'} ) # Format values for tooltip merged['formatted_value'] = merged['rel_indicator_value'].apply(format_value) # Tooltip Configuration fig.update_traces( customdata=merged[['Region', 'formatted_value']], hovertemplate=( "Region: %{customdata[0]}
" "Overall Ranking: %{customdata[1]}" "" ) ) fig.update_layout( margin={"r":0,"t":0,"l":0,"b":0}, height=800 ) return fig def update_map(select_data: gr.SelectData, current_table_data): """ Triggered when a cell in the table is clicked. select_data.index is a tuple (row, col) or int depending on version. """ if select_data is None: return create_ranking_map() # Handle index format (it often comes as [row, col] or just row index) # We safely extract the row index if isinstance(select_data.index, (list, tuple)): row_index = select_data.index[0] else: row_index = select_data.index # Get the row data from the visible table # In newer Gradio, current_table_data is a DataFrame if isinstance(current_table_data, pd.DataFrame): selected_row = current_table_data.iloc[row_index] else: return create_ranking_map() # Find the corresponding unique indicator code match = df_unique[ (df_unique['indicator_name'] == selected_row['indicator_name']) & (df_unique['section'] == selected_row['section']) ] if match.empty: return px.choropleth(title="Indicator not found") selected_code = match.iloc[0][hidden_link_key] selected_unit = match.iloc[0]['indicator_unit'] selected_positive = match.iloc[0]['positive'] # Filter main data for this indicator df_filtered = df[df['indicator_code'] == selected_code].copy() # Ensure one row per region (remove duplicates, keep first occurrence) df_filtered = df_filtered.drop_duplicates(subset=['ISO'], keep='first') # Remove outliers (replace with NaN) - use rel_indicator_value df_filtered['value_clean'] = remove_outliers(df_filtered['rel_indicator_value']) # Calculate Rankings based on positive column df_filtered_for_ranking = df_filtered.dropna(subset=['value_clean']).copy() # If P: higher is better (ascending=False), if N or other: lower is better (ascending=True) ascending = (selected_positive != 'P') df_filtered_for_ranking = df_filtered_for_ranking.sort_values('value_clean', ascending=ascending).reset_index(drop=True) df_filtered_for_ranking['Ranking'] = range(1, len(df_filtered_for_ranking) + 1) # Merge rankings back df_filtered = df_filtered.merge( df_filtered_for_ranking[['ISO', 'Ranking']], on='ISO', how='left' ) # Decide if we should use log scale use_log = should_use_log_scale(df_filtered['value_clean']) # Create color scale values (log if needed) if use_log: df_filtered['color_value'] = np.log10(df_filtered['value_clean']) color_label = f"{selected_unit} (log scale)" else: df_filtered['color_value'] = df_filtered['value_clean'] color_label = selected_unit # Merge with Geometry merged = gdf.merge(df_filtered, on='ISO', how='inner') if merged.empty: return px.choropleth(title="No data for this indicator") # Determine color scale based on positive column # If P: red (low/bad) to blue (high/good) # If N or other: blue (low/good) to red (high/bad) if selected_positive == 'P': color_scale = "RdYlBu" # Blue (low/bad) to Red (high/good) else: color_scale = "RdYlBu_r" # Red (low/good) to Blue (high/bad) reversed # Construct Map with appropriate color scale fig = px.choropleth_map( merged, geojson=merged.geometry, locations=merged.index, color='color_value', color_continuous_scale=color_scale, # Red (low/good) to Blue (high/bad) map_style="satellite-streets", zoom=2, center={"lat": 60, "lon": 90}, opacity=0.6, labels={'color_value': 'Overall Ranking'} ) # Format values for tooltip merged['formatted_value'] = merged['rel_indicator_value'].apply(format_value) merged['formatted_ranking'] = merged['Ranking'].apply(lambda x: str(int(x)) if pd.notna(x) else "N/A") # Tooltip Configuration fig.update_traces( customdata=merged[['formatted_ranking', 'Region', 'indicator_name', 'formatted_value']], hovertemplate=( "Rank: %{customdata[0]}
" "Region: %{customdata[1]}
" "Indicator Name: %{customdata[2]}
" "Relative Value: %{customdata[3]}" "" ) ) fig.update_layout( margin={"r":0,"t":0,"l":0,"b":0}, height=800 ) return fig # 5. Build Gradio UI # --------------------------------------------------------- with gr.Blocks(title="Russian Regions Analytics") as demo: gr.Markdown("## Russian Regional Indicators") with gr.Row(): # Initialize with ranking map map_plot = gr.Plot(label="Regional Distribution", value=create_ranking_map()) with gr.Row(): # Prepare table value with styling metadata table_value = { "data": df_display.values.tolist(), "headers": display_columns, "metadata": { "styling": table_styling } } table = gr.DataFrame( value=table_value, label="Select an Indicator", datatype=["str", "str", "str", "str", "number", "number", "number", "str"], interactive=True, max_height=700, column_widths=column_widths ) # Wire the selection event table.select( fn=update_map, inputs=[table], outputs=[map_plot] ) # Footer with data sources gr.Markdown(""" --- ### Data Sources & Attribution **Statistical Data:** [Tochno.st Regional Datasets](https://tochno.st/datasets/regions_collection) **Administrative Boundaries:** [geoBoundaries](https://www.geoboundaries.org/countryDownloads.html) **Regional Codes:** [Codes of Subjects of the Russian Federation](https://en.everybodywiki.com/Codes_of_subjects_of_the_Russian_Federation) **Translation Model:** [Tencent HY-MT1.5-7B](https://huggingface.co/tencent/HY-MT1.5-7B) """) if __name__ == "__main__": demo.launch()