File size: 11,920 Bytes
896453f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
#!/usr/bin/env python3
"""
Create partitioned parquet datasets for efficient state-based queries.

This converts consolidated files into partitioned datasets where each partition
represents a state. This allows:
- Querying the full national dataset
- Automatically filtering to only read needed states (partition pruning)
- Efficient analytics with tools like Spark, DuckDB, Pandas

Example:
    # Read only Alabama data (only reads AL partition)
    df = pd.read_parquet('data/gold/nonprofits_organizations', 
                         filters=[('state', '=', 'AL')])
    
    # Read multiple states
    df = pd.read_parquet('data/gold/nonprofits_organizations',
                         filters=[('state', 'in', ['AL', 'GA', 'FL'])])
    
    # Read everything (reads all partitions)
    df = pd.read_parquet('data/gold/nonprofits_organizations')

Usage:
    # Create all partitioned datasets
    python scripts/create_partitioned_datasets.py --all
    
    # Create specific dataset
    python scripts/create_partitioned_datasets.py --file nonprofits_organizations.parquet
    
    # Dry run
    python scripts/create_partitioned_datasets.py --all --dry-run
"""

import argparse
from pathlib import Path
import pandas as pd
from loguru import logger
from typing import List, Dict


class PartitionedDatasetCreator:
    """Create partitioned parquet datasets by state."""
    
    # Files that have direct 'state' column
    STATE_COLUMN_FILES = {
        'nonprofits_organizations.parquet': 'state',
        'nonprofits_locations.parquet': 'state',
    }
    
    # Files that need state added via join with organizations (by EIN)
    EIN_JOIN_FILES = {
        'nonprofits_financials.parquet': 'ein',
        'nonprofits_programs.parquet': 'ein',
    }
    
    # Files that have 'State' column (capitalized)
    STATE_UPPER_FILES = {
        'domains_gsa_domains.parquet': 'State',
    }
    
    # Files that have 'USPS' column (state abbreviation)
    USPS_FILES = {
        'jurisdictions_cities.parquet': 'USPS',
        'jurisdictions_counties.parquet': 'USPS',
        'jurisdictions_school_districts.parquet': 'USPS',
        'jurisdictions_townships.parquet': 'USPS',
    }
    
    def __init__(self, gold_dir: str = "data/gold", output_dir: str = None):
        """
        Initialize creator.
        
        Args:
            gold_dir: Directory containing gold parquet files and output location
            output_dir: Directory for partitioned datasets (defaults to same as gold_dir)
        """
        self.gold_dir = Path(gold_dir)
        self.output_dir = Path(output_dir) if output_dir else self.gold_dir
        
        # Combined mapping
        self.all_files = {
            **self.STATE_COLUMN_FILES,
            **self.STATE_UPPER_FILES,
            **self.USPS_FILES,
            **self.EIN_JOIN_FILES,
        }
        
        # Cache for organizations EIN→state mapping (loaded once if needed)
        self._ein_state_map = None
    
    def get_state_column(self, filename: str) -> str:
        """Get the state column name for a file."""
        if filename in self.STATE_COLUMN_FILES:
            return self.STATE_COLUMN_FILES[filename]
        elif filename in self.STATE_UPPER_FILES:
            return self.STATE_UPPER_FILES[filename]
        elif filename in self.USPS_FILES:
            return self.USPS_FILES[filename]
        elif filename in self.EIN_JOIN_FILES:
            return 'state'  # Will be added via join
        else:
            raise ValueError(f"Unknown file: {filename}")
    
    def _load_ein_state_mapping(self):
        """Load EIN→state mapping from organizations (cached)."""
        if self._ein_state_map is not None:
            return self._ein_state_map
        
        import pyarrow.dataset as ds
        
        logger.info("  Loading EIN→state mapping from organizations...")
        org_path = self.output_dir / 'nonprofits_organizations'
        
        # Try partitioned dataset first
        if org_path.exists():
            dataset = ds.dataset(org_path, format='parquet', partitioning='hive')
            table = dataset.to_table(columns=['ein', 'state'])
            self._ein_state_map = table.to_pandas()
        else:
            # Fall back to consolidated file
            org_file = self.gold_dir / 'nonprofits_organizations.parquet'
            if org_file.exists():
                self._ein_state_map = pd.read_parquet(org_file, columns=['ein', 'state'])
            else:
                raise FileNotFoundError(
                    "Cannot find nonprofits_organizations dataset or file. "
                    "Create it first before processing EIN-based files."
                )
        
        logger.info(f"  Loaded {len(self._ein_state_map):,} EIN→state mappings")
        return self._ein_state_map
    
    def create_partitioned_dataset(self, filename: str, dry_run: bool = False) -> bool:
        """
        Create a partitioned dataset from a consolidated file.
        
        Args:
            filename: Name of file to partition (e.g., 'nonprofits_organizations.parquet')
            dry_run: If True, only report what would be done
            
        Returns:
            True if successful, False otherwise
        """
        input_path = self.gold_dir / filename
        
        if not input_path.exists():
            logger.warning(f"File not found: {input_path}")
            return False
        
        logger.info(f"📂 Processing: {filename}")
        
        # Read the file
        df = pd.read_parquet(input_path)
        logger.info(f"  Total records: {len(df):,}")
        
        # Get state column
        state_col = self.get_state_column(filename)
        
        # Check if we need to add state via join
        if filename in self.EIN_JOIN_FILES:
            ein_col = self.EIN_JOIN_FILES[filename]
            
            if ein_col not in df.columns:
                logger.error(f"  ❌ Column '{ein_col}' not found in {filename}")
                return False
            
            # Load EIN→state mapping
            ein_state_map = self._load_ein_state_mapping()
            
            # Join to add state
            logger.info(f"  Joining with organizations to add state column...")
            df = df.merge(ein_state_map[['ein', 'state']], on=ein_col, how='left')
            logger.info(f"  Records with state: {df['state'].notna().sum():,} / {len(df):,}")
            
        elif state_col not in df.columns:
            logger.error(f"  ❌ Column '{state_col}' not found in {filename}")
            return False
        
        # Normalize column name to 'state' for consistent partitioning
        if state_col != 'state':
            df['state'] = df[state_col]
            # Keep original column too for backward compatibility
        
        # Create output directory name (remove .parquet extension)
        dataset_name = filename.replace('.parquet', '')
        output_path = self.output_dir / dataset_name
        
        if dry_run:
            unique_states = df['state'].nunique()
            total_size = input_path.stat().st_size / 1024 / 1024
            logger.info(f"  [DRY RUN] Would create partitioned dataset:")
            logger.info(f"    Path: {output_path}")
            logger.info(f"    Partitions: {unique_states} states")
            logger.info(f"    Total size: {total_size:.2f} MB")
            return True
        
        # Write partitioned dataset
        logger.info(f"  Writing partitioned dataset to: {output_path}")
        df.to_parquet(
            output_path,
            engine='pyarrow',
            partition_cols=['state'],
            index=False
        )
        
        # Get statistics
        partitions = list((output_path).glob('state=*'))
        total_size = sum(
            sum(f.stat().st_size for f in partition.rglob('*.parquet'))
            for partition in partitions
        ) / 1024 / 1024
        
        logger.success(f"✅ Created partitioned dataset:")
        logger.success(f"   Path: {output_path}")
        logger.success(f"   Partitions: {len(partitions)} states")
        logger.success(f"   Total size: {total_size:.2f} MB")
        logger.success(f"   Query example: pd.read_parquet('{output_path}', filters=[('state', '=', 'AL')])")
        
        return True
    
    def create_all(self, dry_run: bool = False) -> Dict[str, bool]:
        """
        Create partitioned datasets for all configured files.
        
        Args:
            dry_run: If True, only report what would be done
            
        Returns:
            Dict mapping filename to success status
        """
        logger.info("🚀 Creating partitioned datasets...")
        logger.info(f"  Input directory: {self.gold_dir}")
        logger.info(f"  Output directory: {self.output_dir}")
        logger.info("")
        
        # Create output directory
        if not dry_run:
            self.output_dir.mkdir(parents=True, exist_ok=True)
        
        results = {}
        for filename in self.all_files.keys():
            try:
                success = self.create_partitioned_dataset(filename, dry_run=dry_run)
                results[filename] = success
                logger.info("")
            except Exception as e:
                logger.error(f"❌ Error processing {filename}: {e}")
                results[filename] = False
                logger.info("")
        
        # Summary
        successful = sum(1 for v in results.values() if v)
        logger.success("=" * 70)
        logger.success(f"✅ Created {successful}/{len(results)} partitioned datasets")
        logger.success(f"📂 Output directory: {self.output_dir}")
        logger.success("=" * 70)
        
        return results


