""" FAST Nonprofit Enrichment Strategy This document explains how to enrich 1.9M+ nonprofits MUCH faster than sequential API calls. Current Problem: - Sequential: 1.9M ร— 0.5sec = 11.3 days (Every.org) - Sequential: 1.9M ร— 1.0sec = 22.6 days (ProPublica) - Total: ~34 days ๐Ÿ˜ฑ Fast Solutions: 1. โœ… Skip Already Enriched (INSTANT) 2. ๐Ÿš€ Async Parallel Requests (50-100x faster) 3. ๐ŸŽฏ Smart Sampling (99% faster) 4. ๐Ÿ’พ Incremental Updates (only enrich new/changed) 5. ๐Ÿ”„ Batch Processing (process in chunks) """ # ============================================================================== # SOLUTION 1: Skip Already Enriched (INSTANT) โœ… # ============================================================================== """ Most nonprofits in IRS data are ALREADY in the enriched file! Check: import pandas as pd base = pd.read_parquet('data/gold/nonprofits_organizations.parquet') enriched = pd.read_parquet('data/gold/nonprofits_organizations_everyorg.parquet') print(f"Base: {len(base):,}") print(f"Enriched: {len(enriched):,}") print(f"Already done: {len(enriched) / len(base) * 100:.1f}%") # Find which ones need enrichment needs_enrichment = base[~base['ein'].isin(enriched['ein'])] print(f"Needs enrichment: {len(needs_enrichment):,}") Result: You probably only need to enrich a FEW THOUSAND, not 1.9M! """ # ============================================================================== # SOLUTION 2: Async Parallel Requests (50-100x FASTER) ๐Ÿš€ # ============================================================================== """ Use asyncio + aiohttp to make MANY requests concurrently. Every.org allows reasonable concurrent requests. Test with 50-100 concurrent workers. Example speedup: - Sequential: 1.9M ร— 0.5sec = 11.3 days - 50 workers: 1.9M ร— 0.5sec / 50 = 5.4 hours โšก - 100 workers: 1.9M ร— 0.5sec / 100 = 2.7 hours โšกโšก WARNING: Test first with small batch to avoid API bans! """ import asyncio import aiohttp from typing import List, Dict import pandas as pd async def fetch_nonprofit_async(session: aiohttp.ClientSession, ein: str, api_key: str) -> Dict: """Fetch single nonprofit asynchronously""" clean_ein = str(ein).replace('-', '').zfill(9) url = f"https://partners.every.org/v0.2/nonprofit/{clean_ein}" headers = {'Authorization': f'Bearer {api_key}', 'Accept': 'application/json'} try: async with session.get(url, headers=headers, timeout=10) as response: if response.status == 200: data = await response.json() return {'ein': ein, 'success': True, 'data': data} else: return {'ein': ein, 'success': False, 'error': response.status} except Exception as e: return {'ein': ein, 'success': False, 'error': str(e)} async def enrich_batch_async(eins: List[str], api_key: str, max_concurrent: int = 50) -> List[Dict]: """Enrich a batch of nonprofits with controlled concurrency""" # Use semaphore to limit concurrent requests semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_semaphore(session, ein): async with semaphore: return await fetch_nonprofit_async(session, ein, api_key) # Create session with connection pooling connector = aiohttp.TCPConnector(limit=100, limit_per_host=50) timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: tasks = [fetch_with_semaphore(session, ein) for ein in eins] results = await asyncio.gather(*tasks, return_exceptions=True) return results def enrich_nonprofits_fast( df: pd.DataFrame, api_key: str, batch_size: int = 1000, max_concurrent: int = 50, output_file: str = 'data/gold/nonprofits_enriched_fast.parquet' ): """ Enrich nonprofits using async parallel processing Args: df: DataFrame with 'ein' column api_key: Every.org API key batch_size: Process this many at once max_concurrent: Concurrent requests per batch output_file: Where to save results Example: df = pd.read_parquet('data/gold/nonprofits_organizations.parquet') # Test with small sample first! sample = df.head(1000) enrich_nonprofits_fast(sample, api_key, batch_size=100, max_concurrent=10) # Then scale up enrich_nonprofits_fast(df, api_key, batch_size=5000, max_concurrent=50) """ from tqdm import tqdm all_results = [] # Process in batches to avoid memory issues for i in tqdm(range(0, len(df), batch_size), desc="Processing batches"): batch_df = df.iloc[i:i+batch_size] eins = batch_df['ein'].tolist() # Run async batch results = asyncio.run(enrich_batch_async(eins, api_key, max_concurrent)) all_results.extend(results) # Save incrementally every 10 batches if (i // batch_size) % 10 == 0 and all_results: temp_df = pd.DataFrame(all_results) temp_df.to_parquet(f"{output_file}.tmp", index=False) # Convert results to DataFrame results_df = pd.DataFrame(all_results) results_df.to_parquet(output_file, index=False) success_rate = results_df['success'].sum() / len(results_df) * 100 print(f"\nโœ… Enriched {len(results_df):,} nonprofits") print(f" Success rate: {success_rate:.1f}%") print(f" Saved to: {output_file}") # ============================================================================== # SOLUTION 3: Smart Sampling (99% FASTER) ๐ŸŽฏ # ============================================================================== """ Do you REALLY need ALL 1.9M enriched? For most use cases, a representative sample is sufficient: - Dashboard/website: Sample 10,000-100,000 (0.5-5%) - Research: Stratified sample by state/category - Production: Only enrich what users request (on-demand) Example: # Sample by state to get representative coverage import pandas as pd df = pd.read_parquet('data/gold/nonprofits_organizations.parquet') # Get 1000 per state (ensures geographic coverage) sampled = df.groupby('state').sample(n=min(1000, len(df)), replace=False) # Result: ~50,000 nonprofits instead of 1.9M # Enrichment time: 50K ร— 0.5sec / 50 workers = 8 minutes โšกโšกโšก """ # ============================================================================== # SOLUTION 4: Incremental Updates (ONLY NEW/CHANGED) ๐Ÿ’พ # ============================================================================== """ Only enrich NEW nonprofits or re-enrich ones older than X days. Check the existing enrich script - it already supports this! Usage: python scripts/enrich_nonprofits_everyorg.py \\ --input data/gold/nonprofits_organizations.parquet \\ --output data/gold/nonprofits_organizations_everyorg.parquet \\ --incremental \\ --max-age-days 30 This will: 1. โœ… Skip nonprofits already enriched in last 30 days 2. โœ… Only enrich NEW nonprofits not in enriched file 3. โœ… Re-enrich old entries (>30 days) Result: Maybe only 10,000-50,000 need enrichment = 2-10 hours """ # ============================================================================== # SOLUTION 5: Batch Processing (CHUNKS) ๐Ÿ”„ # ============================================================================== """ Process in manageable chunks instead of all at once. Example workflow: 1. Split by state: 50 files ร— 40K nonprofits each 2. Process 1 state per day = 50 days (manageable) 3. Or run multiple states in parallel on different machines Usage: # Split by state df = pd.read_parquet('data/gold/nonprofits_organizations.parquet') for state in df['state'].unique(): state_df = df[df['state'] == state] state_df.to_parquet(f'data/chunks/nonprofits_{state}.parquet') # Then enrich each chunk for state in ['AL', 'AK', 'AZ', ...]: python scripts/enrich_nonprofits_everyorg.py \\ --input data/chunks/nonprofits_{state}.parquet \\ --output data/enriched/nonprofits_{state}_enriched.parquet """ # ============================================================================== # RECOMMENDED APPROACH ๐ŸŽฏ # ============================================================================== """ PHASE 1: Smart Sampling (TODAY) - Sample 50,000 representative nonprofits - Enrich with async (50 concurrent workers) - Time: ~15 minutes - Use for dashboard/website launch PHASE 2: Incremental Enrichment (ONGOING) - Enrich new nonprofits as they're added monthly - Re-enrich popular ones every 30 days - Time: 1-2 hours per month PHASE 3: On-Demand Enrichment (PRODUCTION) - When user searches/views a nonprofit, enrich it if not already done - Cache result for 30 days - No upfront cost! PHASE 4: Full Enrichment (OPTIONAL) - If you REALLY need all 1.9M enriched - Use async with 100 workers - Run overnight on dedicated server - Time: ~3-6 hours """ # ============================================================================== # COST ANALYSIS ๐Ÿ’ฐ # ============================================================================== """ Every.org API Pricing: - Free tier: 10,000 requests/month - Paid tier: $0.001 per request (1 million = $1,000) For 1.9M nonprofits: - Cost: 1,952,238 ร— $0.001 = $1,952.24 ProPublica API: - FREE (but slow rate limits) Recommendation: - Use FREE ProPublica data (already have it!) - Use Every.org for 50K sample or incremental updates (within free tier) """ # ============================================================================== # EXAMPLE: FAST ENRICHMENT SCRIPT # ============================================================================== if __name__ == "__main__": import argparse import os from dotenv import load_dotenv load_dotenv() parser = argparse.ArgumentParser(description="Fast nonprofit enrichment with async") parser.add_argument("--input", required=True, help="Input parquet file") parser.add_argument("--output", required=True, help="Output parquet file") parser.add_argument("--sample", type=int, help="Sample size (e.g., 50000)") parser.add_argument("--concurrent", type=int, default=50, help="Concurrent requests") parser.add_argument("--batch-size", type=int, default=1000, help="Batch size") args = parser.parse_args() api_key = os.getenv('EVERYORG_API_KEY') if not api_key: print("ERROR: EVERYORG_API_KEY not found in .env") exit(1) # Load data df = pd.read_parquet(args.input) print(f"Loaded {len(df):,} nonprofits") # Sample if requested if args.sample: df = df.sample(n=min(args.sample, len(df))) print(f"Sampling {len(df):,} nonprofits") # Enrich! enrich_nonprofits_fast( df, api_key, batch_size=args.batch_size, max_concurrent=args.concurrent, output_file=args.output )