Hoe
Deploying Backend API
b339b93
"""Core extraction logic for legislator-recipient crosswalk from DIME data."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import duckdb
from .exceptions import InvalidSourceURLError, OutputWriteError, SourceReadError
from .schema import (
ALLOWED_SOURCE_DOMAINS,
DIME_RECIPIENTS_URL,
EXTRACTION_QUERY,
validate_source_url,
)
from .validators import (
ValidationResult,
validate_counts,
validate_sample,
validate_uniqueness,
)
@dataclass
class ExtractionResult:
"""Result of a successful extraction."""
source_url: str
output_path: Path
source_rows: int
output_count: int
unique_icpsr_count: int
unique_bonica_rid_count: int
validation: ValidationResult
def extract_crosswalk(
output_path: Path | str,
*,
source_url: str = DIME_RECIPIENTS_URL,
validate: bool = True,
sample_size: int = 100,
) -> ExtractionResult:
"""
Extract legislator-recipient crosswalk from DIME Recipients data.
Reads from HuggingFace, filters for records with ICPSR identifiers,
and writes distinct mappings to a local Parquet file with ZSTD compression.
Args:
output_path: Path for output .parquet file
source_url: URL to source parquet file (default: HF DIME Recipients)
validate: Whether to run validation after extraction
sample_size: Sample size for validation
Returns:
ExtractionResult with extraction details and validation results
Raises:
InvalidSourceURLError: If source URL is not from an allowed domain
SourceReadError: If source data cannot be read
ValidationError: If validation fails
DuplicateKeyError: If duplicate key pairs are found
OutputWriteError: If output cannot be written
"""
output_path = Path(output_path)
# Validate source URL to prevent SQL injection
if not validate_source_url(source_url):
raise InvalidSourceURLError(
message="Source URL must be from an allowed domain",
source_url=source_url,
allowed_domains=ALLOWED_SOURCE_DOMAINS,
)
with duckdb.connect() as conn:
# Step 1: Count source rows
print(f"Reading from: {source_url}")
try:
source_rows = conn.execute(f"""
SELECT COUNT(*)
FROM read_parquet('{source_url}')
WHERE "ICPSR" IS NOT NULL
AND "ICPSR" != ''
AND "bonica.rid" IS NOT NULL
AND "bonica.rid" != ''
""").fetchone()[0]
except Exception as e:
raise SourceReadError(
message=str(e),
source_url=source_url,
) from e
print(f" Source rows with ICPSR: {source_rows:,}")
# Step 2: Execute extraction query
print("Extracting distinct crosswalk mappings...")
query = EXTRACTION_QUERY.format(source_url=source_url)
# Step 3: Write to parquet
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
conn.execute(f"""
COPY ({query})
TO '{output_path}' (FORMAT PARQUET, COMPRESSION ZSTD, COMPRESSION_LEVEL 3)
""")
except Exception as e:
raise OutputWriteError(
message=str(e),
output_path=output_path,
) from e
# Step 4: Count output and get unique counts
output_count = conn.execute(f"""
SELECT COUNT(*) FROM read_parquet('{output_path}')
""").fetchone()[0]
print(f" Crosswalk rows: {output_count:,}")
unique_icpsr = conn.execute(f"""
SELECT COUNT(DISTINCT icpsr) FROM read_parquet('{output_path}')
""").fetchone()[0]
print(f" Unique ICPSR values: {unique_icpsr:,}")
unique_bonica_rid = conn.execute(f"""
SELECT COUNT(DISTINCT bonica_rid) FROM read_parquet('{output_path}')
""").fetchone()[0]
print(f" Unique bonica_rid values: {unique_bonica_rid:,}")
# Step 5: Validate
validation = ValidationResult()
if validate:
print("Validating...")
# Tier 1: Counts
validation = validate_counts(source_url, output_path, conn)
print(f" Tier 1 (Counts): PASS ({validation.output_count:,} rows)")
# Tier 2: Uniqueness
validation = validate_uniqueness(output_path, conn, validation)
print(
f" Tier 2 (Uniqueness): PASS ({validation.unique_icpsr_count:,} ICPSR → "
f"{validation.unique_bonica_rid_count:,} bonica_rid)"
)
# Tier 3: Sample
validation = validate_sample(source_url, output_path, conn, validation, sample_size)
print(f" Tier 3 (Sample): PASS ({validation.sample_size} rows verified)")
return ExtractionResult(
source_url=source_url,
output_path=output_path,
source_rows=source_rows,
output_count=output_count,
unique_icpsr_count=unique_icpsr,
unique_bonica_rid_count=unique_bonica_rid,
validation=validation,
)