Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Migration: Rescue misrouted viewing reviews. | |
| Two classes of bad data this script fixes: | |
| CLASS A β person_reviews collection | |
| dm_brain.py (before the bug fix) wrote viewing reviews to a legacy | |
| `person_reviews` collection using field `reviewed_id` instead of | |
| `target_id`. These docs never showed up on the landlord's profile. | |
| CLASS B β reviews collection with ObjectId target_id | |
| viewing_notification_jobs.py stored `landlord_id` (a raw ObjectId) in | |
| the pending_viewing_review session, so dm_brain wrote the review with | |
| target_id = <ObjectId object> instead of a plain string. MongoDB's | |
| $match on the string version found nothing. | |
| Both classes are migrated into `reviews` with: | |
| target_type = "user" | |
| target_id = str(landlord_id) | |
| review_source = "viewing" | |
| After migrating, all affected users' ratings are recalculated from scratch. | |
| Run: | |
| python scripts/migrate_viewing_reviews.py # dry-run (shows what would change) | |
| python scripts/migrate_viewing_reviews.py --apply # actually writes to MongoDB | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| from datetime import datetime | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from bson import ObjectId | |
| from app.config import settings | |
| DRY_RUN = "--apply" not in sys.argv | |
| # ββ helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _str_id(val) -> str: | |
| """Safely coerce ObjectId or string to string.""" | |
| if val is None: | |
| return "" | |
| return str(val) | |
| async def _get_db(): | |
| client = AsyncIOMotorClient(settings.MONGODB_URL, serverSelectionTimeoutMS=5000) | |
| await client.admin.command("ping") | |
| return client[settings.MONGODB_DATABASE] | |
| def _is_objectid(val) -> bool: | |
| return isinstance(val, ObjectId) | |
| async def _find_in_reviews(db, reviewer_id: str, target_id: str): | |
| """Return matching review doc from canonical collection, or None.""" | |
| return await db.reviews.find_one({ | |
| "reviewer_id": reviewer_id, | |
| "target_type": "user", | |
| "target_id": target_id, | |
| }) | |
| async def _already_in_reviews(db, reviewer_id: str, target_id: str) -> bool: | |
| return await _find_in_reviews(db, reviewer_id, target_id) is not None | |
| async def _recalculate_user_rating(db, user_id: str): | |
| """Recompute rating / reviews_count for one user from the reviews collection.""" | |
| reviews = await db.reviews.find({ | |
| "target_type": "user", | |
| "target_id": user_id, | |
| }).to_list(length=None) | |
| count = len(reviews) | |
| avg = round(sum(r.get("rating", 0) for r in reviews) / count, 2) if count else 0.0 | |
| if not DRY_RUN: | |
| await db.users.update_one( | |
| {"_id": ObjectId(user_id)}, | |
| {"$set": {"rating": avg, "reviews_count": count}}, | |
| ) | |
| return count, avg | |
| # ββ CLASS A β migrate from person_reviews ββββββββββββββββββββββββββββββββββββ | |
| async def migrate_person_reviews(db) -> list[str]: | |
| """ | |
| Move docs from `person_reviews` into `reviews`. | |
| Returns list of affected landlord user_ids. | |
| """ | |
| print("\n" + "=" * 60) | |
| print("CLASS A β person_reviews collection") | |
| print("=" * 60) | |
| docs = await db["person_reviews"].find({}).to_list(length=None) | |
| print(f"Found {len(docs)} document(s) in person_reviews") | |
| if not docs: | |
| return [] | |
| affected_users: list[str] = [] | |
| migrated = skipped = errors = 0 | |
| for doc in docs: | |
| doc_id = doc["_id"] | |
| # Field mapping: old schema used `reviewed_id` as the landlord's id | |
| raw_target = doc.get("reviewed_id") or doc.get("target_id") | |
| if not raw_target: | |
| print(f" β Doc {doc_id}: no reviewed_id / target_id β skipping") | |
| errors += 1 | |
| continue | |
| target_id = _str_id(raw_target) | |
| reviewer_id = _str_id(doc.get("reviewer_id", "")) | |
| if not target_id or not reviewer_id: | |
| print(f" β Doc {doc_id}: missing reviewer_id or target_id β skipping") | |
| errors += 1 | |
| continue | |
| # If the review already exists in the canonical collection, check whether | |
| # it's missing review_source="viewing" (saved before today's schema change). | |
| existing = await _find_in_reviews(db, reviewer_id, target_id) | |
| if existing: | |
| needs_patch = existing.get("review_source") != "viewing" | |
| property_name_old = ( | |
| doc.get("property_name") or doc.get("property") or | |
| existing.get("property_name") or "" | |
| ) | |
| if needs_patch: | |
| print(f" ~ Doc {doc_id}: in reviews but missing review_source=viewing β patching") | |
| if not DRY_RUN: | |
| patch = {"review_source": "viewing"} | |
| if property_name_old: | |
| patch["property_name"] = property_name_old | |
| await db["reviews"].update_one( | |
| {"_id": existing["_id"]}, | |
| {"$set": patch}, | |
| ) | |
| affected_users.append(target_id) | |
| migrated += 1 | |
| else: | |
| print(f" β Doc {doc_id}: already correct in reviews β skipping") | |
| skipped += 1 | |
| continue | |
| # Build canonical review doc | |
| new_doc = { | |
| "target_type": "user", | |
| "target_id": target_id, | |
| "reviewer_id": reviewer_id, | |
| "reviewer_name": doc.get("reviewer_name") or "", | |
| "rating": doc.get("rating", 0), | |
| "review_text": doc.get("review_text") or doc.get("comment") or "", | |
| "created_at": doc.get("created_at", datetime.utcnow()), | |
| "review_source": "viewing", | |
| "property_name": doc.get("property_name") or doc.get("property") or "", | |
| } | |
| landlord_name = "(unknown)" | |
| landlord = await db["users"].find_one({"_id": ObjectId(target_id)}) \ | |
| if ObjectId.is_valid(target_id) else None | |
| if landlord: | |
| landlord_name = f"{landlord.get('firstName','')} {landlord.get('lastName','')}".strip() | |
| print(f" β Migrating doc {doc_id}: " | |
| f"{new_doc['rating']}β by {reviewer_id} " | |
| f"for landlord {landlord_name} ({target_id})") | |
| if not DRY_RUN: | |
| await db["reviews"].insert_one(new_doc) | |
| # Mark the old doc so we know it's been processed (don't delete yet) | |
| await db["person_reviews"].update_one( | |
| {"_id": doc_id}, | |
| {"$set": {"_migrated": True, "_migrated_at": datetime.utcnow()}}, | |
| ) | |
| affected_users.append(target_id) | |
| migrated += 1 | |
| print(f"\n Migrated: {migrated} | Already present: {skipped} | Errors: {errors}") | |
| return affected_users | |
| # ββ CLASS B β fix ObjectId target_id in reviews ββββββββββββββββββββββββββββββ | |
| async def fix_objectid_target_ids(db) -> list[str]: | |
| """ | |
| Find user reviews in `reviews` where target_id is a BSON ObjectId | |
| (not a string) and convert them to strings. | |
| Returns list of affected landlord user_ids. | |
| """ | |
| print("\n" + "=" * 60) | |
| print("CLASS B β reviews with ObjectId target_id") | |
| print("=" * 60) | |
| # Motor returns ObjectId fields as Python ObjectId objects; we can match | |
| # documents where target_id is not a string by checking the $type operator. | |
| # BSON type 7 = ObjectId, type 2 = string. | |
| cursor = db["reviews"].find({ | |
| "target_type": "user", | |
| "target_id": {"$type": "objectId"}, # stored as ObjectId, should be string | |
| }) | |
| docs = await cursor.to_list(length=None) | |
| print(f"Found {len(docs)} review(s) with ObjectId target_id") | |
| affected_users: list[str] = [] | |
| for doc in docs: | |
| doc_id = doc["_id"] | |
| old_val = doc["target_id"] # ObjectId | |
| new_val = str(old_val) # string | |
| reviewer_id = _str_id(doc.get("reviewer_id", "")) | |
| landlord = await db["users"].find_one({"_id": old_val}) | |
| landlord_name = "" | |
| if landlord: | |
| landlord_name = f"{landlord.get('firstName','')} {landlord.get('lastName','')}".strip() | |
| print(f" β Fix doc {doc_id}: target_id {old_val} β '{new_val}' " | |
| f"(landlord: {landlord_name or 'not found'})") | |
| if not DRY_RUN: | |
| await db["reviews"].update_one( | |
| {"_id": doc_id}, | |
| {"$set": { | |
| "target_id": new_val, | |
| "review_source": doc.get("review_source") or "viewing", | |
| }}, | |
| ) | |
| affected_users.append(new_val) | |
| print(f"\n Fixed: {len(docs)}") | |
| return affected_users | |
| # ββ Rating recalculation ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def recalculate_affected_ratings(db, user_ids: list[str]): | |
| print("\n" + "=" * 60) | |
| print("Recalculating ratings for affected landlords") | |
| print("=" * 60) | |
| unique_ids = list(set(uid for uid in user_ids if uid and ObjectId.is_valid(uid))) | |
| if not unique_ids: | |
| print(" No users to update.") | |
| return | |
| for uid in unique_ids: | |
| user = await db["users"].find_one({"_id": ObjectId(uid)}) | |
| name = "" | |
| if user: | |
| name = f"{user.get('firstName','')} {user.get('lastName','')}".strip() | |
| count, avg = await _recalculate_user_rating(db, uid) | |
| flag = "" if DRY_RUN else " β " | |
| print(f" {name or uid}: {count} review(s), avg {avg}β {flag}") | |
| # ββ main βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def main(): | |
| print("\n" + "=" * 60) | |
| print("VIEWING REVIEWS MIGRATION") | |
| if DRY_RUN: | |
| print("MODE: DRY RUN β no changes will be written") | |
| print(" Pass --apply to commit changes") | |
| else: | |
| print("MODE: APPLY β changes will be written to MongoDB") | |
| print("=" * 60) | |
| print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| db = await _get_db() | |
| print(f"Connected β db: {settings.MONGODB_DATABASE}") | |
| affected_a = await migrate_person_reviews(db) | |
| affected_b = await fix_objectid_target_ids(db) | |
| all_affected = affected_a + affected_b | |
| await recalculate_affected_ratings(db, all_affected) | |
| print("\n" + "=" * 60) | |
| total = len(affected_a) + len(affected_b) | |
| if DRY_RUN: | |
| print(f"DRY RUN COMPLETE β {total} review(s) would be fixed.") | |
| print("Run with --apply to commit.") | |
| else: | |
| print(f"MIGRATION COMPLETE β {total} review(s) fixed.") | |
| print(f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print("=" * 60) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |