Spaces:
Running
Running
| import logging | |
| from datetime import datetime | |
| from typing import Optional, Tuple | |
| from bson import ObjectId | |
| from app.database import get_db | |
| from app.models.search_alert import SearchAlert, AlertStatus | |
| from app.services.conversation_service import conversation_service | |
| from app.models.conversation import Conversation | |
| from app.models.message import Message | |
| from app.services.listing_service import enrich_listings_batch | |
| logger = logging.getLogger(__name__) | |
| AIDA_BOT_ID = "AIDA_BOT" | |
| # ============================================================ | |
| # SEMANTIC DEDUPLICATION | |
| # ============================================================ | |
| async def find_similar_alert(user_id: str, params: dict) -> Optional[SearchAlert]: | |
| """ | |
| Find existing alert with similar criteria (semantic deduplication). | |
| Instead of exact text matching, we compare: | |
| - Location (fuzzy match) | |
| - Listing type | |
| - Price range (within 20%) | |
| - Bedrooms (within ±1) | |
| Returns the similar alert if found (75%+ match), else None. | |
| """ | |
| db = await get_db() | |
| # Get all active/paused alerts for user | |
| cursor = db.search_alerts.find({ | |
| "user_id": user_id, | |
| "status": {"$in": [AlertStatus.ACTIVE.value, AlertStatus.PAUSED.value, None]}, | |
| "is_active": True # Backward compatibility | |
| }) | |
| new_location = (params.get("location") or "").lower().strip() | |
| new_type = (params.get("listing_type") or "").lower().strip() | |
| new_max_price = params.get("max_price") | |
| new_min_price = params.get("min_price") | |
| new_bedrooms = params.get("bedrooms") | |
| async for doc in cursor: | |
| existing_params = doc.get("search_params", {}) | |
| match_score = 0 | |
| # Location match (fuzzy - contains or contained by) | |
| existing_loc = (existing_params.get("location") or "").lower().strip() | |
| if new_location and existing_loc: | |
| if new_location in existing_loc or existing_loc in new_location: | |
| match_score += 25 | |
| elif not new_location and not existing_loc: | |
| match_score += 25 # Both unspecified = match | |
| # Type match | |
| existing_type = (existing_params.get("listing_type") or "").lower().strip() | |
| if new_type == existing_type: | |
| match_score += 25 | |
| elif not new_type or not existing_type: | |
| match_score += 15 # One unspecified = partial match | |
| # Price match (within 20% tolerance) | |
| existing_max = existing_params.get("max_price") | |
| if new_max_price and existing_max: | |
| try: | |
| diff_ratio = abs(new_max_price - existing_max) / max(new_max_price, existing_max) | |
| if diff_ratio <= 0.2: | |
| match_score += 25 | |
| elif diff_ratio <= 0.4: | |
| match_score += 15 # Close enough | |
| except (TypeError, ZeroDivisionError): | |
| pass | |
| elif not new_max_price and not existing_max: | |
| match_score += 25 # Both unspecified | |
| # Bedrooms match | |
| existing_beds = existing_params.get("bedrooms") | |
| if new_bedrooms is not None and existing_beds is not None: | |
| if new_bedrooms == existing_beds: | |
| match_score += 25 | |
| elif abs(new_bedrooms - existing_beds) <= 1: | |
| match_score += 15 # Close enough | |
| elif new_bedrooms is None and existing_beds is None: | |
| match_score += 25 # Both unspecified | |
| # If 75%+ match, consider it a duplicate | |
| if match_score >= 75: | |
| logger.info(f"Found similar alert (score={match_score}): {doc.get('user_query')}") | |
| return SearchAlert(**doc) | |
| return None | |
| async def create_alert( | |
| user_id: str, | |
| query: str, | |
| params: dict, | |
| use_voice: bool = False, | |
| alert_type: str = "standard", | |
| ) -> Tuple[SearchAlert, Optional[SearchAlert]]: | |
| """ | |
| Create a new search alert with semantic deduplication. | |
| Args: | |
| user_id: User ID to create alert for | |
| query: Original search query text | |
| params: Search parameters (location, price, type, etc.) | |
| use_voice: If True, send notifications as voice messages | |
| alert_type: "standard" | "roommate" | "short_stay" | "viewing" | |
| Returns: | |
| Tuple of (alert, existing_similar): | |
| - If existing_similar is not None, a similar alert already exists | |
| - The first value is the alert to use (either new or existing) | |
| """ | |
| db = await get_db() | |
| # STEP 1: Check for semantically similar alert | |
| similar = await find_similar_alert(user_id, params) | |
| if similar: | |
| logger.info(f"Semantic dedup: Found similar alert {similar.id} for user {user_id}") | |
| if similar.use_voice != use_voice: | |
| await db.search_alerts.update_one( | |
| {"_id": ObjectId(similar.id)}, | |
| {"$set": {"use_voice": use_voice}}, | |
| ) | |
| similar.use_voice = use_voice | |
| return (similar, similar) # Second value indicates it already existed | |
| # STEP 2: Create new alert with normalized criteria | |
| normalized = { | |
| "location": (params.get("location") or "").lower().strip(), | |
| "listing_type": (params.get("listing_type") or "").lower().strip(), | |
| "max_price": params.get("max_price"), | |
| "min_price": params.get("min_price"), | |
| "bedrooms": params.get("bedrooms"), | |
| } | |
| alert = SearchAlert( | |
| user_id=user_id, | |
| user_query=query, | |
| search_params=params, | |
| use_voice=use_voice, | |
| alert_type=alert_type, | |
| status=AlertStatus.ACTIVE, | |
| normalized_criteria=normalized, | |
| ) | |
| new_alert = await db.search_alerts.insert_one( | |
| alert.model_dump(by_alias=True, exclude=["id"]) | |
| ) | |
| created_alert = await db.search_alerts.find_one({"_id": new_alert.inserted_id}) | |
| return (SearchAlert(**created_alert), None) # None = newly created | |
| # ============================================================ | |
| # ALERT ENRICHMENT — called by specialist agents | |
| # ============================================================ | |
| async def enrich_roommate_alert(alert_id: str, lifestyle_data: dict) -> bool: | |
| """ | |
| Append lifestyle/matching criteria to a roommate alert. | |
| Called by Matcher after collecting the seeker's profile. | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id)}, | |
| {"$set": {"matching_criteria": lifestyle_data, "alert_type": "roommate"}}, | |
| ) | |
| logger.info(f"Enriched roommate alert {alert_id} with lifestyle data") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to enrich roommate alert {alert_id}: {e}") | |
| return False | |
| async def enrich_shortstay_alert(alert_id: str, stay_criteria: dict) -> bool: | |
| """ | |
| Append stay criteria (check_in, check_out, guests) to a short-stay alert. | |
| Called by Concierge after collecting booking details. | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id)}, | |
| {"$set": {"stay_criteria": stay_criteria, "alert_type": "short_stay"}}, | |
| ) | |
| logger.info(f"Enriched short-stay alert {alert_id} with stay criteria") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to enrich short-stay alert {alert_id}: {e}") | |
| return False | |
| async def enrich_viewing_alert(alert_id: str, viewing_criteria: dict) -> bool: | |
| """ | |
| Append viewing preferences to a rental/sale alert. | |
| Called by Broker after collecting the user's scheduling preferences. | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id)}, | |
| {"$set": {"viewing_criteria": viewing_criteria, "alert_type": "viewing"}}, | |
| ) | |
| logger.info(f"Enriched viewing alert {alert_id} with viewing criteria") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to enrich viewing alert {alert_id}: {e}") | |
| return False | |
| async def delete_alert(alert_id: str, user_id: str) -> bool: | |
| """Delete a user's alert (soft delete).""" | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id), "user_id": user_id}, | |
| {"$set": {"is_active": False}} | |
| ) | |
| return result.modified_count > 0 | |
| except Exception: | |
| return False | |
| async def delete_alert_by_criteria(user_id: str, criteria: dict) -> int: | |
| """ | |
| Delete alerts based on search criteria. | |
| Used when user says "stop notifying me about houses in Cotonou" or | |
| "I found what I wanted" while replying to a property card. | |
| Args: | |
| user_id: User ID | |
| criteria: Search criteria to match against alert search_params | |
| (location, listing_type, min_price, max_price, bedrooms, bathrooms) | |
| Returns: | |
| Number of alerts deleted | |
| """ | |
| db = await get_db() | |
| # Build query to find matching alerts | |
| query = { | |
| "user_id": user_id, | |
| "is_active": True, | |
| } | |
| # Match alerts by location (case-insensitive partial match) | |
| location = (criteria.get("location") or "").lower().strip() | |
| deleted_count = 0 | |
| # Find all active alerts for user | |
| cursor = db.search_alerts.find(query) | |
| async for alert in cursor: | |
| alert_params = alert.get("search_params") or {} | |
| alert_location = (alert_params.get("location") or "").lower() | |
| alert_type = (alert_params.get("listing_type") or "").lower() | |
| matched = False | |
| # Match by location | |
| if location and (location in alert_location or alert_location in location): | |
| matched = True | |
| # Match by listing type | |
| if criteria.get("listing_type"): | |
| if (criteria["listing_type"] or "").lower() == alert_type: | |
| matched = True | |
| # If no specific criteria, but user is replying to property in that location | |
| if not location and not criteria.get("listing_type"): | |
| # This happens when user swipes on a property card and says "stop notifying me" | |
| # We need to match based on broader context - delete alerts matching the property's location | |
| if criteria.get("property_location"): | |
| prop_loc = (criteria["property_location"] or "").lower() | |
| if prop_loc in alert_location or alert_location in prop_loc: | |
| matched = True | |
| # Soft delete matched alerts | |
| if matched: | |
| await db.search_alerts.update_one( | |
| {"_id": alert["_id"]}, | |
| {"$set": {"is_active": False}} | |
| ) | |
| deleted_count += 1 | |
| logger.info(f"Deleted alert {alert['_id']} for user {user_id} - matched criteria: {criteria}") | |
| return deleted_count | |
| async def get_user_alerts(user_id: str) -> list[SearchAlert]: | |
| """Get all active alerts for a user.""" | |
| db = await get_db() | |
| alerts = [] | |
| cursor = db.search_alerts.find({"user_id": user_id, "is_active": True}) | |
| async for doc in cursor: | |
| alerts.append(SearchAlert(**doc)) | |
| return alerts | |
| async def check_listing_matches_alert(listing: dict, alert: SearchAlert) -> bool: | |
| """ | |
| Check if a listing matches an alert's STRICT criteria. | |
| We reuse the 'strict' logic principles here manually. | |
| """ | |
| params = alert.search_params | |
| # 1. Location (Must match) | |
| if params.get("location"): | |
| # Simple string containment or equality for robustness | |
| loc_param = (params["location"] or "").lower() | |
| loc_listing = (listing.get("location") or "").lower() | |
| if loc_param not in loc_listing: | |
| return False | |
| # 2. Listing Type (Must match) | |
| if params.get("listing_type"): | |
| type_param = (params["listing_type"] or "").lower() | |
| type_listing = (listing.get("listing_type") or "").lower() | |
| if type_param != type_listing: | |
| return False | |
| # 3. Price (Must be within range) | |
| price = listing.get("price") | |
| if price is not None: # Only check price if it exists | |
| if params.get("min_price") and price < params["min_price"]: | |
| return False | |
| if params.get("max_price") and price > params["max_price"]: | |
| return False | |
| # 4. Currency (Must match if specified) | |
| if params.get("currency"): | |
| if listing.get("currency") != params["currency"]: | |
| return False | |
| return True | |
| async def process_new_listing(listing: dict) -> int: | |
| """ | |
| Called when a new listing is created. Checks all alerts and notifies users. | |
| Returns the number of notifications sent. | |
| """ | |
| db = await get_db() | |
| listing_owner_id = str(listing.get("user_id", "")) | |
| notification_count = 0 | |
| # Get all active alerts (excluding the listing owner - don't notify about own listings) | |
| cursor = db.search_alerts.find({ | |
| "is_active": True, | |
| "user_id": {"$ne": listing_owner_id} # Skip owner's own alerts | |
| }) | |
| async for doc in cursor: | |
| alert = SearchAlert(**doc) | |
| try: | |
| # Check match | |
| if await check_listing_matches_alert(listing, alert): | |
| # Ensure listing is enriched before notifying | |
| enriched_listing = (await enrich_listings_batch([listing], db))[0] | |
| await notify_user_of_match(alert, enriched_listing) | |
| notification_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error checking/notifying alert {alert.id} for listing {listing.get('_id')}: {e}") | |
| logger.error(f"Alert params: {alert.search_params}") | |
| logger.error(f"Listing price: {listing.get('price')}") | |
| raise # Re-raise to see in backfill | |
| return notification_count | |
| async def notify_user_of_match(alert: SearchAlert, listing: dict): | |
| """ | |
| Sends a DM to the user with the listing. | |
| """ | |
| db = await get_db() | |
| # 1. Find or Create conversation between AIDA_BOT and User | |
| # We must do this manually because start_or_get_conversation enforces listing_id checks | |
| # which don't apply to AIDA-initiated system messages. | |
| participants = sorted([AIDA_BOT_ID, alert.user_id]) | |
| participants_key = "::".join(participants) | |
| # Use participants_key for exact match | |
| existing_conversation = await db.conversations.find_one({ | |
| "participants_key": participants_key | |
| }) | |
| if existing_conversation: | |
| conv_id = str(existing_conversation["_id"]) | |
| logger.info(f"Using existing AIDA conversation: {conv_id}") | |
| else: | |
| # Create new system conversation | |
| conversation_doc = Conversation.create_document( | |
| listing_id="system", # Generic ID for system chats | |
| participants=participants, | |
| listing_title="AIDA Assistant", | |
| listing_image=None | |
| ) | |
| try: | |
| result = await db.conversations.insert_one(conversation_doc) | |
| conv_id = str(result.inserted_id) | |
| logger.info(f"Created new AIDA conversation: {conv_id}") | |
| except Exception as e: | |
| # Handle race condition: conversation was created between our check and insert | |
| if "duplicate key" in str(e).lower() or "E11000" in str(e): | |
| logger.warning(f"Conversation already exists (race condition), querying again...") | |
| existing_conversation = await db.conversations.find_one({ | |
| "participants_key": participants_key | |
| }) | |
| if existing_conversation: | |
| conv_id = str(existing_conversation["_id"]) | |
| logger.info(f"Found existing conversation after race: {conv_id}") | |
| else: | |
| logger.error(f"Failed to find conversation after duplicate key error: {e}") | |
| raise | |
| else: | |
| logger.error(f"Failed to create conversation: {e}") | |
| raise | |
| # 2. Get user's name for personalization | |
| # Try both string ID and ObjectId since user collection might have either | |
| user = await db.users.find_one({"_id": alert.user_id}) | |
| if not user: | |
| # Try with ObjectId | |
| try: | |
| user = await db.users.find_one({"_id": ObjectId(alert.user_id)}) | |
| except: | |
| pass | |
| user_name = "there" # Default fallback | |
| if user: | |
| # Try to get first name from various possible field names | |
| first_name = ( | |
| user.get("firstName") or | |
| user.get("first_name") or | |
| (user.get("name", "").split()[0] if user.get("name") else None) | |
| ) | |
| if first_name: | |
| user_name = first_name | |
| logger.info(f"Personalization: Found user name '{user_name}' for user {alert.user_id}") | |
| # 3. Construct Message using AI (dynamic, personalized & language-aware) | |
| # Get user's language from alert params (default to English) | |
| user_language = (alert.search_params.get("user_language") or "en").lower() | |
| # Generate message using AI for natural variety | |
| text = await _generate_alert_message_with_ai(user_name, alert.user_query, user_language) | |
| logger.info(f"Alert notification using language: {user_language}") | |
| # Continue with sending the message | |
| await _continue_notify_user_of_match(alert, listing, conv_id, text) | |
| async def _generate_alert_message_with_ai(user_name: str, search_query: str, language: str) -> str: | |
| """Generate a dynamic, personal alert message using AI in any language.""" | |
| from openai import AsyncOpenAI | |
| from app.config import settings | |
| # Map common language codes to full names for clearer instructions | |
| language_names = { | |
| "en": "English", | |
| "fr": "French (use tu form, casual)", | |
| "es": "Spanish (use tú form, casual)", | |
| "pt": "Portuguese (use você form, casual)", | |
| "de": "German (use du form, casual)", | |
| "it": "Italian (use tu form, casual)", | |
| "ar": "Arabic", | |
| "zh": "Chinese (Simplified)", | |
| "ja": "Japanese", | |
| "ko": "Korean", | |
| "sw": "Swahili", | |
| "ha": "Hausa", | |
| "yo": "Yoruba", | |
| "ig": "Igbo", | |
| } | |
| # Get language name, or use the code directly if not in our map | |
| lang_name = language_names.get(language.lower(), language.upper()) | |
| try: | |
| client = AsyncOpenAI( | |
| api_key=settings.MIMO_API_KEY, | |
| base_url=settings.MIMO_BASE_URL, | |
| ) | |
| prompt = f"""Generate a short, friendly notification message for a property search alert. | |
| Context: | |
| - User's name: {user_name} | |
| - Their original search request: "{search_query}" | |
| - A matching property was just found NOW (this is a new listing!) | |
| Style guidelines: | |
| - Start with something like "Hey {user_name}! Remember when you asked me to keep an eye out for [their search]? Guess what—I just found something!" | |
| - Be warm, casual, and excited like a helpful friend who just discovered something great | |
| - Use 1-2 relevant emojis naturally | |
| - Reference that they TOLD you to watch for this type of property | |
| - Express genuine excitement—you've been searching and finally found it! | |
| - End with something like "check it out" or "take a look below" | |
| - Keep it under 100 words | |
| - IMPORTANT: Write the entire message in {lang_name} | |
| Write ONLY the message, nothing else. Be creative and vary your approach.""" | |
| response = await client.chat.completions.create( | |
| model=settings.MIMO_MODEL, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=200, | |
| temperature=0.9, # High temperature for variety | |
| ) | |
| generated_text = response.choices[0].message.content.strip() | |
| logger.info(f"AI generated alert message in {lang_name}") | |
| return generated_text | |
| except Exception as e: | |
| logger.warning(f"AI message generation failed, using fallback: {e}") | |
| # Fallback to English template if AI fails | |
| return f"Hey {user_name}! 👋\n\nI found a property matching your search for '{search_query}'. I think you'll love this one!\n\nCheck it out below! 👇" | |
| # Back in notify_user_of_match function - FIXED INDENTATION | |
| async def _continue_notify_user_of_match(alert, listing, conv_id, text): | |
| """Continue notification after AI message generation (helper to fix indentation).""" | |
| db = await get_db() | |
| # 4. Prepare Property Card (Listing Data) | |
| property_card = { | |
| "listing_id": str(listing.get("_id") or listing.get("id")), | |
| "title": listing.get("title", ""), | |
| "price": listing.get("price", 0), | |
| "currency": listing.get("currency", "NGN"), | |
| "bedrooms": listing.get("bedrooms", 0), | |
| "bathrooms": listing.get("bathrooms", 0), | |
| "location": listing.get("location", ""), | |
| "image_url": listing.get("images", [None])[0] if listing.get("images") else None, | |
| "listing_type": listing.get("listing_type", ""), | |
| # Owner Info (New) | |
| "owner_name": listing.get("owner_name", "Unknown"), | |
| "owner_profile_picture": listing.get("owner_profile_picture"), | |
| "rating": listing.get("rating", 0.0), | |
| "reviews_count": listing.get("reviews_count", 0), | |
| "owner_is_verified": listing.get("owner_is_verified", False), | |
| } | |
| # 5. Check if we should send as voice message | |
| message_type = "property_inquiry" # Default for listing cards | |
| voice_media = None | |
| if getattr(alert, 'use_voice', False): | |
| # Generate voice audio for AIDA's notification message | |
| try: | |
| from app.services.voice_service import voice_service | |
| user_language = (alert.search_params.get("user_language") or "en").lower() | |
| audio_bytes, duration = await voice_service.text_to_speech(text, language=user_language) | |
| audio_url = await voice_service.upload_audio_to_r2(audio_bytes) | |
| voice_media = { | |
| "url": audio_url, | |
| "type": "audio", | |
| "duration": int(duration), | |
| "mimeType": "audio/mpeg" | |
| } | |
| message_type = "voice" | |
| logger.info(f"🔊 Generated voice notification: {audio_url}") | |
| except Exception as e: | |
| logger.warning(f"Failed to generate voice notification, falling back to text: {e}") | |
| # 6. Send Message via Service | |
| await conversation_service.send_message( | |
| conversation_id=conv_id, | |
| current_user_id=AIDA_BOT_ID, | |
| current_user_name="AIDA", | |
| current_user_avatar=None, | |
| message_type=message_type, | |
| content=text, | |
| property_card=property_card, | |
| media=voice_media # Will be None for text messages | |
| ) | |
| # 7. Update Alert stats (use ObjectId for _id query) | |
| alert_id = alert.id if isinstance(alert.id, ObjectId) else ObjectId(alert.id) if alert.id else None | |
| if alert_id: | |
| await db.search_alerts.update_one( | |
| {"_id": alert_id}, | |
| {"$set": {"last_notified_at": datetime.utcnow()}} | |
| ) | |
| logger.info(f"Notified user {alert.user_id} of listing match {listing.get('_id')} (voice={getattr(alert, 'use_voice', False)})") | |
| # ============================================================ | |
| # ALERT LIFECYCLE MANAGEMENT | |
| # ============================================================ | |
| async def pause_alert(alert_id: str, user_id: str) -> bool: | |
| """ | |
| Pause alert when matches are found and awaiting user response. | |
| This happens when AIDA finds matches and sends them to the user. | |
| The alert waits for user to decide before searching again. | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id), "user_id": user_id}, | |
| {"$set": {"status": AlertStatus.PAUSED.value}} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Alert {alert_id} paused - awaiting user response") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to pause alert {alert_id}: {e}") | |
| return False | |
| async def pursue_listing(alert_id: str, user_id: str, listing_id: str) -> bool: | |
| """ | |
| User is interested in a specific listing. | |
| Alert stops searching while user pursues this property. | |
| Can be resumed if deal falls through. | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id), "user_id": user_id}, | |
| {"$set": { | |
| "status": AlertStatus.PURSUING.value, | |
| "linked_listing_id": listing_id | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Alert {alert_id} now pursuing listing {listing_id}") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to update alert {alert_id} to pursuing: {e}") | |
| return False | |
| async def complete_alert(alert_id: str, user_id: str) -> bool: | |
| """ | |
| User found what they wanted - close the alert permanently. | |
| This is a terminal state. The alert can be reactivated manually | |
| but stops all notifications. | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id), "user_id": user_id}, | |
| {"$set": { | |
| "status": AlertStatus.COMPLETED.value, | |
| "is_active": False # Backward compatibility | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Alert {alert_id} completed - user found what they wanted!") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to complete alert {alert_id}: {e}") | |
| return False | |
| async def resume_alert(alert_id: str, user_id: str) -> bool: | |
| """ | |
| Resume a paused/pursuing alert. | |
| Called when: | |
| - User says "deal fell through" | |
| - User says "keep searching" | |
| - User says "resume my search" | |
| """ | |
| db = await get_db() | |
| try: | |
| result = await db.search_alerts.update_one( | |
| {"_id": ObjectId(alert_id), "user_id": user_id}, | |
| {"$set": { | |
| "status": AlertStatus.ACTIVE.value, | |
| "linked_listing_id": None, | |
| "is_active": True # Backward compatibility | |
| }} | |
| ) | |
| if result.modified_count > 0: | |
| logger.info(f"Alert {alert_id} resumed - back to searching") | |
| return result.modified_count > 0 | |
| except Exception as e: | |
| logger.error(f"Failed to resume alert {alert_id}: {e}") | |
| return False | |
| # ============================================================ | |
| # NATURAL ALERT DISAMBIGUATION | |
| # ============================================================ | |
| async def describe_alerts_naturally(user_id: str) -> list[dict]: | |
| """ | |
| Get user's alerts with natural language descriptions. | |
| Instead of: "Alert ID: a1b2c3 - Location: Cotonou" | |
| Returns: "The apartment in Cotonou around 20k with AC" | |
| """ | |
| alerts = await get_user_alerts(user_id) | |
| descriptions = [] | |
| for alert in alerts: | |
| params = alert.search_params | |
| # Build natural description | |
| parts = [] | |
| # Property type description | |
| listing_type = (params.get("listing_type") or "").lower() | |
| if listing_type == "rent": | |
| parts.append("rental") | |
| elif listing_type == "sale": | |
| parts.append("for sale") | |
| elif listing_type == "roommate": | |
| parts.append("roommate") | |
| elif listing_type == "short-stay": | |
| parts.append("short-stay") | |
| else: | |
| parts.append("property") | |
| # Bedrooms | |
| if params.get("bedrooms"): | |
| parts.insert(0, f"{params['bedrooms']}-bedroom") | |
| # Location | |
| if params.get("location"): | |
| parts.append(f"in {params['location']}") | |
| # Price | |
| if params.get("max_price"): | |
| currency = params.get("currency", "") | |
| parts.append(f"under {currency}{params['max_price']:,}") | |
| elif params.get("min_price"): | |
| currency = params.get("currency", "") | |
| parts.append(f"above {currency}{params['min_price']:,}") | |
| description = " ".join(parts).strip() | |
| if not description or description == "property": | |
| description = alert.user_query[:50] if alert.user_query else "property search" | |
| descriptions.append({ | |
| "id": str(alert.id), | |
| "description": description, | |
| "status": alert.status.value if hasattr(alert.status, 'value') else str(alert.status), | |
| "original_query": alert.user_query, | |
| "created_at": alert.created_at.isoformat() if alert.created_at else None, | |
| }) | |
| return descriptions | |
| async def find_alert_by_description(user_id: str, user_response: str) -> Optional[SearchAlert]: | |
| """ | |
| Semantic match user's response to their alerts. | |
| User says: "The Cotonou one" or "my roommate search" | |
| We find the matching alert using fuzzy matching. | |
| """ | |
| alerts = await get_user_alerts(user_id) | |
| if not alerts: | |
| return None | |
| if len(alerts) == 1: | |
| # Only one alert, must be this one | |
| return alerts[0] | |
| # Guard against None/empty response — fall back to most recent alert | |
| if not user_response: | |
| return alerts[0] | |
| user_lower = user_response.lower().strip() | |
| best_match = None | |
| best_score = 0 | |
| for alert in alerts: | |
| params = alert.search_params | |
| score = 0 | |
| # Check location mention | |
| loc = (params.get("location") or "").lower() | |
| if loc and loc in user_lower: | |
| score += 40 | |
| # Check type mention | |
| alert_type = (params.get("listing_type") or "").lower() | |
| if alert_type: | |
| if alert_type in user_lower: | |
| score += 30 | |
| # Also check related words | |
| if alert_type == "rent" and "rental" in user_lower: | |
| score += 30 | |
| if alert_type == "roommate" and ("room" in user_lower or "flatmate" in user_lower): | |
| score += 30 | |
| if alert_type == "sale" and ("buy" in user_lower or "purchase" in user_lower): | |
| score += 30 | |
| if alert_type == "short-stay" and ("stay" in user_lower or "vacation" in user_lower): | |
| score += 30 | |
| # Check bedrooms mention | |
| beds = params.get("bedrooms") | |
| if beds and str(beds) in user_lower: | |
| score += 20 | |
| # Check keywords from original query | |
| if alert.user_query: | |
| query_words = alert.user_query.lower().split() | |
| for word in query_words: | |
| if len(word) > 3 and word in user_lower: | |
| score += 8 | |
| # Boost for ordinal references | |
| if "first" in user_lower and alerts.index(alert) == 0: | |
| score += 20 | |
| if "second" in user_lower and alerts.index(alert) == 1: | |
| score += 20 | |
| if "last" in user_lower and alerts.index(alert) == len(alerts) - 1: | |
| score += 20 | |
| if score > best_score: | |
| best_score = score | |
| best_match = alert | |
| # Minimum threshold | |
| if best_score >= 20: | |
| logger.info(f"Matched user response '{user_response[:30]}' to alert with score {best_score}") | |
| return best_match | |
| logger.info(f"No confident match for '{user_response[:30]}' (best score: {best_score})") | |
| return None | |
| async def get_disambiguation_message(user_id: str) -> Optional[str]: | |
| """ | |
| Generate natural language disambiguation message when user | |
| has multiple alerts and says something ambiguous. | |
| Returns: | |
| Natural message listing alerts, or None if user has 0-1 alerts | |
| """ | |
| descriptions = await describe_alerts_naturally(user_id) | |
| if len(descriptions) <= 1: | |
| return None # No need to disambiguate | |
| # Build natural message | |
| message = "Which search are you talking about? 🤔\n\n" | |
| # Limit to 4 most recent/active | |
| for desc in descriptions[:4]: | |
| status_emoji = "" | |
| if desc["status"] == "paused": | |
| status_emoji = " (paused)" | |
| elif desc["status"] == "pursuing": | |
| status_emoji = " (pursuing)" | |
| message += f"• The {desc['description']}{status_emoji}?\n" | |
| if len(descriptions) > 4: | |
| message += f"\n...and {len(descriptions) - 4} more.\n" | |
| message += "\nJust tell me which one!" | |
| return message | |