#!/usr/bin/env python3 """ Script to detect changes in bills by comparing current data to previous snapshot. This script compares the current known_bills_visualize.json to the most recent snapshot to identify new bills and bills with status changes. Results are saved to a CSV file. """ import json import csv import logging import os import sys import time from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple # Add the project root to the path sys.path.append(str(Path(__file__).parent.parent)) # Pipeline status tracking (no-op when running standalone) _PIPELINE_SCRIPT = os.environ.get("PIPELINE_CURRENT_SCRIPT") _pipeline = None _last_status_write = 0.0 if _PIPELINE_SCRIPT: try: from pipeline_status import PipelineStatus _pipeline = PipelineStatus() except Exception: pass def _update_pipeline_progress(current, total, unit="changes", message=""): global _last_status_write if not _pipeline: return now = time.time() if now - _last_status_write < 3.0: return _last_status_write = now try: _pipeline.update_progress(_PIPELINE_SCRIPT, current, total, unit, message) except Exception: pass # Paths DATA_DIR = Path("data") SNAPSHOTS_DIR = DATA_DIR / "snapshots" CHANGES_DIR = DATA_DIR / "weekly_changes" CURRENT_BILLS_FILE = DATA_DIR / "known_bills_visualize.json" # Create directories SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True) CHANGES_DIR.mkdir(parents=True, exist_ok=True) os.makedirs("data_updating_scripts/logs", exist_ok=True) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler("data_updating_scripts/logs/detect_changes.log") ] ) logger = logging.getLogger(__name__) def bill_key(bill: Dict) -> str: """Generate unique key for a bill.""" return f"{bill.get('state', 'Unknown')}_{bill.get('bill_number', 'Unknown')}" def load_json(file_path: Path) -> List[Dict]: """Load JSON file, return empty list if not found.""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.warning(f"File not found: {file_path}") return [] except Exception as e: logger.error(f"Error loading {file_path}: {e}") return [] def save_json(file_path: Path, data: List[Dict]): """Save data to JSON file.""" file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) def save_csv(file_path: Path, changes: List[Dict]): """Save changes to CSV file.""" if not changes: logger.info("No changes to save to CSV") return file_path.parent.mkdir(parents=True, exist_ok=True) # Define CSV columns fieldnames = ['bill_number', 'state', 'change_type', 'status_change_detail', 'old_status', 'new_status', 'title', 'session_year'] with open(file_path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(changes) logger.info(f"Saved {len(changes)} changes to {file_path}") def get_most_recent_snapshot() -> Optional[Path]: """Find the most recent snapshot file.""" if not SNAPSHOTS_DIR.exists(): return None # Find all snapshot files snapshots = list(SNAPSHOTS_DIR.glob("known_bills_*.json")) if not snapshots: return None # Sort by modification time and return most recent snapshots.sort(key=lambda p: p.stat().st_mtime, reverse=True) return snapshots[0] STATUS_LABELS = { 0: "Inactive", 1: "Active", 2: "Active", 3: "Active", 4: "Signed Into Law", 5: "Vetoed", 6: "Inactive", 7: "Signed Into Law", 8: "Signed Into Law", 9: "Active", 10: "Active", 11: "Inactive", 12: "Active", "0": "Inactive", "1": "Active", "2": "Active", "3": "Active", "4": "Signed Into Law", "5": "Vetoed", "6": "Inactive", "7": "Signed Into Law", "8": "Signed Into Law", "9": "Active", "10": "Active", "11": "Inactive", "12": "Active", } def resolve_status(raw_status): """Convert raw numeric status to human-readable label.""" if raw_status in STATUS_LABELS: return STATUS_LABELS[raw_status] # Already a label string if isinstance(raw_status, str) and raw_status in ("Active", "Inactive", "Signed Into Law", "Vetoed"): return raw_status return str(raw_status) def classify_change(old_label: str, new_label: str) -> str: """Classify a status change into a specific change type.""" if new_label == "Signed Into Law": return "signed_into_law" elif new_label == "Vetoed": return "vetoed" elif new_label == "Inactive" and old_label == "Active": return "went_inactive" else: return "status_change" def detect_changes(current_bills: List[Dict], previous_bills: List[Dict]) -> Tuple[List[Dict], int, int, int, int]: """ Compare current bills to previous snapshot and detect changes. Returns: (changes_list, new_count, status_change_count, signed_count, vetoed_count) """ changes = [] # Create lookup dictionaries current_dict = {bill_key(b): b for b in current_bills} previous_dict = {bill_key(b): b for b in previous_bills} new_count = 0 status_change_count = 0 signed_count = 0 vetoed_count = 0 # Check for new bills for key, bill in current_dict.items(): new_label = resolve_status(bill.get('status', '')) if key not in previous_dict: # New bill changes.append({ 'bill_number': bill.get('bill_number', 'Unknown'), 'state': bill.get('state', 'Unknown'), 'change_type': 'new_bill', 'status_change_detail': '', 'old_status': '', 'new_status': new_label, 'title': bill.get('title', '')[:100], 'session_year': bill.get('session_year', '') }) new_count += 1 else: # Check for status change old_bill = previous_dict[key] old_label = resolve_status(old_bill.get('status', '')) if old_label != new_label and old_label and new_label: change_type = classify_change(old_label, new_label) status_detail = f"{old_label} → {new_label}" changes.append({ 'bill_number': bill.get('bill_number', 'Unknown'), 'state': bill.get('state', 'Unknown'), 'change_type': change_type, 'status_change_detail': status_detail, 'old_status': old_label, 'new_status': new_label, 'title': bill.get('title', '')[:100], 'session_year': bill.get('session_year', '') }) status_change_count += 1 if change_type == "signed_into_law": signed_count += 1 elif change_type == "vetoed": vetoed_count += 1 return changes, new_count, status_change_count, signed_count, vetoed_count def main(): """Main execution function.""" logger.info("Starting change detection...") # Load current bills current_bills = load_json(CURRENT_BILLS_FILE) if not current_bills: logger.error(f"No current bills found in {CURRENT_BILLS_FILE}") return logger.info(f"Loaded {len(current_bills)} current bills") # Find most recent snapshot snapshot_file = get_most_recent_snapshot() if snapshot_file: logger.info(f"Found previous snapshot: {snapshot_file}") previous_bills = load_json(snapshot_file) logger.info(f"Loaded {len(previous_bills)} bills from snapshot") # Detect changes changes, new_count, status_change_count, signed_count, vetoed_count = detect_changes(current_bills, previous_bills) logger.info(f"Detected {len(changes)} total changes:") logger.info(f" - New bills: {new_count}") logger.info(f" - Status changes: {status_change_count}") logger.info(f" - Signed into law: {signed_count}") logger.info(f" - Vetoed: {vetoed_count}") # Save changes to CSV timestamp = datetime.now().strftime("%Y-%m-%d") csv_file = CHANGES_DIR / f"weekly_changes_{timestamp}.csv" save_csv(csv_file, changes) print(f"✅ Change detection complete!") print(f" New bills: {new_count}") print(f" Status changes: {status_change_count}") print(f" Signed into law: {signed_count}") print(f" Vetoed: {vetoed_count}") print(f" CSV saved: {csv_file}") _update_pipeline_progress(1, 1, "changes", f"{len(changes)} changes detected") else: logger.info("No previous snapshot found - this is the first run") print("ℹ️ No previous snapshot found - establishing baseline") # Save current state as new snapshot timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") snapshot_file = SNAPSHOTS_DIR / f"known_bills_{timestamp}.json" save_json(snapshot_file, current_bills) logger.info(f"Saved snapshot: {snapshot_file}") print(f"✅ Snapshot saved: {snapshot_file}") # Upload snapshots and weekly changes to HF dataset for persistence _upload_change_data_to_hf() def _upload_change_data_to_hf(): """Upload the latest snapshot and all weekly CSVs to HF dataset.""" try: from huggingface_hub import HfApi from dotenv import load_dotenv load_dotenv() token = os.environ.get("HUGGINGFACE_HUB_TOKEN") repo_id = os.environ.get("HF_REPO_ID") if not token or not repo_id: logger.info("HF credentials not configured — skipping change data upload") return api = HfApi(token=token) # Upload the most recent snapshot only (to save space) snapshot = get_most_recent_snapshot() if snapshot: dest = f"snapshots/{snapshot.name}" logger.info(f"Uploading snapshot to HF: {dest}") api.upload_file( path_or_fileobj=str(snapshot), path_in_repo=dest, repo_id=repo_id, repo_type="dataset", commit_message=f"Update snapshot: {snapshot.name}", ) # Upload all weekly change CSVs csv_files = list(CHANGES_DIR.glob("*.csv")) for csv_file in csv_files: dest = f"weekly_changes/{csv_file.name}" logger.info(f"Uploading CSV to HF: {dest}") api.upload_file( path_or_fileobj=str(csv_file), path_in_repo=dest, repo_id=repo_id, repo_type="dataset", commit_message=f"Update weekly changes: {csv_file.name}", ) logger.info("Change data uploaded to HF dataset") print("✅ Change data synced to HuggingFace") except Exception as e: logger.warning(f"Failed to upload change data to HF: {e}") print(f"⚠️ HF sync skipped: {e}") if __name__ == "__main__": main()