Spaces:
Runtime error
Runtime error
File size: 5,482 Bytes
b339b93 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | """Core extraction logic for distinct legislators from Voteview data."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import duckdb
from .exceptions import InvalidSourceURLError, OutputWriteError, SourceReadError
from .schema import (
AGGREGATION_QUERY,
ALLOWED_SOURCE_DOMAINS,
MIN_CONGRESS,
VOTEVIEW_MEMBERS_URL,
validate_source_url,
)
from .validators import (
ValidationResult,
validate_aggregation,
validate_completeness,
validate_sample,
)
@dataclass
class ExtractionResult:
"""Result of a successful extraction."""
source_url: str
output_path: Path
source_rows: int
output_count: int
validation: ValidationResult
def extract_distinct_legislators(
output_path: Path | str,
*,
source_url: str = VOTEVIEW_MEMBERS_URL,
min_congress: int = MIN_CONGRESS,
validate: bool = True,
aggregation_sample_size: int = 100,
deep_sample_size: int = 50,
) -> ExtractionResult:
"""
Extract distinct legislators from Voteview HSall_members data.
Reads from HuggingFace, filters by congress, aggregates by bioguide_id,
and writes to a local Parquet file with ZSTD compression.
Args:
output_path: Path for output .parquet file
source_url: URL to source parquet file (default: HF Voteview members)
min_congress: Minimum congress number to include (default: 96 = 1979)
validate: Whether to run three-tier validation after extraction
aggregation_sample_size: Sample size for Tier 2 validation
deep_sample_size: Sample size for Tier 3 validation
Returns:
ExtractionResult with extraction details and validation results
Raises:
InvalidSourceURLError: If source URL is not from an allowed domain
SourceReadError: If source data cannot be read
CompletenessError: If Tier 1 validation fails
AggregationError: If Tier 2 validation fails
SampleValidationError: If Tier 3 validation fails
OutputWriteError: If output cannot be written
"""
output_path = Path(output_path)
# Validate source URL to prevent SQL injection
if not validate_source_url(source_url):
raise InvalidSourceURLError(
message="Source URL must be from an allowed domain",
source_url=source_url,
allowed_domains=ALLOWED_SOURCE_DOMAINS,
)
conn = duckdb.connect()
try:
# Step 1: Count source rows
print(f"Reading from: {source_url}")
try:
source_rows = conn.execute(f"""
SELECT COUNT(*)
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
""").fetchone()[0]
except Exception as e:
raise SourceReadError(
message=str(e),
source_url=source_url,
) from e
print(f" Source rows (congress >= {min_congress}): {source_rows:,}")
# Step 2: Execute aggregation query
print("Aggregating by bioguide_id...")
query = AGGREGATION_QUERY.format(
source_url=source_url,
min_congress=min_congress,
)
# Step 3: Write to parquet
try:
output_path.parent.mkdir(parents=True, exist_ok=True)
conn.execute(f"""
COPY ({query})
TO '{output_path}' (FORMAT PARQUET, COMPRESSION ZSTD, COMPRESSION_LEVEL 3)
""")
except Exception as e:
raise OutputWriteError(
message=str(e),
output_path=output_path,
) from e
# Step 4: Count output
output_count = conn.execute(f"""
SELECT COUNT(*) FROM read_parquet('{output_path}')
""").fetchone()[0]
print(f" Distinct legislators: {output_count:,}")
# Step 5: Validate
validation = ValidationResult()
if validate:
print("Validating...")
# Tier 1: Completeness
validation = validate_completeness(source_url, output_path, conn, min_congress)
print(f" Tier 1 (Completeness): PASS ({validation.output_count:,} legislators)")
# Tier 2: Aggregation Integrity
validation = validate_aggregation(
source_url,
output_path,
conn,
validation,
min_congress,
sample_size=aggregation_sample_size,
)
print(f" Tier 2 (Aggregation): PASS ({validation.aggregation_checks_passed} checks)")
# Tier 3: Sample Verification
validation = validate_sample(
source_url,
output_path,
conn,
validation,
min_congress,
sample_size=deep_sample_size,
)
print(f" Tier 3 (Sample): PASS ({validation.sample_size} legislators verified)")
return ExtractionResult(
source_url=source_url,
output_path=output_path,
source_rows=source_rows,
output_count=output_count,
validation=validation,
)
finally:
conn.close()
|