Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| FAST Nonprofit Enrichment Strategy | |
| This document explains how to enrich 1.9M+ nonprofits MUCH faster than sequential API calls. | |
| Current Problem: | |
| - Sequential: 1.9M Γ 0.5sec = 11.3 days (Every.org) | |
| - Sequential: 1.9M Γ 1.0sec = 22.6 days (ProPublica) | |
| - Total: ~34 days π± | |
| Fast Solutions: | |
| 1. β Skip Already Enriched (INSTANT) | |
| 2. π Async Parallel Requests (50-100x faster) | |
| 3. π― Smart Sampling (99% faster) | |
| 4. πΎ Incremental Updates (only enrich new/changed) | |
| 5. π Batch Processing (process in chunks) | |
| """ | |
| # ============================================================================== | |
| # SOLUTION 1: Skip Already Enriched (INSTANT) β | |
| # ============================================================================== | |
| """ | |
| Most nonprofits in IRS data are ALREADY in the enriched file! | |
| Check: | |
| import pandas as pd | |
| base = pd.read_parquet('data/gold/nonprofits_organizations.parquet') | |
| enriched = pd.read_parquet('data/gold/nonprofits_organizations_everyorg.parquet') | |
| print(f"Base: {len(base):,}") | |
| print(f"Enriched: {len(enriched):,}") | |
| print(f"Already done: {len(enriched) / len(base) * 100:.1f}%") | |
| # Find which ones need enrichment | |
| needs_enrichment = base[~base['ein'].isin(enriched['ein'])] | |
| print(f"Needs enrichment: {len(needs_enrichment):,}") | |
| Result: You probably only need to enrich a FEW THOUSAND, not 1.9M! | |
| """ | |
| # ============================================================================== | |
| # SOLUTION 2: Async Parallel Requests (50-100x FASTER) π | |
| # ============================================================================== | |
| """ | |
| Use asyncio + aiohttp to make MANY requests concurrently. | |
| Every.org allows reasonable concurrent requests. Test with 50-100 concurrent workers. | |
| Example speedup: | |
| - Sequential: 1.9M Γ 0.5sec = 11.3 days | |
| - 50 workers: 1.9M Γ 0.5sec / 50 = 5.4 hours β‘ | |
| - 100 workers: 1.9M Γ 0.5sec / 100 = 2.7 hours β‘β‘ | |
| WARNING: Test first with small batch to avoid API bans! | |
| """ | |
| import asyncio | |
| import aiohttp | |
| from typing import List, Dict | |
| import pandas as pd | |
| async def fetch_nonprofit_async(session: aiohttp.ClientSession, ein: str, api_key: str) -> Dict: | |
| """Fetch single nonprofit asynchronously""" | |
| clean_ein = str(ein).replace('-', '').zfill(9) | |
| url = f"https://partners.every.org/v0.2/nonprofit/{clean_ein}" | |
| headers = {'Authorization': f'Bearer {api_key}', 'Accept': 'application/json'} | |
| try: | |
| async with session.get(url, headers=headers, timeout=10) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| return {'ein': ein, 'success': True, 'data': data} | |
| else: | |
| return {'ein': ein, 'success': False, 'error': response.status} | |
| except Exception as e: | |
| return {'ein': ein, 'success': False, 'error': str(e)} | |
| async def enrich_batch_async(eins: List[str], api_key: str, max_concurrent: int = 50) -> List[Dict]: | |
| """Enrich a batch of nonprofits with controlled concurrency""" | |
| # Use semaphore to limit concurrent requests | |
| semaphore = asyncio.Semaphore(max_concurrent) | |
| async def fetch_with_semaphore(session, ein): | |
| async with semaphore: | |
| return await fetch_nonprofit_async(session, ein, api_key) | |
| # Create session with connection pooling | |
| connector = aiohttp.TCPConnector(limit=100, limit_per_host=50) | |
| timeout = aiohttp.ClientTimeout(total=30) | |
| async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: | |
| tasks = [fetch_with_semaphore(session, ein) for ein in eins] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| return results | |
| def enrich_nonprofits_fast( | |
| df: pd.DataFrame, | |
| api_key: str, | |
| batch_size: int = 1000, | |
| max_concurrent: int = 50, | |
| output_file: str = 'data/gold/nonprofits_enriched_fast.parquet' | |
| ): | |
| """ | |
| Enrich nonprofits using async parallel processing | |
| Args: | |
| df: DataFrame with 'ein' column | |
| api_key: Every.org API key | |
| batch_size: Process this many at once | |
| max_concurrent: Concurrent requests per batch | |
| output_file: Where to save results | |
| Example: | |
| df = pd.read_parquet('data/gold/nonprofits_organizations.parquet') | |
| # Test with small sample first! | |
| sample = df.head(1000) | |
| enrich_nonprofits_fast(sample, api_key, batch_size=100, max_concurrent=10) | |
| # Then scale up | |
| enrich_nonprofits_fast(df, api_key, batch_size=5000, max_concurrent=50) | |
| """ | |
| from tqdm import tqdm | |
| all_results = [] | |
| # Process in batches to avoid memory issues | |
| for i in tqdm(range(0, len(df), batch_size), desc="Processing batches"): | |
| batch_df = df.iloc[i:i+batch_size] | |
| eins = batch_df['ein'].tolist() | |
| # Run async batch | |
| results = asyncio.run(enrich_batch_async(eins, api_key, max_concurrent)) | |
| all_results.extend(results) | |
| # Save incrementally every 10 batches | |
| if (i // batch_size) % 10 == 0 and all_results: | |
| temp_df = pd.DataFrame(all_results) | |
| temp_df.to_parquet(f"{output_file}.tmp", index=False) | |
| # Convert results to DataFrame | |
| results_df = pd.DataFrame(all_results) | |
| results_df.to_parquet(output_file, index=False) | |
| success_rate = results_df['success'].sum() / len(results_df) * 100 | |
| print(f"\nβ Enriched {len(results_df):,} nonprofits") | |
| print(f" Success rate: {success_rate:.1f}%") | |
| print(f" Saved to: {output_file}") | |
| # ============================================================================== | |
| # SOLUTION 3: Smart Sampling (99% FASTER) π― | |
| # ============================================================================== | |
| """ | |
| Do you REALLY need ALL 1.9M enriched? | |
| For most use cases, a representative sample is sufficient: | |
| - Dashboard/website: Sample 10,000-100,000 (0.5-5%) | |
| - Research: Stratified sample by state/category | |
| - Production: Only enrich what users request (on-demand) | |
| Example: | |
| # Sample by state to get representative coverage | |
| import pandas as pd | |
| df = pd.read_parquet('data/gold/nonprofits_organizations.parquet') | |
| # Get 1000 per state (ensures geographic coverage) | |
| sampled = df.groupby('state').sample(n=min(1000, len(df)), replace=False) | |
| # Result: ~50,000 nonprofits instead of 1.9M | |
| # Enrichment time: 50K Γ 0.5sec / 50 workers = 8 minutes β‘β‘β‘ | |
| """ | |
| # ============================================================================== | |
| # SOLUTION 4: Incremental Updates (ONLY NEW/CHANGED) πΎ | |
| # ============================================================================== | |
| """ | |
| Only enrich NEW nonprofits or re-enrich ones older than X days. | |
| Check the existing enrich script - it already supports this! | |
| Usage: | |
| python scripts/enrich_nonprofits_everyorg.py \\ | |
| --input data/gold/nonprofits_organizations.parquet \\ | |
| --output data/gold/nonprofits_organizations_everyorg.parquet \\ | |
| --incremental \\ | |
| --max-age-days 30 | |
| This will: | |
| 1. β Skip nonprofits already enriched in last 30 days | |
| 2. β Only enrich NEW nonprofits not in enriched file | |
| 3. β Re-enrich old entries (>30 days) | |
| Result: Maybe only 10,000-50,000 need enrichment = 2-10 hours | |
| """ | |
| # ============================================================================== | |
| # SOLUTION 5: Batch Processing (CHUNKS) π | |
| # ============================================================================== | |
| """ | |
| Process in manageable chunks instead of all at once. | |
| Example workflow: | |
| 1. Split by state: 50 files Γ 40K nonprofits each | |
| 2. Process 1 state per day = 50 days (manageable) | |
| 3. Or run multiple states in parallel on different machines | |
| Usage: | |
| # Split by state | |
| df = pd.read_parquet('data/gold/nonprofits_organizations.parquet') | |
| for state in df['state'].unique(): | |
| state_df = df[df['state'] == state] | |
| state_df.to_parquet(f'data/chunks/nonprofits_{state}.parquet') | |
| # Then enrich each chunk | |
| for state in ['AL', 'AK', 'AZ', ...]: | |
| python scripts/enrich_nonprofits_everyorg.py \\ | |
| --input data/chunks/nonprofits_{state}.parquet \\ | |
| --output data/enriched/nonprofits_{state}_enriched.parquet | |
| """ | |
| # ============================================================================== | |
| # RECOMMENDED APPROACH π― | |
| # ============================================================================== | |
| """ | |
| PHASE 1: Smart Sampling (TODAY) | |
| - Sample 50,000 representative nonprofits | |
| - Enrich with async (50 concurrent workers) | |
| - Time: ~15 minutes | |
| - Use for dashboard/website launch | |
| PHASE 2: Incremental Enrichment (ONGOING) | |
| - Enrich new nonprofits as they're added monthly | |
| - Re-enrich popular ones every 30 days | |
| - Time: 1-2 hours per month | |
| PHASE 3: On-Demand Enrichment (PRODUCTION) | |
| - When user searches/views a nonprofit, enrich it if not already done | |
| - Cache result for 30 days | |
| - No upfront cost! | |
| PHASE 4: Full Enrichment (OPTIONAL) | |
| - If you REALLY need all 1.9M enriched | |
| - Use async with 100 workers | |
| - Run overnight on dedicated server | |
| - Time: ~3-6 hours | |
| """ | |
| # ============================================================================== | |
| # COST ANALYSIS π° | |
| # ============================================================================== | |
| """ | |
| Every.org API Pricing: | |
| - Free tier: 10,000 requests/month | |
| - Paid tier: $0.001 per request (1 million = $1,000) | |
| For 1.9M nonprofits: | |
| - Cost: 1,952,238 Γ $0.001 = $1,952.24 | |
| ProPublica API: | |
| - FREE (but slow rate limits) | |
| Recommendation: | |
| - Use FREE ProPublica data (already have it!) | |
| - Use Every.org for 50K sample or incremental updates (within free tier) | |
| """ | |
| # ============================================================================== | |
| # EXAMPLE: FAST ENRICHMENT SCRIPT | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| import argparse | |
| import os | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| parser = argparse.ArgumentParser(description="Fast nonprofit enrichment with async") | |
| parser.add_argument("--input", required=True, help="Input parquet file") | |
| parser.add_argument("--output", required=True, help="Output parquet file") | |
| parser.add_argument("--sample", type=int, help="Sample size (e.g., 50000)") | |
| parser.add_argument("--concurrent", type=int, default=50, help="Concurrent requests") | |
| parser.add_argument("--batch-size", type=int, default=1000, help="Batch size") | |
| args = parser.parse_args() | |
| api_key = os.getenv('EVERYORG_API_KEY') | |
| if not api_key: | |
| print("ERROR: EVERYORG_API_KEY not found in .env") | |
| exit(1) | |
| # Load data | |
| df = pd.read_parquet(args.input) | |
| print(f"Loaded {len(df):,} nonprofits") | |
| # Sample if requested | |
| if args.sample: | |
| df = df.sample(n=min(args.sample, len(df))) | |
| print(f"Sampling {len(df):,} nonprofits") | |
| # Enrich! | |
| enrich_nonprofits_fast( | |
| df, | |
| api_key, | |
| batch_size=args.batch_size, | |
| max_concurrent=args.concurrent, | |
| output_file=args.output | |
| ) | |