"""tab_analytics.py — Analytics Dashboard with detailed analysis.""" import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go import tempfile import os from data_loader import DataStore from db import (CHECKLIST_ITEMS, DIMENSIONS, get_all_annotations_df, get_stats, export_csv, _int_to_radio) def _empty_fig(msg="No annotation data yet"): fig = go.Figure() fig.add_annotation(text=msg, xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font_size=18) fig.update_layout(height=350, xaxis_visible=False, yaxis_visible=False) return fig def build_analytics_tab(store: DataStore): """Build the Analytics Dashboard tab.""" gr.Markdown("## Analytics Dashboard") refresh_btn = gr.Button("🔄 Refresh Analytics", variant="primary") # --- Summary Stats --- summary_md = gr.Markdown("*Click Refresh to load analytics*") # --- Row 1: Score distribution + Per-item check rates --- with gr.Row(): score_hist = gr.Plot(label="System-2 Score Distribution") item_rates_plot = gr.Plot(label="Per-Item Check Rate") # --- Row 2: Per-conference stats --- with gr.Row(): conf_count_plot = gr.Plot(label="Annotations by Conference") conf_score_plot = gr.Plot(label="Avg System-2 Score by Conference") # --- Row 3: Score change correlation + Per-dimension --- with gr.Row(): correlation_plot = gr.Plot(label="Review Score Change vs System-2 Score") dimension_plot = gr.Plot(label="Per-Dimension Check Rate") # --- Row 4: Annotation table + Export --- gr.Markdown("### All Annotations") ann_table = gr.Dataframe( label="Annotation Records", interactive=False, wrap=True, ) with gr.Row(): export_btn = gr.Button("📥 Export CSV", scale=1) export_file = gr.File(label="Download", visible=False, scale=2) # ========== Callbacks ========== def refresh_all(): df = get_all_annotations_df() stats = get_stats() # --- Summary --- if stats["total"] == 0: summary = ("No annotations yet. Go to the **Annotation** tab to start.\n\n" f"Dataset: {len(store.reviews_all):,} papers available for annotation.") empty = _empty_fig() return (summary, empty, empty, empty, empty, empty, empty, pd.DataFrame()) summary_lines = [ "| Metric | Value |", "|--------|-------|", f"| Total annotations | **{stats['total']}** |", f"| Unique papers annotated | **{stats['unique_papers']}** |", f"| Average System-2 Score | **{stats['avg_score']:.2f}** / 8 |", f"| Score range | {stats['min_score']} – {stats['max_score']} |", f"| Dataset coverage | {stats['unique_papers']}/{len(store.reviews_all):,} " f"({stats['unique_papers']/max(len(store.reviews_all),1)*100:.1f}%) |", ] summary = "\n".join(summary_lines) # --- Score Distribution Histogram --- score_data = [] for s in range(9): score_data.append({"score": s, "count": stats["score_dist"].get(s, 0)}) score_df = pd.DataFrame(score_data) fig_hist = px.bar( score_df, x="score", y="count", title="System-2 Score Distribution", labels={"score": "Score (0-8)", "count": "Count"}, color="count", color_continuous_scale="Blues", ) fig_hist.update_layout(height=380, xaxis=dict(dtick=1)) fig_hist.update_coloraxes(showscale=False) # --- Per-Item Check Rates --- item_data = [] total = max(stats["total"], 1) for item_id, text in CHECKLIST_ITEMS.items(): rate = stats.get(f"rate_{item_id}", 0) / total * 100 dim = item_id[0] item_data.append({ "item": item_id, "label": f"{item_id}: {text[:15]}...", "rate": round(rate, 1), "dimension": DIMENSIONS[dim], }) item_df = pd.DataFrame(item_data) fig_items = px.bar( item_df, x="item", y="rate", color="dimension", title="Per-Checklist-Item Check Rate (%)", labels={"item": "Item", "rate": "Check Rate (%)"}, hover_data=["label"], ) fig_items.update_layout(height=380, yaxis=dict(range=[0, 100])) # --- Per-Conference Count --- if stats["per_conference"]: conf_df = pd.DataFrame(stats["per_conference"]) # Parse conference name from full string conf_df["conf_short"] = conf_df["conference"].apply( lambda x: " ".join(str(x).split()[:2]) if pd.notna(x) else "Unknown" ) # Top 20 by count conf_df = conf_df.nlargest(20, "count") fig_conf_count = px.bar( conf_df, x="conf_short", y="count", title="Annotations by Conference (Top 20)", labels={"conf_short": "Conference", "count": "Annotations"}, color="count", color_continuous_scale="Viridis", ) fig_conf_count.update_layout(height=380, xaxis_tickangle=-45) fig_conf_count.update_coloraxes(showscale=False) fig_conf_score = px.bar( conf_df, x="conf_short", y="avg_score", title="Avg System-2 Score by Conference", labels={"conf_short": "Conference", "avg_score": "Avg Score"}, color="avg_score", color_continuous_scale="RdYlGn", range_color=[0, 8], ) fig_conf_score.update_layout(height=380, xaxis_tickangle=-45, yaxis=dict(range=[0, 8])) fig_conf_score.update_coloraxes(showscale=False) else: fig_conf_count = _empty_fig("No conference data") fig_conf_score = _empty_fig("No conference data") # --- Score Change Correlation --- fig_corr = _build_correlation_plot(df, store) # --- Per-Dimension Check Rate --- dim_data = [] for dim_key, dim_label in DIMENSIONS.items(): k1, k2 = f"{dim_key}1", f"{dim_key}2" r1 = stats.get(f"rate_{k1}", 0) / total * 100 r2 = stats.get(f"rate_{k2}", 0) / total * 100 avg_rate = (r1 + r2) / 2 dim_data.append({"dimension": dim_label, "avg_rate": round(avg_rate, 1)}) dim_df = pd.DataFrame(dim_data) fig_dim = px.bar( dim_df, x="dimension", y="avg_rate", title="Average Check Rate by Dimension (%)", labels={"dimension": "Dimension", "avg_rate": "Avg Check Rate (%)"}, color="avg_rate", color_continuous_scale="Sunset", range_color=[0, 100], ) fig_dim.update_layout(height=380, yaxis=dict(range=[0, 100])) fig_dim.update_coloraxes(showscale=False) # --- Annotation Table --- display_cols = ["paper_id", "reviewer_id", "conference", "A1", "A2", "B1", "B2", "C1", "C2", "D1", "D2", "score", "notes", "updated_at"] table_df = df[display_cols] if not df.empty else pd.DataFrame() # Convert integer codes to readable labels in table if not table_df.empty: for col in ["A1", "A2", "B1", "B2", "C1", "C2", "D1", "D2"]: table_df[col] = table_df[col].apply(_int_to_radio) return (summary, fig_hist, fig_items, fig_conf_count, fig_conf_score, fig_corr, fig_dim, table_df) def _build_correlation_plot(df, store): """Scatter plot: review score change vs System-2 annotation score.""" if df.empty: return _empty_fig("No data for correlation") points = [] for _, row in df.iterrows(): pid = row["paper_id"] rid = row["reviewer_id"] paper = store.review_by_paper_id.get(pid) if not paper: continue review_obj = None for r in paper["reviews"]: if r["reviewer_id"] == rid: review_obj = r break if not review_obj: continue try: init_r = int(str(review_obj.get("initial_score_unified", {}) .get("rating", "")).split()[0]) final_r = int(str(review_obj.get("final_score_unified", {}) .get("rating", "")).split()[0]) change = final_r - init_r except (ValueError, IndexError, AttributeError): continue points.append({ "score_change": change, "system2_score": row["score"], "paper_id": pid, "reviewer_id": rid, }) if not points: return _empty_fig("No matching review data") pts_df = pd.DataFrame(points) fig = px.scatter( pts_df, x="system2_score", y="score_change", title="Review Score Change vs System-2 Score", labels={"system2_score": "System-2 Score (0-8)", "score_change": "Review Score Change"}, hover_data=["paper_id", "reviewer_id"], opacity=0.6, ) # Add trend line if len(pts_df) > 2: fig.update_traces(marker=dict(size=8)) fig = px.scatter( pts_df, x="system2_score", y="score_change", title="Review Score Change vs System-2 Score", labels={"system2_score": "System-2 Score (0-8)", "score_change": "Review Score Change"}, hover_data=["paper_id", "reviewer_id"], opacity=0.6, trendline="ols", ) fig.update_layout(height=380) return fig def do_export(): csv_str = export_csv() if not csv_str: return gr.update(visible=False) tmp = tempfile.NamedTemporaryFile( mode="w", suffix=".csv", prefix="annotations_", delete=False, dir=tempfile.gettempdir(), ) tmp.write(csv_str) tmp.close() return gr.update(value=tmp.name, visible=True) # ========== Wire Events ========== refresh_btn.click( fn=refresh_all, outputs=[summary_md, score_hist, item_rates_plot, conf_count_plot, conf_score_plot, correlation_plot, dimension_plot, ann_table], ) export_btn.click(fn=do_export, outputs=[export_file])