Spaces:
Runtime error
Runtime error
File size: 10,672 Bytes
b339b93 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 | """Three-tier validation suite for distinct legislators extraction.
Unlike CSV→Parquet converters which validate lossless conversion,
this validates correct aggregation/transformation:
- Tier 1: Completeness - every source bioguide_id appears exactly once
- Tier 2: Aggregation Integrity - MIN/MAX/LIST operations are correct
- Tier 3: Sample Verification - deep validation of random legislators
"""
from __future__ import annotations
import random
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import duckdb
from .exceptions import (
AggregationError,
CompletenessError,
SampleValidationError,
)
@dataclass
class ValidationResult:
"""Results from validation suite."""
completeness_valid: bool = False
source_distinct_count: int = 0
output_count: int = 0
aggregation_valid: bool = False
aggregation_checks_passed: int = 0
sample_valid: bool = False
sample_size: int = 0
@property
def all_valid(self) -> bool:
"""Check if all validation tiers passed."""
return self.completeness_valid and self.aggregation_valid and self.sample_valid
def validate_completeness(
source_url: str,
output_path: Path,
conn: duckdb.DuckDBPyConnection,
min_congress: int,
) -> ValidationResult:
"""
Tier 1: Verify all source bioguide_ids appear exactly once in output.
Checks:
- Output count matches distinct source count
- No missing bioguide_ids
- No extra bioguide_ids
- No duplicates in output
"""
result = ValidationResult()
# Count distinct legislators in source
source_count = conn.execute(f"""
SELECT COUNT(DISTINCT bioguide_id)
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
""").fetchone()[0]
result.source_distinct_count = source_count
# Count rows in output
output_count = conn.execute(f"""
SELECT COUNT(*) FROM read_parquet('{output_path}')
""").fetchone()[0]
result.output_count = output_count
# Check for duplicates in output
duplicate_count = conn.execute(f"""
SELECT COUNT(*) FROM (
SELECT bioguide_id, COUNT(*) as cnt
FROM read_parquet('{output_path}')
GROUP BY bioguide_id
HAVING cnt > 1
)
""").fetchone()[0]
if duplicate_count > 0:
raise CompletenessError(
message=f"Found {duplicate_count} duplicate bioguide_ids in output",
expected_count=source_count,
actual_count=output_count,
)
# Check counts match
if output_count != source_count:
# Find missing or extra IDs
missing = conn.execute(f"""
SELECT bioguide_id FROM (
SELECT DISTINCT bioguide_id
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
) source
WHERE bioguide_id NOT IN (
SELECT bioguide_id FROM read_parquet('{output_path}')
)
LIMIT 10
""").fetchall()
missing_ids = [r[0] for r in missing]
extra = conn.execute(f"""
SELECT bioguide_id FROM read_parquet('{output_path}')
WHERE bioguide_id NOT IN (
SELECT DISTINCT bioguide_id
FROM read_parquet('{source_url}')
WHERE congress >= {min_congress}
AND bioguide_id IS NOT NULL
)
LIMIT 10
""").fetchall()
extra_ids = [r[0] for r in extra]
raise CompletenessError(
message="Count mismatch between source and output",
expected_count=source_count,
actual_count=output_count,
missing_ids=missing_ids if missing_ids else None,
extra_ids=extra_ids if extra_ids else None,
)
result.completeness_valid = True
return result
def validate_aggregation(
source_url: str,
output_path: Path,
conn: duckdb.DuckDBPyConnection,
result: ValidationResult,
min_congress: int,
sample_size: int = 100,
) -> ValidationResult:
"""
Tier 2: Verify aggregation operations (MIN/MAX/LIST) are correct.
Randomly samples legislators and verifies:
- first_congress = MIN(congress) from source
- last_congress = MAX(congress) from source
- congresses_served array length matches source count
"""
# Get random sample of bioguide_ids
all_ids = conn.execute(f"""
SELECT bioguide_id FROM read_parquet('{output_path}')
""").fetchall()
all_ids = [r[0] for r in all_ids]
actual_sample_size = min(sample_size, len(all_ids))
sample_ids = random.sample(all_ids, actual_sample_size)
checks_passed = 0
for bioguide_id in sample_ids:
# Get source data for this legislator
source_data = conn.execute(f"""
SELECT
MIN(congress) as expected_first,
MAX(congress) as expected_last,
COUNT(*) as expected_count
FROM read_parquet('{source_url}')
WHERE bioguide_id = '{bioguide_id}'
AND congress >= {min_congress}
""").fetchone()
expected_first, expected_last, expected_count = source_data
# Get output data
output_data = conn.execute(f"""
SELECT
first_congress,
last_congress,
LENGTH(congresses_served) as actual_count
FROM read_parquet('{output_path}')
WHERE bioguide_id = '{bioguide_id}'
""").fetchone()
actual_first, actual_last, actual_count = output_data
# Validate first_congress
if actual_first != expected_first:
raise AggregationError(
message="first_congress mismatch",
bioguide_id=bioguide_id,
field_name="first_congress",
expected_value=str(expected_first),
actual_value=str(actual_first),
)
# Validate last_congress
if actual_last != expected_last:
raise AggregationError(
message="last_congress mismatch",
bioguide_id=bioguide_id,
field_name="last_congress",
expected_value=str(expected_last),
actual_value=str(actual_last),
)
# Validate congress count
if actual_count != expected_count:
raise AggregationError(
message="congresses_served count mismatch",
bioguide_id=bioguide_id,
field_name="congresses_served (length)",
expected_value=str(expected_count),
actual_value=str(actual_count),
)
checks_passed += 1
result.aggregation_checks_passed = checks_passed
result.aggregation_valid = True
return result
def validate_sample(
source_url: str,
output_path: Path,
conn: duckdb.DuckDBPyConnection,
result: ValidationResult,
min_congress: int,
sample_size: int = 50,
) -> ValidationResult:
"""
Tier 3: Deep validation of random legislators.
For each sampled legislator, verifies:
- congresses_served array contains exactly the right congress numbers
- bioname matches the most recent congress entry
- state_abbrev matches the most recent congress entry
"""
# Get random sample
all_ids = conn.execute(f"""
SELECT bioguide_id FROM read_parquet('{output_path}')
""").fetchall()
all_ids = [r[0] for r in all_ids]
actual_sample_size = min(sample_size, len(all_ids))
sample_ids = random.sample(all_ids, actual_sample_size)
result.sample_size = actual_sample_size
for i, bioguide_id in enumerate(sample_ids):
# Get expected congresses from source
expected_congresses = conn.execute(f"""
SELECT LIST(congress ORDER BY congress)
FROM read_parquet('{source_url}')
WHERE bioguide_id = '{bioguide_id}'
AND congress >= {min_congress}
""").fetchone()[0]
# Get actual congresses from output
actual_congresses = conn.execute(f"""
SELECT congresses_served
FROM read_parquet('{output_path}')
WHERE bioguide_id = '{bioguide_id}'
""").fetchone()[0]
# Compare congress arrays
if list(expected_congresses) != list(actual_congresses):
raise SampleValidationError(
message="congresses_served array mismatch",
bioguide_id=bioguide_id,
field_name="congresses_served",
expected_value=str(expected_congresses),
actual_value=str(actual_congresses),
sample_index=i,
)
# Verify most recent values (bioname, state_abbrev)
expected_latest = conn.execute(f"""
SELECT bioname, state_abbrev
FROM read_parquet('{source_url}')
WHERE bioguide_id = '{bioguide_id}'
AND congress >= {min_congress}
ORDER BY congress DESC
LIMIT 1
""").fetchone()
actual_latest = conn.execute(f"""
SELECT bioname, state_abbrev
FROM read_parquet('{output_path}')
WHERE bioguide_id = '{bioguide_id}'
""").fetchone()
if expected_latest[0] != actual_latest[0]:
raise SampleValidationError(
message="bioname mismatch (should be from most recent congress)",
bioguide_id=bioguide_id,
field_name="bioname",
expected_value=str(expected_latest[0]),
actual_value=str(actual_latest[0]),
sample_index=i,
)
if expected_latest[1] != actual_latest[1]:
raise SampleValidationError(
message="state_abbrev mismatch (should be from most recent congress)",
bioguide_id=bioguide_id,
field_name="state_abbrev",
expected_value=str(expected_latest[1]),
actual_value=str(actual_latest[1]),
sample_index=i,
)
result.sample_valid = True
return result
|