open-navigator / api /routes /bills.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
6dcf9fc verified
"""
Bills API Routes - Legislative bill data from OpenStates
"""
from fastapi import APIRouter, Query, HTTPException
from fastapi.responses import JSONResponse
from typing import Optional, List, Dict
import duckdb
import pandas as pd
from pathlib import Path
from loguru import logger
import re
import os
import sys
import traceback
from api.errors import ErrorDetail, parse_error
from api.routes.search import load_parquet_cached
router = APIRouter(prefix="/bills", tags=["bills"])
GOLD_DIR = Path("data/gold")
IS_HF_SPACES = os.getenv("HF_SPACES") == "1"
HF_ORGANIZATION = "CommunityOne"
def get_hf_dataset_url(dataset_name: str) -> str:
"""
Convert dataset name to HuggingFace parquet URL.
HuggingFace Datasets library stores parquet files in the standard format:
data/train-00000-of-00001.parquet
Args:
dataset_name: Dataset name (e.g., 'states-ma-bills-bills')
Returns:
Full URL to the parquet file
"""
return f"https://huggingface.co/datasets/{HF_ORGANIZATION}/{dataset_name}/resolve/main/data/train-00000-of-00001.parquet"
def get_data_source(file_path: Path, use_remote: bool = False) -> str:
"""Get data source (local path or remote URL) based on environment."""
if not IS_HF_SPACES and not use_remote:
return str(file_path)
# Convert local path to HuggingFace dataset name
parts = file_path.parts
if 'states' in parts:
state_idx = parts.index('states')
state = parts[state_idx + 1].lower()
filename = parts[-1].replace('.parquet', '').replace('_', '-')
dataset_name = f"states-{state}-{filename}"
return get_hf_dataset_url(dataset_name)
# Fallback to local
return str(file_path)
def classify_bill_type(title: str, classification: list, topic: Optional[str] = None) -> str:
"""
Classify bill based on topic-specific categories.
Different topics use different classification schemes:
- Fluoridation: mandate, removal, funding, study
- Dental/Oral Health: coverage_expansion, screening, provider_access, funding
- Medicaid: expansion, coverage, reimbursement, eligibility
- Health (general): protection, restriction, funding, reform
- Education: requirement, funding, curriculum, reform
- Default: support, oppose, regulate, other
"""
title_lower = title.lower()
topic_lower = topic.lower() if topic else ""
# EXCEPTION: Fluoride varnish/dental coverage bills (not water fluoridation)
# Check this BEFORE water fluoridation classification
if any(word in title_lower for word in ['varnish', 'sealant', 'dental', 'medicaid', 'medical assistance']) and 'fluoride' in title_lower:
if any(word in title_lower for word in ['coverage', 'expand', 'expansion', 'benefit']):
return 'coverage_expansion'
elif any(word in title_lower for word in ['screening', 'examination', 'check']):
return 'screening'
# If it mentions dental/varnish but unclear type, it's dental "other" not fluoridation
return 'other'
# Fluoridation-specific classifications (WATER fluoridation only)
if 'fluoride' in topic_lower or 'fluoride' in title_lower:
# FIRST: Check for REMOVAL/BAN/PROHIBITION (negative sentiment)
# CRITICAL: Must check these BEFORE "mandate"/"require" to avoid misclassification
# e.g., "prohibit fluoride" should be "removal", not "mandate"
if any(word in title_lower for word in [
'prohibit', 'prohibition', 'prohibited', 'prohibiting',
'ban', 'banning', 'banned',
'discontinue', 'discontinuation',
'cease', 'ceasing',
'eliminate', 'elimination',
'removal', 'remove', 'removing',
'prevent', 'preventing',
'repeal', 'repealing', 'repealed',
'optional', 'opt-out', 'opt out',
'fluoride-free', 'fluoride free'
]):
# But check if it's "prohibit removal" (double negative = pro-fluoride)
if any(phrase in title_lower for phrase in ['prohibit removal', 'prevent removal', 'ban removal']):
return 'mandate' # Prohibiting removal = mandate to keep
return 'removal'
# SECOND: Check for notification/monitoring (before "require" check)
# Bills like "notification required" are about monitoring, not mandating fluoridation
elif any(phrase in title_lower for phrase in [
'notification', 'notify', 'notifying',
'report to', 'reporting', 'report when',
'monitor', 'monitoring'
]):
return 'study'
# THIRD: Check for MANDATE/REQUIRE (positive sentiment)
# Be specific - just "require" alone isn't enough, need context
elif any(phrase in title_lower for phrase in [
'mandate', 'mandating', 'shall fluoridate', 'shall add fluoride',
'must fluoridate', 'must add fluoride',
'require fluoridation', 'require water system to fluoridate',
'require addition of fluoride'
]):
return 'mandate'
# FOURTH: Check for funding
elif any(word in title_lower for word in ['fund', 'funding', 'appropriation', 'grant', 'reimburse', 'subsidy']):
return 'funding'
elif any(word in title_lower for word in ['study', 'research', 'analysis', 'assess', 'evaluate']):
return 'study'
else:
return 'other'
# Dental/Oral Health-specific classifications
elif 'dental' in topic_lower or 'oral health' in topic_lower or 'dental' in title_lower:
if any(word in title_lower for word in ['expand', 'increase coverage', 'extend coverage', 'add coverage']):
return 'coverage_expansion'
elif any(word in title_lower for word in ['screen', 'examination', 'checkup', 'assessment']):
return 'screening'
elif any(word in title_lower for word in ['provider', 'dentist', 'hygienist', 'workforce', 'professional']):
return 'provider_access'
elif any(word in title_lower for word in ['fund', 'appropriation', 'grant', 'budget', 'reimburse']):
return 'funding'
else:
return 'other'
# Medicaid-specific classifications
elif 'medicaid' in topic_lower or 'medicaid' in title_lower:
if any(word in title_lower for word in ['expand', 'expansion', 'extend', 'broaden']):
return 'expansion'
elif any(word in title_lower for word in ['coverage', 'benefit', 'service']):
return 'coverage'
elif any(word in title_lower for word in ['reimburse', 'payment', 'rate', 'compensation']):
return 'reimbursement'
elif any(word in title_lower for word in ['eligib', 'qualify', 'enroll']):
return 'eligibility'
else:
return 'other'
# Education-specific classifications
elif 'education' in topic_lower or 'school' in topic_lower:
if any(word in title_lower for word in ['require', 'mandate', 'shall provide', 'must offer']):
return 'requirement'
elif any(word in title_lower for word in ['fund', 'appropriation', 'grant', 'budget']):
return 'funding'
elif any(word in title_lower for word in ['curriculum', 'course', 'instruction', 'program']):
return 'curriculum'
elif any(word in title_lower for word in ['reform', 'restructure', 'modernize', 'improve']):
return 'reform'
else:
return 'other'
# General health classifications
elif 'health' in topic_lower or 'health' in title_lower:
if any(word in title_lower for word in ['protect', 'preserve', 'safeguard', 'ensure', 'guarantee', 'expand', 'increase', 'enhance', 'support']):
return 'protection'
elif any(word in title_lower for word in ['restrict', 'limit', 'regulate', 'control', 'impose', 'prohibit', 'ban']):
return 'restriction'
elif any(word in title_lower for word in ['fund', 'appropriation', 'grant', 'budget']):
return 'funding'
elif any(word in title_lower for word in ['reform', 'restructure', 'modernize', 'improve']):
return 'reform'
else:
return 'other'
# Default general classifications
else:
if any(word in title_lower for word in ['support', 'promote', 'encourage', 'expand', 'increase', 'enhance', 'fund']):
return 'support'
elif any(word in title_lower for word in ['oppose', 'prohibit', 'ban', 'restrict', 'limit', 'prevent']):
return 'oppose'
elif any(word in title_lower for word in ['regulate', 'oversee', 'control', 'require', 'mandate']):
return 'regulate'
else:
return 'other'
def get_legend_for_topic(topic: Optional[str]) -> dict:
"""
Get appropriate legend labels based on topic.
"""
topic_lower = topic.lower() if topic else ""
if 'fluoride' in topic_lower:
return {
"mandate": "Mandate Fluoridation",
"removal": "Remove Fluoridation",
"funding": "Funding/Grants",
"study": "Study/Research",
"other": "Other"
}
elif 'dental' in topic_lower or 'oral health' in topic_lower:
return {
"coverage_expansion": "Coverage Expansion",
"screening": "Screening Programs",
"provider_access": "Provider Access",
"funding": "Funding/Grants",
"other": "Other"
}
elif 'medicaid' in topic_lower:
return {
"expansion": "Program Expansion",
"coverage": "Coverage/Benefits",
"reimbursement": "Reimbursement",
"eligibility": "Eligibility",
"other": "Other"
}
elif 'education' in topic_lower or 'school' in topic_lower:
return {
"requirement": "Requirements",
"funding": "Funding",
"curriculum": "Curriculum",
"reform": "Reform",
"other": "Other"
}
elif 'health' in topic_lower:
return {
"protection": "Protection/Expansion",
"restriction": "Restriction",
"funding": "Funding",
"reform": "Reform",
"other": "Other"
}
else:
return {
"support": "Support/Promote",
"oppose": "Oppose/Restrict",
"regulate": "Regulate",
"other": "Other"
}
def determine_bill_status(latest_action: str, latest_date: str) -> str:
"""
Determine if bill was enacted, failed, or is pending.
"""
if not latest_action:
return 'pending'
action_lower = latest_action.lower()
# Enacted/Passed
if any(word in action_lower for word in ['signed', 'enacted', 'approved', 'passed both', 'became law']):
return 'enacted'
# Failed
if any(word in action_lower for word in ['failed', 'defeated', 'rejected', 'died', 'withdrawn', 'vetoed']):
return 'failed'
# Pending (default)
return 'pending'
@router.get("")
async def search_bills(
q: Optional[str] = Query(None, description="Search query for bill title or number"),
state: Optional[str] = Query("AL", description="State code (e.g., AL, GA, MA)"),
session: Optional[str] = Query(None, description="Legislative session (e.g., 2024rs)"),
limit: int = Query(20, ge=1, le=100, description="Maximum results"),
offset: int = Query(0, ge=0, description="Number of results to skip")
):
"""
Search legislative bills from OpenStates data.
**Examples:**
- `/api/bills?state=AL&q=dental` - Search Alabama bills for "dental"
- `/api/bills?state=AL&session=2024rs` - Get all 2024 regular session bills
- `/api/bills?state=AL&limit=50` - Browse recent Alabama bills
"""
try:
# Build file path
bills_file = GOLD_DIR / "states" / state / "bills_bills.parquet"
# Get data source (local or remote HuggingFace URL)
data_source = get_data_source(bills_file, use_remote=IS_HF_SPACES)
# Connect to DuckDB
conn = duckdb.connect()
# Build SQL query
where_clauses = []
params = []
if q:
where_clauses.append("(LOWER(title) LIKE LOWER(?) OR LOWER(bill_number) LIKE LOWER(?))")
pattern = f'%{q}%'
params.extend([pattern, pattern])
if session:
where_clauses.append("session = ?")
params.append(session)
where_clause = " AND ".join(where_clauses) if where_clauses else "1=1"
# Count total
count_sql = f"""
SELECT COUNT(*) as total
FROM read_parquet(?)
WHERE {where_clause}
"""
count_params = [data_source] + params
total = conn.execute(count_sql, count_params).fetchone()[0]
# Fetch bills
sql = f"""
SELECT
bill_id,
bill_number,
title,
classification,
session,
session_name,
first_action_date,
latest_action_date,
latest_action_description,
jurisdiction_name
FROM read_parquet(?)
WHERE {where_clause}
ORDER BY latest_action_date DESC NULLS LAST, bill_number DESC
LIMIT ? OFFSET ?
"""
query_params = [data_source] + params + [limit, offset]
rows = conn.execute(sql, query_params).fetchall()
bills = []
for row in rows:
bills.append({
"bill_id": row[0],
"bill_number": row[1],
"title": row[2],
"classification": row[3],
"session": row[4],
"session_name": row[5],
"first_action_date": row[6],
"latest_action_date": row[7],
"latest_action": row[8],
"jurisdiction": row[9]
})
conn.close()
return {
"total": total,
"bills": bills,
"pagination": {
"limit": limit,
"offset": offset,
"has_more": offset + len(bills) < total
},
"filters": {
"state": state,
"query": q,
"session": session
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Bills search error for state={state}: {e}")
# Parse error into structured response
error_detail = parse_error(e, context={
"state": state,
"data_type": "bills",
"query": q,
"session": session
})
# Return structured error response
return JSONResponse(
status_code=500,
content=error_detail.model_dump()
)
@router.get("/sessions")
async def get_sessions(
state: str = Query("AL", description="State code")
):
"""Get available legislative sessions for a state."""
try:
bills_file = GOLD_DIR / "states" / state / "bills_bills.parquet"
if not bills_file.exists():
raise HTTPException(
status_code=404,
detail=f"No bills data found for state: {state}"
)
conn = duckdb.connect()
sql = """
SELECT DISTINCT
session,
session_name,
MIN(first_action_date) as start_date,
MAX(latest_action_date) as end_date,
COUNT(*) as bill_count
FROM read_parquet(?)
GROUP BY session, session_name
ORDER BY session DESC
"""
rows = conn.execute(sql, [str(bills_file)]).fetchall()
sessions = []
for row in rows:
sessions.append({
"session": row[0],
"session_name": row[1],
"start_date": row[2],
"end_date": row[3],
"bill_count": row[4]
})
conn.close()
return {
"state": state,
"sessions": sessions,
"total_sessions": len(sessions)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Sessions query error for state={state}: {e}")
# Parse error into structured response
error_detail = parse_error(e, context={
"state": state,
"data_type": "sessions"
})
return JSONResponse(
status_code=500,
content=error_detail.model_dump()
)
@router.get("/map")
async def get_bill_map_data(
topic: Optional[str] = Query(None, description="Topic to filter (e.g., dental, health, education)"),
session: Optional[str] = Query(None, description="Legislative session")
):
"""
Get aggregated bill data for choropleth map visualization.
Uses pre-computed national aggregates for instant loading.
Returns counts of bills by type and status for each state.
**Examples:**
- `/api/bills/map?topic=fluorid` - Map fluoridation legislation
- `/api/bills/map?topic=dental` - Map dental legislation
"""
try:
# Use pre-aggregated national dataset
agg_file = GOLD_DIR / "national" / "bills_map_aggregates.parquet"
# Fallback to on-demand aggregation if pre-computed file doesn't exist
if not agg_file.exists():
logger.warning("Pre-aggregated bill data not found, using on-demand aggregation (slower)")
return await get_bill_map_data_on_demand(topic, session)
# Load from cached aggregates (fast!)
df = load_parquet_cached(str(agg_file))
# Filter by topic
if topic:
df = df[df['topic'] == topic.lower()]
# Convert to state_data dict
state_data = {}
for _, row in df.iterrows():
state_code = row['state']
# Reconstruct nested dicts (exclude type_status_counts which is already a dict)
type_cols = [c for c in df.columns if c.startswith('type_') and c != 'type_status_counts']
status_cols = [c for c in df.columns if c.startswith('status_')]
# Handle NaN values - convert to 0
type_counts = {c.replace('type_', ''): int(row[c]) if not pd.isna(row[c]) else 0 for c in type_cols}
status_counts = {c.replace('status_', ''): int(row[c]) if not pd.isna(row[c]) else 0 for c in status_cols}
# Extract sample_bills (stored as numpy array in parquet)
sample_bills = []
if 'sample_bills' in row.index:
bills_data = row['sample_bills']
# Pandas stores list columns as numpy arrays
if hasattr(bills_data, '__iter__') and not isinstance(bills_data, str):
try:
# Convert numpy array or list to Python list
sample_bills = [dict(bill) for bill in bills_data if bill]
except:
sample_bills = []
elif isinstance(bills_data, str):
import json
try:
sample_bills = json.loads(bills_data)
except:
sample_bills = []
state_data[state_code] = {
"state": state_code,
"total_bills": int(row['total_bills']),
"type_counts": type_counts,
"status_counts": status_counts,
"primary_type": row['primary_type'],
"primary_status": row['primary_status'],
"map_category": row['map_category'],
"sample_bills": sample_bills,
"last_updated": str(row['last_updated']) if 'last_updated' in row.index else ''
}
return {
"topic": topic,
"session": session,
"states": state_data,
"total_states": len(state_data),
"legend": {
"types": get_legend_for_topic(topic),
"statuses": {
"enacted": "Enacted",
"failed": "Failed",
"pending": "Pending"
}
},
"cached": True
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Map data error: {e}")
error_detail = parse_error(e, context={
"data_type": "bill map",
"topic": topic,
"session": session
})
return JSONResponse(
status_code=500,
content=error_detail.model_dump()
)
async def get_bill_map_data_on_demand(
topic: Optional[str] = None,
session: Optional[str] = None
):
"""
LEGACY: On-demand aggregation (slow - loads 50 state files).
Only used as fallback if pre-aggregated data doesn't exist.
"""
try:
# List of all US state codes to check
ALL_STATES = [
"AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
"HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
"MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
"NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC",
"SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
]
# In local environment, check available directories
# In HF Spaces, try all states (will skip missing datasets)
states_to_check = ALL_STATES
if not IS_HF_SPACES:
states_dir = GOLD_DIR / "states"
if states_dir.exists():
states_to_check = [d.name for d in states_dir.iterdir() if d.is_dir()]
state_data = {}
# Iterate through states
for state_code in states_to_check:
try:
bills_file = GOLD_DIR / "states" / state_code / "bills_bills.parquet"
# Get data source (local or remote HuggingFace URL)
data_source = get_data_source(bills_file, use_remote=IS_HF_SPACES)
# Connect to DuckDB
conn = duckdb.connect()
# Build query
where_clauses = ["1=1"]
params = [data_source]
if topic:
where_clauses.append("LOWER(title) LIKE LOWER(?)")
params.append(f'%{topic}%')
if session:
where_clauses.append("session = ?")
params.append(session)
where_clause = " AND ".join(where_clauses)
sql = f"""
SELECT
title,
classification,
latest_action_description
FROM read_parquet(?)
WHERE {where_clause}
"""
rows = conn.execute(sql, params).fetchall()
conn.close()
if not rows:
continue
# Get topic-aware categories
legend_categories = get_legend_for_topic(topic)
# Initialize type_counts with all possible categories for this topic
type_counts = {cat: 0 for cat in legend_categories.keys()}
status_counts = {'enacted': 0, 'failed': 0, 'pending': 0}
type_status_counts = {}
for row in rows:
title = row[0]
classification = row[1] if row[1] else []
latest_action = row[2] if row[2] else ''
bill_type = classify_bill_type(title, classification, topic)
bill_status = determine_bill_status(latest_action, '')
# Ensure bill_type exists in type_counts (fallback to 'other')
if bill_type not in type_counts:
bill_type = 'other'
type_counts[bill_type] += 1
status_counts[bill_status] += 1
# Track type+status combinations
key = f"{bill_type}_{bill_status}"
type_status_counts[key] = type_status_counts.get(key, 0) + 1
# Determine primary legislation type and status for map visualization
primary_type = max(type_counts, key=type_counts.get)
primary_status = max(status_counts, key=status_counts.get)
state_data[state_code] = {
"state": state_code,
"total_bills": len(rows),
"type_counts": type_counts,
"status_counts": status_counts,
"type_status_counts": type_status_counts,
"primary_type": primary_type,
"primary_status": primary_status,
# For map visualization
"map_category": f"{primary_type}_{primary_status}" if type_counts[primary_type] > 0 else "none"
}
except Exception as e:
# Skip states with missing or inaccessible data
logger.debug(f"Skipping state {state_code}: {str(e)}")
continue
return {
"topic": topic,
"session": session,
"states": state_data,
"total_states": len(state_data),
"legend": {
"types": get_legend_for_topic(topic),
"statuses": {
"enacted": "Enacted",
"failed": "Failed",
"pending": "Pending"
}
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Map data error: {e}")
# Parse error into structured response
error_detail = parse_error(e, context={
"data_type": "bill map",
"topic": topic,
"session": session
})
return JSONResponse(
status_code=500,
content=error_detail.model_dump()
)
@router.get("/{bill_id}")
async def get_bill_details(bill_id: str):
"""
Get detailed information about a specific bill from gold parquet files.
Args:
bill_id: Bill identifier in format {state}-{bill_number} (e.g., "LA-SB 4")
Returns:
Detailed bill information including actions, sponsors, sources
"""
try:
# Parse bill_id to extract state and bill number
if '-' not in bill_id:
raise HTTPException(status_code=400, detail="Invalid bill ID format. Expected: STATE-BILLNUMBER")
parts = bill_id.split('-', 1)
state = parts[0].upper()
bill_number = parts[1]
# Build file paths for bills data from gold layer
bills_file = GOLD_DIR / "states" / state / "bills_bills.parquet"
actions_file = GOLD_DIR / "states" / state / "bills_bill_actions.parquet"
sponsors_file = GOLD_DIR / "states" / state / "bills_bill_sponsorships.parquet"
# Get data sources (local or remote HuggingFace URL)
bills_source = get_data_source(bills_file, use_remote=IS_HF_SPACES)
actions_source = get_data_source(actions_file, use_remote=IS_HF_SPACES)
sponsors_source = get_data_source(sponsors_file, use_remote=IS_HF_SPACES)
# Connect to DuckDB for querying parquet files
conn = duckdb.connect()
try:
# Query for the specific bill
bill_query = """
SELECT
bill_id,
bill_number,
title,
classification,
latest_action_description,
latest_action_date,
first_action_date,
session,
session_name,
jurisdiction_name
FROM read_parquet(?)
WHERE bill_number = ?
LIMIT 1
"""
result = conn.execute(bill_query, [bills_source, bill_number]).fetchone()
if not result:
conn.close()
raise HTTPException(status_code=404, detail=f"Bill {bill_number} not found in {state}")
# Parse bill data
bill_data = {
"bill_id": result[0] if result[0] else bill_id,
"bill_number": result[1],
"title": result[2],
"classification": result[3] if result[3] else [],
"latest_action": result[4],
"latest_action_date": result[5],
"first_action_date": result[6],
"session": result[7],
"session_name": result[8],
"jurisdiction": result[9],
"state": state,
}
# Get sponsors if available
try:
sponsor_query = """
SELECT name, primary_sponsor, classification
FROM read_parquet(?)
WHERE bill_id = ?
ORDER BY primary_sponsor DESC
"""
sponsor_rows = conn.execute(sponsor_query, [sponsors_source, bill_data["bill_id"]]).fetchall()
bill_data["sponsors"] = [
{"name": s[0], "primary": bool(s[1]), "classification": s[2]}
for s in sponsor_rows
]
except Exception as e:
logger.warning(f"Could not load sponsors for {bill_id}: {e}")
bill_data["sponsors"] = []
# Get actions if available
try:
actions_query = """
SELECT description, date, classification
FROM read_parquet(?)
WHERE bill_id = ?
ORDER BY date DESC
LIMIT 10
"""
action_rows = conn.execute(actions_query, [actions_source, bill_data["bill_id"]]).fetchall()
bill_data["actions"] = [
{"description": a[0], "date": a[1], "classification": a[2]}
for a in action_rows
]
except Exception as e:
logger.warning(f"Could not load actions for {bill_id}: {e}")
bill_data["actions"] = []
conn.close()
return bill_data
except Exception as e:
conn.close()
raise
except HTTPException:
raise
except Exception as e:
logger.error(f"Bill details error: {e}")
logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))