Hoe
Deploying Backend API
b339b93
"""Three-tier validation suite for distinct legislators extraction.
Unlike CSV→Parquet converters which validate lossless conversion,
this validates correct aggregation/transformation:
- Tier 1: Completeness - every source bioguide_id appears exactly once
- Tier 2: Aggregation Integrity - MIN/MAX/LIST operations are correct
- Tier 3: Sample Verification - deep validation of random legislators
"""
from __future__ import annotations
import random
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import duckdb
from .exceptions import (
AggregationError,
CompletenessError,
SampleValidationError,
)
@dataclass
class ValidationResult:
"""Results from validation suite."""
completeness_valid: bool = False
source_distinct_count: int = 0
output_count: int = 0
aggregation_valid: bool = False
aggregation_checks_passed: int = 0
sample_valid: bool = False
sample_size: int = 0
@property
def all_valid(self) -> bool:
"""Check if all validation tiers passed."""
return self.completeness_valid and self.aggregation_valid and self.sample_valid
def validate_completeness(
source_url: str,
output_path: Path,
conn: duckdb.DuckDBPyConnection,
min_congress: int,
) -> ValidationResult:
"""
Tier 1: Verify all source bioguide_ids appear exactly once in output.
Checks:
- Output count matches distinct source count
- No missing bioguide_ids
- No extra bioguide_ids
- No duplicates in output
"""
result = ValidationResult()
# Count distinct legislators in source
source_count = conn.execute(f"""
SELECT COUNT(DISTINCT bioguide_id)
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
""").fetchone()[0]
result.source_distinct_count = source_count
# Count rows in output
output_count = conn.execute(f"""
SELECT COUNT(*) FROM read_parquet('{output_path}')
""").fetchone()[0]
result.output_count = output_count
# Check for duplicates in output
duplicate_count = conn.execute(f"""
SELECT COUNT(*) FROM (
SELECT bioguide_id, COUNT(*) as cnt
FROM read_parquet('{output_path}')
GROUP BY bioguide_id
HAVING cnt > 1
)
""").fetchone()[0]
if duplicate_count > 0:
raise CompletenessError(
message=f"Found {duplicate_count} duplicate bioguide_ids in output",
expected_count=source_count,
actual_count=output_count,
)
# Check counts match
if output_count != source_count:
# Find missing or extra IDs
missing = conn.execute(f"""
SELECT bioguide_id FROM (
SELECT DISTINCT bioguide_id
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
) source
WHERE bioguide_id NOT IN (
SELECT bioguide_id FROM read_parquet('{output_path}')
)
LIMIT 10
""").fetchall()
missing_ids = [r[0] for r in missing]
extra = conn.execute(f"""
SELECT bioguide_id FROM read_parquet('{output_path}')
WHERE bioguide_id NOT IN (
SELECT DISTINCT bioguide_id
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
)
LIMIT 10
""").fetchall()
extra_ids = [r[0] for r in extra]
raise CompletenessError(
message="Count mismatch between source and output",
expected_count=source_count,
actual_count=output_count,
missing_ids=missing_ids if missing_ids else None,
extra_ids=extra_ids if extra_ids else None,
)
result.completeness_valid = True
return result
def validate_aggregation(
source_url: str,
output_path: Path,
conn: duckdb.DuckDBPyConnection,
result: ValidationResult,
min_congress: int,
sample_size: int = 100,
) -> ValidationResult:
"""
Tier 2: Verify aggregation operations (MIN/MAX/LIST) are correct.
Randomly samples legislators and verifies:
- first_congress = MIN(congress) from source
- last_congress = MAX(congress) from source
- congresses_served array length matches source count
"""
# Get random sample of bioguide_ids
all_ids = conn.execute(f"""
SELECT bioguide_id FROM read_parquet('{output_path}')
""").fetchall()
all_ids = [r[0] for r in all_ids]
actual_sample_size = min(sample_size, len(all_ids))
sample_ids = random.sample(all_ids, actual_sample_size)
checks_passed = 0
for bioguide_id in sample_ids:
# Get source data for this legislator
source_data = conn.execute(f"""
SELECT
MIN(congress) as expected_first,
MAX(congress) as expected_last,
COUNT(*) as expected_count
FROM read_parquet('{source_url}')
WHERE bioguide_id = '{bioguide_id}'
AND congress >= {min_congress}
""").fetchone()
expected_first, expected_last, expected_count = source_data
# Get output data
output_data = conn.execute(f"""
SELECT
first_congress,
last_congress,
LENGTH(congresses_served) as actual_count
FROM read_parquet('{output_path}')
WHERE bioguide_id = '{bioguide_id}'
""").fetchone()
actual_first, actual_last, actual_count = output_data
# Validate first_congress
if actual_first != expected_first:
raise AggregationError(
message="first_congress mismatch",
bioguide_id=bioguide_id,
field_name="first_congress",
expected_value=str(expected_first),
actual_value=str(actual_first),
)
# Validate last_congress
if actual_last != expected_last:
raise AggregationError(
message="last_congress mismatch",
bioguide_id=bioguide_id,
field_name="last_congress",
expected_value=str(expected_last),
actual_value=str(actual_last),
)
# Validate congress count
if actual_count != expected_count:
raise AggregationError(
message="congresses_served count mismatch",
bioguide_id=bioguide_id,
field_name="congresses_served (length)",
expected_value=str(expected_count),
actual_value=str(actual_count),
)
checks_passed += 1
result.aggregation_checks_passed = checks_passed
result.aggregation_valid = True
return result
def validate_sample(
source_url: str,
output_path: Path,
conn: duckdb.DuckDBPyConnection,
result: ValidationResult,
min_congress: int,
sample_size: int = 50,
) -> ValidationResult:
"""
Tier 3: Deep validation of random legislators.
For each sampled legislator, verifies:
- congresses_served array contains exactly the right congress numbers
- bioname matches the most recent congress entry
- state_abbrev matches the most recent congress entry
"""
# Get random sample
all_ids = conn.execute(f"""
SELECT bioguide_id FROM read_parquet('{output_path}')
""").fetchall()
all_ids = [r[0] for r in all_ids]
actual_sample_size = min(sample_size, len(all_ids))
sample_ids = random.sample(all_ids, actual_sample_size)
result.sample_size = actual_sample_size
for i, bioguide_id in enumerate(sample_ids):
# Get expected congresses from source
expected_congresses = conn.execute(f"""
SELECT LIST(congress ORDER BY congress)
FROM read_parquet('{source_url}')
WHERE bioguide_id = '{bioguide_id}'
AND congress >= {min_congress}
""").fetchone()[0]
# Get actual congresses from output
actual_congresses = conn.execute(f"""
SELECT congresses_served
FROM read_parquet('{output_path}')
WHERE bioguide_id = '{bioguide_id}'
""").fetchone()[0]
# Compare congress arrays
if list(expected_congresses) != list(actual_congresses):
raise SampleValidationError(
message="congresses_served array mismatch",
bioguide_id=bioguide_id,
field_name="congresses_served",
expected_value=str(expected_congresses),
actual_value=str(actual_congresses),
sample_index=i,
)
# Verify most recent values (bioname, state_abbrev)
expected_latest = conn.execute(f"""
SELECT bioname, state_abbrev
FROM read_parquet('{source_url}')
WHERE bioguide_id = '{bioguide_id}'
AND congress >= {min_congress}
ORDER BY congress DESC
LIMIT 1
""").fetchone()
actual_latest = conn.execute(f"""
SELECT bioname, state_abbrev
FROM read_parquet('{output_path}')
WHERE bioguide_id = '{bioguide_id}'
""").fetchone()
if expected_latest[0] != actual_latest[0]:
raise SampleValidationError(
message="bioname mismatch (should be from most recent congress)",
bioguide_id=bioguide_id,
field_name="bioname",
expected_value=str(expected_latest[0]),
actual_value=str(actual_latest[0]),
sample_index=i,
)
if expected_latest[1] != actual_latest[1]:
raise SampleValidationError(
message="state_abbrev mismatch (should be from most recent congress)",
bioguide_id=bioguide_id,
field_name="state_abbrev",
expected_value=str(expected_latest[1]),
actual_value=str(actual_latest[1]),
sample_index=i,
)
result.sample_valid = True
return result