Execcomp-AI-Dashboard / compute_stats.py
pierjoe's picture
Filter parsing errors from top earners + add change_in_pension
1dc5ddb
#!/usr/bin/env python3
"""
Pre-compute all stats for the HF Space dashboard.
Reads from the local dataset and tracker, outputs a single dashboard_data.json.
Run from the project root (MIIA/stuff):
source .stuff/bin/activate
python Execcomp-AI-Dashboard/compute_stats.py
"""
import json
import sys
from pathlib import Path
from collections import Counter
from datetime import datetime
import pandas as pd
import numpy as np
# Project root
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
import pyarrow.ipc as ipc
from src.tracking import Tracker
def load_arrow_dir(dir_path: Path) -> pd.DataFrame:
"""Load all .arrow files from a directory into a pandas DataFrame, skipping image columns."""
tables = []
for f in sorted(dir_path.glob("*.arrow")):
reader = ipc.open_stream(str(f))
table = reader.read_all()
# Drop image column to avoid slow decoding
if "table_image" in table.column_names:
table = table.drop(["table_image"])
tables.append(table)
import pyarrow as pa
combined = pa.concat_tables(tables)
return combined.to_pandas()
def main():
print("Loading dataset (arrow)...")
train_dir = PROJECT_ROOT / "hf" / "execcomp-ai-postprocessed" / "train"
df = load_arrow_dir(train_dir)
print(f"Train split: {len(df)} records")
# --- Parse all executives ---
all_execs = []
records_meta = []
for _, row in df.iterrows():
execs = json.loads(row["executives"])
for ex in execs:
ex["cik"] = row["cik"]
ex["company"] = row["company"]
ex["filing_year"] = row["year"]
ex["sic"] = row["sic"]
ex["state_of_inc"] = row["state_of_inc"]
ex["sct_probability"] = row["sct_probability"]
if ex.get("fiscal_year") is None:
ex["fiscal_year"] = row["year"]
all_execs.append(ex)
records_meta.append({
"cik": row["cik"],
"company": row["company"],
"year": row["year"],
"sic": row["sic"],
"sct_probability": row["sct_probability"],
})
exec_df = pd.DataFrame(all_execs)
meta_df = pd.DataFrame(records_meta)
# Filter valid years and numeric comp
exec_df["fiscal_year"] = pd.to_numeric(exec_df["fiscal_year"], errors="coerce")
exec_df = exec_df[(exec_df["fiscal_year"] >= 2005) & (exec_df["fiscal_year"] <= 2022)]
comp_cols = ["salary", "bonus", "stock_awards", "option_awards",
"non_equity_incentive", "change_in_pension", "other_compensation", "total"]
for col in comp_cols:
if col in exec_df.columns:
exec_df[col] = pd.to_numeric(exec_df[col], errors="coerce")
# --- Pipeline Stats from tracker ---
print("Loading tracker...")
tracker = Tracker(PROJECT_ROOT)
stats = tracker.stats()
total_docs = stats["total"]
funds = stats["by_status"].get("fund", 0)
with_sct = stats["by_status"].get("complete", 0)
no_sct = stats["by_status"].get("no_sct", 0)
pending = stats["by_status"].get("pending", 0)
total_tables = 0
for doc_id in tracker.get_by_status("complete"):
doc_info = tracker.get_document(doc_id)
if doc_info:
total_tables += len(doc_info.get("sct_tables", []))
pipeline_stats = {
"total_docs": total_docs,
"funds": funds,
"with_sct": with_sct,
"no_sct": no_sct,
"pending": pending,
"total_tables": total_tables,
"multi_table_docs": total_tables - with_sct,
"non_funds": total_docs - funds,
}
# --- Compensation stats ---
# Winsorize total and salary at p99 for robust mean
total_cap = exec_df["total"].dropna().quantile(0.99)
salary_cap = exec_df["salary"].dropna().quantile(0.99)
comp_stats = {
"total_exec_records": len(exec_df),
"unique_companies": int(meta_df["cik"].nunique()),
"year_range": [int(meta_df["year"].min()), int(meta_df["year"].max())],
"mean_total": float(exec_df["total"].clip(upper=total_cap).mean()),
"median_total": float(exec_df["total"].median()),
"max_total": float(exec_df["total"].max()),
"mean_salary": float(exec_df["salary"].clip(upper=salary_cap).mean()),
"median_salary": float(exec_df["salary"].median()),
}
# Breakdown per component (winsorized means)
comp_breakdown = {}
for col in comp_cols:
if col in exec_df.columns:
vals = exec_df[col].dropna()
cap = vals.quantile(0.99)
comp_breakdown[col] = {
"mean": float(vals.clip(upper=cap).mean()),
"median": float(vals.median()),
"max": float(vals.max()),
}
comp_stats["breakdown"] = comp_breakdown
# --- Top 50 highest paid (deduplicated, validated) ---
comp_detail_cols = ["salary", "bonus", "stock_awards", "option_awards",
"non_equity_incentive", "change_in_pension", "other_compensation"]
top_pool = exec_df[
["name", "company", "fiscal_year", "total", "title"] + comp_detail_cols
].copy()
# Sanity check: drop records where components don't sum to ~total (parsing errors)
top_pool["comp_sum"] = top_pool[comp_detail_cols].fillna(0).sum(axis=1)
top_pool["diff_pct"] = ((top_pool["total"] - top_pool["comp_sum"]).abs()
/ top_pool["total"].clip(lower=1) * 100)
top_pool = top_pool[top_pool["diff_pct"] <= 20] # allow up to 20% rounding tolerance
top_pool = top_pool.drop(columns=["comp_sum", "diff_pct"])
top_candidates = top_pool.nlargest(200, "total")
top_candidates["total_rounded"] = (top_candidates["total"] / 1e6).round(1)
top_candidates = top_candidates.drop_duplicates(
subset=["name", "company", "fiscal_year", "total_rounded"]
).head(50)
top50 = []
for _, row in top_candidates.iterrows():
top50.append({
"name": row["name"],
"company": row["company"],
"title": row.get("title", ""),
"fiscal_year": int(row["fiscal_year"]),
"total": float(row["total"]),
"salary": float(row.get("salary", 0) or 0),
"bonus": float(row.get("bonus", 0) or 0),
"stock_awards": float(row.get("stock_awards", 0) or 0),
"option_awards": float(row.get("option_awards", 0) or 0),
"non_equity_incentive": float(row.get("non_equity_incentive", 0) or 0),
"change_in_pension": float(row.get("change_in_pension", 0) or 0),
"other_compensation": float(row.get("other_compensation", 0) or 0),
})
# --- Tables by year ---
tables_by_year = meta_df["year"].value_counts().sort_index()
tables_by_year_dict = {int(k): int(v) for k, v in tables_by_year.items()}
# --- Compensation trends by fiscal year (winsorized at p99) ---
total_cap = exec_df["total"].dropna().quantile(0.99)
exec_df_trend = exec_df.copy()
exec_df_trend["total_clipped"] = exec_df_trend["total"].clip(upper=total_cap)
yearly = exec_df_trend.groupby("fiscal_year").agg(
mean=("total_clipped", "mean"),
median=("total", "median"),
count=("total", "count"),
).reset_index()
trends = []
for _, r in yearly.iterrows():
trends.append({
"year": int(r["fiscal_year"]),
"mean": float(r["mean"]),
"median": float(r["median"]),
"count": int(r["count"]),
})
# --- Compensation distribution (histogram data) ---
total_comp = exec_df["total"].dropna()
total_comp = total_comp[total_comp > 0]
p99 = total_comp.quantile(0.99)
total_comp_clipped = total_comp[total_comp <= p99]
hist_values, hist_edges = np.histogram(total_comp_clipped / 1e6, bins=50)
distribution = {
"values": hist_values.tolist(),
"edges": hist_edges.tolist(),
"median": float(total_comp.median()),
"p99": float(p99),
"n_outliers": int(len(total_comp) - len(total_comp_clipped)),
}
# --- Compensation components average (winsorized at p99 to remove parsing errors) ---
comp_components = {}
for col in ["salary", "bonus", "stock_awards", "option_awards",
"non_equity_incentive", "change_in_pension", "other_compensation"]:
if col in exec_df.columns:
vals = exec_df[col].dropna()
cap = vals.quantile(0.99)
comp_components[col.replace("_", " ").title()] = float(vals.clip(upper=cap).mean())
# --- SCT Probability stats ---
probs = [r["sct_probability"] for r in records_meta]
high_conf = sum(1 for p in probs if p >= 0.7)
medium_conf = sum(1 for p in probs if 0.3 <= p < 0.7)
low_conf = sum(1 for p in probs if p < 0.3)
keys = [(r["cik"], r["year"]) for r in records_meta]
counts = Counter(keys)
unique_docs = len(counts)
multi_table_docs = sum(1 for c in counts.values() if c > 1)
could_disambiguate = 0
for (cik, year), count in counts.items():
if count > 1:
doc_records = [r for r in records_meta if r["cik"] == cik and r["year"] == year]
high_prob = [r for r in doc_records if r["sct_probability"] >= 0.7]
if len(high_prob) == 1:
could_disambiguate += 1
prob_hist_values, prob_hist_edges = np.histogram(probs, bins=20)
probability_stats = {
"total_tables": len(probs),
"unique_docs": unique_docs,
"high_confidence": high_conf,
"medium_confidence": medium_conf,
"low_confidence": low_conf,
"multi_table_docs": multi_table_docs,
"could_disambiguate": could_disambiguate,
"hist_values": prob_hist_values.tolist(),
"hist_edges": prob_hist_edges.tolist(),
}
# --- Salary trends per component (winsorized at p99 to remove parsing errors) ---
comp_trends = {}
for col in ["salary", "stock_awards", "option_awards", "bonus",
"non_equity_incentive", "other_compensation"]:
if col in exec_df.columns:
# Clip at 99th percentile of the ENTIRE column to remove outliers
cap = exec_df[col].dropna().quantile(0.99)
clipped = exec_df[["fiscal_year", col]].copy()
clipped[col] = clipped[col].clip(upper=cap)
yearly_comp = clipped.groupby("fiscal_year")[col].mean().reset_index()
comp_trends[col.replace("_", " ").title()] = {
"years": yearly_comp["fiscal_year"].astype(int).tolist(),
"values": yearly_comp[col].tolist(),
}
# --- Assemble final data ---
dashboard_data = {
"generated_at": datetime.now().isoformat(),
"pipeline": pipeline_stats,
"compensation": comp_stats,
"top50": top50,
"tables_by_year": tables_by_year_dict,
"trends": trends,
"distribution": distribution,
"comp_components": comp_components,
"probability": probability_stats,
"comp_trends": comp_trends,
}
out_path = Path(__file__).parent / "dashboard_data.json"
with open(out_path, "w") as f:
json.dump(dashboard_data, f, indent=2)
print(f"\nDashboard data written to {out_path}")
print(f" Pipeline: {pipeline_stats['total_docs']:,} docs, {pipeline_stats['with_sct']:,} with SCT")
print(f" Executives: {len(exec_df):,} records")
print(f" Tables: {len(probs):,} total, {high_conf:,} high confidence")
if __name__ == "__main__":
main()