Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Script to detect changes in bills by comparing current data to previous snapshot. | |
| This script compares the current known_bills_visualize.json to the most recent snapshot | |
| to identify new bills and bills with status changes. Results are saved to a CSV file. | |
| """ | |
| import json | |
| import csv | |
| import logging | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Tuple | |
| # Add the project root to the path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| # Pipeline status tracking (no-op when running standalone) | |
| _PIPELINE_SCRIPT = os.environ.get("PIPELINE_CURRENT_SCRIPT") | |
| _pipeline = None | |
| _last_status_write = 0.0 | |
| if _PIPELINE_SCRIPT: | |
| try: | |
| from pipeline_status import PipelineStatus | |
| _pipeline = PipelineStatus() | |
| except Exception: | |
| pass | |
| def _update_pipeline_progress(current, total, unit="changes", message=""): | |
| global _last_status_write | |
| if not _pipeline: | |
| return | |
| now = time.time() | |
| if now - _last_status_write < 3.0: | |
| return | |
| _last_status_write = now | |
| try: | |
| _pipeline.update_progress(_PIPELINE_SCRIPT, current, total, unit, message) | |
| except Exception: | |
| pass | |
| # Paths | |
| DATA_DIR = Path("data") | |
| SNAPSHOTS_DIR = DATA_DIR / "snapshots" | |
| CHANGES_DIR = DATA_DIR / "weekly_changes" | |
| CURRENT_BILLS_FILE = DATA_DIR / "known_bills_visualize.json" | |
| # Create directories | |
| SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True) | |
| CHANGES_DIR.mkdir(parents=True, exist_ok=True) | |
| os.makedirs("data_updating_scripts/logs", exist_ok=True) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler("data_updating_scripts/logs/detect_changes.log") | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def bill_key(bill: Dict) -> str: | |
| """Generate unique key for a bill.""" | |
| return f"{bill.get('state', 'Unknown')}_{bill.get('bill_number', 'Unknown')}" | |
| def load_json(file_path: Path) -> List[Dict]: | |
| """Load JSON file, return empty list if not found.""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| logger.warning(f"File not found: {file_path}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error loading {file_path}: {e}") | |
| return [] | |
| def save_json(file_path: Path, data: List[Dict]): | |
| """Save data to JSON file.""" | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| def save_csv(file_path: Path, changes: List[Dict]): | |
| """Save changes to CSV file.""" | |
| if not changes: | |
| logger.info("No changes to save to CSV") | |
| return | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Define CSV columns | |
| fieldnames = ['bill_number', 'state', 'change_type', 'status_change_detail', | |
| 'old_status', 'new_status', 'title', 'session_year'] | |
| with open(file_path, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(changes) | |
| logger.info(f"Saved {len(changes)} changes to {file_path}") | |
| def get_most_recent_snapshot() -> Optional[Path]: | |
| """Find the most recent snapshot file.""" | |
| if not SNAPSHOTS_DIR.exists(): | |
| return None | |
| # Find all snapshot files | |
| snapshots = list(SNAPSHOTS_DIR.glob("known_bills_*.json")) | |
| if not snapshots: | |
| return None | |
| # Sort by modification time and return most recent | |
| snapshots.sort(key=lambda p: p.stat().st_mtime, reverse=True) | |
| return snapshots[0] | |
| STATUS_LABELS = { | |
| 0: "Inactive", 1: "Active", 2: "Active", 3: "Active", | |
| 4: "Signed Into Law", 5: "Vetoed", 6: "Inactive", | |
| 7: "Signed Into Law", 8: "Signed Into Law", 9: "Active", | |
| 10: "Active", 11: "Inactive", 12: "Active", | |
| "0": "Inactive", "1": "Active", "2": "Active", "3": "Active", | |
| "4": "Signed Into Law", "5": "Vetoed", "6": "Inactive", | |
| "7": "Signed Into Law", "8": "Signed Into Law", "9": "Active", | |
| "10": "Active", "11": "Inactive", "12": "Active", | |
| } | |
| def resolve_status(raw_status): | |
| """Convert raw numeric status to human-readable label.""" | |
| if raw_status in STATUS_LABELS: | |
| return STATUS_LABELS[raw_status] | |
| # Already a label string | |
| if isinstance(raw_status, str) and raw_status in ("Active", "Inactive", "Signed Into Law", "Vetoed"): | |
| return raw_status | |
| return str(raw_status) | |
| def classify_change(old_label: str, new_label: str) -> str: | |
| """Classify a status change into a specific change type.""" | |
| if new_label == "Signed Into Law": | |
| return "signed_into_law" | |
| elif new_label == "Vetoed": | |
| return "vetoed" | |
| elif new_label == "Inactive" and old_label == "Active": | |
| return "went_inactive" | |
| else: | |
| return "status_change" | |
| def detect_changes(current_bills: List[Dict], previous_bills: List[Dict]) -> Tuple[List[Dict], int, int, int, int]: | |
| """ | |
| Compare current bills to previous snapshot and detect changes. | |
| Returns: | |
| (changes_list, new_count, status_change_count, signed_count, vetoed_count) | |
| """ | |
| changes = [] | |
| # Create lookup dictionaries | |
| current_dict = {bill_key(b): b for b in current_bills} | |
| previous_dict = {bill_key(b): b for b in previous_bills} | |
| new_count = 0 | |
| status_change_count = 0 | |
| signed_count = 0 | |
| vetoed_count = 0 | |
| # Check for new bills | |
| for key, bill in current_dict.items(): | |
| new_label = resolve_status(bill.get('status', '')) | |
| if key not in previous_dict: | |
| # New bill | |
| changes.append({ | |
| 'bill_number': bill.get('bill_number', 'Unknown'), | |
| 'state': bill.get('state', 'Unknown'), | |
| 'change_type': 'new_bill', | |
| 'status_change_detail': '', | |
| 'old_status': '', | |
| 'new_status': new_label, | |
| 'title': bill.get('title', '')[:100], | |
| 'session_year': bill.get('session_year', '') | |
| }) | |
| new_count += 1 | |
| else: | |
| # Check for status change | |
| old_bill = previous_dict[key] | |
| old_label = resolve_status(old_bill.get('status', '')) | |
| if old_label != new_label and old_label and new_label: | |
| change_type = classify_change(old_label, new_label) | |
| status_detail = f"{old_label} → {new_label}" | |
| changes.append({ | |
| 'bill_number': bill.get('bill_number', 'Unknown'), | |
| 'state': bill.get('state', 'Unknown'), | |
| 'change_type': change_type, | |
| 'status_change_detail': status_detail, | |
| 'old_status': old_label, | |
| 'new_status': new_label, | |
| 'title': bill.get('title', '')[:100], | |
| 'session_year': bill.get('session_year', '') | |
| }) | |
| status_change_count += 1 | |
| if change_type == "signed_into_law": | |
| signed_count += 1 | |
| elif change_type == "vetoed": | |
| vetoed_count += 1 | |
| return changes, new_count, status_change_count, signed_count, vetoed_count | |
| def main(): | |
| """Main execution function.""" | |
| logger.info("Starting change detection...") | |
| # Load current bills | |
| current_bills = load_json(CURRENT_BILLS_FILE) | |
| if not current_bills: | |
| logger.error(f"No current bills found in {CURRENT_BILLS_FILE}") | |
| return | |
| logger.info(f"Loaded {len(current_bills)} current bills") | |
| # Find most recent snapshot | |
| snapshot_file = get_most_recent_snapshot() | |
| if snapshot_file: | |
| logger.info(f"Found previous snapshot: {snapshot_file}") | |
| previous_bills = load_json(snapshot_file) | |
| logger.info(f"Loaded {len(previous_bills)} bills from snapshot") | |
| # Detect changes | |
| changes, new_count, status_change_count, signed_count, vetoed_count = detect_changes(current_bills, previous_bills) | |
| logger.info(f"Detected {len(changes)} total changes:") | |
| logger.info(f" - New bills: {new_count}") | |
| logger.info(f" - Status changes: {status_change_count}") | |
| logger.info(f" - Signed into law: {signed_count}") | |
| logger.info(f" - Vetoed: {vetoed_count}") | |
| # Save changes to CSV | |
| timestamp = datetime.now().strftime("%Y-%m-%d") | |
| csv_file = CHANGES_DIR / f"weekly_changes_{timestamp}.csv" | |
| save_csv(csv_file, changes) | |
| print(f"✅ Change detection complete!") | |
| print(f" New bills: {new_count}") | |
| print(f" Status changes: {status_change_count}") | |
| print(f" Signed into law: {signed_count}") | |
| print(f" Vetoed: {vetoed_count}") | |
| print(f" CSV saved: {csv_file}") | |
| _update_pipeline_progress(1, 1, "changes", f"{len(changes)} changes detected") | |
| else: | |
| logger.info("No previous snapshot found - this is the first run") | |
| print("ℹ️ No previous snapshot found - establishing baseline") | |
| # Save current state as new snapshot | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| snapshot_file = SNAPSHOTS_DIR / f"known_bills_{timestamp}.json" | |
| save_json(snapshot_file, current_bills) | |
| logger.info(f"Saved snapshot: {snapshot_file}") | |
| print(f"✅ Snapshot saved: {snapshot_file}") | |
| # Upload snapshots and weekly changes to HF dataset for persistence | |
| _upload_change_data_to_hf() | |
| def _upload_change_data_to_hf(): | |
| """Upload the latest snapshot and all weekly CSVs to HF dataset.""" | |
| try: | |
| from huggingface_hub import HfApi | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| token = os.environ.get("HUGGINGFACE_HUB_TOKEN") | |
| repo_id = os.environ.get("HF_REPO_ID") | |
| if not token or not repo_id: | |
| logger.info("HF credentials not configured — skipping change data upload") | |
| return | |
| api = HfApi(token=token) | |
| # Upload the most recent snapshot only (to save space) | |
| snapshot = get_most_recent_snapshot() | |
| if snapshot: | |
| dest = f"snapshots/{snapshot.name}" | |
| logger.info(f"Uploading snapshot to HF: {dest}") | |
| api.upload_file( | |
| path_or_fileobj=str(snapshot), | |
| path_in_repo=dest, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update snapshot: {snapshot.name}", | |
| ) | |
| # Upload all weekly change CSVs | |
| csv_files = list(CHANGES_DIR.glob("*.csv")) | |
| for csv_file in csv_files: | |
| dest = f"weekly_changes/{csv_file.name}" | |
| logger.info(f"Uploading CSV to HF: {dest}") | |
| api.upload_file( | |
| path_or_fileobj=str(csv_file), | |
| path_in_repo=dest, | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Update weekly changes: {csv_file.name}", | |
| ) | |
| logger.info("Change data uploaded to HF dataset") | |
| print("✅ Change data synced to HuggingFace") | |
| except Exception as e: | |
| logger.warning(f"Failed to upload change data to HF: {e}") | |
| print(f"⚠️ HF sync skipped: {e}") | |
| if __name__ == "__main__": | |
| main() | |