Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| """ | |
| Migrate old naming conventions to new events_ and contacts_ naming. | |
| OLD NAMING β NEW NAMING: | |
| - meetings.parquet β events_events.parquet | |
| - meetings_calendar.parquet β events_events.parquet (merged) | |
| - meetings_transcripts.parquet β events_event_documents.parquet | |
| - meetings_topics.parquet β events_event_agenda_items.parquet | |
| - meetings_demographics.parquet β events_event_participants.parquet | |
| - meetings_decisions.parquet β events_event_bills.parquet | |
| - contacts_meeting_attendance.parquet β events_event_participants.parquet (merged) | |
| This script will: | |
| 1. Find all old-named files in data/gold/ | |
| 2. Rename them to new naming convention | |
| 3. Backup old files before renaming | |
| 4. Generate a migration report | |
| """ | |
| import shutil | |
| from pathlib import Path | |
| from datetime import datetime | |
| import logging | |
| import sys | |
| # Setup logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Renaming map | |
| RENAME_MAP = { | |
| "meetings.parquet": "events.parquet", | |
| "meetings_calendar.parquet": "events.parquet", # Will be merged if both exist | |
| "meetings_transcripts.parquet": "event_documents.parquet", | |
| "meetings_topics.parquet": "event_agenda_items.parquet", | |
| "meetings_demographics.parquet": "event_participants.parquet", | |
| "meetings_decisions.parquet": "event_bills.parquet", | |
| "contacts_meeting_attendance.parquet": "event_participants.parquet", # Merge with participants | |
| # Rename old events_event_* to new event_* naming | |
| "events_events.parquet": "events.parquet", | |
| "events_event_documents.parquet": "event_documents.parquet", | |
| "events_event_participants.parquet": "event_participants.parquet", | |
| "events_event_agenda_items.parquet": "event_agenda_items.parquet", | |
| "events_event_bills.parquet": "event_bills.parquet", | |
| "events_event_media.parquet": "event_media.parquet", | |
| } | |
| def backup_file(file_path: Path) -> Path: | |
| """Create a backup of the file with timestamp.""" | |
| file_path = file_path.resolve() # Convert to absolute path | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_dir = file_path.parent / ".migration_backup" | |
| backup_dir.mkdir(exist_ok=True) | |
| backup_path = backup_dir / f"{file_path.stem}_{timestamp}.parquet" | |
| shutil.copy2(file_path, backup_path) | |
| try: | |
| print(f" π¦ Backed up to: {backup_path.relative_to(Path.cwd())}") | |
| except ValueError: | |
| print(f" π¦ Backed up to: {backup_path}") | |
| return backup_path | |
| def cleanup_backups(directory: Path, dry_run: bool = False): | |
| """Remove all .migration_backup directories.""" | |
| backup_dirs = list(directory.rglob(".migration_backup")) | |
| if not backup_dirs: | |
| print("No backup directories found") | |
| return 0 | |
| print(f"\nFound {len(backup_dirs)} backup directories:") | |
| for backup_dir in backup_dirs: | |
| try: | |
| print(f" π¦ {backup_dir.relative_to(Path.cwd())}") | |
| except ValueError: | |
| print(f" π¦ {backup_dir}") | |
| if backup_dir.is_dir(): | |
| file_count = len(list(backup_dir.glob("*.parquet"))) | |
| print(f" ({file_count} files)") | |
| if dry_run: | |
| print("\n[DRY RUN] Would delete these backup directories") | |
| return len(backup_dirs) | |
| print("\nβ οΈ This will permanently delete all backup files!") | |
| response = input("Are you sure you want to continue? (yes/no): ") | |
| if response.lower() != "yes": | |
| print("Cancelled - no backups were deleted") | |
| return 0 | |
| deleted = 0 | |
| for backup_dir in backup_dirs: | |
| try: | |
| shutil.rmtree(backup_dir) | |
| try: | |
| print(f"β Deleted: {backup_dir.relative_to(Path.cwd())}") | |
| except ValueError: | |
| print(f"β Deleted: {backup_dir}") | |
| deleted += 1 | |
| except Exception as e: | |
| print(f"β Error deleting {backup_dir}: {e}") | |
| return deleted | |
| def rename_file(old_path: Path, new_name: str, dry_run: bool = False, skip_backup: bool = False) -> bool: | |
| """Rename a file to new naming convention.""" | |
| old_path = old_path.resolve() # Convert to absolute path | |
| new_path = old_path.parent / new_name | |
| print(f"\nπ {old_path.relative_to(Path.cwd())}") | |
| print(f" β {new_path.relative_to(Path.cwd())}") | |
| if new_path.exists(): | |
| print(f" β οΈ Target already exists: {new_path.name}") | |
| print(f" Consider merging or manually resolving") | |
| return False | |
| if dry_run: | |
| print(" [DRY RUN] Would rename this file") | |
| return True | |
| # Create backup unless skipped | |
| if not skip_backup: | |
| backup_file(old_path) | |
| else: | |
| print(" βοΈ Skipping backup (--no-backup)") | |
| # Rename | |
| old_path.rename(new_path) | |
| print(f" β Renamed successfully") | |
| return True | |
| def scan_directory(directory: Path, dry_run: bool = False, skip_backup: bool = False): | |
| """Scan a directory for old-named files.""" | |
| renamed_count = 0 | |
| skipped_count = 0 | |
| for old_name, new_name in RENAME_MAP.items(): | |
| # Find all occurrences of this old filename | |
| old_files = list(directory.rglob(old_name)) | |
| for old_file in old_files: | |
| # Skip backup directories | |
| if ".migration_backup" in str(old_file): | |
| continue | |
| success = rename_file(old_file, new_name, dry_run=dry_run, skip_backup=skip_backup) | |
| if success: | |
| renamed_count += 1 | |
| else: | |
| skipped_count += 1 | |
| return renamed_count, skipped_count | |
| def main(): | |
| """Main migration function.""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Migrate to new events_ naming convention") | |
| parser.add_argument( | |
| "--dry-run", | |
| action="store_true", | |
| help="Show what would be renamed without actually renaming" | |
| ) | |
| parser.add_argument( | |
| "--directory", | |
| type=str, | |
| default="data/gold", | |
| help="Directory to scan for files (default: data/gold)" | |
| ) | |
| parser.add_argument( | |
| "--no-backup", | |
| action="store_true", | |
| help="Skip creating backups (NOT RECOMMENDED unless you have external backups)" | |
| ) | |
| parser.add_argument( | |
| "--cleanup-backups", | |
| action="store_true", | |
| help="Remove all .migration_backup directories" | |
| ) | |
| args = parser.parse_args() | |
| gold_dir = Path(args.directory) | |
| if not gold_dir.exists(): | |
| print(f"β Directory not found: {gold_dir}") | |
| sys.exit(1) | |
| # Handle cleanup mode | |
| if args.cleanup_backups: | |
| print("=" * 80) | |
| print("ποΈ Cleanup Backup Directories") | |
| print("=" * 80) | |
| print(f"Scanning: {gold_dir.absolute()}") | |
| deleted = cleanup_backups(gold_dir, dry_run=args.dry_run) | |
| print("") | |
| print("=" * 80) | |
| if args.dry_run: | |
| print(f"Would delete {deleted} backup directories") | |
| else: | |
| print(f"β Deleted {deleted} backup directories") | |
| print("=" * 80) | |
| return | |
| print("=" * 80) | |
| if args.dry_run: | |
| print("π DRY RUN: File Naming Migration") | |
| else: | |
| print("π File Naming Migration") | |
| print("=" * 80) | |
| print(f"Scanning: {gold_dir.absolute()}") | |
| if args.no_backup: | |
| print("β οΈ Backups DISABLED - files will be renamed without backup!") | |
| print("") | |
| print("Renaming map:") | |
| for old, new in RENAME_MAP.items(): | |
| print(f" {old} β {new}") | |
| print("") | |
| # Scan and rename | |
| renamed, skipped = scan_directory(gold_dir, dry_run=args.dry_run, skip_backup=args.no_backup) | |
| print("") | |
| print("=" * 80) | |
| print("Migration Summary") | |
| print("=" * 80) | |
| print(f"β Renamed: {renamed} files") | |
| print(f"β οΈ Skipped: {skipped} files (target exists or error)") | |
| if args.dry_run: | |
| print("") | |
| print("This was a DRY RUN. No files were actually renamed.") | |
| print("Run without --dry-run to perform the migration.") | |
| else: | |
| print("") | |
| print("β Migration complete!") | |
| if not args.no_backup: | |
| print("Backups are stored in .migration_backup directories") | |
| print("Run with --cleanup-backups to remove them after verification") | |
| print("") | |
| if __name__ == "__main__": | |
| main() | |