open-navigator / scripts /data /manage_contacts.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
896453f verified
#!/usr/bin/env python
"""
Unified Contacts & Meetings Data Management
Manage contact extraction from meetings and relationships between
contacts and meetings.
Gold Tables:
- contacts_local_officials: Unique officials aggregated from meetings
- contacts_meeting_attendance: Junction table (meeting_id โ†” contact_id)
- meetings_transcripts: Source data with 153K meetings
Usage:
# Show statistics
python scripts/manage_contacts.py stats
# Extract contacts from meetings (incremental batches)
python scripts/manage_contacts.py extract --batch-size 10000 --limit 50000
# Build meeting attendance relationships
python scripts/manage_contacts.py build-attendance
# Full refresh (careful - takes time!)
python scripts/manage_contacts.py refresh-all --confirm
"""
import pandas as pd
import pyarrow.parquet as pq
from pathlib import Path
from typing import List, Dict, Optional, Set
from datetime import datetime
import re
from loguru import logger
import sys
import argparse
from tqdm import tqdm
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
# File paths
MEETINGS_TRANSCRIPTS = Path("data/gold/national/meetings_transcripts.parquet")
CONTACTS_OFFICIALS = Path("data/gold/contacts_local_officials.parquet")
MEETING_ATTENDANCE = Path("data/gold/contacts_meeting_attendance.parquet")
class ContactsManager:
"""Manage contacts extraction and relationships"""
def __init__(self):
self.data_dir = Path("data/gold")
self.data_dir.mkdir(parents=True, exist_ok=True)
def extract_officials_from_transcript(self, text: str, jurisdiction: str = "") -> List[Dict]:
"""
Extract official names and titles from meeting transcripts.
Patterns:
1. Roll call: "Jerry Schultz here, Ted Nelson here"
2. Title mentions: "Mayor Smith", "Councilmember Jones"
3. Speaker labels: "John Doe: Thank you Mr. Mayor"
"""
if not text or pd.isna(text):
return []
text_str = str(text)
officials = []
seen_names = set()
# Pattern 1: Roll call ("Name here")
roll_call_pattern = r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2})\s+(?:here|present|aye)'
for match in re.finditer(roll_call_pattern, text_str, re.IGNORECASE):
name = match.group(1).strip()
if self._is_valid_name(name) and name not in seen_names:
seen_names.add(name)
officials.append({
'name': name,
'title': 'Council Member',
'jurisdiction': jurisdiction,
'source': 'roll_call'
})
# Pattern 2: Titles with names
title_patterns = [
(r'Mayor\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})', 'Mayor'),
(r'(?:Councilmember|Council Member)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})', 'Council Member'),
(r'Commissioner\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+){0,2})', 'Commissioner'),
]
for pattern, title in title_patterns:
for match in re.finditer(pattern, text_str, re.IGNORECASE):
name = match.group(1).strip()
if self._is_valid_name(name) and name not in seen_names:
seen_names.add(name)
officials.append({
'name': name,
'title': title,
'jurisdiction': jurisdiction,
'source': 'title_mention'
})
# Pattern 3: Speaker labels ("Name: text")
speaker_pattern = r'^([A-Z][a-z]+(?:\s+[A-Z][a-z]+){1,2}):\s+'
for match in re.finditer(speaker_pattern, text_str, re.MULTILINE):
name = match.group(1).strip()
if self._is_valid_name(name) and name not in seen_names:
seen_names.add(name)
officials.append({
'name': name,
'title': 'Speaker',
'jurisdiction': jurisdiction,
'source': 'speaker_label'
})
return officials
def _is_valid_name(self, name: str) -> bool:
"""Check if extracted string is likely a valid name"""
if not name or len(name) < 3:
return False
# Skip common false positives (exact match and contains)
false_positive_words = {
'thank', 'you', 'good', 'evening', 'morning', 'afternoon',
'right', 'sir', 'madam', 'chair', 'mayor', 'council',
'board', 'member', 'members', 'city', 'town', 'county',
'vice', 'commissioner', 'supervisor', 'alderman'
}
name_lower = name.lower()
for word in false_positive_words:
if word in name_lower:
return False
# Must have at least 2 words (first and last name)
parts = name.split()
if len(parts) < 2:
return False
# Each part should start with capital
if not all(part[0].isupper() for part in parts):
return False
# Each part should have at least 2 letters
if not all(len(part) >= 2 for part in parts):
return False
# Should not have more than 4 words (avoid sentences)
if len(parts) > 4:
return False
return True
def cmd_stats(self):
"""Show statistics about contacts and meetings"""
logger.info("=" * 70)
logger.info("CONTACTS & MEETINGS STATISTICS")
logger.info("=" * 70)
# Check meetings
if MEETINGS_TRANSCRIPTS.exists():
meetings_df = pd.read_parquet(MEETINGS_TRANSCRIPTS, columns=['meeting_id', 'jurisdiction'])
logger.info(f"\n๐Ÿ“… MEETINGS:")
logger.info(f" Total: {len(meetings_df):,}")
logger.info(f" Jurisdictions: {meetings_df['jurisdiction'].nunique():,}")
top_jurisdictions = meetings_df['jurisdiction'].value_counts().head(10)
logger.info(f"\n Top 10 Jurisdictions:")
for jurisdiction, count in top_jurisdictions.items():
logger.info(f" {jurisdiction}: {count:,}")
else:
logger.warning(f"\nโš ๏ธ No meetings file found: {MEETINGS_TRANSCRIPTS}")
# Check contacts
if CONTACTS_OFFICIALS.exists():
contacts_df = pd.read_parquet(CONTACTS_OFFICIALS)
logger.info(f"\n๐Ÿ‘ฅ CONTACTS (Local Officials):")
logger.info(f" Total: {len(contacts_df):,}")
if 'meetings_count' in contacts_df.columns:
logger.info(f" Avg meetings per official: {contacts_df['meetings_count'].mean():.1f}")
logger.info(f" Max meetings: {contacts_df['meetings_count'].max():,}")
if 'title' in contacts_df.columns:
logger.info(f"\n By Title:")
title_counts = contacts_df['title'].value_counts().head(10)
for title, count in title_counts.items():
logger.info(f" {title}: {count:,}")
size_mb = CONTACTS_OFFICIALS.stat().st_size / (1024 * 1024)
logger.info(f"\n File size: {size_mb:.2f} MB")
else:
logger.warning(f"\nโš ๏ธ No contacts file found: {CONTACTS_OFFICIALS}")
# Check attendance
if MEETING_ATTENDANCE.exists():
attendance_df = pd.read_parquet(MEETING_ATTENDANCE)
logger.info(f"\n๐Ÿ“‹ MEETING ATTENDANCE (Relationships):")
logger.info(f" Total records: {len(attendance_df):,}")
logger.info(f" Unique meetings: {attendance_df['meeting_id'].nunique():,}")
logger.info(f" Unique contacts: {attendance_df['name'].nunique():,}")
avg_per_meeting = len(attendance_df) / attendance_df['meeting_id'].nunique()
logger.info(f" Avg attendees per meeting: {avg_per_meeting:.1f}")
size_mb = MEETING_ATTENDANCE.stat().st_size / (1024 * 1024)
logger.info(f" File size: {size_mb:.2f} MB")
else:
logger.warning(f"\nโš ๏ธ No attendance file found: {MEETING_ATTENDANCE}")
logger.info("\n" + "=" * 70)
def cmd_extract(self, batch_size: int = 1000, limit: Optional[int] = None):
"""
Extract contacts from meetings in batches.
MEMORY OPTIMIZED: Saves after each batch, uses small batches.
Args:
batch_size: Number of meetings to process per batch (default: 1000)
limit: Maximum number of meetings to process (None = all)
"""
logger.info("=" * 70)
logger.info("EXTRACTING CONTACTS FROM MEETINGS")
logger.info("=" * 70)
if not MEETINGS_TRANSCRIPTS.exists():
logger.error(f"Meetings file not found: {MEETINGS_TRANSCRIPTS}")
return
# Load meetings metadata (not transcripts yet - too big!)
logger.info(f"\n๐Ÿ“‚ Loading meetings metadata...")
parquet_file = pq.ParquetFile(MEETINGS_TRANSCRIPTS)
total_meetings = parquet_file.metadata.num_rows
logger.info(f" Total meetings: {total_meetings:,}")
logger.info(f" Batch size: {batch_size:,}")
if limit:
total_to_process = min(limit, total_meetings)
logger.info(f" Limit: {limit:,} โ†’ Will process {total_to_process:,}")
else:
total_to_process = total_meetings
# Load existing contacts to avoid duplicates
existing_contacts = {}
if CONTACTS_OFFICIALS.exists():
contacts_df = pd.read_parquet(CONTACTS_OFFICIALS)
logger.info(f"\n๐Ÿ“‹ Loaded {len(contacts_df):,} existing contacts")
for _, row in contacts_df.iterrows():
key = (row['name'], row.get('jurisdiction', ''))
existing_contacts[key] = row.to_dict()
else:
logger.info(f"\n๐Ÿ“‹ No existing contacts, starting fresh")
# Track attendance separately (will merge at end)
existing_attendance = []
if MEETING_ATTENDANCE.exists():
existing_attendance_df = pd.read_parquet(MEETING_ATTENDANCE)
existing_attendance = existing_attendance_df.to_dict('records')
logger.info(f" Loaded {len(existing_attendance):,} existing attendance records")
logger.info(f"\n๐Ÿ”„ Processing meetings in batches...")
logger.info(f" ๐Ÿ’พ Saving after each batch to avoid memory issues")
for batch_start in tqdm(range(0, total_to_process, batch_size), desc="Batches"):
batch_end = min(batch_start + batch_size, total_to_process)
# Load batch (only needed columns)
batch_df = pd.read_parquet(
MEETINGS_TRANSCRIPTS,
columns=['meeting_id', 'jurisdiction', 'transcript_text']
)[batch_start:batch_end]
batch_attendance = []
# Extract from each meeting
for _, meeting in batch_df.iterrows():
officials = self.extract_officials_from_transcript(
meeting['transcript_text'],
meeting.get('jurisdiction', '')
)
# Add to attendance (many-to-many)
for official in officials:
batch_attendance.append({
'meeting_id': meeting['meeting_id'],
'name': official['name'],
'title': official['title'],
'jurisdiction': official['jurisdiction'],
'source': official['source']
})
# Aggregate officials (deduplicate by name + jurisdiction)
for official in officials:
key = (official['name'], official['jurisdiction'])
if key in existing_contacts:
# Update meetings count
existing_contacts[key]['meetings_count'] = \
existing_contacts[key].get('meetings_count', 0) + 1
else:
existing_contacts[key] = {
'name': official['name'],
'title': official['title'],
'jurisdiction': official['jurisdiction'],
'meetings_count': 1,
'first_seen': datetime.now().isoformat(),
'data_source': 'meeting_transcripts'
}
# Add batch attendance to total
existing_attendance.extend(batch_attendance)
# Free memory
del batch_df
del batch_attendance
# SAVE AFTER EACH BATCH (prevent data loss on crash)
if (batch_start // batch_size) % 5 == 0 or batch_end >= total_to_process:
# Save every 5 batches or at end
self._save_results(existing_contacts, existing_attendance)
# Final save
logger.info(f"\n๐Ÿ’พ Final save...")
contacts_df, attendance_df = self._save_results(existing_contacts, existing_attendance)
logger.info("\n" + "=" * 70)
logger.success("EXTRACTION COMPLETE!")
logger.info("=" * 70)
# Show summary
if contacts_df is not None and len(contacts_df) > 0:
logger.info(f"\n๐Ÿ“Š SUMMARY:")
logger.info(f" Unique contacts: {len(contacts_df):,}")
logger.info(f" Attendance records: {len(attendance_df):,}")
logger.info(f" Avg meetings per contact: {contacts_df['meetings_count'].mean():.1f}")
logger.info(f"\n Top 10 Most Active:")
top_10 = contacts_df.head(10)
for _, row in top_10.iterrows():
logger.info(f" {row['name']} ({row.get('title', 'Unknown')}): {row['meetings_count']} meetings")
def _save_results(self, existing_contacts: dict, existing_attendance: list):
"""Save contacts and attendance to disk"""
# Save contacts
contacts_df = pd.DataFrame(list(existing_contacts.values()))
if len(contacts_df) > 0:
# Add last_updated
contacts_df['last_updated'] = datetime.now().isoformat()
# Sort by meetings_count descending
contacts_df = contacts_df.sort_values('meetings_count', ascending=False)
contacts_df.to_parquet(CONTACTS_OFFICIALS, index=False)
# Save attendance
attendance_df = None
if existing_attendance:
attendance_df = pd.DataFrame(existing_attendance)
attendance_df['recorded_at'] = datetime.now().isoformat()
# Deduplicate by (meeting_id, name)
attendance_df = attendance_df.drop_duplicates(subset=['meeting_id', 'name'], keep='last')
attendance_df.to_parquet(MEETING_ATTENDANCE, index=False)
return contacts_df, attendance_df
def cmd_build_attendance(self):
"""Build meeting attendance from existing contacts"""
logger.info("=" * 70)
logger.info("BUILDING MEETING ATTENDANCE")
logger.info("=" * 70)
if not CONTACTS_OFFICIALS.exists():
logger.error("No contacts file found. Run 'extract' first.")
return
if not MEETINGS_TRANSCRIPTS.exists():
logger.error(f"Meetings file not found: {MEETINGS_TRANSCRIPTS}")
return
logger.info("\n๐Ÿ“‹ Loading contacts...")
contacts_df = pd.read_parquet(CONTACTS_OFFICIALS)
logger.info(f" Loaded {len(contacts_df):,} contacts")
logger.info("\n๐Ÿ“… Scanning meetings for contact appearances...")
# This is simpler - just re-extract from all meetings
# (In practice, this is what cmd_extract does)
logger.info(" ๐Ÿ’ก Use 'extract' command to rebuild attendance")
def cmd_refresh_all(self, confirm: bool = False, batch_size: int = 1000):
"""Full refresh - delete existing and re-extract everything"""
if not confirm:
logger.warning("โš ๏ธ This will DELETE existing contacts and re-extract from scratch!")
logger.warning(" Add --confirm flag to proceed")
return
logger.info("=" * 70)
logger.info("FULL REFRESH")
logger.info("=" * 70)
# Delete existing
for file_path in [CONTACTS_OFFICIALS, MEETING_ATTENDANCE]:
if file_path.exists():
logger.info(f" Deleting: {file_path}")
file_path.unlink()
# Re-extract all with specified batch size
logger.info(f"\n๐Ÿ”„ Starting fresh extraction (batch_size={batch_size})...")
self.cmd_extract(batch_size=batch_size, limit=None)
def main():
"""Main CLI"""
parser = argparse.ArgumentParser(
description="Manage contacts and meetings relationships",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# Stats command
subparsers.add_parser('stats', help='Show statistics')
# Extract command
extract_parser = subparsers.add_parser('extract', help='Extract contacts from meetings')
extract_parser.add_argument('--batch-size', type=int, default=1000,
help='Meetings per batch (default: 1000, lower = less memory)')
extract_parser.add_argument('--limit', type=int, default=None,
help='Max meetings to process (default: all)')
# Build attendance command
subparsers.add_parser('build-attendance', help='Build meeting attendance relationships')
# Refresh command
refresh_parser = subparsers.add_parser('refresh-all', help='Delete and re-extract everything')
refresh_parser.add_argument('--confirm', action='store_true',
help='Confirm destructive operation')
refresh_parser.add_argument('--batch-size', type=int, default=1000,
help='Meetings per batch (default: 1000, lower = less memory)')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
manager = ContactsManager()
if args.command == 'stats':
manager.cmd_stats()
elif args.command == 'extract':
manager.cmd_extract(batch_size=args.batch_size, limit=args.limit)
elif args.command == 'build-attendance':
manager.cmd_build_attendance()
elif args.command == 'refresh-all':
manager.cmd_refresh_all(confirm=args.confirm, batch_size=args.batch_size)
if __name__ == "__main__":
main()