Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Pre-compute all stats for the HF Space dashboard. | |
| Reads from the local dataset and tracker, outputs a single dashboard_data.json. | |
| Run from the project root (MIIA/stuff): | |
| source .stuff/bin/activate | |
| python Execcomp-AI-Dashboard/compute_stats.py | |
| """ | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from collections import Counter | |
| from datetime import datetime | |
| import pandas as pd | |
| import numpy as np | |
| # Project root | |
| PROJECT_ROOT = Path(__file__).parent.parent | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| import pyarrow.ipc as ipc | |
| from src.tracking import Tracker | |
| def load_arrow_dir(dir_path: Path) -> pd.DataFrame: | |
| """Load all .arrow files from a directory into a pandas DataFrame, skipping image columns.""" | |
| tables = [] | |
| for f in sorted(dir_path.glob("*.arrow")): | |
| reader = ipc.open_stream(str(f)) | |
| table = reader.read_all() | |
| # Drop image column to avoid slow decoding | |
| if "table_image" in table.column_names: | |
| table = table.drop(["table_image"]) | |
| tables.append(table) | |
| import pyarrow as pa | |
| combined = pa.concat_tables(tables) | |
| return combined.to_pandas() | |
| def main(): | |
| print("Loading dataset (arrow)...") | |
| train_dir = PROJECT_ROOT / "hf" / "execcomp-ai-postprocessed" / "train" | |
| df = load_arrow_dir(train_dir) | |
| print(f"Train split: {len(df)} records") | |
| # --- Parse all executives --- | |
| all_execs = [] | |
| records_meta = [] | |
| for _, row in df.iterrows(): | |
| execs = json.loads(row["executives"]) | |
| for ex in execs: | |
| ex["cik"] = row["cik"] | |
| ex["company"] = row["company"] | |
| ex["filing_year"] = row["year"] | |
| ex["sic"] = row["sic"] | |
| ex["state_of_inc"] = row["state_of_inc"] | |
| ex["sct_probability"] = row["sct_probability"] | |
| if ex.get("fiscal_year") is None: | |
| ex["fiscal_year"] = row["year"] | |
| all_execs.append(ex) | |
| records_meta.append({ | |
| "cik": row["cik"], | |
| "company": row["company"], | |
| "year": row["year"], | |
| "sic": row["sic"], | |
| "sct_probability": row["sct_probability"], | |
| }) | |
| exec_df = pd.DataFrame(all_execs) | |
| meta_df = pd.DataFrame(records_meta) | |
| # Filter valid years and numeric comp | |
| exec_df["fiscal_year"] = pd.to_numeric(exec_df["fiscal_year"], errors="coerce") | |
| exec_df = exec_df[(exec_df["fiscal_year"] >= 2005) & (exec_df["fiscal_year"] <= 2022)] | |
| comp_cols = ["salary", "bonus", "stock_awards", "option_awards", | |
| "non_equity_incentive", "change_in_pension", "other_compensation", "total"] | |
| for col in comp_cols: | |
| if col in exec_df.columns: | |
| exec_df[col] = pd.to_numeric(exec_df[col], errors="coerce") | |
| # --- Pipeline Stats from tracker --- | |
| print("Loading tracker...") | |
| tracker = Tracker(PROJECT_ROOT) | |
| stats = tracker.stats() | |
| total_docs = stats["total"] | |
| funds = stats["by_status"].get("fund", 0) | |
| with_sct = stats["by_status"].get("complete", 0) | |
| no_sct = stats["by_status"].get("no_sct", 0) | |
| pending = stats["by_status"].get("pending", 0) | |
| total_tables = 0 | |
| for doc_id in tracker.get_by_status("complete"): | |
| doc_info = tracker.get_document(doc_id) | |
| if doc_info: | |
| total_tables += len(doc_info.get("sct_tables", [])) | |
| pipeline_stats = { | |
| "total_docs": total_docs, | |
| "funds": funds, | |
| "with_sct": with_sct, | |
| "no_sct": no_sct, | |
| "pending": pending, | |
| "total_tables": total_tables, | |
| "multi_table_docs": total_tables - with_sct, | |
| "non_funds": total_docs - funds, | |
| } | |
| # --- Compensation stats --- | |
| # Winsorize total and salary at p99 for robust mean | |
| total_cap = exec_df["total"].dropna().quantile(0.99) | |
| salary_cap = exec_df["salary"].dropna().quantile(0.99) | |
| comp_stats = { | |
| "total_exec_records": len(exec_df), | |
| "unique_companies": int(meta_df["cik"].nunique()), | |
| "year_range": [int(meta_df["year"].min()), int(meta_df["year"].max())], | |
| "mean_total": float(exec_df["total"].clip(upper=total_cap).mean()), | |
| "median_total": float(exec_df["total"].median()), | |
| "max_total": float(exec_df["total"].max()), | |
| "mean_salary": float(exec_df["salary"].clip(upper=salary_cap).mean()), | |
| "median_salary": float(exec_df["salary"].median()), | |
| } | |
| # Breakdown per component (winsorized means) | |
| comp_breakdown = {} | |
| for col in comp_cols: | |
| if col in exec_df.columns: | |
| vals = exec_df[col].dropna() | |
| cap = vals.quantile(0.99) | |
| comp_breakdown[col] = { | |
| "mean": float(vals.clip(upper=cap).mean()), | |
| "median": float(vals.median()), | |
| "max": float(vals.max()), | |
| } | |
| comp_stats["breakdown"] = comp_breakdown | |
| # --- Top 50 highest paid (deduplicated, validated) --- | |
| comp_detail_cols = ["salary", "bonus", "stock_awards", "option_awards", | |
| "non_equity_incentive", "change_in_pension", "other_compensation"] | |
| top_pool = exec_df[ | |
| ["name", "company", "fiscal_year", "total", "title"] + comp_detail_cols | |
| ].copy() | |
| # Sanity check: drop records where components don't sum to ~total (parsing errors) | |
| top_pool["comp_sum"] = top_pool[comp_detail_cols].fillna(0).sum(axis=1) | |
| top_pool["diff_pct"] = ((top_pool["total"] - top_pool["comp_sum"]).abs() | |
| / top_pool["total"].clip(lower=1) * 100) | |
| top_pool = top_pool[top_pool["diff_pct"] <= 20] # allow up to 20% rounding tolerance | |
| top_pool = top_pool.drop(columns=["comp_sum", "diff_pct"]) | |
| top_candidates = top_pool.nlargest(200, "total") | |
| top_candidates["total_rounded"] = (top_candidates["total"] / 1e6).round(1) | |
| top_candidates = top_candidates.drop_duplicates( | |
| subset=["name", "company", "fiscal_year", "total_rounded"] | |
| ).head(50) | |
| top50 = [] | |
| for _, row in top_candidates.iterrows(): | |
| top50.append({ | |
| "name": row["name"], | |
| "company": row["company"], | |
| "title": row.get("title", ""), | |
| "fiscal_year": int(row["fiscal_year"]), | |
| "total": float(row["total"]), | |
| "salary": float(row.get("salary", 0) or 0), | |
| "bonus": float(row.get("bonus", 0) or 0), | |
| "stock_awards": float(row.get("stock_awards", 0) or 0), | |
| "option_awards": float(row.get("option_awards", 0) or 0), | |
| "non_equity_incentive": float(row.get("non_equity_incentive", 0) or 0), | |
| "change_in_pension": float(row.get("change_in_pension", 0) or 0), | |
| "other_compensation": float(row.get("other_compensation", 0) or 0), | |
| }) | |
| # --- Tables by year --- | |
| tables_by_year = meta_df["year"].value_counts().sort_index() | |
| tables_by_year_dict = {int(k): int(v) for k, v in tables_by_year.items()} | |
| # --- Compensation trends by fiscal year (winsorized at p99) --- | |
| total_cap = exec_df["total"].dropna().quantile(0.99) | |
| exec_df_trend = exec_df.copy() | |
| exec_df_trend["total_clipped"] = exec_df_trend["total"].clip(upper=total_cap) | |
| yearly = exec_df_trend.groupby("fiscal_year").agg( | |
| mean=("total_clipped", "mean"), | |
| median=("total", "median"), | |
| count=("total", "count"), | |
| ).reset_index() | |
| trends = [] | |
| for _, r in yearly.iterrows(): | |
| trends.append({ | |
| "year": int(r["fiscal_year"]), | |
| "mean": float(r["mean"]), | |
| "median": float(r["median"]), | |
| "count": int(r["count"]), | |
| }) | |
| # --- Compensation distribution (histogram data) --- | |
| total_comp = exec_df["total"].dropna() | |
| total_comp = total_comp[total_comp > 0] | |
| p99 = total_comp.quantile(0.99) | |
| total_comp_clipped = total_comp[total_comp <= p99] | |
| hist_values, hist_edges = np.histogram(total_comp_clipped / 1e6, bins=50) | |
| distribution = { | |
| "values": hist_values.tolist(), | |
| "edges": hist_edges.tolist(), | |
| "median": float(total_comp.median()), | |
| "p99": float(p99), | |
| "n_outliers": int(len(total_comp) - len(total_comp_clipped)), | |
| } | |
| # --- Compensation components average (winsorized at p99 to remove parsing errors) --- | |
| comp_components = {} | |
| for col in ["salary", "bonus", "stock_awards", "option_awards", | |
| "non_equity_incentive", "change_in_pension", "other_compensation"]: | |
| if col in exec_df.columns: | |
| vals = exec_df[col].dropna() | |
| cap = vals.quantile(0.99) | |
| comp_components[col.replace("_", " ").title()] = float(vals.clip(upper=cap).mean()) | |
| # --- SCT Probability stats --- | |
| probs = [r["sct_probability"] for r in records_meta] | |
| high_conf = sum(1 for p in probs if p >= 0.7) | |
| medium_conf = sum(1 for p in probs if 0.3 <= p < 0.7) | |
| low_conf = sum(1 for p in probs if p < 0.3) | |
| keys = [(r["cik"], r["year"]) for r in records_meta] | |
| counts = Counter(keys) | |
| unique_docs = len(counts) | |
| multi_table_docs = sum(1 for c in counts.values() if c > 1) | |
| could_disambiguate = 0 | |
| for (cik, year), count in counts.items(): | |
| if count > 1: | |
| doc_records = [r for r in records_meta if r["cik"] == cik and r["year"] == year] | |
| high_prob = [r for r in doc_records if r["sct_probability"] >= 0.7] | |
| if len(high_prob) == 1: | |
| could_disambiguate += 1 | |
| prob_hist_values, prob_hist_edges = np.histogram(probs, bins=20) | |
| probability_stats = { | |
| "total_tables": len(probs), | |
| "unique_docs": unique_docs, | |
| "high_confidence": high_conf, | |
| "medium_confidence": medium_conf, | |
| "low_confidence": low_conf, | |
| "multi_table_docs": multi_table_docs, | |
| "could_disambiguate": could_disambiguate, | |
| "hist_values": prob_hist_values.tolist(), | |
| "hist_edges": prob_hist_edges.tolist(), | |
| } | |
| # --- Salary trends per component (winsorized at p99 to remove parsing errors) --- | |
| comp_trends = {} | |
| for col in ["salary", "stock_awards", "option_awards", "bonus", | |
| "non_equity_incentive", "other_compensation"]: | |
| if col in exec_df.columns: | |
| # Clip at 99th percentile of the ENTIRE column to remove outliers | |
| cap = exec_df[col].dropna().quantile(0.99) | |
| clipped = exec_df[["fiscal_year", col]].copy() | |
| clipped[col] = clipped[col].clip(upper=cap) | |
| yearly_comp = clipped.groupby("fiscal_year")[col].mean().reset_index() | |
| comp_trends[col.replace("_", " ").title()] = { | |
| "years": yearly_comp["fiscal_year"].astype(int).tolist(), | |
| "values": yearly_comp[col].tolist(), | |
| } | |
| # --- Assemble final data --- | |
| dashboard_data = { | |
| "generated_at": datetime.now().isoformat(), | |
| "pipeline": pipeline_stats, | |
| "compensation": comp_stats, | |
| "top50": top50, | |
| "tables_by_year": tables_by_year_dict, | |
| "trends": trends, | |
| "distribution": distribution, | |
| "comp_components": comp_components, | |
| "probability": probability_stats, | |
| "comp_trends": comp_trends, | |
| } | |
| out_path = Path(__file__).parent / "dashboard_data.json" | |
| with open(out_path, "w") as f: | |
| json.dump(dashboard_data, f, indent=2) | |
| print(f"\nDashboard data written to {out_path}") | |
| print(f" Pipeline: {pipeline_stats['total_docs']:,} docs, {pipeline_stats['with_sct']:,} with SCT") | |
| print(f" Executives: {len(exec_df):,} records") | |
| print(f" Tables: {len(probs):,} total, {high_conf:,} high confidence") | |
| if __name__ == "__main__": | |
| main() | |