Hoe
Deploying Backend API
b339b93
"""Core extraction logic for distinct legislators from Voteview data."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import duckdb
from .exceptions import InvalidSourceURLError, OutputWriteError, SourceReadError
from .schema import (
AGGREGATION_QUERY,
ALLOWED_SOURCE_DOMAINS,
MIN_CONGRESS,
VOTEVIEW_MEMBERS_URL,
validate_source_url,
)
from .validators import (
ValidationResult,
validate_aggregation,
validate_completeness,
validate_sample,
)
@dataclass
class ExtractionResult:
"""Result of a successful extraction."""
source_url: str
output_path: Path
source_rows: int
output_count: int
validation: ValidationResult
def extract_distinct_legislators(
output_path: Path | str,
*,
source_url: str = VOTEVIEW_MEMBERS_URL,
min_congress: int = MIN_CONGRESS,
validate: bool = True,
aggregation_sample_size: int = 100,
deep_sample_size: int = 50,
) -> ExtractionResult:
"""
Extract distinct legislators from Voteview HSall_members data.
Reads from HuggingFace, filters by congress, aggregates by bioguide_id,
and writes to a local Parquet file with ZSTD compression.
Args:
output_path: Path for output .parquet file
source_url: URL to source parquet file (default: HF Voteview members)
min_congress: Minimum congress number to include (default: 96 = 1979)
validate: Whether to run three-tier validation after extraction
aggregation_sample_size: Sample size for Tier 2 validation
deep_sample_size: Sample size for Tier 3 validation
Returns:
ExtractionResult with extraction details and validation results
Raises:
InvalidSourceURLError: If source URL is not from an allowed domain
SourceReadError: If source data cannot be read
CompletenessError: If Tier 1 validation fails
AggregationError: If Tier 2 validation fails
SampleValidationError: If Tier 3 validation fails
OutputWriteError: If output cannot be written
"""
output_path = Path(output_path)
# Validate source URL to prevent SQL injection
if not validate_source_url(source_url):
raise InvalidSourceURLError(
message="Source URL must be from an allowed domain",
source_url=source_url,
allowed_domains=ALLOWED_SOURCE_DOMAINS,
)
conn = duckdb.connect()
try:
# Step 1: Count source rows
print(f"Reading from: {source_url}")
try:
source_rows = conn.execute(f"""
SELECT COUNT(*)
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
""").fetchone()[0]
except Exception as e:
raise SourceReadError(
message=str(e),
source_url=source_url,
) from e
print(f" Source rows (congress >= {min_congress}): {source_rows:,}")
# Step 2: Execute aggregation query
print("Aggregating by bioguide_id...")
query = AGGREGATION_QUERY.format(
source_url=source_url,
min_congress=min_congress,
)
# Step 3: Write to parquet
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
conn.execute(f"""
COPY ({query})
TO '{output_path}' (FORMAT PARQUET, COMPRESSION ZSTD, COMPRESSION_LEVEL 3)
""")
except Exception as e:
raise OutputWriteError(
message=str(e),
output_path=output_path,
) from e
# Step 4: Count output
output_count = conn.execute(f"""
SELECT COUNT(*) FROM read_parquet('{output_path}')
""").fetchone()[0]
print(f" Distinct legislators: {output_count:,}")
# Step 5: Validate
validation = ValidationResult()
if validate:
print("Validating...")
# Tier 1: Completeness
validation = validate_completeness(source_url, output_path, conn, min_congress)
print(f" Tier 1 (Completeness): PASS ({validation.output_count:,} legislators)")
# Tier 2: Aggregation Integrity
validation = validate_aggregation(
source_url,
output_path,
conn,
validation,
min_congress,
sample_size=aggregation_sample_size,
)
print(f" Tier 2 (Aggregation): PASS ({validation.aggregation_checks_passed} checks)")
# Tier 3: Sample Verification
validation = validate_sample(
source_url,
output_path,
conn,
validation,
min_congress,
sample_size=deep_sample_size,
)
print(f" Tier 3 (Sample): PASS ({validation.sample_size} legislators verified)")
return ExtractionResult(
source_url=source_url,
output_path=output_path,
source_rows=source_rows,
output_count=output_count,
validation=validation,
)
finally:
conn.close()