#!/usr/bin/env python3 """ Pre-compute all stats for the HF Space dashboard. Reads from the local dataset and tracker, outputs a single dashboard_data.json. Run from the project root (MIIA/stuff): source .stuff/bin/activate python Execcomp-AI-Dashboard/compute_stats.py """ import json import sys from pathlib import Path from collections import Counter from datetime import datetime import pandas as pd import numpy as np # Project root PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) import pyarrow.ipc as ipc from src.tracking import Tracker def load_arrow_dir(dir_path: Path) -> pd.DataFrame: """Load all .arrow files from a directory into a pandas DataFrame, skipping image columns.""" tables = [] for f in sorted(dir_path.glob("*.arrow")): reader = ipc.open_stream(str(f)) table = reader.read_all() # Drop image column to avoid slow decoding if "table_image" in table.column_names: table = table.drop(["table_image"]) tables.append(table) import pyarrow as pa combined = pa.concat_tables(tables) return combined.to_pandas() def main(): print("Loading dataset (arrow)...") train_dir = PROJECT_ROOT / "hf" / "execcomp-ai-postprocessed" / "train" df = load_arrow_dir(train_dir) print(f"Train split: {len(df)} records") # --- Parse all executives --- all_execs = [] records_meta = [] for _, row in df.iterrows(): execs = json.loads(row["executives"]) for ex in execs: ex["cik"] = row["cik"] ex["company"] = row["company"] ex["filing_year"] = row["year"] ex["sic"] = row["sic"] ex["state_of_inc"] = row["state_of_inc"] ex["sct_probability"] = row["sct_probability"] if ex.get("fiscal_year") is None: ex["fiscal_year"] = row["year"] all_execs.append(ex) records_meta.append({ "cik": row["cik"], "company": row["company"], "year": row["year"], "sic": row["sic"], "sct_probability": row["sct_probability"], }) exec_df = pd.DataFrame(all_execs) meta_df = pd.DataFrame(records_meta) # Filter valid years and numeric comp exec_df["fiscal_year"] = pd.to_numeric(exec_df["fiscal_year"], errors="coerce") exec_df = exec_df[(exec_df["fiscal_year"] >= 2005) & (exec_df["fiscal_year"] <= 2022)] comp_cols = ["salary", "bonus", "stock_awards", "option_awards", "non_equity_incentive", "change_in_pension", "other_compensation", "total"] for col in comp_cols: if col in exec_df.columns: exec_df[col] = pd.to_numeric(exec_df[col], errors="coerce") # --- Pipeline Stats from tracker --- print("Loading tracker...") tracker = Tracker(PROJECT_ROOT) stats = tracker.stats() total_docs = stats["total"] funds = stats["by_status"].get("fund", 0) with_sct = stats["by_status"].get("complete", 0) no_sct = stats["by_status"].get("no_sct", 0) pending = stats["by_status"].get("pending", 0) total_tables = 0 for doc_id in tracker.get_by_status("complete"): doc_info = tracker.get_document(doc_id) if doc_info: total_tables += len(doc_info.get("sct_tables", [])) pipeline_stats = { "total_docs": total_docs, "funds": funds, "with_sct": with_sct, "no_sct": no_sct, "pending": pending, "total_tables": total_tables, "multi_table_docs": total_tables - with_sct, "non_funds": total_docs - funds, } # --- Compensation stats --- # Winsorize total and salary at p99 for robust mean total_cap = exec_df["total"].dropna().quantile(0.99) salary_cap = exec_df["salary"].dropna().quantile(0.99) comp_stats = { "total_exec_records": len(exec_df), "unique_companies": int(meta_df["cik"].nunique()), "year_range": [int(meta_df["year"].min()), int(meta_df["year"].max())], "mean_total": float(exec_df["total"].clip(upper=total_cap).mean()), "median_total": float(exec_df["total"].median()), "max_total": float(exec_df["total"].max()), "mean_salary": float(exec_df["salary"].clip(upper=salary_cap).mean()), "median_salary": float(exec_df["salary"].median()), } # Breakdown per component (winsorized means) comp_breakdown = {} for col in comp_cols: if col in exec_df.columns: vals = exec_df[col].dropna() cap = vals.quantile(0.99) comp_breakdown[col] = { "mean": float(vals.clip(upper=cap).mean()), "median": float(vals.median()), "max": float(vals.max()), } comp_stats["breakdown"] = comp_breakdown # --- Top 50 highest paid (deduplicated, validated) --- comp_detail_cols = ["salary", "bonus", "stock_awards", "option_awards", "non_equity_incentive", "change_in_pension", "other_compensation"] top_pool = exec_df[ ["name", "company", "fiscal_year", "total", "title"] + comp_detail_cols ].copy() # Sanity check: drop records where components don't sum to ~total (parsing errors) top_pool["comp_sum"] = top_pool[comp_detail_cols].fillna(0).sum(axis=1) top_pool["diff_pct"] = ((top_pool["total"] - top_pool["comp_sum"]).abs() / top_pool["total"].clip(lower=1) * 100) top_pool = top_pool[top_pool["diff_pct"] <= 20] # allow up to 20% rounding tolerance top_pool = top_pool.drop(columns=["comp_sum", "diff_pct"]) top_candidates = top_pool.nlargest(200, "total") top_candidates["total_rounded"] = (top_candidates["total"] / 1e6).round(1) top_candidates = top_candidates.drop_duplicates( subset=["name", "company", "fiscal_year", "total_rounded"] ).head(50) top50 = [] for _, row in top_candidates.iterrows(): top50.append({ "name": row["name"], "company": row["company"], "title": row.get("title", ""), "fiscal_year": int(row["fiscal_year"]), "total": float(row["total"]), "salary": float(row.get("salary", 0) or 0), "bonus": float(row.get("bonus", 0) or 0), "stock_awards": float(row.get("stock_awards", 0) or 0), "option_awards": float(row.get("option_awards", 0) or 0), "non_equity_incentive": float(row.get("non_equity_incentive", 0) or 0), "change_in_pension": float(row.get("change_in_pension", 0) or 0), "other_compensation": float(row.get("other_compensation", 0) or 0), }) # --- Tables by year --- tables_by_year = meta_df["year"].value_counts().sort_index() tables_by_year_dict = {int(k): int(v) for k, v in tables_by_year.items()} # --- Compensation trends by fiscal year (winsorized at p99) --- total_cap = exec_df["total"].dropna().quantile(0.99) exec_df_trend = exec_df.copy() exec_df_trend["total_clipped"] = exec_df_trend["total"].clip(upper=total_cap) yearly = exec_df_trend.groupby("fiscal_year").agg( mean=("total_clipped", "mean"), median=("total", "median"), count=("total", "count"), ).reset_index() trends = [] for _, r in yearly.iterrows(): trends.append({ "year": int(r["fiscal_year"]), "mean": float(r["mean"]), "median": float(r["median"]), "count": int(r["count"]), }) # --- Compensation distribution (histogram data) --- total_comp = exec_df["total"].dropna() total_comp = total_comp[total_comp > 0] p99 = total_comp.quantile(0.99) total_comp_clipped = total_comp[total_comp <= p99] hist_values, hist_edges = np.histogram(total_comp_clipped / 1e6, bins=50) distribution = { "values": hist_values.tolist(), "edges": hist_edges.tolist(), "median": float(total_comp.median()), "p99": float(p99), "n_outliers": int(len(total_comp) - len(total_comp_clipped)), } # --- Compensation components average (winsorized at p99 to remove parsing errors) --- comp_components = {} for col in ["salary", "bonus", "stock_awards", "option_awards", "non_equity_incentive", "change_in_pension", "other_compensation"]: if col in exec_df.columns: vals = exec_df[col].dropna() cap = vals.quantile(0.99) comp_components[col.replace("_", " ").title()] = float(vals.clip(upper=cap).mean()) # --- SCT Probability stats --- probs = [r["sct_probability"] for r in records_meta] high_conf = sum(1 for p in probs if p >= 0.7) medium_conf = sum(1 for p in probs if 0.3 <= p < 0.7) low_conf = sum(1 for p in probs if p < 0.3) keys = [(r["cik"], r["year"]) for r in records_meta] counts = Counter(keys) unique_docs = len(counts) multi_table_docs = sum(1 for c in counts.values() if c > 1) could_disambiguate = 0 for (cik, year), count in counts.items(): if count > 1: doc_records = [r for r in records_meta if r["cik"] == cik and r["year"] == year] high_prob = [r for r in doc_records if r["sct_probability"] >= 0.7] if len(high_prob) == 1: could_disambiguate += 1 prob_hist_values, prob_hist_edges = np.histogram(probs, bins=20) probability_stats = { "total_tables": len(probs), "unique_docs": unique_docs, "high_confidence": high_conf, "medium_confidence": medium_conf, "low_confidence": low_conf, "multi_table_docs": multi_table_docs, "could_disambiguate": could_disambiguate, "hist_values": prob_hist_values.tolist(), "hist_edges": prob_hist_edges.tolist(), } # --- Salary trends per component (winsorized at p99 to remove parsing errors) --- comp_trends = {} for col in ["salary", "stock_awards", "option_awards", "bonus", "non_equity_incentive", "other_compensation"]: if col in exec_df.columns: # Clip at 99th percentile of the ENTIRE column to remove outliers cap = exec_df[col].dropna().quantile(0.99) clipped = exec_df[["fiscal_year", col]].copy() clipped[col] = clipped[col].clip(upper=cap) yearly_comp = clipped.groupby("fiscal_year")[col].mean().reset_index() comp_trends[col.replace("_", " ").title()] = { "years": yearly_comp["fiscal_year"].astype(int).tolist(), "values": yearly_comp[col].tolist(), } # --- Assemble final data --- dashboard_data = { "generated_at": datetime.now().isoformat(), "pipeline": pipeline_stats, "compensation": comp_stats, "top50": top50, "tables_by_year": tables_by_year_dict, "trends": trends, "distribution": distribution, "comp_components": comp_components, "probability": probability_stats, "comp_trends": comp_trends, } out_path = Path(__file__).parent / "dashboard_data.json" with open(out_path, "w") as f: json.dump(dashboard_data, f, indent=2) print(f"\nDashboard data written to {out_path}") print(f" Pipeline: {pipeline_stats['total_docs']:,} docs, {pipeline_stats['with_sct']:,} with SCT") print(f" Executives: {len(exec_df):,} records") print(f" Tables: {len(probs):,} total, {high_conf:,} high confidence") if __name__ == "__main__": main()