Spaces:
Runtime error
Runtime error
| """Core extraction logic for legislator-recipient crosswalk from DIME data.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import duckdb | |
| from .exceptions import InvalidSourceURLError, OutputWriteError, SourceReadError | |
| from .schema import ( | |
| ALLOWED_SOURCE_DOMAINS, | |
| DIME_RECIPIENTS_URL, | |
| EXTRACTION_QUERY, | |
| validate_source_url, | |
| ) | |
| from .validators import ( | |
| ValidationResult, | |
| validate_counts, | |
| validate_sample, | |
| validate_uniqueness, | |
| ) | |
| class ExtractionResult: | |
| """Result of a successful extraction.""" | |
| source_url: str | |
| output_path: Path | |
| source_rows: int | |
| output_count: int | |
| unique_icpsr_count: int | |
| unique_bonica_rid_count: int | |
| validation: ValidationResult | |
| def extract_crosswalk( | |
| output_path: Path | str, | |
| *, | |
| source_url: str = DIME_RECIPIENTS_URL, | |
| validate: bool = True, | |
| sample_size: int = 100, | |
| ) -> ExtractionResult: | |
| """ | |
| Extract legislator-recipient crosswalk from DIME Recipients data. | |
| Reads from HuggingFace, filters for records with ICPSR identifiers, | |
| and writes distinct mappings to a local Parquet file with ZSTD compression. | |
| Args: | |
| output_path: Path for output .parquet file | |
| source_url: URL to source parquet file (default: HF DIME Recipients) | |
| validate: Whether to run validation after extraction | |
| sample_size: Sample size for validation | |
| Returns: | |
| ExtractionResult with extraction details and validation results | |
| Raises: | |
| InvalidSourceURLError: If source URL is not from an allowed domain | |
| SourceReadError: If source data cannot be read | |
| ValidationError: If validation fails | |
| DuplicateKeyError: If duplicate key pairs are found | |
| OutputWriteError: If output cannot be written | |
| """ | |
| output_path = Path(output_path) | |
| # Validate source URL to prevent SQL injection | |
| if not validate_source_url(source_url): | |
| raise InvalidSourceURLError( | |
| message="Source URL must be from an allowed domain", | |
| source_url=source_url, | |
| allowed_domains=ALLOWED_SOURCE_DOMAINS, | |
| ) | |
| with duckdb.connect() as conn: | |
| # Step 1: Count source rows | |
| print(f"Reading from: {source_url}") | |
| try: | |
| source_rows = conn.execute(f""" | |
| SELECT COUNT(*) | |
| FROM read_parquet('{source_url}') | |
| WHERE "ICPSR" IS NOT NULL | |
| AND "ICPSR" != '' | |
| AND "bonica.rid" IS NOT NULL | |
| AND "bonica.rid" != '' | |
| """).fetchone()[0] | |
| except Exception as e: | |
| raise SourceReadError( | |
| message=str(e), | |
| source_url=source_url, | |
| ) from e | |
| print(f" Source rows with ICPSR: {source_rows:,}") | |
| # Step 2: Execute extraction query | |
| print("Extracting distinct crosswalk mappings...") | |
| query = EXTRACTION_QUERY.format(source_url=source_url) | |
| # Step 3: Write to parquet | |
| try: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| conn.execute(f""" | |
| COPY ({query}) | |
| TO '{output_path}' (FORMAT PARQUET, COMPRESSION ZSTD, COMPRESSION_LEVEL 3) | |
| """) | |
| except Exception as e: | |
| raise OutputWriteError( | |
| message=str(e), | |
| output_path=output_path, | |
| ) from e | |
| # Step 4: Count output and get unique counts | |
| output_count = conn.execute(f""" | |
| SELECT COUNT(*) FROM read_parquet('{output_path}') | |
| """).fetchone()[0] | |
| print(f" Crosswalk rows: {output_count:,}") | |
| unique_icpsr = conn.execute(f""" | |
| SELECT COUNT(DISTINCT icpsr) FROM read_parquet('{output_path}') | |
| """).fetchone()[0] | |
| print(f" Unique ICPSR values: {unique_icpsr:,}") | |
| unique_bonica_rid = conn.execute(f""" | |
| SELECT COUNT(DISTINCT bonica_rid) FROM read_parquet('{output_path}') | |
| """).fetchone()[0] | |
| print(f" Unique bonica_rid values: {unique_bonica_rid:,}") | |
| # Step 5: Validate | |
| validation = ValidationResult() | |
| if validate: | |
| print("Validating...") | |
| # Tier 1: Counts | |
| validation = validate_counts(source_url, output_path, conn) | |
| print(f" Tier 1 (Counts): PASS ({validation.output_count:,} rows)") | |
| # Tier 2: Uniqueness | |
| validation = validate_uniqueness(output_path, conn, validation) | |
| print( | |
| f" Tier 2 (Uniqueness): PASS ({validation.unique_icpsr_count:,} ICPSR → " | |
| f"{validation.unique_bonica_rid_count:,} bonica_rid)" | |
| ) | |
| # Tier 3: Sample | |
| validation = validate_sample(source_url, output_path, conn, validation, sample_size) | |
| print(f" Tier 3 (Sample): PASS ({validation.sample_size} rows verified)") | |
| return ExtractionResult( | |
| source_url=source_url, | |
| output_path=output_path, | |
| source_rows=source_rows, | |
| output_count=output_count, | |
| unique_icpsr_count=unique_icpsr, | |
| unique_bonica_rid_count=unique_bonica_rid, | |
| validation=validation, | |
| ) | |