""" Legislative Tracking System Download and track state legislation across multiple social issues, categorizing bills by type (ban/restriction/protection) and status (introduced/enacted/failed). Creates choropleth maps showing legislative activity by state. Data Sources: - Open States API (state legislation) - Ballotpedia (ballot measures) - LegiScan (additional tracking) Usage: # Track fluoridation legislation python scripts/legislative_tracker.py --issue fluoridation --year 2024 # Track multiple issues python scripts/legislative_tracker.py --issue abortion,marijuana,voting --year 2024 # Generate map visualization python scripts/legislative_tracker.py --issue fluoridation --visualize """ import asyncio import os from typing import List, Dict, Optional from datetime import datetime import json from pathlib import Path import httpx import pandas as pd from loguru import logger from dotenv import load_dotenv # Visualization libraries try: import plotly.graph_objects as go import plotly.express as px PLOTLY_AVAILABLE = True except ImportError: PLOTLY_AVAILABLE = False logger.warning("Plotly not installed. Run: pip install plotly") try: import matplotlib.pyplot as plt import matplotlib.patches as mpatches MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False logger.warning("Matplotlib not installed. Run: pip install matplotlib") load_dotenv() class LegislativeTracker: """ Track state legislation across multiple social issues. Categorizes bills by: - Type: Outright Ban, Restriction, Protection - Status: Introduced, Enacted, Failed Creates visualizations similar to legislative tracking maps. """ def __init__( self, openstates_api_key: Optional[str] = None, cache_dir: str = "data/cache/legislation" ): self.api_key = openstates_api_key or os.getenv("OPENSTATES_API_KEY") self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.base_url = "https://v3.openstates.org" # Issue-specific keywords for categorization self.issue_keywords = { "fluoridation": { "ban": ["prohibit fluoridation", "ban fluoride", "remove fluoride", "prohibit fluoride"], "restriction": ["limit fluoridation", "restrict fluoride", "opt-out fluoride", "fluoride disclosure"], "protection": ["require fluoridation", "mandate fluoride", "fluoride protection", "fluoride funding"] }, "abortion": { "ban": ["ban abortion", "prohibit abortion", "criminalize abortion", "abortion ban"], "restriction": ["abortion restriction", "parental consent", "waiting period", "gestational limit"], "protection": ["abortion access", "protect abortion", "abortion rights", "reproductive freedom"] }, "marijuana": { "ban": ["prohibit marijuana", "cannabis ban", "marijuana criminal"], "restriction": ["marijuana restriction", "cannabis regulation", "limited medical"], "protection": ["legalize marijuana", "cannabis legalization", "marijuana rights", "decriminalize"] }, "voting": { "ban": ["voter id requirement", "restrict voting", "purge voter rolls"], "restriction": ["voting restriction", "ballot access", "registration deadline"], "protection": ["expand voting", "voter protection", "automatic registration", "early voting"] }, "lgbtq": { "ban": ["ban transgender", "prohibit gender", "bathroom ban", "sports ban"], "restriction": ["transgender restriction", "gender therapy limit", "parental consent gender"], "protection": ["lgbtq protection", "transgender rights", "nondiscrimination", "gender identity protection"] }, "education": { "ban": ["ban critical race theory", "prohibit teaching", "book ban"], "restriction": ["curriculum restriction", "parental rights education", "opt-out"], "protection": ["education funding", "school protection", "teacher rights"] } } async def search_bills( self, issue: str, year: Optional[int] = None, state: Optional[str] = None ) -> List[Dict]: """ Search for bills related to specific issue. Args: issue: Issue keyword (e.g., 'fluoridation', 'abortion') year: Legislative session year (default: current year) state: State code (e.g., 'AL') or None for all states Returns: List of bill dictionaries """ if not self.api_key: raise ValueError("OPENSTATES_API_KEY required. Get one at https://openstates.org/accounts/signup/") year = year or datetime.now().year search_query = issue logger.info(f"Searching Open States API for '{issue}' bills in {year}") params = { "q": search_query, "page": 1, "per_page": 100 } if state: params["jurisdiction"] = state headers = { "X-API-Key": self.api_key } all_bills = [] async with httpx.AsyncClient(timeout=30.0) as client: while True: try: response = await client.get( f"{self.base_url}/bills", params=params, headers=headers ) response.raise_for_status() data = response.json() bills = data.get("results", []) all_bills.extend(bills) logger.info(f" Fetched page {params['page']}: {len(bills)} bills") # Check if there are more pages if not data.get("pagination", {}).get("next"): break params["page"] += 1 # Rate limiting await asyncio.sleep(0.5) except Exception as e: logger.error(f"Error fetching bills: {e}") break logger.info(f"āœ… Total bills found: {len(all_bills)}") return all_bills def categorize_bill(self, bill: Dict, issue: str) -> Dict: """ Categorize bill by type and status. Args: bill: Bill dictionary from Open States API issue: Issue keyword Returns: Dictionary with categorization """ title = bill.get("title", "").lower() summary = bill.get("abstracts", [{}])[0].get("abstract", "").lower() text = f"{title} {summary}" # Determine bill type bill_type = "unknown" keywords = self.issue_keywords.get(issue, {}) for keyword in keywords.get("ban", []): if keyword.lower() in text: bill_type = "ban" break if bill_type == "unknown": for keyword in keywords.get("restriction", []): if keyword.lower() in text: bill_type = "restriction" break if bill_type == "unknown": for keyword in keywords.get("protection", []): if keyword.lower() in text: bill_type = "protection" break # Determine status latest_action = bill.get("latest_action_description", "").lower() status = "introduced" if any(word in latest_action for word in ["signed", "enacted", "passed", "approved"]): status = "enacted" elif any(word in latest_action for word in ["failed", "defeated", "vetoed", "withdrawn"]): status = "failed" elif any(word in latest_action for word in ["introduced", "referred", "committee"]): status = "introduced" return { "bill_id": bill.get("identifier"), "state": bill.get("jurisdiction", {}).get("name"), "state_code": bill.get("jurisdiction", {}).get("id", "").replace("ocd-jurisdiction/country:us/state:", "").upper(), "title": bill.get("title"), "type": bill_type, "status": status, "url": bill.get("openstates_url"), "session": bill.get("session", {}).get("identifier"), "latest_action": bill.get("latest_action_description"), "latest_action_date": bill.get("latest_action_date"), "created_at": bill.get("created_at") } async def track_issue( self, issue: str, year: Optional[int] = None, states: Optional[List[str]] = None ) -> pd.DataFrame: """ Track legislation for specific issue across all states. Args: issue: Issue keyword year: Year to track states: List of state codes (None = all states) Returns: DataFrame with categorized bills """ logger.info(f"Tracking '{issue}' legislation for {year or 'current year'}") # Search all bills all_bills = await self.search_bills(issue, year) # Categorize each bill categorized = [] for bill in all_bills: cat = self.categorize_bill(bill, issue) categorized.append(cat) df = pd.DataFrame(categorized) # Filter by states if specified if states: df = df[df['state_code'].isin(states)] # Save to cache cache_file = self.cache_dir / f"{issue}_{year or 'current'}.csv" df.to_csv(cache_file, index=False) logger.info(f"āœ… Saved {len(df)} bills to {cache_file}") return df def generate_state_summary(self, df: pd.DataFrame) -> pd.DataFrame: """ Generate state-level summary of legislation. Args: df: DataFrame with categorized bills Returns: DataFrame with one row per state """ # Count bills by state, type, and status summary = df.groupby(['state_code', 'type', 'status']).size().reset_index(name='count') # Pivot to wide format state_summary = [] for state in summary['state_code'].unique(): state_data = summary[summary['state_code'] == state] # Determine dominant legislation type type_counts = state_data.groupby('type')['count'].sum() if len(type_counts) > 0: dominant_type = type_counts.idxmax() else: continue # Determine dominant status for that type type_status = state_data[state_data['type'] == dominant_type] status_counts = type_status.groupby('status')['count'].sum() dominant_status = status_counts.idxmax() if len(status_counts) > 0 else "introduced" state_summary.append({ 'state_code': state, 'dominant_type': dominant_type, 'dominant_status': dominant_status, 'total_bills': state_data['count'].sum(), 'ban_count': state_data[state_data['type'] == 'ban']['count'].sum() if 'ban' in state_data['type'].values else 0, 'restriction_count': state_data[state_data['type'] == 'restriction']['count'].sum() if 'restriction' in state_data['type'].values else 0, 'protection_count': state_data[state_data['type'] == 'protection']['count'].sum() if 'protection' in state_data['type'].values else 0, }) return pd.DataFrame(state_summary) def create_choropleth_map( self, df: pd.DataFrame, issue: str, output_file: Optional[str] = None ): """ Create choropleth map showing legislative activity by state. Similar to the fluoridation map visualization. Args: df: DataFrame with categorized bills issue: Issue name for title output_file: Path to save HTML file (default: data/visualizations/{issue}_map.html) """ if not PLOTLY_AVAILABLE: logger.error("Plotly not installed. Run: pip install plotly") return # Generate state summary state_summary = self.generate_state_summary(df) # Define color scheme color_map = { ('ban', 'enacted'): '#D2691E', # Brown (ban enacted - solid) ('ban', 'introduced'): '#FFA500', # Orange (ban introduced - lighter) ('ban', 'failed'): '#FFE4B5', # Moccasin (ban failed - lightest) ('restriction', 'enacted'): '#DAA520', # Goldenrod (restriction enacted) ('restriction', 'introduced'): '#FFD700', # Gold (restriction introduced) ('restriction', 'failed'): '#FFFFE0', # Light yellow (restriction failed) ('protection', 'enacted'): '#00008B', # Dark blue (protection enacted) ('protection', 'introduced'): '#4169E1', # Royal blue (protection introduced) ('protection', 'failed'): '#87CEEB', # Sky blue (protection failed) ('unknown', 'introduced'): '#D3D3D3', # Light gray (unknown) } # Map state codes to colors state_summary['color'] = state_summary.apply( lambda row: color_map.get((row['dominant_type'], row['dominant_status']), '#FFFFFF'), axis=1 ) # Create hover text state_summary['hover_text'] = state_summary.apply( lambda row: f"{row['state_code']}
" + f"Type: {row['dominant_type'].title()}
" + f"Status: {row['dominant_status'].title()}
" + f"Total Bills: {row['total_bills']}
" + f"Bans: {row['ban_count']}
" + f"Restrictions: {row['restriction_count']}
" + f"Protections: {row['protection_count']}", axis=1 ) # Create choropleth fig = go.Figure(data=go.Choropleth( locations=state_summary['state_code'], z=state_summary['total_bills'], # Color intensity by bill count locationmode='USA-states', colorscale='Blues', marker_line_color='white', marker_line_width=0.5, text=state_summary['hover_text'], hoverinfo='text', showscale=True )) fig.update_layout( title_text=f'{issue.title()} Legislation Tracker', geo_scope='usa', height=600, width=1000 ) # Save to file output_file = output_file or f"data/visualizations/{issue}_map.html" Path(output_file).parent.mkdir(parents=True, exist_ok=True) fig.write_html(output_file) logger.info(f"āœ… Map saved to {output_file}") # Also save legend as separate image self._create_legend(issue) return fig def _create_legend(self, issue: str): """Create a separate legend image showing bill types and statuses.""" if not MATPLOTLIB_AVAILABLE: return fig, ax = plt.subplots(figsize=(8, 6)) ax.axis('off') # Define patches for legend legend_elements = [ mpatches.Patch(color='#D2691E', label='Outright Ban (Enacted)'), mpatches.Patch(color='#FFA500', label='Outright Ban (Introduced)'), mpatches.Patch(color='#FFE4B5', label='Outright Ban (Failed)'), mpatches.Patch(color='#DAA520', label='Restriction (Enacted)'), mpatches.Patch(color='#FFD700', label='Restriction (Introduced)'), mpatches.Patch(color='#FFFFE0', label='Restriction (Failed)'), mpatches.Patch(color='#00008B', label='Protection (Enacted)'), mpatches.Patch(color='#4169E1', label='Protection (Introduced)'), mpatches.Patch(color='#87CEEB', label='Protection (Failed)'), ] ax.legend(handles=legend_elements, loc='center', fontsize=12, title=f'{issue.title()} Legislation Types') output_file = f"data/visualizations/{issue}_legend.png" plt.savefig(output_file, dpi=150, bbox_inches='tight') logger.info(f"āœ… Legend saved to {output_file}") plt.close() async def main(): """Main CLI entry point.""" import argparse parser = argparse.ArgumentParser(description="Track state legislation across social issues") parser.add_argument("--issue", required=True, help="Issue to track (e.g., 'fluoridation', 'abortion')") parser.add_argument("--year", type=int, help="Year to track (default: current year)") parser.add_argument("--visualize", action="store_true", help="Generate map visualization") parser.add_argument("--output", help="Output file path for visualization") args = parser.parse_args() tracker = LegislativeTracker() # Track legislation df = await tracker.track_issue(args.issue, args.year) logger.info(f"\nšŸ“Š Summary for {args.issue}:") logger.info(f" Total bills: {len(df)}") logger.info(f" Bans: {len(df[df['type'] == 'ban'])}") logger.info(f" Restrictions: {len(df[df['type'] == 'restriction'])}") logger.info(f" Protections: {len(df[df['type'] == 'protection'])}") logger.info(f" Enacted: {len(df[df['status'] == 'enacted'])}") logger.info(f" Failed: {len(df[df['status'] == 'failed'])}") # Generate visualization if args.visualize: tracker.create_choropleth_map(df, args.issue, args.output) if __name__ == "__main__": asyncio.run(main())