Spaces:
Runtime error
Runtime error
| """Command-line interface for Congress Legislators CSV to Parquet converter.""" | |
| import argparse | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| from .congress_utils import DEFAULT_MIN_CONGRESS | |
| from .converter import convert_legislators_file | |
| from .downloader import download_all, download_file | |
| from .exceptions import CongressLegislatorsConversionError, DownloadError, SourceNotFoundError | |
| from .extractor import extract_unified_legislators | |
| from .schema import FileType | |
| def detect_file_type(filename: str) -> FileType: | |
| """ | |
| Auto-detect file type from filename. | |
| Recognizes patterns like: | |
| - legislators-current.csv -> CURRENT | |
| - legislators-historical.csv -> HISTORICAL | |
| """ | |
| name_lower = filename.lower() | |
| if "current" in name_lower: | |
| return FileType.CURRENT | |
| if "historical" in name_lower: | |
| return FileType.HISTORICAL | |
| raise ValueError(f"Cannot auto-detect file type from: {filename}") | |
| def cmd_download(args: argparse.Namespace) -> int: | |
| """Handle the download subcommand.""" | |
| output_dir = Path(args.output_dir) | |
| print(f"[{datetime.now().isoformat()}] Downloading legislators data...") | |
| try: | |
| if args.file_type: | |
| file_type = FileType(args.file_type) | |
| path = download_file(file_type, output_dir) | |
| print(f" Downloaded: {path}") | |
| else: | |
| paths = download_all(output_dir) | |
| for ft, path in paths.items(): | |
| print(f" Downloaded {ft.value}: {path}") | |
| print(f"[{datetime.now().isoformat()}] Download complete") | |
| return 0 | |
| except DownloadError as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| return 1 | |
| def cmd_convert(args: argparse.Namespace) -> int: | |
| """Handle the convert subcommand.""" | |
| source_path = Path(args.source) | |
| output_path = Path(args.output) | |
| # Validate source exists | |
| if not source_path.exists(): | |
| print(f"ERROR: Source file not found: {source_path}", file=sys.stderr) | |
| return 1 | |
| # Determine file type | |
| if args.file_type: | |
| file_type = FileType(args.file_type) | |
| else: | |
| try: | |
| file_type = detect_file_type(source_path.name) | |
| except ValueError as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| print("Use -t/--file-type to specify explicitly", file=sys.stderr) | |
| return 1 | |
| print(f"[{datetime.now().isoformat()}] Converting: {source_path.name}") | |
| print(f" File type: {file_type.value}") | |
| try: | |
| result = convert_legislators_file( | |
| source_path, | |
| output_path, | |
| file_type, | |
| validate=not args.no_validate, | |
| sample_size=args.sample_size, | |
| batch_size=args.batch_size, | |
| ) | |
| print(f"[{datetime.now().isoformat()}] SUCCESS: {result.row_count:,} rows") | |
| print(f" Output: {result.output_path}") | |
| if args.no_validate: | |
| print(" Validation: SKIPPED") | |
| else: | |
| print(" Validation: ALL PASSED") | |
| return 0 | |
| except CongressLegislatorsConversionError as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| return 1 | |
| def cmd_all(args: argparse.Namespace) -> int: | |
| """Handle the all subcommand (download + convert all files).""" | |
| output_dir = Path(args.output_dir) | |
| print(f"[{datetime.now().isoformat()}] Processing all legislators data...") | |
| try: | |
| # Step 1: Download all files | |
| print("\n=== Downloading ===") | |
| csv_paths = download_all(output_dir) | |
| # Step 2: Convert all files | |
| print("\n=== Converting ===") | |
| for file_type, csv_path in csv_paths.items(): | |
| parquet_path = output_dir / f"legislators-{file_type.value}.parquet" | |
| print(f"\n[{datetime.now().isoformat()}] Converting: {csv_path.name}") | |
| print(f" File type: {file_type.value}") | |
| result = convert_legislators_file( | |
| csv_path, | |
| parquet_path, | |
| file_type, | |
| validate=not args.no_validate, | |
| sample_size=args.sample_size, | |
| batch_size=args.batch_size, | |
| ) | |
| print(f" SUCCESS: {result.row_count:,} rows -> {parquet_path.name}") | |
| print(f"\n[{datetime.now().isoformat()}] All conversions complete") | |
| return 0 | |
| except (DownloadError, CongressLegislatorsConversionError) as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| return 1 | |
| def cmd_unified(args: argparse.Namespace) -> int: | |
| """Handle the unified subcommand (merge current + historical into single file).""" | |
| output_dir = Path(args.output_dir) | |
| current_path = output_dir / "legislators-current.parquet" | |
| historical_path = output_dir / "legislators-historical.parquet" | |
| output_path = output_dir / "legislators.parquet" | |
| # Handle min_congress: None means include all, otherwise use the value | |
| min_congress = None if args.all_congresses else args.min_congress | |
| print(f"[{datetime.now().isoformat()}] Creating unified legislators file...") | |
| try: | |
| result = extract_unified_legislators( | |
| current_path=current_path, | |
| historical_path=historical_path, | |
| output_path=output_path, | |
| validate=not args.no_validate, | |
| sample_size=args.sample_size, | |
| min_congress=min_congress, | |
| ) | |
| print(f"\n[{datetime.now().isoformat()}] SUCCESS") | |
| print(f" Output: {result.output_path}") | |
| print(f" Total legislators: {result.output_count:,}") | |
| if result.min_congress is not None: | |
| print(f" Congress filter: {result.min_congress}+") | |
| if result.filtered_out_count > 0: | |
| print(f" Filtered out: {result.filtered_out_count:,}") | |
| print(f" Current: {result.current_count:,}") | |
| print(f" Historical only: {result.output_count - result.current_count:,}") | |
| print(f" With FEC IDs: {result.fec_ids_populated_count:,}") | |
| print(f" With ICPSR: {result.icpsr_populated_count:,}") | |
| return 0 | |
| except SourceNotFoundError as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| print("Run 'all' command first to generate source parquet files", file=sys.stderr) | |
| return 1 | |
| except CongressLegislatorsConversionError as e: | |
| print(f"ERROR: {e}", file=sys.stderr) | |
| return 1 | |
| def main() -> int: | |
| """Main entry point for the CLI.""" | |
| parser = argparse.ArgumentParser( | |
| description="Download and convert Congress Legislators data to Parquet", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| # Download subcommand | |
| download_parser = subparsers.add_parser( | |
| "download", | |
| help="Download CSV files from unitedstates.github.io", | |
| ) | |
| download_parser.add_argument( | |
| "--output-dir", | |
| "-o", | |
| type=str, | |
| default=".", | |
| help="Directory to save downloaded files (default: current directory)", | |
| ) | |
| download_parser.add_argument( | |
| "-t", | |
| "--file-type", | |
| choices=["current", "historical"], | |
| default=None, | |
| help="Specific file type to download (downloads all if not specified)", | |
| ) | |
| download_parser.set_defaults(func=cmd_download) | |
| # Convert subcommand | |
| convert_parser = subparsers.add_parser( | |
| "convert", | |
| help="Convert a CSV file to Parquet format", | |
| ) | |
| convert_parser.add_argument( | |
| "source", | |
| type=str, | |
| help="Source CSV file path", | |
| ) | |
| convert_parser.add_argument( | |
| "output", | |
| type=str, | |
| help="Output Parquet file path", | |
| ) | |
| convert_parser.add_argument( | |
| "-t", | |
| "--file-type", | |
| choices=["current", "historical"], | |
| default=None, | |
| help="Type of file (auto-detected from filename if not specified)", | |
| ) | |
| convert_parser.add_argument( | |
| "--no-validate", | |
| action="store_true", | |
| help="Skip validation (not recommended)", | |
| ) | |
| convert_parser.add_argument( | |
| "--sample-size", | |
| type=int, | |
| default=None, | |
| help="Number of rows to sample for validation", | |
| ) | |
| convert_parser.add_argument( | |
| "--batch-size", | |
| type=int, | |
| default=100_000, | |
| help="Rows per batch for streaming conversion (default: 100000)", | |
| ) | |
| convert_parser.set_defaults(func=cmd_convert) | |
| # All subcommand (download + convert) | |
| all_parser = subparsers.add_parser( | |
| "all", | |
| help="Download and convert all files", | |
| ) | |
| all_parser.add_argument( | |
| "--output-dir", | |
| "-o", | |
| type=str, | |
| default=".", | |
| help="Directory for downloads and conversions (default: current directory)", | |
| ) | |
| all_parser.add_argument( | |
| "--no-validate", | |
| action="store_true", | |
| help="Skip validation (not recommended)", | |
| ) | |
| all_parser.add_argument( | |
| "--sample-size", | |
| type=int, | |
| default=None, | |
| help="Number of rows to sample for validation", | |
| ) | |
| all_parser.add_argument( | |
| "--batch-size", | |
| type=int, | |
| default=100_000, | |
| help="Rows per batch for streaming conversion (default: 100000)", | |
| ) | |
| all_parser.set_defaults(func=cmd_all) | |
| # Unified subcommand (merge current + historical) | |
| unified_parser = subparsers.add_parser( | |
| "unified", | |
| help="Merge current + historical into single legislators.parquet", | |
| ) | |
| unified_parser.add_argument( | |
| "--output-dir", | |
| "-o", | |
| type=str, | |
| default=".", | |
| help="Directory containing source parquet files (default: current directory)", | |
| ) | |
| unified_parser.add_argument( | |
| "--min-congress", | |
| type=int, | |
| default=DEFAULT_MIN_CONGRESS, | |
| help=f"Minimum congress to include (default: {DEFAULT_MIN_CONGRESS}, i.e. 1979+)", | |
| ) | |
| unified_parser.add_argument( | |
| "--all-congresses", | |
| action="store_true", | |
| help="Include all legislators (no congress filtering)", | |
| ) | |
| unified_parser.add_argument( | |
| "--no-validate", | |
| action="store_true", | |
| help="Skip validation (not recommended)", | |
| ) | |
| unified_parser.add_argument( | |
| "--sample-size", | |
| type=int, | |
| default=100, | |
| help="Number of rows to sample for validation (default: 100)", | |
| ) | |
| unified_parser.set_defaults(func=cmd_unified) | |
| args = parser.parse_args() | |
| return args.func(args) | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |