File size: 10,672 Bytes
b339b93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""Three-tier validation suite for distinct legislators extraction.



Unlike CSV→Parquet converters which validate lossless conversion,

this validates correct aggregation/transformation:



- Tier 1: Completeness - every source bioguide_id appears exactly once

- Tier 2: Aggregation Integrity - MIN/MAX/LIST operations are correct

- Tier 3: Sample Verification - deep validation of random legislators

"""

from __future__ import annotations

import random
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    import duckdb

from .exceptions import (
    AggregationError,
    CompletenessError,
    SampleValidationError,
)


@dataclass
class ValidationResult:
    """Results from validation suite."""

    completeness_valid: bool = False
    source_distinct_count: int = 0
    output_count: int = 0

    aggregation_valid: bool = False
    aggregation_checks_passed: int = 0

    sample_valid: bool = False
    sample_size: int = 0

    @property
    def all_valid(self) -> bool:
        """Check if all validation tiers passed."""
        return self.completeness_valid and self.aggregation_valid and self.sample_valid


def validate_completeness(

    source_url: str,

    output_path: Path,

    conn: duckdb.DuckDBPyConnection,

    min_congress: int,

) -> ValidationResult:
    """

    Tier 1: Verify all source bioguide_ids appear exactly once in output.



    Checks:

    - Output count matches distinct source count

    - No missing bioguide_ids

    - No extra bioguide_ids

    - No duplicates in output

    """
    result = ValidationResult()

    # Count distinct legislators in source
    source_count = conn.execute(f"""

        SELECT COUNT(DISTINCT bioguide_id)

        FROM read_parquet('{source_url}')

        WHERE congress >= {min_congress}

          AND bioguide_id IS NOT NULL

    """).fetchone()[0]
    result.source_distinct_count = source_count

    # Count rows in output
    output_count = conn.execute(f"""

        SELECT COUNT(*) FROM read_parquet('{output_path}')

    """).fetchone()[0]
    result.output_count = output_count

    # Check for duplicates in output
    duplicate_count = conn.execute(f"""

        SELECT COUNT(*) FROM (

            SELECT bioguide_id, COUNT(*) as cnt

            FROM read_parquet('{output_path}')

            GROUP BY bioguide_id

            HAVING cnt > 1

        )

    """).fetchone()[0]

    if duplicate_count > 0:
        raise CompletenessError(
            message=f"Found {duplicate_count} duplicate bioguide_ids in output",
            expected_count=source_count,
            actual_count=output_count,
        )

    # Check counts match
    if output_count != source_count:
        # Find missing or extra IDs
        missing = conn.execute(f"""

            SELECT bioguide_id FROM (

                SELECT DISTINCT bioguide_id

                FROM read_parquet('{source_url}')

                WHERE congress >= {min_congress}

                  AND bioguide_id IS NOT NULL

            ) source

            WHERE bioguide_id NOT IN (

                SELECT bioguide_id FROM read_parquet('{output_path}')

            )

            LIMIT 10

        """).fetchall()
        missing_ids = [r[0] for r in missing]

        extra = conn.execute(f"""

            SELECT bioguide_id FROM read_parquet('{output_path}')

            WHERE bioguide_id NOT IN (

                SELECT DISTINCT bioguide_id

                FROM read_parquet('{source_url}')

                WHERE congress >= {min_congress}

                  AND bioguide_id IS NOT NULL

            )

            LIMIT 10

        """).fetchall()
        extra_ids = [r[0] for r in extra]

        raise CompletenessError(
            message="Count mismatch between source and output",
            expected_count=source_count,
            actual_count=output_count,
            missing_ids=missing_ids if missing_ids else None,
            extra_ids=extra_ids if extra_ids else None,
        )

    result.completeness_valid = True
    return result


def validate_aggregation(

    source_url: str,

    output_path: Path,

    conn: duckdb.DuckDBPyConnection,

    result: ValidationResult,

    min_congress: int,

    sample_size: int = 100,

) -> ValidationResult:
    """

    Tier 2: Verify aggregation operations (MIN/MAX/LIST) are correct.



    Randomly samples legislators and verifies:

    - first_congress = MIN(congress) from source

    - last_congress = MAX(congress) from source

    - congresses_served array length matches source count

    """
    # Get random sample of bioguide_ids
    all_ids = conn.execute(f"""

        SELECT bioguide_id FROM read_parquet('{output_path}')

    """).fetchall()
    all_ids = [r[0] for r in all_ids]

    actual_sample_size = min(sample_size, len(all_ids))
    sample_ids = random.sample(all_ids, actual_sample_size)

    checks_passed = 0

    for bioguide_id in sample_ids:
        # Get source data for this legislator
        source_data = conn.execute(f"""

            SELECT

                MIN(congress) as expected_first,

                MAX(congress) as expected_last,

                COUNT(*) as expected_count

            FROM read_parquet('{source_url}')

            WHERE bioguide_id = '{bioguide_id}'

              AND congress >= {min_congress}

        """).fetchone()

        expected_first, expected_last, expected_count = source_data

        # Get output data
        output_data = conn.execute(f"""

            SELECT

                first_congress,

                last_congress,

                LENGTH(congresses_served) as actual_count

            FROM read_parquet('{output_path}')

            WHERE bioguide_id = '{bioguide_id}'

        """).fetchone()

        actual_first, actual_last, actual_count = output_data

        # Validate first_congress
        if actual_first != expected_first:
            raise AggregationError(
                message="first_congress mismatch",
                bioguide_id=bioguide_id,
                field_name="first_congress",
                expected_value=str(expected_first),
                actual_value=str(actual_first),
            )

        # Validate last_congress
        if actual_last != expected_last:
            raise AggregationError(
                message="last_congress mismatch",
                bioguide_id=bioguide_id,
                field_name="last_congress",
                expected_value=str(expected_last),
                actual_value=str(actual_last),
            )

        # Validate congress count
        if actual_count != expected_count:
            raise AggregationError(
                message="congresses_served count mismatch",
                bioguide_id=bioguide_id,
                field_name="congresses_served (length)",
                expected_value=str(expected_count),
                actual_value=str(actual_count),
            )

        checks_passed += 1

    result.aggregation_checks_passed = checks_passed
    result.aggregation_valid = True
    return result


def validate_sample(

    source_url: str,

    output_path: Path,

    conn: duckdb.DuckDBPyConnection,

    result: ValidationResult,

    min_congress: int,

    sample_size: int = 50,

) -> ValidationResult:
    """

    Tier 3: Deep validation of random legislators.



    For each sampled legislator, verifies:

    - congresses_served array contains exactly the right congress numbers

    - bioname matches the most recent congress entry

    - state_abbrev matches the most recent congress entry

    """
    # Get random sample
    all_ids = conn.execute(f"""

        SELECT bioguide_id FROM read_parquet('{output_path}')

    """).fetchall()
    all_ids = [r[0] for r in all_ids]

    actual_sample_size = min(sample_size, len(all_ids))
    sample_ids = random.sample(all_ids, actual_sample_size)
    result.sample_size = actual_sample_size

    for i, bioguide_id in enumerate(sample_ids):
        # Get expected congresses from source
        expected_congresses = conn.execute(f"""

            SELECT LIST(congress ORDER BY congress)

            FROM read_parquet('{source_url}')

            WHERE bioguide_id = '{bioguide_id}'

              AND congress >= {min_congress}

        """).fetchone()[0]

        # Get actual congresses from output
        actual_congresses = conn.execute(f"""

            SELECT congresses_served

            FROM read_parquet('{output_path}')

            WHERE bioguide_id = '{bioguide_id}'

        """).fetchone()[0]

        # Compare congress arrays
        if list(expected_congresses) != list(actual_congresses):
            raise SampleValidationError(
                message="congresses_served array mismatch",
                bioguide_id=bioguide_id,
                field_name="congresses_served",
                expected_value=str(expected_congresses),
                actual_value=str(actual_congresses),
                sample_index=i,
            )

        # Verify most recent values (bioname, state_abbrev)
        expected_latest = conn.execute(f"""

            SELECT bioname, state_abbrev

            FROM read_parquet('{source_url}')

            WHERE bioguide_id = '{bioguide_id}'

              AND congress >= {min_congress}

            ORDER BY congress DESC

            LIMIT 1

        """).fetchone()

        actual_latest = conn.execute(f"""

            SELECT bioname, state_abbrev

            FROM read_parquet('{output_path}')

            WHERE bioguide_id = '{bioguide_id}'

        """).fetchone()

        if expected_latest[0] != actual_latest[0]:
            raise SampleValidationError(
                message="bioname mismatch (should be from most recent congress)",
                bioguide_id=bioguide_id,
                field_name="bioname",
                expected_value=str(expected_latest[0]),
                actual_value=str(actual_latest[0]),
                sample_index=i,
            )

        if expected_latest[1] != actual_latest[1]:
            raise SampleValidationError(
                message="state_abbrev mismatch (should be from most recent congress)",
                bioguide_id=bioguide_id,
                field_name="state_abbrev",
                expected_value=str(expected_latest[1]),
                actual_value=str(actual_latest[1]),
                sample_index=i,
            )

    result.sample_valid = True
    return result