Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 8,227 Bytes
896453f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | """
Create All Gold Tables - Main Orchestration Script
This script runs both pipelines:
1. Meeting data gold tables (from LocalView cache 2006-2023)
2. Nonprofit data gold tables (from ProPublica API + other sources)
Usage:
# Run both pipelines
python scripts/create_all_gold_tables.py
# Run only meetings pipeline
python scripts/create_all_gold_tables.py --meetings-only
# Run only nonprofits pipeline
python scripts/create_all_gold_tables.py --nonprofits-only
# Specify states for nonprofit discovery
python scripts/create_all_gold_tables.py --states AL MI NY CA
# Skip nonprofit API discovery, use cached data
python scripts/create_all_gold_tables.py --skip-discovery
Output:
data/gold/national/meetings_*.parquet
data/gold/nonprofits_*.parquet
"""
import sys
from pathlib import Path
import argparse
from loguru import logger
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from pipeline.create_meetings_gold_tables import MeetingGoldTableCreator
from pipeline.create_nonprofits_gold_tables import NonprofitGoldTableCreator
from pipeline.create_contacts_gold_tables import ContactsGoldTableCreator
def main():
"""Main orchestration function"""
parser = argparse.ArgumentParser(
description="Create all gold tables from meeting and nonprofit data"
)
parser.add_argument(
"--meetings-only",
action="store_true",
help="Only create meeting gold tables"
)
parser.add_argument(
"--nonprofits-only",
action="store_true",
help="Only create nonprofit gold tables"
)
parser.add_argument(
"--states",
nargs="+",
default=["AL", "MI"],
help="State codes for nonprofit discovery (e.g., AL MI NY)"
)
parser.add_argument(
"--ntee-codes",
nargs="+",
default=None,
help="NTEE codes to search (e.g., E P K). Default: E P K L S W. Use 'ALL' to skip filtering."
)
parser.add_argument(
"--skip-discovery",
action="store_true",
help="Skip nonprofit API discovery, use existing bronze data"
)
parser.add_argument(
"--use-irs",
action="store_true",
help="Use IRS EO-BMF bulk data instead of ProPublica API (RECOMMENDED - gets ALL nonprofits!)"
)
parser.add_argument(
"--download-all-irs",
action="store_true",
help="Download ALL 1.9M+ nonprofits from IRS (4 regional files). Requires --use-irs."
)
parser.add_argument(
"--extract-contacts",
action="store_true",
help="Extract contacts (officials) from meeting transcripts after creating meeting tables"
)
args = parser.parse_args()
# Determine which pipelines to run
run_meetings = not args.nonprofits_only
run_nonprofits = not args.meetings_only
logger.info("=" * 70)
logger.info("GOLD TABLE CREATION PIPELINE")
logger.info("=" * 70)
logger.info(f"Run Meetings Pipeline: {run_meetings}")
logger.info(f"Run Nonprofits Pipeline: {run_nonprofits}")
if run_nonprofits:
logger.info(f"States: {', '.join(args.states)}")
logger.info(f"Skip Discovery: {args.skip_discovery}")
logger.info("=" * 70)
logger.info("")
# Run meetings pipeline
if run_meetings:
logger.info("")
logger.info("ποΈ STARTING MEETINGS PIPELINE")
logger.info("-" * 70)
try:
meeting_creator = MeetingGoldTableCreator()
meeting_creator.create_all_gold_tables()
logger.success("β
Meetings pipeline completed successfully!")
# Extract contacts if requested
if args.extract_contacts:
logger.info("")
logger.info("π₯ EXTRACTING CONTACTS FROM MEETINGS")
logger.info("-" * 70)
contacts_creator = ContactsGoldTableCreator()
contacts_creator.create_all_contacts_tables()
logger.success("β
Contacts extraction completed successfully!")
except Exception as e:
logger.error(f"β Meetings pipeline failed: {e}")
import traceback
traceback.print_exc()
# Run nonprofits pipeline
if run_nonprofits:
logger.info("")
logger.info("ποΈ STARTING NONPROFITS PIPELINE")
logger.info("-" * 70)
try:
nonprofit_creator = NonprofitGoldTableCreator()
# Handle NTEE codes argument
ntee_codes = args.ntee_codes
if ntee_codes and len(ntee_codes) == 1 and ntee_codes[0].upper() == 'ALL':
ntee_codes = [] # Empty list means get all nonprofits
nonprofit_creator.create_all_gold_tables(
states=args.states,
ntee_codes=ntee_codes,
skip_discovery=args.skip_discovery,
use_irs_data=args.use_irs,
download_all_irs=args.download_all_irs
)
logger.success("β
Nonprofits pipeline completed successfully!")
except Exception as e:
logger.error(f"β Nonprofits pipeline failed: {e}")
import traceback
traceback.print_exc()
# Final summary
logger.info("")
logger.info("=" * 70)
logger.info("PIPELINE SUMMARY")
logger.info("=" * 70)
gold_dir = Path("data/gold")
if gold_dir.exists():
import pandas as pd
all_gold_files = sorted(gold_dir.glob("*.parquet"))
if all_gold_files:
logger.info(f"\nπ Created {len(all_gold_files)} gold tables:\n")
# Separate by category
meeting_files = [f for f in all_gold_files if 'meeting' in f.name]
nonprofit_files = [f for f in all_gold_files if 'nonprofit' in f.name]
contacts_files = [f for f in all_gold_files if 'contacts_' in f.name]
other_files = [f for f in all_gold_files if f not in meeting_files + nonprofit_files + contacts_files]
if meeting_files:
logger.info("π
Meeting Tables:")
for file in meeting_files:
df = pd.read_parquet(file)
size_mb = file.stat().st_size / (1024 * 1024)
logger.info(f" β’ {file.name}: {len(df):,} records ({size_mb:.2f} MB)")
if contacts_files:
logger.info("\nπ₯ Contacts Tables:")
for file in contacts_files:
df = pd.read_parquet(file)
size_mb = file.stat().st_size / (1024 * 1024)
logger.info(f" β’ {file.name}: {len(df):,} records ({size_mb:.2f} MB)")
if nonprofit_files:
logger.info("\nποΈ Nonprofit Tables:")
for file in nonprofit_files:
df = pd.read_parquet(file)
size_mb = file.stat().st_size / (1024 * 1024)
logger.info(f" β’ {file.name}: {len(df):,} records ({size_mb:.2f} MB)")
if other_files:
logger.info("\nπ Other Tables:")
for file in other_files:
df = pd.read_parquet(file)
size_mb = file.stat().st_size / (1024 * 1024)
logger.info(f" β’ {file.name}: {len(df):,} records ({size_mb:.2f} MB)")
# Calculate totals
total_records = sum(len(pd.read_parquet(f)) for f in all_gold_files)
total_size_mb = sum(f.stat().st_size for f in all_gold_files) / (1024 * 1024)
logger.info("")
logger.info(f"π Total: {total_records:,} records across {len(all_gold_files)} tables ({total_size_mb:.2f} MB)")
else:
logger.warning("No gold tables found!")
else:
logger.warning("Gold directory does not exist!")
logger.info("=" * 70)
logger.success("β
ALL PIPELINES COMPLETED!")
logger.info("=" * 70)
if __name__ == "__main__":
main()
|