Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Create contacts data organized by state. | |
| For each state with meetings_transcripts.parquet: | |
| - Extract contacts_local_officials.parquet | |
| - Extract contacts_meeting_attendance.parquet | |
| """ | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime | |
| import pandas as pd | |
| from loguru import logger | |
| # Add project root to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| # Import the contacts extraction logic | |
| from pipeline.create_contacts_gold_tables import ContactsGoldTableCreator | |
| def create_contacts_for_state(state: str, state_dir: Path): | |
| """Create contacts files for a single state.""" | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"Processing {state}") | |
| logger.info(f"{'='*60}") | |
| transcripts_file = state_dir / "meetings_transcripts.parquet" | |
| if not transcripts_file.exists(): | |
| logger.warning(f" β οΈ No transcripts found: {transcripts_file}") | |
| return None | |
| # Load transcripts | |
| df = pd.read_parquet(transcripts_file) | |
| logger.info(f" π Loaded {len(df):,} meeting transcripts") | |
| # Create temporary contacts creator (will process in-memory) | |
| creator = ContactsGoldTableCreator( | |
| meetings_gold_dir=state_dir.parent.parent, # Not used for state-level | |
| output_dir=state_dir | |
| ) | |
| # Extract officials from each meeting | |
| all_officials = [] | |
| for idx, row in df.iterrows(): | |
| if idx > 0 and idx % 1000 == 0: | |
| logger.info(f" Processing {idx:,}/{len(df):,} meetings...") | |
| officials = creator.extract_officials_from_transcript( | |
| row.get('transcript_text', ''), | |
| row.get('jurisdiction', 'Unknown') | |
| ) | |
| for official in officials: | |
| official['meeting_id'] = row['meeting_id'] | |
| # Add state info | |
| official['state'] = state | |
| all_officials.append(official) | |
| if not all_officials: | |
| logger.warning(f" β οΈ No officials extracted for {state}") | |
| return {'state': state, 'officials': 0, 'attendance': 0} | |
| officials_df = pd.DataFrame(all_officials) | |
| logger.info(f" β Extracted {len(officials_df):,} official mentions") | |
| # 1. Create meeting attendance table (junction table) | |
| attendance_df = officials_df[[ | |
| 'meeting_id', 'name', 'title', 'jurisdiction', 'source', 'state' | |
| ]].copy() | |
| attendance_df['last_updated'] = datetime.now().isoformat() | |
| attendance_output = state_dir / "contacts_meeting_attendance.parquet" | |
| attendance_df.to_parquet(attendance_output, index=False, compression='snappy') | |
| size_attendance = attendance_output.stat().st_size / 1024 / 1024 | |
| logger.success( | |
| f" πΎ {attendance_output.name}: {len(attendance_df):,} records ({size_attendance:.2f} MB)" | |
| ) | |
| # 2. Create aggregated officials table | |
| grouped = officials_df.groupby(['name', 'jurisdiction']).agg({ | |
| 'title': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0], | |
| 'meeting_id': 'count', | |
| 'source': 'first', | |
| 'state': 'first' | |
| }).reset_index() | |
| grouped.rename(columns={'meeting_id': 'meetings_count'}, inplace=True) | |
| grouped['last_updated'] = datetime.now().isoformat() | |
| grouped['data_source'] = 'LocalView meeting transcripts' | |
| # Reorder columns | |
| officials_summary = grouped[[ | |
| 'name', 'title', 'jurisdiction', 'state', | |
| 'meetings_count', 'source', 'data_source', 'last_updated' | |
| ]] | |
| officials_output = state_dir / "contacts_local_officials.parquet" | |
| officials_summary.to_parquet(officials_output, index=False, compression='snappy') | |
| size_officials = officials_output.stat().st_size / 1024 / 1024 | |
| logger.success( | |
| f" πΎ {officials_output.name}: {len(officials_summary):,} unique officials ({size_officials:.2f} MB)" | |
| ) | |
| # Show top officials | |
| logger.info(f"\n π Top 5 officials by meeting attendance:") | |
| top_officials = officials_summary.sort_values('meetings_count', ascending=False).head(5) | |
| for _, row in top_officials.iterrows(): | |
| logger.info( | |
| f" β’ {row['name']} ({row['title']}) - {row['jurisdiction']} - " | |
| f"{row['meetings_count']} meetings" | |
| ) | |
| return { | |
| 'state': state, | |
| 'officials': len(officials_summary), | |
| 'attendance': len(attendance_df), | |
| 'size_mb': size_officials + size_attendance | |
| } | |
| def main(): | |
| """Process all states with meeting transcripts.""" | |
| logger.info("π Creating contacts data by state...\n") | |
| states_dir = Path("data/gold/states") | |
| if not states_dir.exists(): | |
| logger.error(f"States directory not found: {states_dir}") | |
| return | |
| # Find all state directories with transcripts | |
| state_dirs = sorted([ | |
| d for d in states_dir.iterdir() | |
| if d.is_dir() and (d / "meetings_transcripts.parquet").exists() | |
| ]) | |
| if not state_dirs: | |
| logger.error("No state directories with meetings_transcripts.parquet found") | |
| logger.info("Run: python scripts/split_meetings_by_state.py first") | |
| return | |
| logger.info(f"Found {len(state_dirs)} states with transcript data\n") | |
| results = [] | |
| for state_dir in state_dirs: | |
| state = state_dir.name | |
| result = create_contacts_for_state(state, state_dir) | |
| if result: | |
| results.append(result) | |
| # Summary | |
| logger.info(f"\n{'='*60}") | |
| logger.info("π SUMMARY") | |
| logger.info(f"{'='*60}\n") | |
| if results: | |
| results_df = pd.DataFrame(results) | |
| total_officials = results_df['officials'].sum() | |
| total_attendance = results_df['attendance'].sum() | |
| total_size = results_df['size_mb'].sum() | |
| logger.success(f"Processed {len(results)} states:") | |
| for _, row in results_df.iterrows(): | |
| logger.info( | |
| f" {row['state']}: {row['officials']:,} officials, " | |
| f"{row['attendance']:,} attendance records" | |
| ) | |
| logger.info("") | |
| logger.success(f"π¦ Total: {total_officials:,} unique officials") | |
| logger.success(f"π¦ Total: {total_attendance:,} attendance records") | |
| logger.success(f"π¦ Total size: {total_size:.1f} MB") | |
| logger.info("\nβ Files created in each state directory:") | |
| logger.info(" - contacts_local_officials.parquet") | |
| logger.info(" - contacts_meeting_attendance.parquet") | |
| else: | |
| logger.warning("No contacts created") | |
| if __name__ == "__main__": | |
| main() | |