#!/usr/bin/env python3 """ Migration: Rescue misrouted viewing reviews. Two classes of bad data this script fixes: CLASS A — person_reviews collection dm_brain.py (before the bug fix) wrote viewing reviews to a legacy `person_reviews` collection using field `reviewed_id` instead of `target_id`. These docs never showed up on the landlord's profile. CLASS B — reviews collection with ObjectId target_id viewing_notification_jobs.py stored `landlord_id` (a raw ObjectId) in the pending_viewing_review session, so dm_brain wrote the review with target_id = instead of a plain string. MongoDB's $match on the string version found nothing. Both classes are migrated into `reviews` with: target_type = "user" target_id = str(landlord_id) review_source = "viewing" After migrating, all affected users' ratings are recalculated from scratch. Run: python scripts/migrate_viewing_reviews.py # dry-run (shows what would change) python scripts/migrate_viewing_reviews.py --apply # actually writes to MongoDB """ import asyncio import sys import os from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from motor.motor_asyncio import AsyncIOMotorClient from bson import ObjectId from app.config import settings DRY_RUN = "--apply" not in sys.argv # ── helpers ────────────────────────────────────────────────────────────────── def _str_id(val) -> str: """Safely coerce ObjectId or string to string.""" if val is None: return "" return str(val) async def _get_db(): client = AsyncIOMotorClient(settings.MONGODB_URL, serverSelectionTimeoutMS=5000) await client.admin.command("ping") return client[settings.MONGODB_DATABASE] def _is_objectid(val) -> bool: return isinstance(val, ObjectId) async def _find_in_reviews(db, reviewer_id: str, target_id: str): """Return matching review doc from canonical collection, or None.""" return await db.reviews.find_one({ "reviewer_id": reviewer_id, "target_type": "user", "target_id": target_id, }) async def _already_in_reviews(db, reviewer_id: str, target_id: str) -> bool: return await _find_in_reviews(db, reviewer_id, target_id) is not None async def _recalculate_user_rating(db, user_id: str): """Recompute rating / reviews_count for one user from the reviews collection.""" reviews = await db.reviews.find({ "target_type": "user", "target_id": user_id, }).to_list(length=None) count = len(reviews) avg = round(sum(r.get("rating", 0) for r in reviews) / count, 2) if count else 0.0 if not DRY_RUN: await db.users.update_one( {"_id": ObjectId(user_id)}, {"$set": {"rating": avg, "reviews_count": count}}, ) return count, avg # ── CLASS A — migrate from person_reviews ──────────────────────────────────── async def migrate_person_reviews(db) -> list[str]: """ Move docs from `person_reviews` into `reviews`. Returns list of affected landlord user_ids. """ print("\n" + "=" * 60) print("CLASS A — person_reviews collection") print("=" * 60) docs = await db["person_reviews"].find({}).to_list(length=None) print(f"Found {len(docs)} document(s) in person_reviews") if not docs: return [] affected_users: list[str] = [] migrated = skipped = errors = 0 for doc in docs: doc_id = doc["_id"] # Field mapping: old schema used `reviewed_id` as the landlord's id raw_target = doc.get("reviewed_id") or doc.get("target_id") if not raw_target: print(f" ⚠ Doc {doc_id}: no reviewed_id / target_id — skipping") errors += 1 continue target_id = _str_id(raw_target) reviewer_id = _str_id(doc.get("reviewer_id", "")) if not target_id or not reviewer_id: print(f" ⚠ Doc {doc_id}: missing reviewer_id or target_id — skipping") errors += 1 continue # If the review already exists in the canonical collection, check whether # it's missing review_source="viewing" (saved before today's schema change). existing = await _find_in_reviews(db, reviewer_id, target_id) if existing: needs_patch = existing.get("review_source") != "viewing" property_name_old = ( doc.get("property_name") or doc.get("property") or existing.get("property_name") or "" ) if needs_patch: print(f" ~ Doc {doc_id}: in reviews but missing review_source=viewing — patching") if not DRY_RUN: patch = {"review_source": "viewing"} if property_name_old: patch["property_name"] = property_name_old await db["reviews"].update_one( {"_id": existing["_id"]}, {"$set": patch}, ) affected_users.append(target_id) migrated += 1 else: print(f" ✓ Doc {doc_id}: already correct in reviews — skipping") skipped += 1 continue # Build canonical review doc new_doc = { "target_type": "user", "target_id": target_id, "reviewer_id": reviewer_id, "reviewer_name": doc.get("reviewer_name") or "", "rating": doc.get("rating", 0), "review_text": doc.get("review_text") or doc.get("comment") or "", "created_at": doc.get("created_at", datetime.utcnow()), "review_source": "viewing", "property_name": doc.get("property_name") or doc.get("property") or "", } landlord_name = "(unknown)" landlord = await db["users"].find_one({"_id": ObjectId(target_id)}) \ if ObjectId.is_valid(target_id) else None if landlord: landlord_name = f"{landlord.get('firstName','')} {landlord.get('lastName','')}".strip() print(f" → Migrating doc {doc_id}: " f"{new_doc['rating']}★ by {reviewer_id} " f"for landlord {landlord_name} ({target_id})") if not DRY_RUN: await db["reviews"].insert_one(new_doc) # Mark the old doc so we know it's been processed (don't delete yet) await db["person_reviews"].update_one( {"_id": doc_id}, {"$set": {"_migrated": True, "_migrated_at": datetime.utcnow()}}, ) affected_users.append(target_id) migrated += 1 print(f"\n Migrated: {migrated} | Already present: {skipped} | Errors: {errors}") return affected_users # ── CLASS B — fix ObjectId target_id in reviews ────────────────────────────── async def fix_objectid_target_ids(db) -> list[str]: """ Find user reviews in `reviews` where target_id is a BSON ObjectId (not a string) and convert them to strings. Returns list of affected landlord user_ids. """ print("\n" + "=" * 60) print("CLASS B — reviews with ObjectId target_id") print("=" * 60) # Motor returns ObjectId fields as Python ObjectId objects; we can match # documents where target_id is not a string by checking the $type operator. # BSON type 7 = ObjectId, type 2 = string. cursor = db["reviews"].find({ "target_type": "user", "target_id": {"$type": "objectId"}, # stored as ObjectId, should be string }) docs = await cursor.to_list(length=None) print(f"Found {len(docs)} review(s) with ObjectId target_id") affected_users: list[str] = [] for doc in docs: doc_id = doc["_id"] old_val = doc["target_id"] # ObjectId new_val = str(old_val) # string reviewer_id = _str_id(doc.get("reviewer_id", "")) landlord = await db["users"].find_one({"_id": old_val}) landlord_name = "" if landlord: landlord_name = f"{landlord.get('firstName','')} {landlord.get('lastName','')}".strip() print(f" → Fix doc {doc_id}: target_id {old_val} → '{new_val}' " f"(landlord: {landlord_name or 'not found'})") if not DRY_RUN: await db["reviews"].update_one( {"_id": doc_id}, {"$set": { "target_id": new_val, "review_source": doc.get("review_source") or "viewing", }}, ) affected_users.append(new_val) print(f"\n Fixed: {len(docs)}") return affected_users # ── Rating recalculation ────────────────────────────────────────────────────── async def recalculate_affected_ratings(db, user_ids: list[str]): print("\n" + "=" * 60) print("Recalculating ratings for affected landlords") print("=" * 60) unique_ids = list(set(uid for uid in user_ids if uid and ObjectId.is_valid(uid))) if not unique_ids: print(" No users to update.") return for uid in unique_ids: user = await db["users"].find_one({"_id": ObjectId(uid)}) name = "" if user: name = f"{user.get('firstName','')} {user.get('lastName','')}".strip() count, avg = await _recalculate_user_rating(db, uid) flag = "" if DRY_RUN else " ✅" print(f" {name or uid}: {count} review(s), avg {avg}★{flag}") # ── main ───────────────────────────────────────────────────────────────────── async def main(): print("\n" + "=" * 60) print("VIEWING REVIEWS MIGRATION") if DRY_RUN: print("MODE: DRY RUN — no changes will be written") print(" Pass --apply to commit changes") else: print("MODE: APPLY — changes will be written to MongoDB") print("=" * 60) print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") db = await _get_db() print(f"Connected — db: {settings.MONGODB_DATABASE}") affected_a = await migrate_person_reviews(db) affected_b = await fix_objectid_target_ids(db) all_affected = affected_a + affected_b await recalculate_affected_ratings(db, all_affected) print("\n" + "=" * 60) total = len(affected_a) + len(affected_b) if DRY_RUN: print(f"DRY RUN COMPLETE — {total} review(s) would be fixed.") print("Run with --apply to commit.") else: print(f"MIGRATION COMPLETE — {total} review(s) fixed.") print(f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) if __name__ == "__main__": asyncio.run(main())