AIDA / scripts /backfill_alerts.py
destinyebuka's picture
fyp
5abadff
#!/usr/bin/env python3
"""
One-time Backfill Script: Send notifications for existing listings matching alerts.
This script:
1. Fetches all active search_alerts
2. Fetches all published listings
3. Checks each listing against each alert
4. Sends DM notifications for matches (avoiding duplicates)
Usage:
cd python-Backend/lojiz-backend/AIDA
python -m scripts.backfill_alerts
Or run as standalone:
python scripts/backfill_alerts.py
"""
import asyncio
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from datetime import datetime
from typing import List, Dict, Any
async def run_backfill():
"""Main backfill function."""
# Import after path setup
from app.database import get_db, connect_db, disconnect_db
from app.models.search_alert import SearchAlert
from app.services.alert_service import check_listing_matches_alert, notify_user_of_match
print("="*60)
print(" ALERT BACKFILL SCRIPT")
print("="*60)
print(f"Started at: {datetime.now().isoformat()}")
print()
# Initialize DB Connection
await connect_db()
try:
db = await get_db()
# 1. Fetch all active alerts
print(" Fetching active search alerts...")
alerts_cursor = db.search_alerts.find({"is_active": True})
alerts = [SearchAlert(**doc) async for doc in alerts_cursor]
print(f" Found {len(alerts)} active alerts")
if not alerts:
print(" No active alerts found. Nothing to backfill.")
return
# Print alert summary
for alert in alerts:
print(f" - User {alert.user_id}: '{alert.user_query}'")
print()
# 2. Fetch all published listings
print(" Fetching published listings...")
listings_cursor = db.listings.find({"status": "active"})
listings = await listings_cursor.to_list(length=1000) # Limit for safety
print(f" Found {len(listings)} active listings")
print()
if not listings:
print(" No active listings found. Nothing to match.")
return
# 3. Check each listing against each alert
print(" Checking matches...")
matches_found = 0
notifications_sent = 0
skipped_duplicates = 0
for listing in listings:
listing_id = str(listing["_id"])
listing_title = listing.get("title", "Untitled")
listing_location = listing.get("location", "Unknown")
for alert in alerts:
# Check if this alert already received notification for this listing
# (We store notified listing IDs in alert to prevent duplicate notifications)
notified_listings = alert.notified_listing_ids or []
if listing_id in notified_listings:
skipped_duplicates += 1
continue
# Check if listing matches alert criteria
if await check_listing_matches_alert(listing, alert):
matches_found += 1
print(f" MATCH: '{listing_title}' ({listing_location}) -> User {alert.user_id}")
try:
# Send notification
await notify_user_of_match(alert, listing)
notifications_sent += 1
# Mark listing as notified for this alert (prevent duplicates)
await db.search_alerts.update_one(
{"_id": alert.id},
{"$addToSet": {"notified_listing_ids": listing_id}}
)
print(f" Notification sent!")
except Exception as e:
print(f" Failed to notify: {e}")
# 4. Summary
print()
print("="*60)
print(" BACKFILL SUMMARY")
print("="*60)
print(f" Alerts processed: {len(alerts)}")
print(f" Listings checked: {len(listings)}")
print(f" Matches found: {matches_found}")
print(f" Notifications sent: {notifications_sent}")
print(f" Duplicates skipped: {skipped_duplicates}")
print()
print(f"Completed at: {datetime.now().isoformat()}")
print("="*60)
finally:
await disconnect_db()
if __name__ == "__main__":
print("\n" + " Starting Alert Backfill Script..." + "\n")
asyncio.run(run_backfill())
print("\n" + " Script completed!" + "\n")