ramanna's picture
Update data_updating_scripts/detect_changes.py - persist snapshots and CSVs to HF dataset
470541b verified
#!/usr/bin/env python3
"""
Script to detect changes in bills by comparing current data to previous snapshot.
This script compares the current known_bills_visualize.json to the most recent snapshot
to identify new bills and bills with status changes. Results are saved to a CSV file.
"""
import json
import csv
import logging
import os
import sys
import time
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
# Add the project root to the path
sys.path.append(str(Path(__file__).parent.parent))
# Pipeline status tracking (no-op when running standalone)
_PIPELINE_SCRIPT = os.environ.get("PIPELINE_CURRENT_SCRIPT")
_pipeline = None
_last_status_write = 0.0
if _PIPELINE_SCRIPT:
try:
from pipeline_status import PipelineStatus
_pipeline = PipelineStatus()
except Exception:
pass
def _update_pipeline_progress(current, total, unit="changes", message=""):
global _last_status_write
if not _pipeline:
return
now = time.time()
if now - _last_status_write < 3.0:
return
_last_status_write = now
try:
_pipeline.update_progress(_PIPELINE_SCRIPT, current, total, unit, message)
except Exception:
pass
# Paths
DATA_DIR = Path("data")
SNAPSHOTS_DIR = DATA_DIR / "snapshots"
CHANGES_DIR = DATA_DIR / "weekly_changes"
CURRENT_BILLS_FILE = DATA_DIR / "known_bills_visualize.json"
# Create directories
SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)
CHANGES_DIR.mkdir(parents=True, exist_ok=True)
os.makedirs("data_updating_scripts/logs", exist_ok=True)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("data_updating_scripts/logs/detect_changes.log")
]
)
logger = logging.getLogger(__name__)
def bill_key(bill: Dict) -> str:
"""Generate unique key for a bill."""
return f"{bill.get('state', 'Unknown')}_{bill.get('bill_number', 'Unknown')}"
def load_json(file_path: Path) -> List[Dict]:
"""Load JSON file, return empty list if not found."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
logger.warning(f"File not found: {file_path}")
return []
except Exception as e:
logger.error(f"Error loading {file_path}: {e}")
return []
def save_json(file_path: Path, data: List[Dict]):
"""Save data to JSON file."""
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def save_csv(file_path: Path, changes: List[Dict]):
"""Save changes to CSV file."""
if not changes:
logger.info("No changes to save to CSV")
return
file_path.parent.mkdir(parents=True, exist_ok=True)
# Define CSV columns
fieldnames = ['bill_number', 'state', 'change_type', 'status_change_detail',
'old_status', 'new_status', 'title', 'session_year']
with open(file_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(changes)
logger.info(f"Saved {len(changes)} changes to {file_path}")
def get_most_recent_snapshot() -> Optional[Path]:
"""Find the most recent snapshot file."""
if not SNAPSHOTS_DIR.exists():
return None
# Find all snapshot files
snapshots = list(SNAPSHOTS_DIR.glob("known_bills_*.json"))
if not snapshots:
return None
# Sort by modification time and return most recent
snapshots.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return snapshots[0]
STATUS_LABELS = {
0: "Inactive", 1: "Active", 2: "Active", 3: "Active",
4: "Signed Into Law", 5: "Vetoed", 6: "Inactive",
7: "Signed Into Law", 8: "Signed Into Law", 9: "Active",
10: "Active", 11: "Inactive", 12: "Active",
"0": "Inactive", "1": "Active", "2": "Active", "3": "Active",
"4": "Signed Into Law", "5": "Vetoed", "6": "Inactive",
"7": "Signed Into Law", "8": "Signed Into Law", "9": "Active",
"10": "Active", "11": "Inactive", "12": "Active",
}
def resolve_status(raw_status):
"""Convert raw numeric status to human-readable label."""
if raw_status in STATUS_LABELS:
return STATUS_LABELS[raw_status]
# Already a label string
if isinstance(raw_status, str) and raw_status in ("Active", "Inactive", "Signed Into Law", "Vetoed"):
return raw_status
return str(raw_status)
def classify_change(old_label: str, new_label: str) -> str:
"""Classify a status change into a specific change type."""
if new_label == "Signed Into Law":
return "signed_into_law"
elif new_label == "Vetoed":
return "vetoed"
elif new_label == "Inactive" and old_label == "Active":
return "went_inactive"
else:
return "status_change"
def detect_changes(current_bills: List[Dict], previous_bills: List[Dict]) -> Tuple[List[Dict], int, int, int, int]:
"""
Compare current bills to previous snapshot and detect changes.
Returns:
(changes_list, new_count, status_change_count, signed_count, vetoed_count)
"""
changes = []
# Create lookup dictionaries
current_dict = {bill_key(b): b for b in current_bills}
previous_dict = {bill_key(b): b for b in previous_bills}
new_count = 0
status_change_count = 0
signed_count = 0
vetoed_count = 0
# Check for new bills
for key, bill in current_dict.items():
new_label = resolve_status(bill.get('status', ''))
if key not in previous_dict:
# New bill
changes.append({
'bill_number': bill.get('bill_number', 'Unknown'),
'state': bill.get('state', 'Unknown'),
'change_type': 'new_bill',
'status_change_detail': '',
'old_status': '',
'new_status': new_label,
'title': bill.get('title', '')[:100],
'session_year': bill.get('session_year', '')
})
new_count += 1
else:
# Check for status change
old_bill = previous_dict[key]
old_label = resolve_status(old_bill.get('status', ''))
if old_label != new_label and old_label and new_label:
change_type = classify_change(old_label, new_label)
status_detail = f"{old_label}{new_label}"
changes.append({
'bill_number': bill.get('bill_number', 'Unknown'),
'state': bill.get('state', 'Unknown'),
'change_type': change_type,
'status_change_detail': status_detail,
'old_status': old_label,
'new_status': new_label,
'title': bill.get('title', '')[:100],
'session_year': bill.get('session_year', '')
})
status_change_count += 1
if change_type == "signed_into_law":
signed_count += 1
elif change_type == "vetoed":
vetoed_count += 1
return changes, new_count, status_change_count, signed_count, vetoed_count
def main():
"""Main execution function."""
logger.info("Starting change detection...")
# Load current bills
current_bills = load_json(CURRENT_BILLS_FILE)
if not current_bills:
logger.error(f"No current bills found in {CURRENT_BILLS_FILE}")
return
logger.info(f"Loaded {len(current_bills)} current bills")
# Find most recent snapshot
snapshot_file = get_most_recent_snapshot()
if snapshot_file:
logger.info(f"Found previous snapshot: {snapshot_file}")
previous_bills = load_json(snapshot_file)
logger.info(f"Loaded {len(previous_bills)} bills from snapshot")
# Detect changes
changes, new_count, status_change_count, signed_count, vetoed_count = detect_changes(current_bills, previous_bills)
logger.info(f"Detected {len(changes)} total changes:")
logger.info(f" - New bills: {new_count}")
logger.info(f" - Status changes: {status_change_count}")
logger.info(f" - Signed into law: {signed_count}")
logger.info(f" - Vetoed: {vetoed_count}")
# Save changes to CSV
timestamp = datetime.now().strftime("%Y-%m-%d")
csv_file = CHANGES_DIR / f"weekly_changes_{timestamp}.csv"
save_csv(csv_file, changes)
print(f"✅ Change detection complete!")
print(f" New bills: {new_count}")
print(f" Status changes: {status_change_count}")
print(f" Signed into law: {signed_count}")
print(f" Vetoed: {vetoed_count}")
print(f" CSV saved: {csv_file}")
_update_pipeline_progress(1, 1, "changes", f"{len(changes)} changes detected")
else:
logger.info("No previous snapshot found - this is the first run")
print("ℹ️ No previous snapshot found - establishing baseline")
# Save current state as new snapshot
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
snapshot_file = SNAPSHOTS_DIR / f"known_bills_{timestamp}.json"
save_json(snapshot_file, current_bills)
logger.info(f"Saved snapshot: {snapshot_file}")
print(f"✅ Snapshot saved: {snapshot_file}")
# Upload snapshots and weekly changes to HF dataset for persistence
_upload_change_data_to_hf()
def _upload_change_data_to_hf():
"""Upload the latest snapshot and all weekly CSVs to HF dataset."""
try:
from huggingface_hub import HfApi
from dotenv import load_dotenv
load_dotenv()
token = os.environ.get("HUGGINGFACE_HUB_TOKEN")
repo_id = os.environ.get("HF_REPO_ID")
if not token or not repo_id:
logger.info("HF credentials not configured — skipping change data upload")
return
api = HfApi(token=token)
# Upload the most recent snapshot only (to save space)
snapshot = get_most_recent_snapshot()
if snapshot:
dest = f"snapshots/{snapshot.name}"
logger.info(f"Uploading snapshot to HF: {dest}")
api.upload_file(
path_or_fileobj=str(snapshot),
path_in_repo=dest,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Update snapshot: {snapshot.name}",
)
# Upload all weekly change CSVs
csv_files = list(CHANGES_DIR.glob("*.csv"))
for csv_file in csv_files:
dest = f"weekly_changes/{csv_file.name}"
logger.info(f"Uploading CSV to HF: {dest}")
api.upload_file(
path_or_fileobj=str(csv_file),
path_in_repo=dest,
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Update weekly changes: {csv_file.name}",
)
logger.info("Change data uploaded to HF dataset")
print("✅ Change data synced to HuggingFace")
except Exception as e:
logger.warning(f"Failed to upload change data to HF: {e}")
print(f"⚠️ HF sync skipped: {e}")
if __name__ == "__main__":
main()