"""Validation suite for legislator crosswalk extraction. Validates: - Tier 1: Basic counts and non-null requirements - Tier 2: Uniqueness of (icpsr, bonica_rid) pairs - Tier 3: Sample verification against source """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: import duckdb from .exceptions import DuplicateKeyError, ValidationError from .schema import YEAR_SUFFIX_LENGTH @dataclass class ValidationResult: """Results from validation suite.""" counts_valid: bool = False source_count: int = 0 output_count: int = 0 uniqueness_valid: bool = False unique_icpsr_count: int = 0 unique_bonica_rid_count: int = 0 sample_valid: bool = False sample_size: int = 0 @property def all_valid(self) -> bool: """Check if all validation tiers passed.""" return self.counts_valid and self.uniqueness_valid and self.sample_valid def validate_counts( source_url: str, output_path: Path, conn: duckdb.DuckDBPyConnection, ) -> ValidationResult: """ Tier 1: Verify basic counts and non-null requirements. Checks: - Output has rows - All icpsr values are non-null - All bonica_rid values are non-null """ result = ValidationResult() # Count source rows matching our filter # Note: DIME stores ICPSR as "{icpsr}{year}", we extract just the ICPSR portion source_count = conn.execute(f""" SELECT COUNT(DISTINCT ( SUBSTRING(CAST("ICPSR" AS VARCHAR), 1, LENGTH(CAST("ICPSR" AS VARCHAR))-{YEAR_SUFFIX_LENGTH}), "bonica.rid" )) FROM read_parquet('{source_url}') WHERE "ICPSR" IS NOT NULL AND "ICPSR" != '' AND LENGTH(CAST("ICPSR" AS VARCHAR)) > {YEAR_SUFFIX_LENGTH} AND "bonica.rid" IS NOT NULL AND "bonica.rid" != '' """).fetchone()[0] result.source_count = source_count # Count output rows output_count = conn.execute(f""" SELECT COUNT(*) FROM read_parquet('{output_path}') """).fetchone()[0] result.output_count = output_count if output_count == 0: raise ValidationError( message="Output file has no rows", expected_count=source_count, actual_count=0, ) # Check for null icpsr values null_icpsr = conn.execute(f""" SELECT COUNT(*) FROM read_parquet('{output_path}') WHERE icpsr IS NULL OR icpsr = '' """).fetchone()[0] if null_icpsr > 0: raise ValidationError( message=f"Found {null_icpsr} rows with null/empty icpsr", expected_count=0, actual_count=null_icpsr, ) # Check for null bonica_rid values null_bonica_rid = conn.execute(f""" SELECT COUNT(*) FROM read_parquet('{output_path}') WHERE bonica_rid IS NULL OR bonica_rid = '' """).fetchone()[0] if null_bonica_rid > 0: raise ValidationError( message=f"Found {null_bonica_rid} rows with null/empty bonica_rid", expected_count=0, actual_count=null_bonica_rid, ) # Verify counts match if output_count != source_count: raise ValidationError( message="Row count mismatch between source and output", expected_count=source_count, actual_count=output_count, ) result.counts_valid = True return result def validate_uniqueness( output_path: Path, conn: duckdb.DuckDBPyConnection, result: ValidationResult, ) -> ValidationResult: """ Tier 2: Verify uniqueness of key pairs. Checks: - No duplicate (icpsr, bonica_rid) pairs exist """ # Check for duplicate key pairs duplicates = conn.execute(f""" SELECT icpsr, bonica_rid, COUNT(*) as cnt FROM read_parquet('{output_path}') GROUP BY icpsr, bonica_rid HAVING cnt > 1 LIMIT 10 """).fetchall() if duplicates: sample_dups = [(r[0], r[1]) for r in duplicates] raise DuplicateKeyError( message="Found duplicate (icpsr, bonica_rid) pairs", duplicate_count=len(duplicates), sample_duplicates=sample_dups, ) # Get unique counts for reporting result.unique_icpsr_count = conn.execute(f""" SELECT COUNT(DISTINCT icpsr) FROM read_parquet('{output_path}') """).fetchone()[0] result.unique_bonica_rid_count = conn.execute(f""" SELECT COUNT(DISTINCT bonica_rid) FROM read_parquet('{output_path}') """).fetchone()[0] result.uniqueness_valid = True return result def validate_sample( source_url: str, output_path: Path, conn: duckdb.DuckDBPyConnection, result: ValidationResult, sample_size: int = 100, ) -> ValidationResult: """ Tier 3: Sample verification against source. For randomly sampled rows, verifies: - The (icpsr, bonica_rid) mapping exists in source data Note: We don't compare metadata columns exactly because our extraction uses MAX() aggregation when multiple rows have the same (icpsr, bonica_rid). """ # Get random sample of output rows sample = conn.execute(f""" SELECT icpsr, bonica_rid FROM read_parquet('{output_path}') USING SAMPLE {sample_size} """).fetchall() actual_sample_size = len(sample) result.sample_size = actual_sample_size verified = 0 for icpsr, bonica_rid in sample: # Verify this mapping exists in source # Note: DIME stores ICPSR as "{icpsr}{year}", so we compare the extracted portion # Use parameterized query to prevent SQL injection from malicious parquet data source_exists = conn.execute( f""" SELECT 1 FROM read_parquet('{source_url}') WHERE SUBSTRING(CAST("ICPSR" AS VARCHAR), 1, LENGTH(CAST("ICPSR" AS VARCHAR))-{YEAR_SUFFIX_LENGTH}) = ? AND "bonica.rid" = ? LIMIT 1 """, [icpsr, bonica_rid], ).fetchone() if source_exists is None: raise ValidationError( message=f"Mapping (icpsr={icpsr}, bonica_rid={bonica_rid}) not found in source", expected_count=1, actual_count=0, ) verified += 1 result.sample_valid = True return result