open-navigator / scripts /data /migrate_to_events_naming.py
jcbowyer's picture
Deploy: Consolidated gold tables, fixed nginx docs routing
896453f verified
#!/usr/bin/env python3
"""
Migrate old naming conventions to new events_ and contacts_ naming.
OLD NAMING β†’ NEW NAMING:
- meetings.parquet β†’ events_events.parquet
- meetings_calendar.parquet β†’ events_events.parquet (merged)
- meetings_transcripts.parquet β†’ events_event_documents.parquet
- meetings_topics.parquet β†’ events_event_agenda_items.parquet
- meetings_demographics.parquet β†’ events_event_participants.parquet
- meetings_decisions.parquet β†’ events_event_bills.parquet
- contacts_meeting_attendance.parquet β†’ events_event_participants.parquet (merged)
This script will:
1. Find all old-named files in data/gold/
2. Rename them to new naming convention
3. Backup old files before renaming
4. Generate a migration report
"""
import shutil
from pathlib import Path
from datetime import datetime
import logging
import sys
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(message)s'
)
logger = logging.getLogger(__name__)
# Renaming map
RENAME_MAP = {
"meetings.parquet": "events.parquet",
"meetings_calendar.parquet": "events.parquet", # Will be merged if both exist
"meetings_transcripts.parquet": "event_documents.parquet",
"meetings_topics.parquet": "event_agenda_items.parquet",
"meetings_demographics.parquet": "event_participants.parquet",
"meetings_decisions.parquet": "event_bills.parquet",
"contacts_meeting_attendance.parquet": "event_participants.parquet", # Merge with participants
# Rename old events_event_* to new event_* naming
"events_events.parquet": "events.parquet",
"events_event_documents.parquet": "event_documents.parquet",
"events_event_participants.parquet": "event_participants.parquet",
"events_event_agenda_items.parquet": "event_agenda_items.parquet",
"events_event_bills.parquet": "event_bills.parquet",
"events_event_media.parquet": "event_media.parquet",
}
def backup_file(file_path: Path) -> Path:
"""Create a backup of the file with timestamp."""
file_path = file_path.resolve() # Convert to absolute path
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = file_path.parent / ".migration_backup"
backup_dir.mkdir(exist_ok=True)
backup_path = backup_dir / f"{file_path.stem}_{timestamp}.parquet"
shutil.copy2(file_path, backup_path)
try:
print(f" πŸ“¦ Backed up to: {backup_path.relative_to(Path.cwd())}")
except ValueError:
print(f" πŸ“¦ Backed up to: {backup_path}")
return backup_path
def cleanup_backups(directory: Path, dry_run: bool = False):
"""Remove all .migration_backup directories."""
backup_dirs = list(directory.rglob(".migration_backup"))
if not backup_dirs:
print("No backup directories found")
return 0
print(f"\nFound {len(backup_dirs)} backup directories:")
for backup_dir in backup_dirs:
try:
print(f" πŸ“¦ {backup_dir.relative_to(Path.cwd())}")
except ValueError:
print(f" πŸ“¦ {backup_dir}")
if backup_dir.is_dir():
file_count = len(list(backup_dir.glob("*.parquet")))
print(f" ({file_count} files)")
if dry_run:
print("\n[DRY RUN] Would delete these backup directories")
return len(backup_dirs)
print("\n⚠️ This will permanently delete all backup files!")
response = input("Are you sure you want to continue? (yes/no): ")
if response.lower() != "yes":
print("Cancelled - no backups were deleted")
return 0
deleted = 0
for backup_dir in backup_dirs:
try:
shutil.rmtree(backup_dir)
try:
print(f"βœ… Deleted: {backup_dir.relative_to(Path.cwd())}")
except ValueError:
print(f"βœ… Deleted: {backup_dir}")
deleted += 1
except Exception as e:
print(f"❌ Error deleting {backup_dir}: {e}")
return deleted
def rename_file(old_path: Path, new_name: str, dry_run: bool = False, skip_backup: bool = False) -> bool:
"""Rename a file to new naming convention."""
old_path = old_path.resolve() # Convert to absolute path
new_path = old_path.parent / new_name
print(f"\nπŸ”„ {old_path.relative_to(Path.cwd())}")
print(f" β†’ {new_path.relative_to(Path.cwd())}")
if new_path.exists():
print(f" ⚠️ Target already exists: {new_path.name}")
print(f" Consider merging or manually resolving")
return False
if dry_run:
print(" [DRY RUN] Would rename this file")
return True
# Create backup unless skipped
if not skip_backup:
backup_file(old_path)
else:
print(" ⏭️ Skipping backup (--no-backup)")
# Rename
old_path.rename(new_path)
print(f" βœ… Renamed successfully")
return True
def scan_directory(directory: Path, dry_run: bool = False, skip_backup: bool = False):
"""Scan a directory for old-named files."""
renamed_count = 0
skipped_count = 0
for old_name, new_name in RENAME_MAP.items():
# Find all occurrences of this old filename
old_files = list(directory.rglob(old_name))
for old_file in old_files:
# Skip backup directories
if ".migration_backup" in str(old_file):
continue
success = rename_file(old_file, new_name, dry_run=dry_run, skip_backup=skip_backup)
if success:
renamed_count += 1
else:
skipped_count += 1
return renamed_count, skipped_count
def main():
"""Main migration function."""
import argparse
parser = argparse.ArgumentParser(description="Migrate to new events_ naming convention")
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be renamed without actually renaming"
)
parser.add_argument(
"--directory",
type=str,
default="data/gold",
help="Directory to scan for files (default: data/gold)"
)
parser.add_argument(
"--no-backup",
action="store_true",
help="Skip creating backups (NOT RECOMMENDED unless you have external backups)"
)
parser.add_argument(
"--cleanup-backups",
action="store_true",
help="Remove all .migration_backup directories"
)
args = parser.parse_args()
gold_dir = Path(args.directory)
if not gold_dir.exists():
print(f"❌ Directory not found: {gold_dir}")
sys.exit(1)
# Handle cleanup mode
if args.cleanup_backups:
print("=" * 80)
print("πŸ—‘οΈ Cleanup Backup Directories")
print("=" * 80)
print(f"Scanning: {gold_dir.absolute()}")
deleted = cleanup_backups(gold_dir, dry_run=args.dry_run)
print("")
print("=" * 80)
if args.dry_run:
print(f"Would delete {deleted} backup directories")
else:
print(f"βœ… Deleted {deleted} backup directories")
print("=" * 80)
return
print("=" * 80)
if args.dry_run:
print("πŸ” DRY RUN: File Naming Migration")
else:
print("πŸ”„ File Naming Migration")
print("=" * 80)
print(f"Scanning: {gold_dir.absolute()}")
if args.no_backup:
print("⚠️ Backups DISABLED - files will be renamed without backup!")
print("")
print("Renaming map:")
for old, new in RENAME_MAP.items():
print(f" {old} β†’ {new}")
print("")
# Scan and rename
renamed, skipped = scan_directory(gold_dir, dry_run=args.dry_run, skip_backup=args.no_backup)
print("")
print("=" * 80)
print("Migration Summary")
print("=" * 80)
print(f"βœ… Renamed: {renamed} files")
print(f"⚠️ Skipped: {skipped} files (target exists or error)")
if args.dry_run:
print("")
print("This was a DRY RUN. No files were actually renamed.")
print("Run without --dry-run to perform the migration.")
else:
print("")
print("βœ… Migration complete!")
if not args.no_backup:
print("Backups are stored in .migration_backup directories")
print("Run with --cleanup-backups to remove them after verification")
print("")
if __name__ == "__main__":
main()