def main():
    parser = argparse.ArgumentParser(
        description="Create partitioned parquet datasets by state",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Create all partitioned datasets
  python scripts/create_partitioned_datasets.py --all
  
  # Create specific dataset
  python scripts/create_partitioned_datasets.py --file nonprofits_organizations.parquet
  
  # Dry run
  python scripts/create_partitioned_datasets.py --all --dry-run

Query Examples:
  # Read only Alabama data (efficient - only reads AL partition)
  import pandas as pd
  df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations',
                       filters=[('state', '=', 'AL')])
  
  # Read multiple states
  df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations',
                       filters=[('state', 'in', ['AL', 'GA', 'FL'])])
  
  # Read all states
  df = pd.read_parquet('data/gold/partitioned/nonprofits_organizations')
        """
    )
    
    parser.add_argument('--all', action='store_true',
                       help='Create all partitioned datasets')
    parser.add_argument('--file', type=str,
                       help='Create partitioned dataset for specific file')
    parser.add_argument('--dry-run', action='store_true',
                       help='Show what would be done without creating datasets')
    parser.add_argument('--gold-dir', type=str, default='data/gold',
                       help='Directory containing gold parquet files (default: data/gold)')
    parser.add_argument('--output-dir', type=str,
                       help='Output directory for partitioned datasets (default: same as gold-dir)')
    
    args = parser.parse_args()
    
    # Initialize creator
    creator = PartitionedDatasetCreator(
        gold_dir=args.gold_dir,
        output_dir=args.output_dir
    )
    
    # Handle commands
    if args.all:
        creator.create_all(dry_run=args.dry_run)
    elif args.file:
        creator.create_partitioned_dataset(args.file, dry_run=args.dry_run)
    else:
        parser.print_help()


if __name__ == "__main__":
    main()