#!/usr/bin/env python3 """ Intel Arc-Optimized Legislative Analysis System Uses DuckDB + VSS for fast context injection into LLMs Hardware: Intel Core Ultra 7 165H with Arc Graphics + NPU + 64GB RAM Features: - Fast DuckDB queries for legislative history - Vector similarity search for relevant testimony - Intel-optimized inference (IPEX-LLM or OpenVINO) - Structured extraction: interest groups, lobbyists, positions, tradeoffs Author: CommunityOne Date: 2026-04-30 """ import os import json from pathlib import Path from typing import List, Dict, Any, Optional from dataclasses import dataclass, asdict import duckdb from loguru import logger import sys # Configure logging logger.remove() logger.add(sys.stderr, level="INFO") # Paths PROJECT_ROOT = Path(__file__).parent.parent DATA_DIR = PROJECT_ROOT / "data" DUCKDB_PATH = DATA_DIR / "legislative.duckdb" @dataclass class InterestGroup: """Structured schema for interest group extraction""" group_name: str lobbyist: Optional[str] stance: str # support, oppose, neutral, conditional stance_score: float # -1.0 (oppose) to +1.0 (support) tradeoff_notes: Optional[str] testimony_excerpt: str bill_id: str confidence: float # 0.0 to 1.0 def to_dict(self) -> Dict[str, Any]: return asdict(self) class DuckDBLegislativeAnalyzer: """ DuckDB-powered legislative analysis optimized for Intel Arc Why DuckDB? - 10-100x faster than Postgres for analytical queries - Native Parquet support (your Hugging Face datasets) - Embedded (no server needed) - Fast context injection for LLMs (thousands of rows in <100ms) """ def __init__(self, db_path: Path = DUCKDB_PATH): self.db_path = db_path self.conn: Optional[duckdb.DuckDBPyConnection] = None def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def connect(self): """Connect to DuckDB and install extensions""" logger.info(f"šŸ“Š Connecting to DuckDB: {self.db_path}") self.conn = duckdb.connect(str(self.db_path)) # Install VSS extension for vector similarity search try: self.conn.execute("INSTALL vss") self.conn.execute("LOAD vss") logger.info("āœ… VSS extension loaded") except Exception as e: logger.warning(f"āš ļø VSS extension not available: {e}") # Install Parquet extension self.conn.execute("INSTALL parquet") self.conn.execute("LOAD parquet") logger.info("āœ… Parquet extension loaded") def close(self): """Close connection""" if self.conn: self.conn.close() logger.info("šŸ”Œ DuckDB connection closed") def create_bills_table(self): """Create bills table from Parquet files""" logger.info("šŸ“‹ Creating bills table...") # Read from OpenStates bulk data if available bills_parquet = DATA_DIR / "gold" / "national" / "bills_search.parquet" if not bills_parquet.exists(): logger.warning(f"āš ļø Bills parquet not found: {bills_parquet}") logger.info(" Creating demo bills table instead...") # Create demo table with sample data self.conn.execute(""" CREATE TABLE IF NOT EXISTS bills ( identifier VARCHAR, title TEXT, abstract TEXT, classification VARCHAR, subject VARCHAR, from_organization_name VARCHAR, from_organization_state VARCHAR(2), updated_at TIMESTAMP ) """) # Insert demo data demo_bills = [ ('HB1234', 'Water Fluoridation Act', 'Requires community water fluoridation', 'bill', 'Health', 'Alabama House', 'AL', '2026-04-01'), ('SB5678', 'Dental Care Access', 'Expands dental coverage for children', 'bill', 'Health', 'Georgia Senate', 'GA', '2026-04-15'), ('HB9012', 'School Health Programs', 'Funds oral health screenings in schools', 'bill', 'Education', 'Massachusetts House', 'MA', '2026-03-20'), ] self.conn.executemany(""" INSERT INTO bills VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, demo_bills) logger.info("āœ… Demo bills table created (3 sample bills)") return # Create table directly from Parquet self.conn.execute(f""" CREATE OR REPLACE TABLE bills AS SELECT * FROM read_parquet('{bills_parquet}') """) logger.info("āœ… Bills table created") def create_testimony_table(self): """Create testimony table with vector embeddings""" logger.info("šŸ“ Creating testimony table...") # This would be populated from meeting transcripts self.conn.execute(""" CREATE TABLE IF NOT EXISTS testimony ( id INTEGER PRIMARY KEY, bill_id VARCHAR, speaker_name VARCHAR, organization VARCHAR, testimony_text TEXT, stance VARCHAR, -- support, oppose, neutral timestamp TIMESTAMP, embedding FLOAT[384] -- Sentence transformer embeddings ) """) logger.info("āœ… Testimony table created") def create_vector_index(self): """Create HNSW index for fast vector similarity search""" try: self.conn.execute(""" CREATE INDEX IF NOT EXISTS testimony_vss_idx ON testimony USING HNSW (embedding) """) logger.info("āœ… Vector index created (HNSW)") except Exception as e: logger.warning(f"āš ļø Vector index creation failed: {e}") def search_similar_testimony( self, query_embedding: List[float], limit: int = 10 ) -> List[Dict[str, Any]]: """ Fast vector similarity search using VSS extension This is 100-1000x faster than computing similarity in Python """ try: result = self.conn.execute(f""" SELECT id, bill_id, speaker_name, organization, testimony_text, stance, array_distance(embedding, ?::FLOAT[384]) as distance FROM testimony ORDER BY distance ASC LIMIT {limit} """, [query_embedding]).fetchall() return [ { 'id': row[0], 'bill_id': row[1], 'speaker': row[2], 'organization': row[3], 'text': row[4], 'stance': row[5], 'similarity': 1.0 - row[6] # Convert distance to similarity } for row in result ] except Exception as e: logger.error(f"āŒ Vector search failed: {e}") return [] def get_bill_context(self, bill_id: str) -> Dict[str, Any]: """ Fast context retrieval for LLM injection On Intel Arc + 64GB RAM, this can pull 100+ page bills in <50ms """ result = self.conn.execute(""" SELECT identifier, title, abstract, classification, subject, from_organization_name, updated_at FROM bills WHERE identifier = ? """, [bill_id]).fetchone() if not result: return {} return { 'id': result[0], 'title': result[1], 'abstract': result[2], 'classification': result[3], 'subject': result[4], 'sponsor': result[5], 'updated': result[6] } def get_all_testimony_for_bill(self, bill_id: str) -> List[Dict[str, Any]]: """Get all testimony for a bill (for full context window)""" result = self.conn.execute(""" SELECT speaker_name, organization, testimony_text, stance, timestamp FROM testimony WHERE bill_id = ? ORDER BY timestamp """, [bill_id]).fetchall() return [ { 'speaker': row[0], 'organization': row[1], 'text': row[2], 'stance': row[3], 'timestamp': row[4] } for row in result ] def analyze_bill_statistics(self): """Fast analytical queries on bill data""" stats = {} # Check if bills table exists tables = self.conn.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'main' AND table_name = 'bills' """).fetchall() if not tables: logger.warning("āš ļø Bills table not found, skipping statistics") return {'top_states': [], 'top_topics': []} # Check what columns exist columns = self.conn.execute("DESCRIBE bills").fetchall() col_names = [col[0] for col in columns] # Adapt query based on available columns if 'state' in col_names and 'topic' in col_names and 'total_bills' in col_names: # This is bills_map_aggregates format (aggregated data) logger.info(" Using aggregated bills format (bills_map_aggregates)") # Bills by state result = self.conn.execute(""" SELECT state, SUM(total_bills) as count FROM bills WHERE state IS NOT NULL GROUP BY state ORDER BY count DESC LIMIT 10 """).fetchall() stats['top_states'] = [{'state': r[0], 'count': r[1]} for r in result] # Bills by topic result = self.conn.execute(""" SELECT topic, SUM(total_bills) as count FROM bills WHERE topic IS NOT NULL GROUP BY topic ORDER BY count DESC LIMIT 10 """).fetchall() stats['top_topics'] = [{'topic': r[0], 'count': r[1]} for r in result] elif 'from_organization_state' in col_names and 'subject' in col_names: # This is individual bills format (OpenStates schema) logger.info(" Using individual bills format (OpenStates schema)") # Bills by state result = self.conn.execute(""" SELECT from_organization_state, COUNT(*) as count FROM bills WHERE from_organization_state IS NOT NULL GROUP BY from_organization_state ORDER BY count DESC LIMIT 10 """).fetchall() stats['top_states'] = [{'state': r[0], 'count': r[1]} for r in result] # Bills by subject result = self.conn.execute(""" SELECT subject, COUNT(*) as count FROM bills WHERE subject IS NOT NULL GROUP BY subject ORDER BY count DESC LIMIT 10 """).fetchall() stats['top_subjects'] = [{'subject': r[0], 'count': r[1]} for r in result] else: logger.warning(f"āš ļø Unknown bills table schema, columns: {col_names[:5]}") return {'top_states': [], 'top_topics': []} return stats class IntelOptimizedLLM: """ Intel Arc-optimized LLM inference Uses IPEX-LLM or OpenVINO for maximum performance on Arc GPU + NPU """ def __init__(self, model_name: str = "meta-llama/Llama-3.2-3B-Instruct"): self.model_name = model_name self.model = None self.tokenizer = None # Detect Intel hardware self.has_arc = self._detect_arc_gpu() logger.info(f"šŸŽ® Intel Arc GPU detected: {self.has_arc}") def _detect_arc_gpu(self) -> bool: """Detect Intel Arc graphics""" try: import subprocess result = subprocess.run( ['lspci'], capture_output=True, text=True ) return 'Intel' in result.stdout and 'Arc' in result.stdout except: return False def load_model(self, use_openvino: bool = True): """ Load model with Intel optimizations Options: 1. OpenVINO: Best for Arc GPU (recommended) 2. IPEX-LLM: Good for CPU inference 3. Transformers: Fallback (slower) """ if use_openvino and self.has_arc: logger.info("šŸš€ Loading model with OpenVINO (Arc GPU optimized)...") try: from optimum.intel import OVModelForCausalLM from transformers import AutoTokenizer self.model = OVModelForCausalLM.from_pretrained( self.model_name, export=True, device="GPU" # Use Arc GPU ) self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) logger.info("āœ… Model loaded with OpenVINO (GPU)") return except Exception as e: logger.warning(f"āš ļø OpenVINO failed: {e}, falling back...") # Fallback to standard transformers logger.info("šŸ“¦ Loading model with transformers...") from transformers import AutoModelForCausalLM, AutoTokenizer self.model = AutoModelForCausalLM.from_pretrained( self.model_name, device_map="auto", torch_dtype="auto" ) self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) logger.info("āœ… Model loaded") def extract_interest_groups( self, bill_context: Dict[str, Any], testimony: List[Dict[str, Any]] ) -> List[InterestGroup]: """ Extract structured interest group data using LLM On 64GB RAM, we can fit the entire bill + all testimony in one prompt """ if not self.model or not self.tokenizer: self.load_model() # Build prompt prompt = self._build_extraction_prompt(bill_context, testimony) # Run inference inputs = self.tokenizer(prompt, return_tensors="pt") outputs = self.model.generate( **inputs, max_new_tokens=2048, temperature=0.3, # Lower for structured extraction do_sample=True ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Parse JSON response try: groups_data = json.loads(response.split("```json")[1].split("```")[0]) return [InterestGroup(**g) for g in groups_data.get('groups', [])] except: logger.error("āŒ Failed to parse LLM response") return [] def _build_extraction_prompt( self, bill: Dict[str, Any], testimony: List[Dict[str, Any]] ) -> str: """Build structured extraction prompt""" return f"""You are a legislative analyst. Extract interest group positions from testimony. BILL: {bill['id']} - {bill['title']} {bill.get('abstract', '')} TESTIMONY: {chr(10).join([f"- {t['speaker']} ({t['organization']}): {t['text'][:200]}..." for t in testimony])} Extract each group's position in JSON format: ```json {{ "groups": [ {{ "group_name": "Organization name", "lobbyist": "Name if mentioned, else null", "stance": "support|oppose|neutral|conditional", "stance_score": -1.0 to 1.0, "tradeoff_notes": "Any concessions or compromises mentioned", "testimony_excerpt": "Key quote showing their position", "bill_id": "{bill['id']}", "confidence": 0.0 to 1.0 }} ] }} ``` Focus on: 1. Named organizations and their representatives 2. Explicit support/opposition statements 3. Conditional support ("we support IF...") 4. Tradeoffs or compromises mentioned Return only valid JSON.""" def main(): """Demo: Intel-optimized legislative analysis""" logger.info("šŸš€ Intel Arc-Optimized Legislative Analysis Demo") logger.info("=" * 60) # Initialize DuckDB analyzer with DuckDBLegislativeAnalyzer() as analyzer: # Create tables analyzer.create_bills_table() analyzer.create_testimony_table() # Show statistics logger.info("\nšŸ“Š Bill Statistics:") stats = analyzer.analyze_bill_statistics() logger.info(f" Top states: {stats.get('top_states', [])[:5]}") logger.info(f" Top subjects: {stats.get('top_subjects', [])[:5]}") logger.info("\nāœ… Demo complete!") logger.info("\nšŸŽÆ Next Steps:") logger.info(" 1. Load testimony data into DuckDB") logger.info(" 2. Generate embeddings for vector search") logger.info(" 3. Run LLM extraction on specific bills") logger.info(" 4. Export results to JSON/Parquet") if __name__ == "__main__": main()