AIDA / scripts /migrate_viewing_reviews.py
destinyebuka's picture
new update
9a0c720
#!/usr/bin/env python3
"""
Migration: Rescue misrouted viewing reviews.
Two classes of bad data this script fixes:
CLASS A β€” person_reviews collection
dm_brain.py (before the bug fix) wrote viewing reviews to a legacy
`person_reviews` collection using field `reviewed_id` instead of
`target_id`. These docs never showed up on the landlord's profile.
CLASS B β€” reviews collection with ObjectId target_id
viewing_notification_jobs.py stored `landlord_id` (a raw ObjectId) in
the pending_viewing_review session, so dm_brain wrote the review with
target_id = <ObjectId object> instead of a plain string. MongoDB's
$match on the string version found nothing.
Both classes are migrated into `reviews` with:
target_type = "user"
target_id = str(landlord_id)
review_source = "viewing"
After migrating, all affected users' ratings are recalculated from scratch.
Run:
python scripts/migrate_viewing_reviews.py # dry-run (shows what would change)
python scripts/migrate_viewing_reviews.py --apply # actually writes to MongoDB
"""
import asyncio
import sys
import os
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from motor.motor_asyncio import AsyncIOMotorClient
from bson import ObjectId
from app.config import settings
DRY_RUN = "--apply" not in sys.argv
# ── helpers ──────────────────────────────────────────────────────────────────
def _str_id(val) -> str:
"""Safely coerce ObjectId or string to string."""
if val is None:
return ""
return str(val)
async def _get_db():
client = AsyncIOMotorClient(settings.MONGODB_URL, serverSelectionTimeoutMS=5000)
await client.admin.command("ping")
return client[settings.MONGODB_DATABASE]
def _is_objectid(val) -> bool:
return isinstance(val, ObjectId)
async def _find_in_reviews(db, reviewer_id: str, target_id: str):
"""Return matching review doc from canonical collection, or None."""
return await db.reviews.find_one({
"reviewer_id": reviewer_id,
"target_type": "user",
"target_id": target_id,
})
async def _already_in_reviews(db, reviewer_id: str, target_id: str) -> bool:
return await _find_in_reviews(db, reviewer_id, target_id) is not None
async def _recalculate_user_rating(db, user_id: str):
"""Recompute rating / reviews_count for one user from the reviews collection."""
reviews = await db.reviews.find({
"target_type": "user",
"target_id": user_id,
}).to_list(length=None)
count = len(reviews)
avg = round(sum(r.get("rating", 0) for r in reviews) / count, 2) if count else 0.0
if not DRY_RUN:
await db.users.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"rating": avg, "reviews_count": count}},
)
return count, avg
# ── CLASS A β€” migrate from person_reviews ────────────────────────────────────
async def migrate_person_reviews(db) -> list[str]:
"""
Move docs from `person_reviews` into `reviews`.
Returns list of affected landlord user_ids.
"""
print("\n" + "=" * 60)
print("CLASS A β€” person_reviews collection")
print("=" * 60)
docs = await db["person_reviews"].find({}).to_list(length=None)
print(f"Found {len(docs)} document(s) in person_reviews")
if not docs:
return []
affected_users: list[str] = []
migrated = skipped = errors = 0
for doc in docs:
doc_id = doc["_id"]
# Field mapping: old schema used `reviewed_id` as the landlord's id
raw_target = doc.get("reviewed_id") or doc.get("target_id")
if not raw_target:
print(f" ⚠ Doc {doc_id}: no reviewed_id / target_id β€” skipping")
errors += 1
continue
target_id = _str_id(raw_target)
reviewer_id = _str_id(doc.get("reviewer_id", ""))
if not target_id or not reviewer_id:
print(f" ⚠ Doc {doc_id}: missing reviewer_id or target_id β€” skipping")
errors += 1
continue
# If the review already exists in the canonical collection, check whether
# it's missing review_source="viewing" (saved before today's schema change).
existing = await _find_in_reviews(db, reviewer_id, target_id)
if existing:
needs_patch = existing.get("review_source") != "viewing"
property_name_old = (
doc.get("property_name") or doc.get("property") or
existing.get("property_name") or ""
)
if needs_patch:
print(f" ~ Doc {doc_id}: in reviews but missing review_source=viewing β€” patching")
if not DRY_RUN:
patch = {"review_source": "viewing"}
if property_name_old:
patch["property_name"] = property_name_old
await db["reviews"].update_one(
{"_id": existing["_id"]},
{"$set": patch},
)
affected_users.append(target_id)
migrated += 1
else:
print(f" βœ“ Doc {doc_id}: already correct in reviews β€” skipping")
skipped += 1
continue
# Build canonical review doc
new_doc = {
"target_type": "user",
"target_id": target_id,
"reviewer_id": reviewer_id,
"reviewer_name": doc.get("reviewer_name") or "",
"rating": doc.get("rating", 0),
"review_text": doc.get("review_text") or doc.get("comment") or "",
"created_at": doc.get("created_at", datetime.utcnow()),
"review_source": "viewing",
"property_name": doc.get("property_name") or doc.get("property") or "",
}
landlord_name = "(unknown)"
landlord = await db["users"].find_one({"_id": ObjectId(target_id)}) \
if ObjectId.is_valid(target_id) else None
if landlord:
landlord_name = f"{landlord.get('firstName','')} {landlord.get('lastName','')}".strip()
print(f" β†’ Migrating doc {doc_id}: "
f"{new_doc['rating']}β˜… by {reviewer_id} "
f"for landlord {landlord_name} ({target_id})")
if not DRY_RUN:
await db["reviews"].insert_one(new_doc)
# Mark the old doc so we know it's been processed (don't delete yet)
await db["person_reviews"].update_one(
{"_id": doc_id},
{"$set": {"_migrated": True, "_migrated_at": datetime.utcnow()}},
)
affected_users.append(target_id)
migrated += 1
print(f"\n Migrated: {migrated} | Already present: {skipped} | Errors: {errors}")
return affected_users
# ── CLASS B β€” fix ObjectId target_id in reviews ──────────────────────────────
async def fix_objectid_target_ids(db) -> list[str]:
"""
Find user reviews in `reviews` where target_id is a BSON ObjectId
(not a string) and convert them to strings.
Returns list of affected landlord user_ids.
"""
print("\n" + "=" * 60)
print("CLASS B β€” reviews with ObjectId target_id")
print("=" * 60)
# Motor returns ObjectId fields as Python ObjectId objects; we can match
# documents where target_id is not a string by checking the $type operator.
# BSON type 7 = ObjectId, type 2 = string.
cursor = db["reviews"].find({
"target_type": "user",
"target_id": {"$type": "objectId"}, # stored as ObjectId, should be string
})
docs = await cursor.to_list(length=None)
print(f"Found {len(docs)} review(s) with ObjectId target_id")
affected_users: list[str] = []
for doc in docs:
doc_id = doc["_id"]
old_val = doc["target_id"] # ObjectId
new_val = str(old_val) # string
reviewer_id = _str_id(doc.get("reviewer_id", ""))
landlord = await db["users"].find_one({"_id": old_val})
landlord_name = ""
if landlord:
landlord_name = f"{landlord.get('firstName','')} {landlord.get('lastName','')}".strip()
print(f" β†’ Fix doc {doc_id}: target_id {old_val} β†’ '{new_val}' "
f"(landlord: {landlord_name or 'not found'})")
if not DRY_RUN:
await db["reviews"].update_one(
{"_id": doc_id},
{"$set": {
"target_id": new_val,
"review_source": doc.get("review_source") or "viewing",
}},
)
affected_users.append(new_val)
print(f"\n Fixed: {len(docs)}")
return affected_users
# ── Rating recalculation ──────────────────────────────────────────────────────
async def recalculate_affected_ratings(db, user_ids: list[str]):
print("\n" + "=" * 60)
print("Recalculating ratings for affected landlords")
print("=" * 60)
unique_ids = list(set(uid for uid in user_ids if uid and ObjectId.is_valid(uid)))
if not unique_ids:
print(" No users to update.")
return
for uid in unique_ids:
user = await db["users"].find_one({"_id": ObjectId(uid)})
name = ""
if user:
name = f"{user.get('firstName','')} {user.get('lastName','')}".strip()
count, avg = await _recalculate_user_rating(db, uid)
flag = "" if DRY_RUN else " βœ…"
print(f" {name or uid}: {count} review(s), avg {avg}β˜…{flag}")
# ── main ─────────────────────────────────────────────────────────────────────
async def main():
print("\n" + "=" * 60)
print("VIEWING REVIEWS MIGRATION")
if DRY_RUN:
print("MODE: DRY RUN β€” no changes will be written")
print(" Pass --apply to commit changes")
else:
print("MODE: APPLY β€” changes will be written to MongoDB")
print("=" * 60)
print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
db = await _get_db()
print(f"Connected β€” db: {settings.MONGODB_DATABASE}")
affected_a = await migrate_person_reviews(db)
affected_b = await fix_objectid_target_ids(db)
all_affected = affected_a + affected_b
await recalculate_affected_ratings(db, all_affected)
print("\n" + "=" * 60)
total = len(affected_a) + len(affected_b)
if DRY_RUN:
print(f"DRY RUN COMPLETE β€” {total} review(s) would be fixed.")
print("Run with --apply to commit.")
else:
print(f"MIGRATION COMPLETE β€” {total} review(s) fixed.")
print(f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())