Hoe
Deploying Backend API
b339b93
"""Command-line interface for Congress Legislators CSV to Parquet converter."""
import argparse
import sys
from datetime import datetime
from pathlib import Path
from .congress_utils import DEFAULT_MIN_CONGRESS
from .converter import convert_legislators_file
from .downloader import download_all, download_file
from .exceptions import CongressLegislatorsConversionError, DownloadError, SourceNotFoundError
from .extractor import extract_unified_legislators
from .schema import FileType
def detect_file_type(filename: str) -> FileType:
"""
Auto-detect file type from filename.
Recognizes patterns like:
- legislators-current.csv -> CURRENT
- legislators-historical.csv -> HISTORICAL
"""
name_lower = filename.lower()
if "current" in name_lower:
return FileType.CURRENT
if "historical" in name_lower:
return FileType.HISTORICAL
raise ValueError(f"Cannot auto-detect file type from: {filename}")
def cmd_download(args: argparse.Namespace) -> int:
"""Handle the download subcommand."""
output_dir = Path(args.output_dir)
print(f"[{datetime.now().isoformat()}] Downloading legislators data...")
try:
if args.file_type:
file_type = FileType(args.file_type)
path = download_file(file_type, output_dir)
print(f" Downloaded: {path}")
else:
paths = download_all(output_dir)
for ft, path in paths.items():
print(f" Downloaded {ft.value}: {path}")
print(f"[{datetime.now().isoformat()}] Download complete")
return 0
except DownloadError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
def cmd_convert(args: argparse.Namespace) -> int:
"""Handle the convert subcommand."""
source_path = Path(args.source)
output_path = Path(args.output)
# Validate source exists
if not source_path.exists():
print(f"ERROR: Source file not found: {source_path}", file=sys.stderr)
return 1
# Determine file type
if args.file_type:
file_type = FileType(args.file_type)
else:
try:
file_type = detect_file_type(source_path.name)
except ValueError as e:
print(f"ERROR: {e}", file=sys.stderr)
print("Use -t/--file-type to specify explicitly", file=sys.stderr)
return 1
print(f"[{datetime.now().isoformat()}] Converting: {source_path.name}")
print(f" File type: {file_type.value}")
try:
result = convert_legislators_file(
source_path,
output_path,
file_type,
validate=not args.no_validate,
sample_size=args.sample_size,
batch_size=args.batch_size,
)
print(f"[{datetime.now().isoformat()}] SUCCESS: {result.row_count:,} rows")
print(f" Output: {result.output_path}")
if args.no_validate:
print(" Validation: SKIPPED")
else:
print(" Validation: ALL PASSED")
return 0
except CongressLegislatorsConversionError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
def cmd_all(args: argparse.Namespace) -> int:
"""Handle the all subcommand (download + convert all files)."""
output_dir = Path(args.output_dir)
print(f"[{datetime.now().isoformat()}] Processing all legislators data...")
try:
# Step 1: Download all files
print("\n=== Downloading ===")
csv_paths = download_all(output_dir)
# Step 2: Convert all files
print("\n=== Converting ===")
for file_type, csv_path in csv_paths.items():
parquet_path = output_dir / f"legislators-{file_type.value}.parquet"
print(f"\n[{datetime.now().isoformat()}] Converting: {csv_path.name}")
print(f" File type: {file_type.value}")
result = convert_legislators_file(
csv_path,
parquet_path,
file_type,
validate=not args.no_validate,
sample_size=args.sample_size,
batch_size=args.batch_size,
)
print(f" SUCCESS: {result.row_count:,} rows -> {parquet_path.name}")
print(f"\n[{datetime.now().isoformat()}] All conversions complete")
return 0
except (DownloadError, CongressLegislatorsConversionError) as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
def cmd_unified(args: argparse.Namespace) -> int:
"""Handle the unified subcommand (merge current + historical into single file)."""
output_dir = Path(args.output_dir)
current_path = output_dir / "legislators-current.parquet"
historical_path = output_dir / "legislators-historical.parquet"
output_path = output_dir / "legislators.parquet"
# Handle min_congress: None means include all, otherwise use the value
min_congress = None if args.all_congresses else args.min_congress
print(f"[{datetime.now().isoformat()}] Creating unified legislators file...")
try:
result = extract_unified_legislators(
current_path=current_path,
historical_path=historical_path,
output_path=output_path,
validate=not args.no_validate,
sample_size=args.sample_size,
min_congress=min_congress,
)
print(f"\n[{datetime.now().isoformat()}] SUCCESS")
print(f" Output: {result.output_path}")
print(f" Total legislators: {result.output_count:,}")
if result.min_congress is not None:
print(f" Congress filter: {result.min_congress}+")
if result.filtered_out_count > 0:
print(f" Filtered out: {result.filtered_out_count:,}")
print(f" Current: {result.current_count:,}")
print(f" Historical only: {result.output_count - result.current_count:,}")
print(f" With FEC IDs: {result.fec_ids_populated_count:,}")
print(f" With ICPSR: {result.icpsr_populated_count:,}")
return 0
except SourceNotFoundError as e:
print(f"ERROR: {e}", file=sys.stderr)
print("Run 'all' command first to generate source parquet files", file=sys.stderr)
return 1
except CongressLegislatorsConversionError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
def main() -> int:
"""Main entry point for the CLI."""
parser = argparse.ArgumentParser(
description="Download and convert Congress Legislators data to Parquet",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(dest="command", required=True)
# Download subcommand
download_parser = subparsers.add_parser(
"download",
help="Download CSV files from unitedstates.github.io",
)
download_parser.add_argument(
"--output-dir",
"-o",
type=str,
default=".",
help="Directory to save downloaded files (default: current directory)",
)
download_parser.add_argument(
"-t",
"--file-type",
choices=["current", "historical"],
default=None,
help="Specific file type to download (downloads all if not specified)",
)
download_parser.set_defaults(func=cmd_download)
# Convert subcommand
convert_parser = subparsers.add_parser(
"convert",
help="Convert a CSV file to Parquet format",
)
convert_parser.add_argument(
"source",
type=str,
help="Source CSV file path",
)
convert_parser.add_argument(
"output",
type=str,
help="Output Parquet file path",
)
convert_parser.add_argument(
"-t",
"--file-type",
choices=["current", "historical"],
default=None,
help="Type of file (auto-detected from filename if not specified)",
)
convert_parser.add_argument(
"--no-validate",
action="store_true",
help="Skip validation (not recommended)",
)
convert_parser.add_argument(
"--sample-size",
type=int,
default=None,
help="Number of rows to sample for validation",
)
convert_parser.add_argument(
"--batch-size",
type=int,
default=100_000,
help="Rows per batch for streaming conversion (default: 100000)",
)
convert_parser.set_defaults(func=cmd_convert)
# All subcommand (download + convert)
all_parser = subparsers.add_parser(
"all",
help="Download and convert all files",
)
all_parser.add_argument(
"--output-dir",
"-o",
type=str,
default=".",
help="Directory for downloads and conversions (default: current directory)",
)
all_parser.add_argument(
"--no-validate",
action="store_true",
help="Skip validation (not recommended)",
)
all_parser.add_argument(
"--sample-size",
type=int,
default=None,
help="Number of rows to sample for validation",
)
all_parser.add_argument(
"--batch-size",
type=int,
default=100_000,
help="Rows per batch for streaming conversion (default: 100000)",
)
all_parser.set_defaults(func=cmd_all)
# Unified subcommand (merge current + historical)
unified_parser = subparsers.add_parser(
"unified",
help="Merge current + historical into single legislators.parquet",
)
unified_parser.add_argument(
"--output-dir",
"-o",
type=str,
default=".",
help="Directory containing source parquet files (default: current directory)",
)
unified_parser.add_argument(
"--min-congress",
type=int,
default=DEFAULT_MIN_CONGRESS,
help=f"Minimum congress to include (default: {DEFAULT_MIN_CONGRESS}, i.e. 1979+)",
)
unified_parser.add_argument(
"--all-congresses",
action="store_true",
help="Include all legislators (no congress filtering)",
)
unified_parser.add_argument(
"--no-validate",
action="store_true",
help="Skip validation (not recommended)",
)
unified_parser.add_argument(
"--sample-size",
type=int,
default=100,
help="Number of rows to sample for validation (default: 100)",
)
unified_parser.set_defaults(func=cmd_unified)
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
sys.exit(main())