ramanna's picture
Deploy: fix newsletter parsing for state list items and linked bills
98bf60c
#!/usr/bin/env python3
"""
build_calendar.py
-----------------
Reads bill action history from known_bills_visualize.json and builds a
lightweight calendar of recent legislative milestones (committee referrals,
floor votes, governor actions, etc.).
Output: data/bill_calendar.json
Can be run standalone or as part of the weekly pipeline.
"""
import json
import logging
import os
import re
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
# Add project root to path
sys.path.append(str(Path(__file__).parent.parent))
# ── Pipeline status tracking (no-op when running standalone) ──────────────
_PIPELINE_SCRIPT = os.environ.get("PIPELINE_CURRENT_SCRIPT")
_pipeline = None
_last_status_write = 0.0
if _PIPELINE_SCRIPT:
try:
from pipeline_status import PipelineStatus
_pipeline = PipelineStatus()
except Exception:
pass
def _update_pipeline_progress(current, total, unit="bills", message=""):
global _last_status_write
if not _pipeline:
return
now = time.time()
if now - _last_status_write < 3.0:
return
_last_status_write = now
try:
_pipeline.update_progress(_PIPELINE_SCRIPT, current, total, unit, message)
except Exception:
pass
# ── Paths ─────────────────────────────────────────────────────────────────
DATA_DIR = Path("data")
BILLS_FILE = DATA_DIR / "known_bills_visualize.json"
CALENDAR_FILE = DATA_DIR / "bill_calendar.json"
os.makedirs("data_updating_scripts/logs", exist_ok=True)
# ── Logging ───────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("data_updating_scripts/logs/build_calendar.log"),
],
)
logger = logging.getLogger(__name__)
# ── Milestone patterns ────────────────────────────────────────────────────
# Each key maps to a list of regex patterns that identify that milestone type.
MILESTONE_PATTERNS = {
"hearing_scheduled": [
r"(?i)scheduled\s+for\s+hearing",
r"(?i)hearing\s+set\s+for",
r"(?i)notice\s+of\s+hearing",
r"(?i)public\s+hearing",
],
"committee_referral": [
r"(?i)referred\s+to\s+(the\s+)?(committee|subcommittee|comm\.)",
r"(?i)assigned\s+to\s+(the\s+)?(committee|subcommittee|comm\.)",
r"(?i)re-?referred\s+to",
],
"committee_passed": [
r"(?i)passed\s+(the\s+)?(committee|subcommittee|comm\.)",
r"(?i)reported\s+(out\s+)?(favorably|without\s+amendment)",
r"(?i)\bdo\s+pass\b",
r"(?i)recommended\s+for\s+passage",
r"(?i)reported\s+with\s+recommendation",
],
"floor_vote": [
r"(?i)third\s+reading",
r"(?i)3rd\s+reading",
r"(?i)placed\s+on\s+(the\s+)?calendar",
r"(?i)ordered\s+(to\s+)?(be\s+)?engrossed",
r"(?i)passed\s+(the\s+)?(house|senate|assembly|chamber)",
r"(?i)adopted\s+by\s+(the\s+)?(house|senate|assembly)",
],
"sent_to_governor": [
r"(?i)sent\s+to\s+(the\s+)?governor",
r"(?i)presented\s+to\s+(the\s+)?governor",
r"(?i)\benrolled\b",
r"(?i)transmitted\s+to\s+(the\s+)?governor",
],
}
# Human-readable labels for each milestone type
MILESTONE_LABELS = {
"hearing_scheduled": "Hearing Scheduled",
"committee_referral": "Committee Referral",
"committee_passed": "Passed Committee",
"floor_vote": "Floor Vote",
"sent_to_governor": "Sent to Governor",
}
def _classify_action(action_text: str) -> str | None:
"""Match action text against milestone patterns. Returns type or None."""
for milestone_type, patterns in MILESTONE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, action_text):
return milestone_type
return None
def build_calendar(bills: list, lookback_days: int = 30) -> list:
"""
Scan bill actions for legislative milestones within the lookback window.
Returns a list of event dicts sorted by date descending.
"""
cutoff = (datetime.now(timezone.utc) - timedelta(days=lookback_days)).strftime("%Y-%m-%d")
events = []
for i, bill in enumerate(bills):
if i % 500 == 0:
_update_pipeline_progress(i, len(bills), "bills",
f"Scanning actions β€” {len(events)} milestones found")
actions = bill.get("actions", [])
if not actions or not isinstance(actions, list):
continue
bill_id = str(bill.get("bill_id", ""))
state = bill.get("state", "")
bill_number = bill.get("bill_number", "")
title = bill.get("title", "")
bill_url = bill.get("bill_url", "")
bill_status = bill.get("status", "")
for action in actions:
try:
action_text = action.get("action", "") or ""
# LegiScan uses "date" or "action_date" depending on context
action_date = action.get("date") or action.get("action_date") or ""
if not action_text or not action_date:
continue
# Skip actions older than the lookback window
if action_date < cutoff:
continue
milestone_type = _classify_action(action_text)
if milestone_type:
events.append({
"bill_id": bill_id,
"state": state,
"bill_number": bill_number,
"title": title[:150],
"event_type": milestone_type,
"event_label": MILESTONE_LABELS.get(milestone_type, milestone_type),
"event_description": action_text.strip(),
"event_date": action_date,
"chamber": action.get("chamber", ""),
"bill_url": bill_url,
"bill_status": bill_status,
})
except Exception as e:
logger.warning(f"Error processing action for bill {bill_id}: {e}")
continue
# Sort by date descending (most recent first)
events.sort(key=lambda e: e.get("event_date", ""), reverse=True)
return events
def main():
logger.info("=" * 60)
logger.info("Building legislative calendar from bill actions")
logger.info("=" * 60)
# Load bills
if not BILLS_FILE.exists():
logger.error(f"Bills file not found: {BILLS_FILE}")
return
try:
with open(BILLS_FILE, "r", encoding="utf-8") as f:
bills = json.load(f)
except Exception as e:
logger.error(f"Failed to load bills: {e}")
return
logger.info(f"Loaded {len(bills)} bills")
# Count bills with non-empty actions
bills_with_actions = sum(1 for b in bills if b.get("actions"))
logger.info(f"Bills with action history: {bills_with_actions}")
if bills_with_actions == 0:
logger.warning("No bills have action history yet. Writing empty calendar.")
with open(CALENDAR_FILE, "w", encoding="utf-8") as f:
json.dump([], f)
logger.info(f"Wrote empty {CALENDAR_FILE}")
return
# Build calendar
events = build_calendar(bills, lookback_days=30)
logger.info(f"Found {len(events)} milestone events in the last 30 days")
# Log breakdown by type
type_counts = {}
for e in events:
t = e["event_type"]
type_counts[t] = type_counts.get(t, 0) + 1
for t, c in sorted(type_counts.items(), key=lambda x: -x[1]):
logger.info(f" {MILESTONE_LABELS.get(t, t)}: {c}")
# Write output
try:
with open(CALENDAR_FILE, "w", encoding="utf-8") as f:
json.dump(events, f, indent=2, ensure_ascii=False)
logger.info(f"Wrote {len(events)} events to {CALENDAR_FILE}")
except Exception as e:
logger.error(f"Failed to write calendar: {e}")
return
_update_pipeline_progress(len(bills), len(bills), "bills",
f"Done β€” {len(events)} milestones")
logger.info("Calendar build complete.")
if __name__ == "__main__":
main()