AIDA / app /services /alert_service.py
destinyebuka's picture
fyp
e5a08b1
import logging
from datetime import datetime
from typing import Optional, Tuple
from bson import ObjectId
from app.database import get_db
from app.models.search_alert import SearchAlert, AlertStatus
from app.services.conversation_service import conversation_service
from app.models.conversation import Conversation
from app.models.message import Message
from app.services.listing_service import enrich_listings_batch
logger = logging.getLogger(__name__)
AIDA_BOT_ID = "AIDA_BOT"
# ============================================================
# SEMANTIC DEDUPLICATION
# ============================================================
async def find_similar_alert(user_id: str, params: dict) -> Optional[SearchAlert]:
"""
Find existing alert with similar criteria (semantic deduplication).
Instead of exact text matching, we compare:
- Location (fuzzy match)
- Listing type
- Price range (within 20%)
- Bedrooms (within ±1)
Returns the similar alert if found (75%+ match), else None.
"""
db = await get_db()
# Get all active/paused alerts for user
cursor = db.search_alerts.find({
"user_id": user_id,
"status": {"$in": [AlertStatus.ACTIVE.value, AlertStatus.PAUSED.value, None]},
"is_active": True # Backward compatibility
})
new_location = (params.get("location") or "").lower().strip()
new_type = (params.get("listing_type") or "").lower().strip()
new_max_price = params.get("max_price")
new_min_price = params.get("min_price")
new_bedrooms = params.get("bedrooms")
async for doc in cursor:
existing_params = doc.get("search_params", {})
match_score = 0
# Location match (fuzzy - contains or contained by)
existing_loc = (existing_params.get("location") or "").lower().strip()
if new_location and existing_loc:
if new_location in existing_loc or existing_loc in new_location:
match_score += 25
elif not new_location and not existing_loc:
match_score += 25 # Both unspecified = match
# Type match
existing_type = (existing_params.get("listing_type") or "").lower().strip()
if new_type == existing_type:
match_score += 25
elif not new_type or not existing_type:
match_score += 15 # One unspecified = partial match
# Price match (within 20% tolerance)
existing_max = existing_params.get("max_price")
if new_max_price and existing_max:
try:
diff_ratio = abs(new_max_price - existing_max) / max(new_max_price, existing_max)
if diff_ratio <= 0.2:
match_score += 25
elif diff_ratio <= 0.4:
match_score += 15 # Close enough
except (TypeError, ZeroDivisionError):
pass
elif not new_max_price and not existing_max:
match_score += 25 # Both unspecified
# Bedrooms match
existing_beds = existing_params.get("bedrooms")
if new_bedrooms is not None and existing_beds is not None:
if new_bedrooms == existing_beds:
match_score += 25
elif abs(new_bedrooms - existing_beds) <= 1:
match_score += 15 # Close enough
elif new_bedrooms is None and existing_beds is None:
match_score += 25 # Both unspecified
# If 75%+ match, consider it a duplicate
if match_score >= 75:
logger.info(f"Found similar alert (score={match_score}): {doc.get('user_query')}")
return SearchAlert(**doc)
return None
async def create_alert(
user_id: str,
query: str,
params: dict,
use_voice: bool = False,
alert_type: str = "standard",
) -> Tuple[SearchAlert, Optional[SearchAlert]]:
"""
Create a new search alert with semantic deduplication.
Args:
user_id: User ID to create alert for
query: Original search query text
params: Search parameters (location, price, type, etc.)
use_voice: If True, send notifications as voice messages
alert_type: "standard" | "roommate" | "short_stay" | "viewing"
Returns:
Tuple of (alert, existing_similar):
- If existing_similar is not None, a similar alert already exists
- The first value is the alert to use (either new or existing)
"""
db = await get_db()
# STEP 1: Check for semantically similar alert
similar = await find_similar_alert(user_id, params)
if similar:
logger.info(f"Semantic dedup: Found similar alert {similar.id} for user {user_id}")
if similar.use_voice != use_voice:
await db.search_alerts.update_one(
{"_id": ObjectId(similar.id)},
{"$set": {"use_voice": use_voice}},
)
similar.use_voice = use_voice
return (similar, similar) # Second value indicates it already existed
# STEP 2: Create new alert with normalized criteria
normalized = {
"location": (params.get("location") or "").lower().strip(),
"listing_type": (params.get("listing_type") or "").lower().strip(),
"max_price": params.get("max_price"),
"min_price": params.get("min_price"),
"bedrooms": params.get("bedrooms"),
}
alert = SearchAlert(
user_id=user_id,
user_query=query,
search_params=params,
use_voice=use_voice,
alert_type=alert_type,
status=AlertStatus.ACTIVE,
normalized_criteria=normalized,
)
new_alert = await db.search_alerts.insert_one(
alert.model_dump(by_alias=True, exclude=["id"])
)
created_alert = await db.search_alerts.find_one({"_id": new_alert.inserted_id})
return (SearchAlert(**created_alert), None) # None = newly created
# ============================================================
# ALERT ENRICHMENT — called by specialist agents
# ============================================================
async def enrich_roommate_alert(alert_id: str, lifestyle_data: dict) -> bool:
"""
Append lifestyle/matching criteria to a roommate alert.
Called by Matcher after collecting the seeker's profile.
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id)},
{"$set": {"matching_criteria": lifestyle_data, "alert_type": "roommate"}},
)
logger.info(f"Enriched roommate alert {alert_id} with lifestyle data")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to enrich roommate alert {alert_id}: {e}")
return False
async def enrich_shortstay_alert(alert_id: str, stay_criteria: dict) -> bool:
"""
Append stay criteria (check_in, check_out, guests) to a short-stay alert.
Called by Concierge after collecting booking details.
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id)},
{"$set": {"stay_criteria": stay_criteria, "alert_type": "short_stay"}},
)
logger.info(f"Enriched short-stay alert {alert_id} with stay criteria")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to enrich short-stay alert {alert_id}: {e}")
return False
async def enrich_viewing_alert(alert_id: str, viewing_criteria: dict) -> bool:
"""
Append viewing preferences to a rental/sale alert.
Called by Broker after collecting the user's scheduling preferences.
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id)},
{"$set": {"viewing_criteria": viewing_criteria, "alert_type": "viewing"}},
)
logger.info(f"Enriched viewing alert {alert_id} with viewing criteria")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to enrich viewing alert {alert_id}: {e}")
return False
async def delete_alert(alert_id: str, user_id: str) -> bool:
"""Delete a user's alert (soft delete)."""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id), "user_id": user_id},
{"$set": {"is_active": False}}
)
return result.modified_count > 0
except Exception:
return False
async def delete_alert_by_criteria(user_id: str, criteria: dict) -> int:
"""
Delete alerts based on search criteria.
Used when user says "stop notifying me about houses in Cotonou" or
"I found what I wanted" while replying to a property card.
Args:
user_id: User ID
criteria: Search criteria to match against alert search_params
(location, listing_type, min_price, max_price, bedrooms, bathrooms)
Returns:
Number of alerts deleted
"""
db = await get_db()
# Build query to find matching alerts
query = {
"user_id": user_id,
"is_active": True,
}
# Match alerts by location (case-insensitive partial match)
location = (criteria.get("location") or "").lower().strip()
deleted_count = 0
# Find all active alerts for user
cursor = db.search_alerts.find(query)
async for alert in cursor:
alert_params = alert.get("search_params") or {}
alert_location = (alert_params.get("location") or "").lower()
alert_type = (alert_params.get("listing_type") or "").lower()
matched = False
# Match by location
if location and (location in alert_location or alert_location in location):
matched = True
# Match by listing type
if criteria.get("listing_type"):
if (criteria["listing_type"] or "").lower() == alert_type:
matched = True
# If no specific criteria, but user is replying to property in that location
if not location and not criteria.get("listing_type"):
# This happens when user swipes on a property card and says "stop notifying me"
# We need to match based on broader context - delete alerts matching the property's location
if criteria.get("property_location"):
prop_loc = (criteria["property_location"] or "").lower()
if prop_loc in alert_location or alert_location in prop_loc:
matched = True
# Soft delete matched alerts
if matched:
await db.search_alerts.update_one(
{"_id": alert["_id"]},
{"$set": {"is_active": False}}
)
deleted_count += 1
logger.info(f"Deleted alert {alert['_id']} for user {user_id} - matched criteria: {criteria}")
return deleted_count
async def get_user_alerts(user_id: str) -> list[SearchAlert]:
"""Get all active alerts for a user."""
db = await get_db()
alerts = []
cursor = db.search_alerts.find({"user_id": user_id, "is_active": True})
async for doc in cursor:
alerts.append(SearchAlert(**doc))
return alerts
async def check_listing_matches_alert(listing: dict, alert: SearchAlert) -> bool:
"""
Check if a listing matches an alert's STRICT criteria.
We reuse the 'strict' logic principles here manually.
"""
params = alert.search_params
# 1. Location (Must match)
if params.get("location"):
# Simple string containment or equality for robustness
loc_param = (params["location"] or "").lower()
loc_listing = (listing.get("location") or "").lower()
if loc_param not in loc_listing:
return False
# 2. Listing Type (Must match)
if params.get("listing_type"):
type_param = (params["listing_type"] or "").lower()
type_listing = (listing.get("listing_type") or "").lower()
if type_param != type_listing:
return False
# 3. Price (Must be within range)
price = listing.get("price")
if price is not None: # Only check price if it exists
if params.get("min_price") and price < params["min_price"]:
return False
if params.get("max_price") and price > params["max_price"]:
return False
# 4. Currency (Must match if specified)
if params.get("currency"):
if listing.get("currency") != params["currency"]:
return False
return True
async def process_new_listing(listing: dict) -> int:
"""
Called when a new listing is created. Checks all alerts and notifies users.
Returns the number of notifications sent.
"""
db = await get_db()
listing_owner_id = str(listing.get("user_id", ""))
notification_count = 0
# Get all active alerts (excluding the listing owner - don't notify about own listings)
cursor = db.search_alerts.find({
"is_active": True,
"user_id": {"$ne": listing_owner_id} # Skip owner's own alerts
})
async for doc in cursor:
alert = SearchAlert(**doc)
try:
# Check match
if await check_listing_matches_alert(listing, alert):
# Ensure listing is enriched before notifying
enriched_listing = (await enrich_listings_batch([listing], db))[0]
await notify_user_of_match(alert, enriched_listing)
notification_count += 1
except Exception as e:
logger.error(f"Error checking/notifying alert {alert.id} for listing {listing.get('_id')}: {e}")
logger.error(f"Alert params: {alert.search_params}")
logger.error(f"Listing price: {listing.get('price')}")
raise # Re-raise to see in backfill
return notification_count
async def notify_user_of_match(alert: SearchAlert, listing: dict):
"""
Sends a DM to the user with the listing.
"""
db = await get_db()
# 1. Find or Create conversation between AIDA_BOT and User
# We must do this manually because start_or_get_conversation enforces listing_id checks
# which don't apply to AIDA-initiated system messages.
participants = sorted([AIDA_BOT_ID, alert.user_id])
participants_key = "::".join(participants)
# Use participants_key for exact match
existing_conversation = await db.conversations.find_one({
"participants_key": participants_key
})
if existing_conversation:
conv_id = str(existing_conversation["_id"])
logger.info(f"Using existing AIDA conversation: {conv_id}")
else:
# Create new system conversation
conversation_doc = Conversation.create_document(
listing_id="system", # Generic ID for system chats
participants=participants,
listing_title="AIDA Assistant",
listing_image=None
)
try:
result = await db.conversations.insert_one(conversation_doc)
conv_id = str(result.inserted_id)
logger.info(f"Created new AIDA conversation: {conv_id}")
except Exception as e:
# Handle race condition: conversation was created between our check and insert
if "duplicate key" in str(e).lower() or "E11000" in str(e):
logger.warning(f"Conversation already exists (race condition), querying again...")
existing_conversation = await db.conversations.find_one({
"participants_key": participants_key
})
if existing_conversation:
conv_id = str(existing_conversation["_id"])
logger.info(f"Found existing conversation after race: {conv_id}")
else:
logger.error(f"Failed to find conversation after duplicate key error: {e}")
raise
else:
logger.error(f"Failed to create conversation: {e}")
raise
# 2. Get user's name for personalization
# Try both string ID and ObjectId since user collection might have either
user = await db.users.find_one({"_id": alert.user_id})
if not user:
# Try with ObjectId
try:
user = await db.users.find_one({"_id": ObjectId(alert.user_id)})
except:
pass
user_name = "there" # Default fallback
if user:
# Try to get first name from various possible field names
first_name = (
user.get("firstName") or
user.get("first_name") or
(user.get("name", "").split()[0] if user.get("name") else None)
)
if first_name:
user_name = first_name
logger.info(f"Personalization: Found user name '{user_name}' for user {alert.user_id}")
# 3. Construct Message using AI (dynamic, personalized & language-aware)
# Get user's language from alert params (default to English)
user_language = (alert.search_params.get("user_language") or "en").lower()
# Generate message using AI for natural variety
text = await _generate_alert_message_with_ai(user_name, alert.user_query, user_language)
logger.info(f"Alert notification using language: {user_language}")
# Continue with sending the message
await _continue_notify_user_of_match(alert, listing, conv_id, text)
async def _generate_alert_message_with_ai(user_name: str, search_query: str, language: str) -> str:
"""Generate a dynamic, personal alert message using AI in any language."""
from openai import AsyncOpenAI
from app.config import settings
# Map common language codes to full names for clearer instructions
language_names = {
"en": "English",
"fr": "French (use tu form, casual)",
"es": "Spanish (use tú form, casual)",
"pt": "Portuguese (use você form, casual)",
"de": "German (use du form, casual)",
"it": "Italian (use tu form, casual)",
"ar": "Arabic",
"zh": "Chinese (Simplified)",
"ja": "Japanese",
"ko": "Korean",
"sw": "Swahili",
"ha": "Hausa",
"yo": "Yoruba",
"ig": "Igbo",
}
# Get language name, or use the code directly if not in our map
lang_name = language_names.get(language.lower(), language.upper())
try:
client = AsyncOpenAI(
api_key=settings.MIMO_API_KEY,
base_url=settings.MIMO_BASE_URL,
)
prompt = f"""Generate a short, friendly notification message for a property search alert.
Context:
- User's name: {user_name}
- Their original search request: "{search_query}"
- A matching property was just found NOW (this is a new listing!)
Style guidelines:
- Start with something like "Hey {user_name}! Remember when you asked me to keep an eye out for [their search]? Guess what—I just found something!"
- Be warm, casual, and excited like a helpful friend who just discovered something great
- Use 1-2 relevant emojis naturally
- Reference that they TOLD you to watch for this type of property
- Express genuine excitement—you've been searching and finally found it!
- End with something like "check it out" or "take a look below"
- Keep it under 100 words
- IMPORTANT: Write the entire message in {lang_name}
Write ONLY the message, nothing else. Be creative and vary your approach."""
response = await client.chat.completions.create(
model=settings.MIMO_MODEL,
messages=[{"role": "user", "content": prompt}],
max_tokens=200,
temperature=0.9, # High temperature for variety
)
generated_text = response.choices[0].message.content.strip()
logger.info(f"AI generated alert message in {lang_name}")
return generated_text
except Exception as e:
logger.warning(f"AI message generation failed, using fallback: {e}")
# Fallback to English template if AI fails
return f"Hey {user_name}! 👋\n\nI found a property matching your search for '{search_query}'. I think you'll love this one!\n\nCheck it out below! 👇"
# Back in notify_user_of_match function - FIXED INDENTATION
async def _continue_notify_user_of_match(alert, listing, conv_id, text):
"""Continue notification after AI message generation (helper to fix indentation)."""
db = await get_db()
# 4. Prepare Property Card (Listing Data)
property_card = {
"listing_id": str(listing.get("_id") or listing.get("id")),
"title": listing.get("title", ""),
"price": listing.get("price", 0),
"currency": listing.get("currency", "NGN"),
"bedrooms": listing.get("bedrooms", 0),
"bathrooms": listing.get("bathrooms", 0),
"location": listing.get("location", ""),
"image_url": listing.get("images", [None])[0] if listing.get("images") else None,
"listing_type": listing.get("listing_type", ""),
# Owner Info (New)
"owner_name": listing.get("owner_name", "Unknown"),
"owner_profile_picture": listing.get("owner_profile_picture"),
"rating": listing.get("rating", 0.0),
"reviews_count": listing.get("reviews_count", 0),
"owner_is_verified": listing.get("owner_is_verified", False),
}
# 5. Check if we should send as voice message
message_type = "property_inquiry" # Default for listing cards
voice_media = None
if getattr(alert, 'use_voice', False):
# Generate voice audio for AIDA's notification message
try:
from app.services.voice_service import voice_service
user_language = (alert.search_params.get("user_language") or "en").lower()
audio_bytes, duration = await voice_service.text_to_speech(text, language=user_language)
audio_url = await voice_service.upload_audio_to_r2(audio_bytes)
voice_media = {
"url": audio_url,
"type": "audio",
"duration": int(duration),
"mimeType": "audio/mpeg"
}
message_type = "voice"
logger.info(f"🔊 Generated voice notification: {audio_url}")
except Exception as e:
logger.warning(f"Failed to generate voice notification, falling back to text: {e}")
# 6. Send Message via Service
await conversation_service.send_message(
conversation_id=conv_id,
current_user_id=AIDA_BOT_ID,
current_user_name="AIDA",
current_user_avatar=None,
message_type=message_type,
content=text,
property_card=property_card,
media=voice_media # Will be None for text messages
)
# 7. Update Alert stats (use ObjectId for _id query)
alert_id = alert.id if isinstance(alert.id, ObjectId) else ObjectId(alert.id) if alert.id else None
if alert_id:
await db.search_alerts.update_one(
{"_id": alert_id},
{"$set": {"last_notified_at": datetime.utcnow()}}
)
logger.info(f"Notified user {alert.user_id} of listing match {listing.get('_id')} (voice={getattr(alert, 'use_voice', False)})")
# ============================================================
# ALERT LIFECYCLE MANAGEMENT
# ============================================================
async def pause_alert(alert_id: str, user_id: str) -> bool:
"""
Pause alert when matches are found and awaiting user response.
This happens when AIDA finds matches and sends them to the user.
The alert waits for user to decide before searching again.
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id), "user_id": user_id},
{"$set": {"status": AlertStatus.PAUSED.value}}
)
if result.modified_count > 0:
logger.info(f"Alert {alert_id} paused - awaiting user response")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to pause alert {alert_id}: {e}")
return False
async def pursue_listing(alert_id: str, user_id: str, listing_id: str) -> bool:
"""
User is interested in a specific listing.
Alert stops searching while user pursues this property.
Can be resumed if deal falls through.
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id), "user_id": user_id},
{"$set": {
"status": AlertStatus.PURSUING.value,
"linked_listing_id": listing_id
}}
)
if result.modified_count > 0:
logger.info(f"Alert {alert_id} now pursuing listing {listing_id}")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to update alert {alert_id} to pursuing: {e}")
return False
async def complete_alert(alert_id: str, user_id: str) -> bool:
"""
User found what they wanted - close the alert permanently.
This is a terminal state. The alert can be reactivated manually
but stops all notifications.
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id), "user_id": user_id},
{"$set": {
"status": AlertStatus.COMPLETED.value,
"is_active": False # Backward compatibility
}}
)
if result.modified_count > 0:
logger.info(f"Alert {alert_id} completed - user found what they wanted!")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to complete alert {alert_id}: {e}")
return False
async def resume_alert(alert_id: str, user_id: str) -> bool:
"""
Resume a paused/pursuing alert.
Called when:
- User says "deal fell through"
- User says "keep searching"
- User says "resume my search"
"""
db = await get_db()
try:
result = await db.search_alerts.update_one(
{"_id": ObjectId(alert_id), "user_id": user_id},
{"$set": {
"status": AlertStatus.ACTIVE.value,
"linked_listing_id": None,
"is_active": True # Backward compatibility
}}
)
if result.modified_count > 0:
logger.info(f"Alert {alert_id} resumed - back to searching")
return result.modified_count > 0
except Exception as e:
logger.error(f"Failed to resume alert {alert_id}: {e}")
return False
# ============================================================
# NATURAL ALERT DISAMBIGUATION
# ============================================================
async def describe_alerts_naturally(user_id: str) -> list[dict]:
"""
Get user's alerts with natural language descriptions.
Instead of: "Alert ID: a1b2c3 - Location: Cotonou"
Returns: "The apartment in Cotonou around 20k with AC"
"""
alerts = await get_user_alerts(user_id)
descriptions = []
for alert in alerts:
params = alert.search_params
# Build natural description
parts = []
# Property type description
listing_type = (params.get("listing_type") or "").lower()
if listing_type == "rent":
parts.append("rental")
elif listing_type == "sale":
parts.append("for sale")
elif listing_type == "roommate":
parts.append("roommate")
elif listing_type == "short-stay":
parts.append("short-stay")
else:
parts.append("property")
# Bedrooms
if params.get("bedrooms"):
parts.insert(0, f"{params['bedrooms']}-bedroom")
# Location
if params.get("location"):
parts.append(f"in {params['location']}")
# Price
if params.get("max_price"):
currency = params.get("currency", "")
parts.append(f"under {currency}{params['max_price']:,}")
elif params.get("min_price"):
currency = params.get("currency", "")
parts.append(f"above {currency}{params['min_price']:,}")
description = " ".join(parts).strip()
if not description or description == "property":
description = alert.user_query[:50] if alert.user_query else "property search"
descriptions.append({
"id": str(alert.id),
"description": description,
"status": alert.status.value if hasattr(alert.status, 'value') else str(alert.status),
"original_query": alert.user_query,
"created_at": alert.created_at.isoformat() if alert.created_at else None,
})
return descriptions
async def find_alert_by_description(user_id: str, user_response: str) -> Optional[SearchAlert]:
"""
Semantic match user's response to their alerts.
User says: "The Cotonou one" or "my roommate search"
We find the matching alert using fuzzy matching.
"""
alerts = await get_user_alerts(user_id)
if not alerts:
return None
if len(alerts) == 1:
# Only one alert, must be this one
return alerts[0]
# Guard against None/empty response — fall back to most recent alert
if not user_response:
return alerts[0]
user_lower = user_response.lower().strip()
best_match = None
best_score = 0
for alert in alerts:
params = alert.search_params
score = 0
# Check location mention
loc = (params.get("location") or "").lower()
if loc and loc in user_lower:
score += 40
# Check type mention
alert_type = (params.get("listing_type") or "").lower()
if alert_type:
if alert_type in user_lower:
score += 30
# Also check related words
if alert_type == "rent" and "rental" in user_lower:
score += 30
if alert_type == "roommate" and ("room" in user_lower or "flatmate" in user_lower):
score += 30
if alert_type == "sale" and ("buy" in user_lower or "purchase" in user_lower):
score += 30
if alert_type == "short-stay" and ("stay" in user_lower or "vacation" in user_lower):
score += 30
# Check bedrooms mention
beds = params.get("bedrooms")
if beds and str(beds) in user_lower:
score += 20
# Check keywords from original query
if alert.user_query:
query_words = alert.user_query.lower().split()
for word in query_words:
if len(word) > 3 and word in user_lower:
score += 8
# Boost for ordinal references
if "first" in user_lower and alerts.index(alert) == 0:
score += 20
if "second" in user_lower and alerts.index(alert) == 1:
score += 20
if "last" in user_lower and alerts.index(alert) == len(alerts) - 1:
score += 20
if score > best_score:
best_score = score
best_match = alert
# Minimum threshold
if best_score >= 20:
logger.info(f"Matched user response '{user_response[:30]}' to alert with score {best_score}")
return best_match
logger.info(f"No confident match for '{user_response[:30]}' (best score: {best_score})")
return None
async def get_disambiguation_message(user_id: str) -> Optional[str]:
"""
Generate natural language disambiguation message when user
has multiple alerts and says something ambiguous.
Returns:
Natural message listing alerts, or None if user has 0-1 alerts
"""
descriptions = await describe_alerts_naturally(user_id)
if len(descriptions) <= 1:
return None # No need to disambiguate
# Build natural message
message = "Which search are you talking about? 🤔\n\n"
# Limit to 4 most recent/active
for desc in descriptions[:4]:
status_emoji = ""
if desc["status"] == "paused":
status_emoji = " (paused)"
elif desc["status"] == "pursuing":
status_emoji = " (pursuing)"
message += f"• The {desc['description']}{status_emoji}?\n"
if len(descriptions) > 4:
message += f"\n...and {len(descriptions) - 4} more.\n"
message += "\nJust tell me which one!"
return message