Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import os | |
| import sys | |
| import json | |
| import time | |
| import logging | |
| import base64 | |
| from datetime import datetime, timezone | |
| import requests | |
| from dotenv import load_dotenv | |
| from bs4 import BeautifulSoup | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Pull API key from environment | |
| API_KEY = os.getenv("LEGISCAN_API_KEY") # Set your LegiScan API key in .env | |
| if not API_KEY: | |
| print("Error: Please set LEGISCAN_API_KEY in your .env file.") | |
| sys.exit(1) | |
| # Modes for testing | |
| # Quick test: pulls only TEST_MAX_BILLS bills | |
| TESTING_MODE = False | |
| # Full test: pulls all bills for TEST_STATE and TEST_YEAR without bill count cap | |
| FULL_TESTING_MODE = False | |
| TEST_STATE = 'CA' | |
| TEST_YEAR = 2023 | |
| TEST_MAX_BILLS = 3 | |
| # Output files | |
| CACHE_FILE = "data/bill_cache.json" # Stores bill_id -> change_hash | |
| OUTPUT_FILE = "data/known_bills.json" # Final bills data | |
| # Query settings | |
| QUERY = "artificial intelligence" | |
| START_YEAR = 2023 | |
| END_YEAR = datetime.now(timezone.utc).year | |
| # Include all state legislatures plus U.S. Congress (both chambers) | |
| STATES = [ | |
| "AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA", | |
| "HI","ID","IL","IN","IA","KS","KY","LA","ME","MD", | |
| "MA","MI","MN","MS","MO","MT","NE","NV","NH","NJ", | |
| "NM","NY","NC","ND","OH","OK","OR","PA","RI","SC", | |
| "SD","TN","TX","UT","VT","VA","WA","WV","WI","WY", | |
| "US" # U.S. Congress | |
| ] | |
| # Rate limiting (seconds between requests) | |
| RATE_LIMIT = 0.2 | |
| # Create logs directory if it doesn't exist | |
| os.makedirs("data_updating_scripts/logs", exist_ok=True) | |
| # Logging configuration | |
| LOG_FILE = "data_updating_scripts/logs/fetch_ai_bills.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler(LOG_FILE) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Apply testing overrides | |
| if TESTING_MODE: | |
| logger.info(f"*** TESTING MODE: fetching only {TEST_MAX_BILLS} bills from {TEST_STATE} ({TEST_YEAR}) ***") | |
| STATES = [TEST_STATE] | |
| if FULL_TESTING_MODE: | |
| logger.info(f"*** FULL TESTING MODE: fetching all bills from {TEST_STATE} ({TEST_YEAR}) ***") | |
| STATES = [TEST_STATE] | |
| def load_json(path, default): | |
| try: | |
| with open(path, 'r') as f: | |
| return json.load(f) | |
| except (FileNotFoundError, json.JSONDecodeError): | |
| return default | |
| def save_json(path, data): | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| logger.info(f"Saved JSON to {path}") | |
| def legi_request(op, params): | |
| base = "https://api.legiscan.com/" | |
| params.update({"key": API_KEY, "op": op}) | |
| try: | |
| resp = requests.get(base, params=params, timeout=10) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get("status") != "OK": | |
| logger.error(f"API error {op}: {data.get('message', data)}") | |
| return None | |
| return data | |
| except requests.RequestException as e: | |
| logger.error(f"Request failed ({op}): {e}") | |
| return None | |
| def extract_plain_text(html_content: str) -> str: | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| return soup.get_text(separator="\n", strip=True) | |
| def main(): | |
| cache = load_json(CACHE_FILE, {}) | |
| existing = load_json(OUTPUT_FILE, []) | |
| existing_map = {b.get("bill_id"): b for b in existing} | |
| logger.info(f"Loaded cache entries: {len(cache)}, existing bills: {len(existing)}") | |
| collected = [] | |
| total_fetched = 0 | |
| years = [TEST_YEAR] if (TESTING_MODE or FULL_TESTING_MODE) else list(range(START_YEAR, END_YEAR + 1)) | |
| for state in STATES: | |
| for year in years: | |
| page = 1 | |
| while True: | |
| if TESTING_MODE and total_fetched >= TEST_MAX_BILLS: | |
| logger.info("Reached TEST_MAX_BILLS limit, stopping early.") | |
| break | |
| params = {"state": state, "year": year, "query": QUERY, "page": page} | |
| logger.info(f"Searching {state} for {year}, page {page}") | |
| data = legi_request("getSearch", params) | |
| if not data: | |
| break | |
| results = data.get("searchresult", {}) | |
| summary = results.get("summary", {}) | |
| bills = [v for k, v in results.items() if k != "summary"] | |
| if not bills: | |
| logger.info(f"No bills on page {page} for {state} {year}") | |
| break | |
| logger.info(f"Found {len(bills)} bills on {state} {year} page {page}") | |
| for bill in bills: | |
| if TESTING_MODE and total_fetched >= TEST_MAX_BILLS: | |
| break | |
| bill_id = str(bill.get("bill_id")) | |
| state_code = bill.get("state") | |
| bill_num = bill.get("bill_number") | |
| logger.info(f"Processing bill {state_code}_{bill_num} (ID: {bill_id})") | |
| details_resp = legi_request("getBill", {"id": bill_id}) | |
| if not details_resp: | |
| continue | |
| details = details_resp.get("bill", {}) | |
| sess_year = details.get("session", {}).get("year_start", 0) | |
| if sess_year < START_YEAR: | |
| continue | |
| new_hash = details.get("change_hash") | |
| old_hash = cache.get(bill_id) | |
| now_iso = datetime.now(timezone.utc).isoformat() | |
| # Extract all relevant dates | |
| explicit = details.get("last_action_date") | |
| status_date = details.get("status_date") | |
| last_vote_date = details.get("last_vote_date") | |
| last_amendment_date = details.get("last_amendment_date") | |
| actions = details.get("actions", []) | |
| action_dates = [a.get("action_date") for a in actions if a.get("action_date")] | |
| most_recent_action = max(action_dates) if action_dates else None | |
| candidates = [d for d in [explicit, status_date, last_vote_date, last_amendment_date, most_recent_action] if d] | |
| last_action_date = max(candidates) if candidates else None | |
| bill_url = details.get("url") # Bill detail page URL | |
| if new_hash and new_hash == old_hash and bill_id in existing_map: | |
| entry = existing_map[bill_id] | |
| entry.update({ | |
| "status": details.get("status"), | |
| "session_year": f"{details.get('session', {}).get('year_start', '')}-{details.get('session', {}).get('year_end', '')}", | |
| "last_action_date": last_action_date, | |
| "status_date": status_date, | |
| "last_vote_date": last_vote_date, | |
| "last_amendment_date": last_amendment_date, | |
| "actions": actions, | |
| "bill_url": bill_url, | |
| "lastUpdatedAt": now_iso | |
| }) | |
| logger.info(f"Reused cache; updated status={entry['status']}, last_action_date={entry['last_action_date']}") | |
| else: | |
| plain_text = None | |
| texts = details.get("texts", []) | |
| if texts: | |
| doc_id = texts[0].get("doc_id") | |
| text_resp = legi_request("getBillText", {"id": doc_id}) | |
| if text_resp and "text" in text_resp: | |
| raw_b64 = text_resp["text"].get("doc", "") | |
| try: | |
| decoded = base64.b64decode(raw_b64) | |
| html = decoded.decode("utf-8", errors="ignore") | |
| plain_text = extract_plain_text(html) | |
| except Exception as e: | |
| logger.error(f"Failed decoding HTML for {bill_id}: {e}") | |
| entry = { | |
| "bill_id": bill_id, | |
| "state": state_code, | |
| "bill_number": bill_num, | |
| "session_year": f"{details.get('session', {}).get('year_start', '')}-{details.get('session', {}).get('year_end', '')}", | |
| "title": details.get("title"), | |
| "description": details.get("description"), | |
| "status": details.get("status"), | |
| "sponsors": [s.get("name") for s in details.get("sponsors", [])], | |
| "text": plain_text, | |
| "last_action_date": last_action_date, | |
| "status_date": status_date, | |
| "last_vote_date": last_vote_date, | |
| "last_amendment_date": last_amendment_date, | |
| "actions": actions, | |
| "bill_url": bill_url, | |
| "change_hash": new_hash, | |
| "lastUpdatedAt": now_iso | |
| } | |
| cache[bill_id] = new_hash | |
| logger.info( | |
| f"Entry data: title='{entry['title']}', sponsors={len(entry['sponsors'])}, " | |
| f"status={entry['status']}, last_action_date={entry['last_action_date']}" | |
| ) | |
| collected.append(entry) | |
| total_fetched += 1 | |
| time.sleep(RATE_LIMIT) | |
| if page >= summary.get("page_total", 1): | |
| break | |
| page += 1 | |
| time.sleep(RATE_LIMIT) | |
| if TESTING_MODE and total_fetched >= TEST_MAX_BILLS: | |
| break | |
| if TESTING_MODE and total_fetched >= TEST_MAX_BILLS: | |
| break | |
| dedup = {e["bill_id"]: e for e in collected} | |
| all_bills = list(dedup.values()) | |
| save_json(OUTPUT_FILE, all_bills) | |
| save_json(CACHE_FILE, cache) | |
| logger.info(f"Completed run, saved {len(all_bills)} bills to {OUTPUT_FILE}") | |
| if __name__ == "__main__": | |
| main() | |