File size: 5,482 Bytes
b339b93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""Core extraction logic for distinct legislators from Voteview data."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

import duckdb

from .exceptions import InvalidSourceURLError, OutputWriteError, SourceReadError
from .schema import (
    AGGREGATION_QUERY,
    ALLOWED_SOURCE_DOMAINS,
    MIN_CONGRESS,
    VOTEVIEW_MEMBERS_URL,
    validate_source_url,
)
from .validators import (
    ValidationResult,
    validate_aggregation,
    validate_completeness,
    validate_sample,
)


@dataclass
class ExtractionResult:
    """Result of a successful extraction."""

    source_url: str
    output_path: Path
    source_rows: int
    output_count: int
    validation: ValidationResult


def extract_distinct_legislators(

    output_path: Path | str,

    *,

    source_url: str = VOTEVIEW_MEMBERS_URL,

    min_congress: int = MIN_CONGRESS,

    validate: bool = True,

    aggregation_sample_size: int = 100,

    deep_sample_size: int = 50,

) -> ExtractionResult:
    """

    Extract distinct legislators from Voteview HSall_members data.



    Reads from HuggingFace, filters by congress, aggregates by bioguide_id,

    and writes to a local Parquet file with ZSTD compression.



    Args:

        output_path: Path for output .parquet file

        source_url: URL to source parquet file (default: HF Voteview members)

        min_congress: Minimum congress number to include (default: 96 = 1979)

        validate: Whether to run three-tier validation after extraction

        aggregation_sample_size: Sample size for Tier 2 validation

        deep_sample_size: Sample size for Tier 3 validation



    Returns:

        ExtractionResult with extraction details and validation results



    Raises:

        InvalidSourceURLError: If source URL is not from an allowed domain

        SourceReadError: If source data cannot be read

        CompletenessError: If Tier 1 validation fails

        AggregationError: If Tier 2 validation fails

        SampleValidationError: If Tier 3 validation fails

        OutputWriteError: If output cannot be written

    """
    output_path = Path(output_path)

    # Validate source URL to prevent SQL injection
    if not validate_source_url(source_url):
        raise InvalidSourceURLError(
            message="Source URL must be from an allowed domain",
            source_url=source_url,
            allowed_domains=ALLOWED_SOURCE_DOMAINS,
        )

    conn = duckdb.connect()

    try:
        # Step 1: Count source rows
        print(f"Reading from: {source_url}")
        try:
            source_rows = conn.execute(f"""

                SELECT COUNT(*)

                FROM read_parquet('{source_url}')

                WHERE congress >= {min_congress}

                  AND bioguide_id IS NOT NULL

            """).fetchone()[0]
        except Exception as e:
            raise SourceReadError(
                message=str(e),
                source_url=source_url,
            ) from e

        print(f"  Source rows (congress >= {min_congress}): {source_rows:,}")

        # Step 2: Execute aggregation query
        print("Aggregating by bioguide_id...")
        query = AGGREGATION_QUERY.format(
            source_url=source_url,
            min_congress=min_congress,
        )

        # Step 3: Write to parquet
        try:
            output_path.parent.mkdir(parents=True, exist_ok=True)
            conn.execute(f"""

                COPY ({query})

                TO '{output_path}' (FORMAT PARQUET, COMPRESSION ZSTD, COMPRESSION_LEVEL 3)

            """)
        except Exception as e:
            raise OutputWriteError(
                message=str(e),
                output_path=output_path,
            ) from e

        # Step 4: Count output
        output_count = conn.execute(f"""

            SELECT COUNT(*) FROM read_parquet('{output_path}')

        """).fetchone()[0]
        print(f"  Distinct legislators: {output_count:,}")

        # Step 5: Validate
        validation = ValidationResult()
        if validate:
            print("Validating...")

            # Tier 1: Completeness
            validation = validate_completeness(source_url, output_path, conn, min_congress)
            print(f"  Tier 1 (Completeness): PASS ({validation.output_count:,} legislators)")

            # Tier 2: Aggregation Integrity
            validation = validate_aggregation(
                source_url,
                output_path,
                conn,
                validation,
                min_congress,
                sample_size=aggregation_sample_size,
            )
            print(f"  Tier 2 (Aggregation): PASS ({validation.aggregation_checks_passed} checks)")

            # Tier 3: Sample Verification
            validation = validate_sample(
                source_url,
                output_path,
                conn,
                validation,
                min_congress,
                sample_size=deep_sample_size,
            )
            print(f"  Tier 3 (Sample): PASS ({validation.sample_size} legislators verified)")

        return ExtractionResult(
            source_url=source_url,
            output_path=output_path,
            source_rows=source_rows,
            output_count=output_count,
            validation=validation,
        )

    finally:
        conn.close()