Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| NUCLEAR CLEANUP: Delete corrupted conversations and fix the index | |
| """ | |
| import asyncio | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from app.database import connect_db, disconnect_db, get_db | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| async def nuclear_cleanup(): | |
| """Aggressively fix all conversation issues""" | |
| try: | |
| await connect_db() | |
| db = await get_db() | |
| logger.info("Step 1: Finding ALL conversations...") | |
| all_convs = await db.conversations.find({}).to_list(length=None) | |
| logger.info(f"Total conversations in DB: {len(all_convs)}") | |
| # Check each conversation | |
| corrupted_ids = [] | |
| for conv in all_convs: | |
| participants = conv.get("participants") | |
| conv_id = conv["_id"] | |
| # Check if corrupted (not a proper list of 2 strings) | |
| is_corrupted = False | |
| if isinstance(participants, str): | |
| logger.info(f" β CORRUPTED (string): {conv_id} -> '{participants}'") | |
| is_corrupted = True | |
| elif not isinstance(participants, list): | |
| logger.info(f" β CORRUPTED (not list): {conv_id} -> {type(participants)}") | |
| is_corrupted = True | |
| elif len(participants) != 2: | |
| logger.info(f" β CORRUPTED (wrong length): {conv_id} -> {participants}") | |
| is_corrupted = True | |
| else: | |
| # Verify each participant is a string | |
| for p in participants: | |
| if not isinstance(p, str): | |
| logger.info(f" β CORRUPTED (bad participant type): {conv_id}") | |
| is_corrupted = True | |
| break | |
| if is_corrupted: | |
| corrupted_ids.append(conv_id) | |
| logger.info(f"\nFound {len(corrupted_ids)} corrupted conversations") | |
| # Delete corrupted conversations and their messages | |
| for conv_id in corrupted_ids: | |
| await db.messages.delete_many({"conversation_id": str(conv_id)}) | |
| await db.conversations.delete_one({"_id": conv_id}) | |
| logger.info(f" β Deleted: {conv_id}") | |
| # Step 2: Drop and recreate the index | |
| logger.info("\nStep 2: Fixing participants index...") | |
| try: | |
| await db.conversations.drop_index("participants_1") | |
| logger.info(" Dropped old participants_1 index") | |
| except Exception as e: | |
| logger.info(f" No existing index to drop: {e}") | |
| # Recreate with proper unique index | |
| await db.conversations.create_index("participants", unique=True) | |
| logger.info(" β Created new unique index on participants") | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"β NUCLEAR CLEANUP COMPLETE!") | |
| logger.info(f"Deleted {len(corrupted_ids)} corrupted conversations") | |
| logger.info(f"Index recreated") | |
| logger.info(f"{'='*60}\n") | |
| except Exception as e: | |
| logger.error(f"Cleanup failed: {e}") | |
| raise | |
| finally: | |
| await disconnect_db() | |
| if __name__ == "__main__": | |
| logger.info("π₯ NUCLEAR CLEANUP - Fixing all conversation issues...") | |
| asyncio.run(nuclear_cleanup()) | |