Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
File size: 8,478 Bytes
896453f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 | #!/usr/bin/env python3
"""
Migrate old naming conventions to new events_ and contacts_ naming.
OLD NAMING β NEW NAMING:
- meetings.parquet β events_events.parquet
- meetings_calendar.parquet β events_events.parquet (merged)
- meetings_transcripts.parquet β events_event_documents.parquet
- meetings_topics.parquet β events_event_agenda_items.parquet
- meetings_demographics.parquet β events_event_participants.parquet
- meetings_decisions.parquet β events_event_bills.parquet
- contacts_meeting_attendance.parquet β events_event_participants.parquet (merged)
This script will:
1. Find all old-named files in data/gold/
2. Rename them to new naming convention
3. Backup old files before renaming
4. Generate a migration report
"""
import shutil
from pathlib import Path
from datetime import datetime
import logging
import sys
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(message)s'
)
logger = logging.getLogger(__name__)
# Renaming map
RENAME_MAP = {
"meetings.parquet": "events.parquet",
"meetings_calendar.parquet": "events.parquet", # Will be merged if both exist
"meetings_transcripts.parquet": "event_documents.parquet",
"meetings_topics.parquet": "event_agenda_items.parquet",
"meetings_demographics.parquet": "event_participants.parquet",
"meetings_decisions.parquet": "event_bills.parquet",
"contacts_meeting_attendance.parquet": "event_participants.parquet", # Merge with participants
# Rename old events_event_* to new event_* naming
"events_events.parquet": "events.parquet",
"events_event_documents.parquet": "event_documents.parquet",
"events_event_participants.parquet": "event_participants.parquet",
"events_event_agenda_items.parquet": "event_agenda_items.parquet",
"events_event_bills.parquet": "event_bills.parquet",
"events_event_media.parquet": "event_media.parquet",
}
def backup_file(file_path: Path) -> Path:
"""Create a backup of the file with timestamp."""
file_path = file_path.resolve() # Convert to absolute path
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = file_path.parent / ".migration_backup"
backup_dir.mkdir(exist_ok=True)
backup_path = backup_dir / f"{file_path.stem}_{timestamp}.parquet"
shutil.copy2(file_path, backup_path)
try:
print(f" π¦ Backed up to: {backup_path.relative_to(Path.cwd())}")
except ValueError:
print(f" π¦ Backed up to: {backup_path}")
return backup_path
def cleanup_backups(directory: Path, dry_run: bool = False):
"""Remove all .migration_backup directories."""
backup_dirs = list(directory.rglob(".migration_backup"))
if not backup_dirs:
print("No backup directories found")
return 0
print(f"\nFound {len(backup_dirs)} backup directories:")
for backup_dir in backup_dirs:
try:
print(f" π¦ {backup_dir.relative_to(Path.cwd())}")
except ValueError:
print(f" π¦ {backup_dir}")
if backup_dir.is_dir():
file_count = len(list(backup_dir.glob("*.parquet")))
print(f" ({file_count} files)")
if dry_run:
print("\n[DRY RUN] Would delete these backup directories")
return len(backup_dirs)
print("\nβ οΈ This will permanently delete all backup files!")
response = input("Are you sure you want to continue? (yes/no): ")
if response.lower() != "yes":
print("Cancelled - no backups were deleted")
return 0
deleted = 0
for backup_dir in backup_dirs:
try:
shutil.rmtree(backup_dir)
try:
print(f"β
Deleted: {backup_dir.relative_to(Path.cwd())}")
except ValueError:
print(f"β
Deleted: {backup_dir}")
deleted += 1
except Exception as e:
print(f"β Error deleting {backup_dir}: {e}")
return deleted
def rename_file(old_path: Path, new_name: str, dry_run: bool = False, skip_backup: bool = False) -> bool:
"""Rename a file to new naming convention."""
old_path = old_path.resolve() # Convert to absolute path
new_path = old_path.parent / new_name
print(f"\nπ {old_path.relative_to(Path.cwd())}")
print(f" β {new_path.relative_to(Path.cwd())}")
if new_path.exists():
print(f" β οΈ Target already exists: {new_path.name}")
print(f" Consider merging or manually resolving")
return False
if dry_run:
print(" [DRY RUN] Would rename this file")
return True
# Create backup unless skipped
if not skip_backup:
backup_file(old_path)
else:
print(" βοΈ Skipping backup (--no-backup)")
# Rename
old_path.rename(new_path)
print(f" β
Renamed successfully")
return True
def scan_directory(directory: Path, dry_run: bool = False, skip_backup: bool = False):
"""Scan a directory for old-named files."""
renamed_count = 0
skipped_count = 0
for old_name, new_name in RENAME_MAP.items():
# Find all occurrences of this old filename
old_files = list(directory.rglob(old_name))
for old_file in old_files:
# Skip backup directories
if ".migration_backup" in str(old_file):
continue
success = rename_file(old_file, new_name, dry_run=dry_run, skip_backup=skip_backup)
if success:
renamed_count += 1
else:
skipped_count += 1
return renamed_count, skipped_count
def main():
"""Main migration function."""
import argparse
parser = argparse.ArgumentParser(description="Migrate to new events_ naming convention")
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be renamed without actually renaming"
)
parser.add_argument(
"--directory",
type=str,
default="data/gold",
help="Directory to scan for files (default: data/gold)"
)
parser.add_argument(
"--no-backup",
action="store_true",
help="Skip creating backups (NOT RECOMMENDED unless you have external backups)"
)
parser.add_argument(
"--cleanup-backups",
action="store_true",
help="Remove all .migration_backup directories"
)
args = parser.parse_args()
gold_dir = Path(args.directory)
if not gold_dir.exists():
print(f"β Directory not found: {gold_dir}")
sys.exit(1)
# Handle cleanup mode
if args.cleanup_backups:
print("=" * 80)
print("ποΈ Cleanup Backup Directories")
print("=" * 80)
print(f"Scanning: {gold_dir.absolute()}")
deleted = cleanup_backups(gold_dir, dry_run=args.dry_run)
print("")
print("=" * 80)
if args.dry_run:
print(f"Would delete {deleted} backup directories")
else:
print(f"β
Deleted {deleted} backup directories")
print("=" * 80)
return
print("=" * 80)
if args.dry_run:
print("π DRY RUN: File Naming Migration")
else:
print("π File Naming Migration")
print("=" * 80)
print(f"Scanning: {gold_dir.absolute()}")
if args.no_backup:
print("β οΈ Backups DISABLED - files will be renamed without backup!")
print("")
print("Renaming map:")
for old, new in RENAME_MAP.items():
print(f" {old} β {new}")
print("")
# Scan and rename
renamed, skipped = scan_directory(gold_dir, dry_run=args.dry_run, skip_backup=args.no_backup)
print("")
print("=" * 80)
print("Migration Summary")
print("=" * 80)
print(f"β
Renamed: {renamed} files")
print(f"β οΈ Skipped: {skipped} files (target exists or error)")
if args.dry_run:
print("")
print("This was a DRY RUN. No files were actually renamed.")
print("Run without --dry-run to perform the migration.")
else:
print("")
print("β
Migration complete!")
if not args.no_backup:
print("Backups are stored in .migration_backup directories")
print("Run with --cleanup-backups to remove them after verification")
print("")
if __name__ == "__main__":
main()
|