""" Bills API Routes - Legislative bill data from OpenStates """ from fastapi import APIRouter, Query, HTTPException from fastapi.responses import JSONResponse from typing import Optional, List, Dict import duckdb import pandas as pd from pathlib import Path from loguru import logger import re import os import sys import traceback from api.errors import ErrorDetail, parse_error from api.routes.search import load_parquet_cached router = APIRouter(prefix="/bills", tags=["bills"]) GOLD_DIR = Path("data/gold") IS_HF_SPACES = os.getenv("HF_SPACES") == "1" HF_ORGANIZATION = "CommunityOne" def get_hf_dataset_url(dataset_name: str) -> str: """ Convert dataset name to HuggingFace parquet URL. HuggingFace Datasets library stores parquet files in the standard format: data/train-00000-of-00001.parquet Args: dataset_name: Dataset name (e.g., 'states-ma-bills-bills') Returns: Full URL to the parquet file """ return f"https://huggingface.co/datasets/{HF_ORGANIZATION}/{dataset_name}/resolve/main/data/train-00000-of-00001.parquet" def get_data_source(file_path: Path, use_remote: bool = False) -> str: """Get data source (local path or remote URL) based on environment.""" if not IS_HF_SPACES and not use_remote: return str(file_path) # Convert local path to HuggingFace dataset name parts = file_path.parts if 'states' in parts: state_idx = parts.index('states') state = parts[state_idx + 1].lower() filename = parts[-1].replace('.parquet', '').replace('_', '-') dataset_name = f"states-{state}-{filename}" return get_hf_dataset_url(dataset_name) # Fallback to local return str(file_path) def classify_bill_type(title: str, classification: list, topic: Optional[str] = None) -> str: """ Classify bill based on topic-specific categories. Different topics use different classification schemes: - Fluoridation: mandate, removal, funding, study - Dental/Oral Health: coverage_expansion, screening, provider_access, funding - Medicaid: expansion, coverage, reimbursement, eligibility - Health (general): protection, restriction, funding, reform - Education: requirement, funding, curriculum, reform - Default: support, oppose, regulate, other """ title_lower = title.lower() topic_lower = topic.lower() if topic else "" # EXCEPTION: Fluoride varnish/dental coverage bills (not water fluoridation) # Check this BEFORE water fluoridation classification if any(word in title_lower for word in ['varnish', 'sealant', 'dental', 'medicaid', 'medical assistance']) and 'fluoride' in title_lower: if any(word in title_lower for word in ['coverage', 'expand', 'expansion', 'benefit']): return 'coverage_expansion' elif any(word in title_lower for word in ['screening', 'examination', 'check']): return 'screening' # If it mentions dental/varnish but unclear type, it's dental "other" not fluoridation return 'other' # Fluoridation-specific classifications (WATER fluoridation only) if 'fluoride' in topic_lower or 'fluoride' in title_lower: # FIRST: Check for REMOVAL/BAN/PROHIBITION (negative sentiment) # CRITICAL: Must check these BEFORE "mandate"/"require" to avoid misclassification # e.g., "prohibit fluoride" should be "removal", not "mandate" if any(word in title_lower for word in [ 'prohibit', 'prohibition', 'prohibited', 'prohibiting', 'ban', 'banning', 'banned', 'discontinue', 'discontinuation', 'cease', 'ceasing', 'eliminate', 'elimination', 'removal', 'remove', 'removing', 'prevent', 'preventing', 'repeal', 'repealing', 'repealed', 'optional', 'opt-out', 'opt out', 'fluoride-free', 'fluoride free' ]): # But check if it's "prohibit removal" (double negative = pro-fluoride) if any(phrase in title_lower for phrase in ['prohibit removal', 'prevent removal', 'ban removal']): return 'mandate' # Prohibiting removal = mandate to keep return 'removal' # SECOND: Check for notification/monitoring (before "require" check) # Bills like "notification required" are about monitoring, not mandating fluoridation elif any(phrase in title_lower for phrase in [ 'notification', 'notify', 'notifying', 'report to', 'reporting', 'report when', 'monitor', 'monitoring' ]): return 'study' # THIRD: Check for MANDATE/REQUIRE (positive sentiment) # Be specific - just "require" alone isn't enough, need context elif any(phrase in title_lower for phrase in [ 'mandate', 'mandating', 'shall fluoridate', 'shall add fluoride', 'must fluoridate', 'must add fluoride', 'require fluoridation', 'require water system to fluoridate', 'require addition of fluoride' ]): return 'mandate' # FOURTH: Check for funding elif any(word in title_lower for word in ['fund', 'funding', 'appropriation', 'grant', 'reimburse', 'subsidy']): return 'funding' elif any(word in title_lower for word in ['study', 'research', 'analysis', 'assess', 'evaluate']): return 'study' else: return 'other' # Dental/Oral Health-specific classifications elif 'dental' in topic_lower or 'oral health' in topic_lower or 'dental' in title_lower: if any(word in title_lower for word in ['expand', 'increase coverage', 'extend coverage', 'add coverage']): return 'coverage_expansion' elif any(word in title_lower for word in ['screen', 'examination', 'checkup', 'assessment']): return 'screening' elif any(word in title_lower for word in ['provider', 'dentist', 'hygienist', 'workforce', 'professional']): return 'provider_access' elif any(word in title_lower for word in ['fund', 'appropriation', 'grant', 'budget', 'reimburse']): return 'funding' else: return 'other' # Medicaid-specific classifications elif 'medicaid' in topic_lower or 'medicaid' in title_lower: if any(word in title_lower for word in ['expand', 'expansion', 'extend', 'broaden']): return 'expansion' elif any(word in title_lower for word in ['coverage', 'benefit', 'service']): return 'coverage' elif any(word in title_lower for word in ['reimburse', 'payment', 'rate', 'compensation']): return 'reimbursement' elif any(word in title_lower for word in ['eligib', 'qualify', 'enroll']): return 'eligibility' else: return 'other' # Education-specific classifications elif 'education' in topic_lower or 'school' in topic_lower: if any(word in title_lower for word in ['require', 'mandate', 'shall provide', 'must offer']): return 'requirement' elif any(word in title_lower for word in ['fund', 'appropriation', 'grant', 'budget']): return 'funding' elif any(word in title_lower for word in ['curriculum', 'course', 'instruction', 'program']): return 'curriculum' elif any(word in title_lower for word in ['reform', 'restructure', 'modernize', 'improve']): return 'reform' else: return 'other' # General health classifications elif 'health' in topic_lower or 'health' in title_lower: if any(word in title_lower for word in ['protect', 'preserve', 'safeguard', 'ensure', 'guarantee', 'expand', 'increase', 'enhance', 'support']): return 'protection' elif any(word in title_lower for word in ['restrict', 'limit', 'regulate', 'control', 'impose', 'prohibit', 'ban']): return 'restriction' elif any(word in title_lower for word in ['fund', 'appropriation', 'grant', 'budget']): return 'funding' elif any(word in title_lower for word in ['reform', 'restructure', 'modernize', 'improve']): return 'reform' else: return 'other' # Default general classifications else: if any(word in title_lower for word in ['support', 'promote', 'encourage', 'expand', 'increase', 'enhance', 'fund']): return 'support' elif any(word in title_lower for word in ['oppose', 'prohibit', 'ban', 'restrict', 'limit', 'prevent']): return 'oppose' elif any(word in title_lower for word in ['regulate', 'oversee', 'control', 'require', 'mandate']): return 'regulate' else: return 'other' def get_legend_for_topic(topic: Optional[str]) -> dict: """ Get appropriate legend labels based on topic. """ topic_lower = topic.lower() if topic else "" if 'fluoride' in topic_lower: return { "mandate": "Mandate Fluoridation", "removal": "Remove Fluoridation", "funding": "Funding/Grants", "study": "Study/Research", "other": "Other" } elif 'dental' in topic_lower or 'oral health' in topic_lower: return { "coverage_expansion": "Coverage Expansion", "screening": "Screening Programs", "provider_access": "Provider Access", "funding": "Funding/Grants", "other": "Other" } elif 'medicaid' in topic_lower: return { "expansion": "Program Expansion", "coverage": "Coverage/Benefits", "reimbursement": "Reimbursement", "eligibility": "Eligibility", "other": "Other" } elif 'education' in topic_lower or 'school' in topic_lower: return { "requirement": "Requirements", "funding": "Funding", "curriculum": "Curriculum", "reform": "Reform", "other": "Other" } elif 'health' in topic_lower: return { "protection": "Protection/Expansion", "restriction": "Restriction", "funding": "Funding", "reform": "Reform", "other": "Other" } else: return { "support": "Support/Promote", "oppose": "Oppose/Restrict", "regulate": "Regulate", "other": "Other" } def determine_bill_status(latest_action: str, latest_date: str) -> str: """ Determine if bill was enacted, failed, or is pending. """ if not latest_action: return 'pending' action_lower = latest_action.lower() # Enacted/Passed if any(word in action_lower for word in ['signed', 'enacted', 'approved', 'passed both', 'became law']): return 'enacted' # Failed if any(word in action_lower for word in ['failed', 'defeated', 'rejected', 'died', 'withdrawn', 'vetoed']): return 'failed' # Pending (default) return 'pending' @router.get("") async def search_bills( q: Optional[str] = Query(None, description="Search query for bill title or number"), state: Optional[str] = Query("AL", description="State code (e.g., AL, GA, MA)"), session: Optional[str] = Query(None, description="Legislative session (e.g., 2024rs)"), limit: int = Query(20, ge=1, le=100, description="Maximum results"), offset: int = Query(0, ge=0, description="Number of results to skip") ): """ Search legislative bills from OpenStates data. **Examples:** - `/api/bills?state=AL&q=dental` - Search Alabama bills for "dental" - `/api/bills?state=AL&session=2024rs` - Get all 2024 regular session bills - `/api/bills?state=AL&limit=50` - Browse recent Alabama bills """ try: # Build file path bills_file = GOLD_DIR / "states" / state / "bills_bills.parquet" # Get data source (local or remote HuggingFace URL) data_source = get_data_source(bills_file, use_remote=IS_HF_SPACES) # Connect to DuckDB conn = duckdb.connect() # Build SQL query where_clauses = [] params = [] if q: where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(bill_number) LIKE LOWER(?))") pattern = f'%{q}%' params.extend([pattern, pattern]) if session: where_clauses.append("session = ?") params.append(session) where_clause = " AND ".join(where_clauses) if where_clauses else "1=1" # Count total count_sql = f""" SELECT COUNT(*) as total FROM read_parquet(?) WHERE {where_clause} """ count_params = [data_source] + params total = conn.execute(count_sql, count_params).fetchone()[0] # Fetch bills sql = f""" SELECT bill_id, bill_number, title, classification, session, session_name, first_action_date, latest_action_date, latest_action_description, jurisdiction_name FROM read_parquet(?) WHERE {where_clause} ORDER BY latest_action_date DESC NULLS LAST, bill_number DESC LIMIT ? OFFSET ? """ query_params = [data_source] + params + [limit, offset] rows = conn.execute(sql, query_params).fetchall() bills = [] for row in rows: bills.append({ "bill_id": row[0], "bill_number": row[1], "title": row[2], "classification": row[3], "session": row[4], "session_name": row[5], "first_action_date": row[6], "latest_action_date": row[7], "latest_action": row[8], "jurisdiction": row[9] }) conn.close() return { "total": total, "bills": bills, "pagination": { "limit": limit, "offset": offset, "has_more": offset + len(bills) < total }, "filters": { "state": state, "query": q, "session": session } } except HTTPException: raise except Exception as e: logger.error(f"Bills search error for state={state}: {e}") # Parse error into structured response error_detail = parse_error(e, context={ "state": state, "data_type": "bills", "query": q, "session": session }) # Return structured error response return JSONResponse( status_code=500, content=error_detail.model_dump() ) @router.get("/sessions") async def get_sessions( state: str = Query("AL", description="State code") ): """Get available legislative sessions for a state.""" try: bills_file = GOLD_DIR / "states" / state / "bills_bills.parquet" if not bills_file.exists(): raise HTTPException( status_code=404, detail=f"No bills data found for state: {state}" ) conn = duckdb.connect() sql = """ SELECT DISTINCT session, session_name, MIN(first_action_date) as start_date, MAX(latest_action_date) as end_date, COUNT(*) as bill_count FROM read_parquet(?) GROUP BY session, session_name ORDER BY session DESC """ rows = conn.execute(sql, [str(bills_file)]).fetchall() sessions = [] for row in rows: sessions.append({ "session": row[0], "session_name": row[1], "start_date": row[2], "end_date": row[3], "bill_count": row[4] }) conn.close() return { "state": state, "sessions": sessions, "total_sessions": len(sessions) } except HTTPException: raise except Exception as e: logger.error(f"Sessions query error for state={state}: {e}") # Parse error into structured response error_detail = parse_error(e, context={ "state": state, "data_type": "sessions" }) return JSONResponse( status_code=500, content=error_detail.model_dump() ) @router.get("/map") async def get_bill_map_data( topic: Optional[str] = Query(None, description="Topic to filter (e.g., dental, health, education)"), session: Optional[str] = Query(None, description="Legislative session") ): """ Get aggregated bill data for choropleth map visualization. Uses pre-computed national aggregates for instant loading. Returns counts of bills by type and status for each state. **Examples:** - `/api/bills/map?topic=fluorid` - Map fluoridation legislation - `/api/bills/map?topic=dental` - Map dental legislation """ try: # Use pre-aggregated national dataset agg_file = GOLD_DIR / "national" / "bills_map_aggregates.parquet" # Fallback to on-demand aggregation if pre-computed file doesn't exist if not agg_file.exists(): logger.warning("Pre-aggregated bill data not found, using on-demand aggregation (slower)") return await get_bill_map_data_on_demand(topic, session) # Load from cached aggregates (fast!) df = load_parquet_cached(str(agg_file)) # Filter by topic if topic: df = df[df['topic'] == topic.lower()] # Convert to state_data dict state_data = {} for _, row in df.iterrows(): state_code = row['state'] # Reconstruct nested dicts (exclude type_status_counts which is already a dict) type_cols = [c for c in df.columns if c.startswith('type_') and c != 'type_status_counts'] status_cols = [c for c in df.columns if c.startswith('status_')] # Handle NaN values - convert to 0 type_counts = {c.replace('type_', ''): int(row[c]) if not pd.isna(row[c]) else 0 for c in type_cols} status_counts = {c.replace('status_', ''): int(row[c]) if not pd.isna(row[c]) else 0 for c in status_cols} # Extract sample_bills (stored as numpy array in parquet) sample_bills = [] if 'sample_bills' in row.index: bills_data = row['sample_bills'] # Pandas stores list columns as numpy arrays if hasattr(bills_data, '__iter__') and not isinstance(bills_data, str): try: # Convert numpy array or list to Python list sample_bills = [dict(bill) for bill in bills_data if bill] except: sample_bills = [] elif isinstance(bills_data, str): import json try: sample_bills = json.loads(bills_data) except: sample_bills = [] state_data[state_code] = { "state": state_code, "total_bills": int(row['total_bills']), "type_counts": type_counts, "status_counts": status_counts, "primary_type": row['primary_type'], "primary_status": row['primary_status'], "map_category": row['map_category'], "sample_bills": sample_bills, "last_updated": str(row['last_updated']) if 'last_updated' in row.index else '' } return { "topic": topic, "session": session, "states": state_data, "total_states": len(state_data), "legend": { "types": get_legend_for_topic(topic), "statuses": { "enacted": "Enacted", "failed": "Failed", "pending": "Pending" } }, "cached": True } except HTTPException: raise except Exception as e: logger.error(f"Map data error: {e}") error_detail = parse_error(e, context={ "data_type": "bill map", "topic": topic, "session": session }) return JSONResponse( status_code=500, content=error_detail.model_dump() ) async def get_bill_map_data_on_demand( topic: Optional[str] = None, session: Optional[str] = None ): """ LEGACY: On-demand aggregation (slow - loads 50 state files). Only used as fallback if pre-aggregated data doesn't exist. """ try: # List of all US state codes to check ALL_STATES = [ "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY" ] # In local environment, check available directories # In HF Spaces, try all states (will skip missing datasets) states_to_check = ALL_STATES if not IS_HF_SPACES: states_dir = GOLD_DIR / "states" if states_dir.exists(): states_to_check = [d.name for d in states_dir.iterdir() if d.is_dir()] state_data = {} # Iterate through states for state_code in states_to_check: try: bills_file = GOLD_DIR / "states" / state_code / "bills_bills.parquet" # Get data source (local or remote HuggingFace URL) data_source = get_data_source(bills_file, use_remote=IS_HF_SPACES) # Connect to DuckDB conn = duckdb.connect() # Build query where_clauses = ["1=1"] params = [data_source] if topic: where_clauses.append("LOWER(title) LIKE LOWER(?)") params.append(f'%{topic}%') if session: where_clauses.append("session = ?") params.append(session) where_clause = " AND ".join(where_clauses) sql = f""" SELECT title, classification, latest_action_description FROM read_parquet(?) WHERE {where_clause} """ rows = conn.execute(sql, params).fetchall() conn.close() if not rows: continue # Get topic-aware categories legend_categories = get_legend_for_topic(topic) # Initialize type_counts with all possible categories for this topic type_counts = {cat: 0 for cat in legend_categories.keys()} status_counts = {'enacted': 0, 'failed': 0, 'pending': 0} type_status_counts = {} for row in rows: title = row[0] classification = row[1] if row[1] else [] latest_action = row[2] if row[2] else '' bill_type = classify_bill_type(title, classification, topic) bill_status = determine_bill_status(latest_action, '') # Ensure bill_type exists in type_counts (fallback to 'other') if bill_type not in type_counts: bill_type = 'other' type_counts[bill_type] += 1 status_counts[bill_status] += 1 # Track type+status combinations key = f"{bill_type}_{bill_status}" type_status_counts[key] = type_status_counts.get(key, 0) + 1 # Determine primary legislation type and status for map visualization primary_type = max(type_counts, key=type_counts.get) primary_status = max(status_counts, key=status_counts.get) state_data[state_code] = { "state": state_code, "total_bills": len(rows), "type_counts": type_counts, "status_counts": status_counts, "type_status_counts": type_status_counts, "primary_type": primary_type, "primary_status": primary_status, # For map visualization "map_category": f"{primary_type}_{primary_status}" if type_counts[primary_type] > 0 else "none" } except Exception as e: # Skip states with missing or inaccessible data logger.debug(f"Skipping state {state_code}: {str(e)}") continue return { "topic": topic, "session": session, "states": state_data, "total_states": len(state_data), "legend": { "types": get_legend_for_topic(topic), "statuses": { "enacted": "Enacted", "failed": "Failed", "pending": "Pending" } } } except HTTPException: raise except Exception as e: logger.error(f"Map data error: {e}") # Parse error into structured response error_detail = parse_error(e, context={ "data_type": "bill map", "topic": topic, "session": session }) return JSONResponse( status_code=500, content=error_detail.model_dump() ) @router.get("/{bill_id}") async def get_bill_details(bill_id: str): """ Get detailed information about a specific bill from gold parquet files. Args: bill_id: Bill identifier in format {state}-{bill_number} (e.g., "LA-SB 4") Returns: Detailed bill information including actions, sponsors, sources """ try: # Parse bill_id to extract state and bill number if '-' not in bill_id: raise HTTPException(status_code=400, detail="Invalid bill ID format. Expected: STATE-BILLNUMBER") parts = bill_id.split('-', 1) state = parts[0].upper() bill_number = parts[1] # Build file paths for bills data from gold layer bills_file = GOLD_DIR / "states" / state / "bills_bills.parquet" actions_file = GOLD_DIR / "states" / state / "bills_bill_actions.parquet" sponsors_file = GOLD_DIR / "states" / state / "bills_bill_sponsorships.parquet" # Get data sources (local or remote HuggingFace URL) bills_source = get_data_source(bills_file, use_remote=IS_HF_SPACES) actions_source = get_data_source(actions_file, use_remote=IS_HF_SPACES) sponsors_source = get_data_source(sponsors_file, use_remote=IS_HF_SPACES) # Connect to DuckDB for querying parquet files conn = duckdb.connect() try: # Query for the specific bill bill_query = """ SELECT bill_id, bill_number, title, classification, latest_action_description, latest_action_date, first_action_date, session, session_name, jurisdiction_name FROM read_parquet(?) WHERE bill_number = ? LIMIT 1 """ result = conn.execute(bill_query, [bills_source, bill_number]).fetchone() if not result: conn.close() raise HTTPException(status_code=404, detail=f"Bill {bill_number} not found in {state}") # Parse bill data bill_data = { "bill_id": result[0] if result[0] else bill_id, "bill_number": result[1], "title": result[2], "classification": result[3] if result[3] else [], "latest_action": result[4], "latest_action_date": result[5], "first_action_date": result[6], "session": result[7], "session_name": result[8], "jurisdiction": result[9], "state": state, } # Get sponsors if available try: sponsor_query = """ SELECT name, primary_sponsor, classification FROM read_parquet(?) WHERE bill_id = ? ORDER BY primary_sponsor DESC """ sponsor_rows = conn.execute(sponsor_query, [sponsors_source, bill_data["bill_id"]]).fetchall() bill_data["sponsors"] = [ {"name": s[0], "primary": bool(s[1]), "classification": s[2]} for s in sponsor_rows ] except Exception as e: logger.warning(f"Could not load sponsors for {bill_id}: {e}") bill_data["sponsors"] = [] # Get actions if available try: actions_query = """ SELECT description, date, classification FROM read_parquet(?) WHERE bill_id = ? ORDER BY date DESC LIMIT 10 """ action_rows = conn.execute(actions_query, [actions_source, bill_data["bill_id"]]).fetchall() bill_data["actions"] = [ {"description": a[0], "date": a[1], "classification": a[2]} for a in action_rows ] except Exception as e: logger.warning(f"Could not load actions for {bill_id}: {e}") bill_data["actions"] = [] conn.close() return bill_data except Exception as e: conn.close() raise except HTTPException: raise except Exception as e: logger.error(f"Bill details error: {e}") logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e))