Spaces:
Runtime error
Runtime error
| """Core extraction logic for distinct legislators from Voteview data.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import duckdb | |
| from .exceptions import InvalidSourceURLError, OutputWriteError, SourceReadError | |
| from .schema import ( | |
| AGGREGATION_QUERY, | |
| ALLOWED_SOURCE_DOMAINS, | |
| MIN_CONGRESS, | |
| VOTEVIEW_MEMBERS_URL, | |
| validate_source_url, | |
| ) | |
| from .validators import ( | |
| ValidationResult, | |
| validate_aggregation, | |
| validate_completeness, | |
| validate_sample, | |
| ) | |
| class ExtractionResult: | |
| """Result of a successful extraction.""" | |
| source_url: str | |
| output_path: Path | |
| source_rows: int | |
| output_count: int | |
| validation: ValidationResult | |
| def extract_distinct_legislators( | |
| output_path: Path | str, | |
| *, | |
| source_url: str = VOTEVIEW_MEMBERS_URL, | |
| min_congress: int = MIN_CONGRESS, | |
| validate: bool = True, | |
| aggregation_sample_size: int = 100, | |
| deep_sample_size: int = 50, | |
| ) -> ExtractionResult: | |
| """ | |
| Extract distinct legislators from Voteview HSall_members data. | |
| Reads from HuggingFace, filters by congress, aggregates by bioguide_id, | |
| and writes to a local Parquet file with ZSTD compression. | |
| Args: | |
| output_path: Path for output .parquet file | |
| source_url: URL to source parquet file (default: HF Voteview members) | |
| min_congress: Minimum congress number to include (default: 96 = 1979) | |
| validate: Whether to run three-tier validation after extraction | |
| aggregation_sample_size: Sample size for Tier 2 validation | |
| deep_sample_size: Sample size for Tier 3 validation | |
| Returns: | |
| ExtractionResult with extraction details and validation results | |
| Raises: | |
| InvalidSourceURLError: If source URL is not from an allowed domain | |
| SourceReadError: If source data cannot be read | |
| CompletenessError: If Tier 1 validation fails | |
| AggregationError: If Tier 2 validation fails | |
| SampleValidationError: If Tier 3 validation fails | |
| OutputWriteError: If output cannot be written | |
| """ | |
| output_path = Path(output_path) | |
| # Validate source URL to prevent SQL injection | |
| if not validate_source_url(source_url): | |
| raise InvalidSourceURLError( | |
| message="Source URL must be from an allowed domain", | |
| source_url=source_url, | |
| allowed_domains=ALLOWED_SOURCE_DOMAINS, | |
| ) | |
| conn = duckdb.connect() | |
| try: | |
| # Step 1: Count source rows | |
| print(f"Reading from: {source_url}") | |
| try: | |
| source_rows = conn.execute(f""" | |
| SELECT COUNT(*) | |
| FROM read_parquet('{source_url}') | |
| WHERE congress >= {min_congress} | |
| AND bioguide_id IS NOT NULL | |
| """).fetchone()[0] | |
| except Exception as e: | |
| raise SourceReadError( | |
| message=str(e), | |
| source_url=source_url, | |
| ) from e | |
| print(f" Source rows (congress >= {min_congress}): {source_rows:,}") | |
| # Step 2: Execute aggregation query | |
| print("Aggregating by bioguide_id...") | |
| query = AGGREGATION_QUERY.format( | |
| source_url=source_url, | |
| min_congress=min_congress, | |
| ) | |
| # Step 3: Write to parquet | |
| try: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| conn.execute(f""" | |
| COPY ({query}) | |
| TO '{output_path}' (FORMAT PARQUET, COMPRESSION ZSTD, COMPRESSION_LEVEL 3) | |
| """) | |
| except Exception as e: | |
| raise OutputWriteError( | |
| message=str(e), | |
| output_path=output_path, | |
| ) from e | |
| # Step 4: Count output | |
| output_count = conn.execute(f""" | |
| SELECT COUNT(*) FROM read_parquet('{output_path}') | |
| """).fetchone()[0] | |
| print(f" Distinct legislators: {output_count:,}") | |
| # Step 5: Validate | |
| validation = ValidationResult() | |
| if validate: | |
| print("Validating...") | |
| # Tier 1: Completeness | |
| validation = validate_completeness(source_url, output_path, conn, min_congress) | |
| print(f" Tier 1 (Completeness): PASS ({validation.output_count:,} legislators)") | |
| # Tier 2: Aggregation Integrity | |
| validation = validate_aggregation( | |
| source_url, | |
| output_path, | |
| conn, | |
| validation, | |
| min_congress, | |
| sample_size=aggregation_sample_size, | |
| ) | |
| print(f" Tier 2 (Aggregation): PASS ({validation.aggregation_checks_passed} checks)") | |
| # Tier 3: Sample Verification | |
| validation = validate_sample( | |
| source_url, | |
| output_path, | |
| conn, | |
| validation, | |
| min_congress, | |
| sample_size=deep_sample_size, | |
| ) | |
| print(f" Tier 3 (Sample): PASS ({validation.sample_size} legislators verified)") | |
| return ExtractionResult( | |
| source_url=source_url, | |
| output_path=output_path, | |
| source_rows=source_rows, | |
| output_count=output_count, | |
| validation=validation, | |
| ) | |
| finally: | |
| conn.close() | |