ramanna commited on
Commit
52eb9a6
·
verified ·
1 Parent(s): adebc27

Update data_updating_scripts/detect_changes.py - change detection engine

Browse files
data_updating_scripts/detect_changes.py ADDED
@@ -0,0 +1,280 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Script to detect changes in bills by comparing current data to previous snapshot.
4
+
5
+ This script compares the current known_bills_visualize.json to the most recent snapshot
6
+ to identify new bills and bills with status changes. Results are saved to a CSV file.
7
+ """
8
+
9
+ import json
10
+ import csv
11
+ import logging
12
+ import os
13
+ import sys
14
+ import time
15
+ from pathlib import Path
16
+ from datetime import datetime
17
+ from typing import Dict, List, Optional, Tuple
18
+
19
+ # Add the project root to the path
20
+ sys.path.append(str(Path(__file__).parent.parent))
21
+
22
+ # Pipeline status tracking (no-op when running standalone)
23
+ _PIPELINE_SCRIPT = os.environ.get("PIPELINE_CURRENT_SCRIPT")
24
+ _pipeline = None
25
+ _last_status_write = 0.0
26
+ if _PIPELINE_SCRIPT:
27
+ try:
28
+ from pipeline_status import PipelineStatus
29
+ _pipeline = PipelineStatus()
30
+ except Exception:
31
+ pass
32
+
33
+ def _update_pipeline_progress(current, total, unit="changes", message=""):
34
+ global _last_status_write
35
+ if not _pipeline:
36
+ return
37
+ now = time.time()
38
+ if now - _last_status_write < 3.0:
39
+ return
40
+ _last_status_write = now
41
+ try:
42
+ _pipeline.update_progress(_PIPELINE_SCRIPT, current, total, unit, message)
43
+ except Exception:
44
+ pass
45
+
46
+ # Paths
47
+ DATA_DIR = Path("data")
48
+ SNAPSHOTS_DIR = DATA_DIR / "snapshots"
49
+ CHANGES_DIR = DATA_DIR / "weekly_changes"
50
+ CURRENT_BILLS_FILE = DATA_DIR / "known_bills_visualize.json"
51
+
52
+ # Create directories
53
+ SNAPSHOTS_DIR.mkdir(parents=True, exist_ok=True)
54
+ CHANGES_DIR.mkdir(parents=True, exist_ok=True)
55
+ os.makedirs("data_updating_scripts/logs", exist_ok=True)
56
+
57
+ # Configure logging
58
+ logging.basicConfig(
59
+ level=logging.INFO,
60
+ format="%(asctime)s [%(levelname)s] %(message)s",
61
+ handlers=[
62
+ logging.StreamHandler(),
63
+ logging.FileHandler("data_updating_scripts/logs/detect_changes.log")
64
+ ]
65
+ )
66
+ logger = logging.getLogger(__name__)
67
+
68
+
69
+ def bill_key(bill: Dict) -> str:
70
+ """Generate unique key for a bill."""
71
+ return f"{bill.get('state', 'Unknown')}_{bill.get('bill_number', 'Unknown')}"
72
+
73
+
74
+ def load_json(file_path: Path) -> List[Dict]:
75
+ """Load JSON file, return empty list if not found."""
76
+ try:
77
+ with open(file_path, 'r', encoding='utf-8') as f:
78
+ return json.load(f)
79
+ except FileNotFoundError:
80
+ logger.warning(f"File not found: {file_path}")
81
+ return []
82
+ except Exception as e:
83
+ logger.error(f"Error loading {file_path}: {e}")
84
+ return []
85
+
86
+
87
+ def save_json(file_path: Path, data: List[Dict]):
88
+ """Save data to JSON file."""
89
+ file_path.parent.mkdir(parents=True, exist_ok=True)
90
+ with open(file_path, 'w', encoding='utf-8') as f:
91
+ json.dump(data, f, indent=2, ensure_ascii=False)
92
+
93
+
94
+ def save_csv(file_path: Path, changes: List[Dict]):
95
+ """Save changes to CSV file."""
96
+ if not changes:
97
+ logger.info("No changes to save to CSV")
98
+ return
99
+
100
+ file_path.parent.mkdir(parents=True, exist_ok=True)
101
+
102
+ # Define CSV columns
103
+ fieldnames = ['bill_number', 'state', 'change_type', 'status_change_detail',
104
+ 'old_status', 'new_status', 'title', 'session_year']
105
+
106
+ with open(file_path, 'w', newline='', encoding='utf-8') as f:
107
+ writer = csv.DictWriter(f, fieldnames=fieldnames)
108
+ writer.writeheader()
109
+ writer.writerows(changes)
110
+
111
+ logger.info(f"Saved {len(changes)} changes to {file_path}")
112
+
113
+
114
+ def get_most_recent_snapshot() -> Optional[Path]:
115
+ """Find the most recent snapshot file."""
116
+ if not SNAPSHOTS_DIR.exists():
117
+ return None
118
+
119
+ # Find all snapshot files
120
+ snapshots = list(SNAPSHOTS_DIR.glob("known_bills_*.json"))
121
+ if not snapshots:
122
+ return None
123
+
124
+ # Sort by modification time and return most recent
125
+ snapshots.sort(key=lambda p: p.stat().st_mtime, reverse=True)
126
+ return snapshots[0]
127
+
128
+
129
+ STATUS_LABELS = {
130
+ 0: "Inactive", 1: "Active", 2: "Active", 3: "Active",
131
+ 4: "Signed Into Law", 5: "Vetoed", 6: "Inactive",
132
+ 7: "Signed Into Law", 8: "Signed Into Law", 9: "Active",
133
+ 10: "Active", 11: "Inactive", 12: "Active",
134
+ "0": "Inactive", "1": "Active", "2": "Active", "3": "Active",
135
+ "4": "Signed Into Law", "5": "Vetoed", "6": "Inactive",
136
+ "7": "Signed Into Law", "8": "Signed Into Law", "9": "Active",
137
+ "10": "Active", "11": "Inactive", "12": "Active",
138
+ }
139
+
140
+
141
+ def resolve_status(raw_status):
142
+ """Convert raw numeric status to human-readable label."""
143
+ if raw_status in STATUS_LABELS:
144
+ return STATUS_LABELS[raw_status]
145
+ # Already a label string
146
+ if isinstance(raw_status, str) and raw_status in ("Active", "Inactive", "Signed Into Law", "Vetoed"):
147
+ return raw_status
148
+ return str(raw_status)
149
+
150
+
151
+ def classify_change(old_label: str, new_label: str) -> str:
152
+ """Classify a status change into a specific change type."""
153
+ if new_label == "Signed Into Law":
154
+ return "signed_into_law"
155
+ elif new_label == "Vetoed":
156
+ return "vetoed"
157
+ elif new_label == "Inactive" and old_label == "Active":
158
+ return "went_inactive"
159
+ else:
160
+ return "status_change"
161
+
162
+
163
+ def detect_changes(current_bills: List[Dict], previous_bills: List[Dict]) -> Tuple[List[Dict], int, int, int, int]:
164
+ """
165
+ Compare current bills to previous snapshot and detect changes.
166
+
167
+ Returns:
168
+ (changes_list, new_count, status_change_count, signed_count, vetoed_count)
169
+ """
170
+ changes = []
171
+
172
+ # Create lookup dictionaries
173
+ current_dict = {bill_key(b): b for b in current_bills}
174
+ previous_dict = {bill_key(b): b for b in previous_bills}
175
+
176
+ new_count = 0
177
+ status_change_count = 0
178
+ signed_count = 0
179
+ vetoed_count = 0
180
+
181
+ # Check for new bills
182
+ for key, bill in current_dict.items():
183
+ new_label = resolve_status(bill.get('status', ''))
184
+
185
+ if key not in previous_dict:
186
+ # New bill
187
+ changes.append({
188
+ 'bill_number': bill.get('bill_number', 'Unknown'),
189
+ 'state': bill.get('state', 'Unknown'),
190
+ 'change_type': 'new_bill',
191
+ 'status_change_detail': '',
192
+ 'old_status': '',
193
+ 'new_status': new_label,
194
+ 'title': bill.get('title', '')[:100],
195
+ 'session_year': bill.get('session_year', '')
196
+ })
197
+ new_count += 1
198
+ else:
199
+ # Check for status change
200
+ old_bill = previous_dict[key]
201
+ old_label = resolve_status(old_bill.get('status', ''))
202
+
203
+ if old_label != new_label and old_label and new_label:
204
+ change_type = classify_change(old_label, new_label)
205
+ status_detail = f"{old_label} → {new_label}"
206
+ changes.append({
207
+ 'bill_number': bill.get('bill_number', 'Unknown'),
208
+ 'state': bill.get('state', 'Unknown'),
209
+ 'change_type': change_type,
210
+ 'status_change_detail': status_detail,
211
+ 'old_status': old_label,
212
+ 'new_status': new_label,
213
+ 'title': bill.get('title', '')[:100],
214
+ 'session_year': bill.get('session_year', '')
215
+ })
216
+ status_change_count += 1
217
+ if change_type == "signed_into_law":
218
+ signed_count += 1
219
+ elif change_type == "vetoed":
220
+ vetoed_count += 1
221
+
222
+ return changes, new_count, status_change_count, signed_count, vetoed_count
223
+
224
+
225
+ def main():
226
+ """Main execution function."""
227
+ logger.info("Starting change detection...")
228
+
229
+ # Load current bills
230
+ current_bills = load_json(CURRENT_BILLS_FILE)
231
+ if not current_bills:
232
+ logger.error(f"No current bills found in {CURRENT_BILLS_FILE}")
233
+ return
234
+
235
+ logger.info(f"Loaded {len(current_bills)} current bills")
236
+
237
+ # Find most recent snapshot
238
+ snapshot_file = get_most_recent_snapshot()
239
+
240
+ if snapshot_file:
241
+ logger.info(f"Found previous snapshot: {snapshot_file}")
242
+ previous_bills = load_json(snapshot_file)
243
+ logger.info(f"Loaded {len(previous_bills)} bills from snapshot")
244
+
245
+ # Detect changes
246
+ changes, new_count, status_change_count, signed_count, vetoed_count = detect_changes(current_bills, previous_bills)
247
+
248
+ logger.info(f"Detected {len(changes)} total changes:")
249
+ logger.info(f" - New bills: {new_count}")
250
+ logger.info(f" - Status changes: {status_change_count}")
251
+ logger.info(f" - Signed into law: {signed_count}")
252
+ logger.info(f" - Vetoed: {vetoed_count}")
253
+
254
+ # Save changes to CSV
255
+ timestamp = datetime.now().strftime("%Y-%m-%d")
256
+ csv_file = CHANGES_DIR / f"weekly_changes_{timestamp}.csv"
257
+ save_csv(csv_file, changes)
258
+
259
+ print(f"✅ Change detection complete!")
260
+ print(f" New bills: {new_count}")
261
+ print(f" Status changes: {status_change_count}")
262
+ print(f" Signed into law: {signed_count}")
263
+ print(f" Vetoed: {vetoed_count}")
264
+ print(f" CSV saved: {csv_file}")
265
+
266
+ _update_pipeline_progress(1, 1, "changes", f"{len(changes)} changes detected")
267
+ else:
268
+ logger.info("No previous snapshot found - this is the first run")
269
+ print("ℹ️ No previous snapshot found - establishing baseline")
270
+
271
+ # Save current state as new snapshot
272
+ timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
273
+ snapshot_file = SNAPSHOTS_DIR / f"known_bills_{timestamp}.json"
274
+ save_json(snapshot_file, current_bills)
275
+ logger.info(f"Saved snapshot: {snapshot_file}")
276
+ print(f"✅ Snapshot saved: {snapshot_file}")
277
+
278
+
279
+ if __name__ == "__main__":
280
+ main()