Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import logging | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any | |
| import pickle | |
| import math | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from transformers import AutoTokenizer, AutoModel, pipeline | |
| import torch | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Parallel processing imports | |
| import concurrent.futures | |
| import threading | |
| # Import centralized configuration | |
| from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_email_config | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import time | |
| # ChromaDB imports | |
| import chromadb | |
| from chromadb.config import Settings | |
| import uuid | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class AIRecommendationEngine: | |
| def __init__(self): | |
| self.device = 'cpu' # Force CPU usage | |
| logger.info("🤖 Initializing AI Recommendation Engine with ChromaDB (CPU-based)") | |
| # Set Hugging Face cache directories for Hugging Face Spaces | |
| self.setup_cache_directories() | |
| # Configuration for API endpoints and settings | |
| self.config = { | |
| 'backend_url': get_backend_url(), | |
| 'headers': get_ngrok_headers() | |
| } | |
| # Email configuration (SendGrid with fallback to mock) | |
| # Uses real SendGrid API when network is available, falls back to mock service | |
| self.email_config = get_email_config() | |
| # API configuration - NO TIMEOUT for complete data fetch | |
| self.properties_api = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" | |
| # Initialize ChromaDB collections (will be set in initialize_chromadb) | |
| self.properties_collection = None | |
| self.user_preferences_collection = None | |
| # Initialize ChromaDB | |
| self.initialize_chromadb() | |
| # Initialize AI models | |
| self.initialize_ai_models() | |
| def setup_cache_directories(self): | |
| """Setup cache directories for Hugging Face Spaces compatibility""" | |
| try: | |
| # Create cache directories with proper permissions | |
| cache_dir = "/tmp/hf_cache" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| os.chmod(cache_dir, 0o755) | |
| # Set environment variables for Hugging Face cache | |
| os.environ['HF_HOME'] = cache_dir | |
| os.environ['TRANSFORMERS_CACHE'] = f"{cache_dir}/transformers" | |
| os.environ['HF_DATASETS_CACHE'] = f"{cache_dir}/datasets" | |
| os.environ['HF_HUB_CACHE'] = f"{cache_dir}/hub" | |
| # Create subdirectories | |
| for subdir in ['transformers', 'datasets', 'hub', 'chroma']: | |
| subdir_path = f"{cache_dir}/{subdir}" | |
| os.makedirs(subdir_path, exist_ok=True) | |
| os.chmod(subdir_path, 0o755) | |
| logger.info(f"✅ Cache directories setup complete: {cache_dir}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Cache directory setup failed: {e}") | |
| # Fallback to default locations | |
| pass | |
| def initialize_chromadb(self): | |
| """Initialize ChromaDB with robust containerized environment support""" | |
| try: | |
| logger.info("🗄️ Initializing ChromaDB with robust containerized support...") | |
| # Multiple database path strategies for different environments | |
| import os | |
| import tempfile | |
| # Strategy 1: Try /tmp for containerized environments | |
| db_paths = [ | |
| "/tmp/chromadb_properties", # Container-friendly | |
| "/tmp/hf_cache/chromadb", # Hugging Face cache | |
| os.path.abspath("./property_db"), # Local development | |
| tempfile.mkdtemp(prefix="chromadb_") # Fallback temp directory | |
| ] | |
| chroma_client = None | |
| selected_path = None | |
| # Try each path until one works | |
| for db_path in db_paths: | |
| try: | |
| logger.info(f"🔄 Trying ChromaDB path: {db_path}") | |
| os.makedirs(db_path, exist_ok=True) | |
| # Set proper permissions for containerized environments | |
| os.chmod(db_path, 0o755) | |
| # Try to create client | |
| chroma_client = chromadb.PersistentClient(path=db_path) | |
| # Test the client by listing collections | |
| collections = chroma_client.list_collections() | |
| logger.info(f"✅ ChromaDB client test successful at: {db_path}") | |
| selected_path = db_path | |
| break | |
| except Exception as path_error: | |
| logger.warning(f"⚠️ Failed to initialize ChromaDB at {db_path}: {path_error}") | |
| continue | |
| if chroma_client is None: | |
| logger.error("❌ All ChromaDB paths failed - falling back to in-memory client") | |
| chroma_client = chromadb.Client() | |
| selected_path = "in-memory" | |
| self.chroma_client = chroma_client | |
| logger.info(f"🗄️ ChromaDB initialized successfully at: {selected_path}") | |
| # Create robust embedding function | |
| embedding_function = self.create_robust_embedding_function() | |
| # Initialize collections with retry logic | |
| collection_name = "properties_main_collection" | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| # Try to get existing collection | |
| self.properties_collection = self.chroma_client.get_collection(collection_name) | |
| count = self.properties_collection.count() | |
| logger.info(f"📚 Using existing ChromaDB collection '{collection_name}' with {count} properties") | |
| break | |
| except Exception as e: | |
| logger.info(f"📚 Attempt {attempt + 1}/{max_retries}: Creating new ChromaDB collection '{collection_name}'") | |
| try: | |
| # Delete existing collection if it exists | |
| try: | |
| self.chroma_client.delete_collection(collection_name) | |
| logger.info(f"🗑️ Deleted existing collection '{collection_name}'") | |
| except: | |
| pass | |
| # Create new collection | |
| self.properties_collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| metadata={"description": "Real estate properties with robust embeddings"}, | |
| embedding_function=embedding_function | |
| ) | |
| logger.info(f"📚 Successfully created ChromaDB collection '{collection_name}'") | |
| break | |
| except Exception as create_error: | |
| logger.error(f"❌ Attempt {attempt + 1} failed to create collection: {create_error}") | |
| if attempt == max_retries - 1: | |
| raise create_error | |
| import time | |
| time.sleep(1) # Wait before retry | |
| # Create user preferences collection | |
| try: | |
| self.user_preferences_collection = self.chroma_client.get_collection("user_preferences_robust") | |
| logger.info("📚 Using existing robust user preferences collection") | |
| except: | |
| self.user_preferences_collection = self.chroma_client.create_collection( | |
| name="user_preferences_robust", | |
| metadata={"description": "User behavior and preferences with robust embeddings"}, | |
| embedding_function=embedding_function | |
| ) | |
| logger.info("📚 Created new robust user preferences collection") | |
| logger.info(f"✅ Robust ChromaDB initialized successfully") | |
| logger.info(f"📊 Properties in DB: {self.properties_collection.count()}") | |
| # Test the embedding function | |
| try: | |
| test_text = "Test property for robust embedding function" | |
| test_embedding = embedding_function([test_text]) | |
| if test_embedding and len(test_embedding) > 0 and len(test_embedding[0]) > 0: | |
| logger.info(f"✅ Robust embedding function test successful - embedding shape: {len(test_embedding[0])}") | |
| else: | |
| logger.warning("⚠️ Robust embedding function returned empty result") | |
| except Exception as test_error: | |
| logger.warning(f"⚠️ Robust embedding function test failed: {test_error}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to initialize robust ChromaDB: {e}") | |
| # Set to None so other methods can handle gracefully | |
| self.properties_collection = None | |
| self.user_preferences_collection = None | |
| def get_custom_embedding_function(self): | |
| """Get custom embedding function with proper cache directory""" | |
| try: | |
| from chromadb.utils import embedding_functions | |
| # Set environment variables for all cache directories | |
| cache_dir = '/tmp/hf_cache' | |
| chroma_cache_dir = f'{cache_dir}/chroma' | |
| onnx_cache_dir = f'{cache_dir}/onnx' | |
| # Create all necessary cache directories with proper permissions | |
| for dir_path in [cache_dir, chroma_cache_dir, onnx_cache_dir]: | |
| os.makedirs(dir_path, exist_ok=True) | |
| os.chmod(dir_path, 0o755) | |
| # Set environment variables to control cache locations | |
| os.environ['CHROMA_CACHE_DIR'] = chroma_cache_dir | |
| os.environ['ONNXRUNTIME_CACHE_DIR'] = onnx_cache_dir | |
| os.environ['HF_HOME'] = cache_dir | |
| os.environ['TRANSFORMERS_CACHE'] = f'{cache_dir}/transformers' | |
| os.environ['SENTENCE_TRANSFORMERS_HOME'] = f'{cache_dir}/sentence_transformers' | |
| # Try ONNX embedding function first (faster, smaller) | |
| try: | |
| logger.info("🔄 Trying ONNX embedding function...") | |
| embedding_func = embedding_functions.ONNXMiniLM_L6_V2() | |
| logger.info("✅ ONNX embedding function created successfully") | |
| return embedding_func | |
| except Exception as onnx_error: | |
| logger.warning(f"⚠️ ONNX embedding function failed: {onnx_error}") | |
| # Fallback to sentence transformers | |
| try: | |
| logger.info("🔄 Trying SentenceTransformer embedding function...") | |
| embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction( | |
| model_name='all-MiniLM-L6-v2', | |
| cache_folder=chroma_cache_dir | |
| ) | |
| logger.info("✅ SentenceTransformer embedding function created successfully") | |
| return embedding_func | |
| except Exception as st_error: | |
| logger.warning(f"⚠️ SentenceTransformer embedding function failed: {st_error}") | |
| # Final fallback to default | |
| try: | |
| logger.info("🔄 Trying default embedding function...") | |
| embedding_func = embedding_functions.DefaultEmbeddingFunction() | |
| logger.info("✅ Default embedding function created successfully") | |
| return embedding_func | |
| except Exception as default_error: | |
| logger.warning(f"⚠️ Default embedding function failed: {default_error}") | |
| # Ultimate fallback - create a simple embedding function | |
| try: | |
| logger.info("🔄 Creating simple fallback embedding function...") | |
| embedding_func = self.create_simple_embedding_function() | |
| logger.info("✅ Simple fallback embedding function created successfully") | |
| return embedding_func | |
| except Exception as simple_error: | |
| logger.error(f"❌ All embedding functions failed: {simple_error}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ Error in get_custom_embedding_function: {e}") | |
| return None | |
| def create_robust_embedding_function(self): | |
| """Create robust embedding function that works in containerized environments""" | |
| class RobustEmbeddingFunction: | |
| def __init__(self): | |
| self.embedding_dim = 384 # Standard dimension | |
| self.cache = {} # Simple cache for performance | |
| logger.info(f"🔧 Created robust embedding function with dimension: {self.embedding_dim}") | |
| def __call__(self, input): | |
| """Generate robust embeddings that work in any environment""" | |
| try: | |
| # Handle both string and list inputs | |
| if isinstance(input, str): | |
| texts = [input] | |
| elif isinstance(input, list): | |
| texts = input | |
| else: | |
| texts = [str(input)] | |
| if not texts: | |
| return [[0.0] * self.embedding_dim] | |
| embeddings = [] | |
| for text in texts: | |
| if not text or len(text.strip()) == 0: | |
| embeddings.append([0.0] * self.embedding_dim) | |
| continue | |
| # Check cache first | |
| text_key = text.lower().strip() | |
| if text_key in self.cache: | |
| embeddings.append(self.cache[text_key]) | |
| continue | |
| # Create robust embedding based on text characteristics | |
| embedding = self._generate_robust_embedding(text) | |
| # Cache the result | |
| self.cache[text_key] = embedding | |
| embeddings.append(embedding) | |
| return embeddings | |
| except Exception as e: | |
| logger.warning(f"⚠️ Robust embedding failed, using fallback: {e}") | |
| return [[0.0] * self.embedding_dim for _ in texts] | |
| def _generate_robust_embedding(self, text): | |
| """Generate embedding using multiple strategies for robustness""" | |
| import hashlib | |
| import struct | |
| # Strategy 1: MD5 hash-based | |
| hash_obj = hashlib.md5(text.encode('utf-8')) | |
| hash_bytes = hash_obj.digest() | |
| # Strategy 2: SHA256 for additional entropy | |
| sha_obj = hashlib.sha256(text.encode('utf-8')) | |
| sha_bytes = sha_obj.digest() | |
| # Strategy 3: Text characteristics | |
| char_freq = {} | |
| for char in text: | |
| char_freq[char] = char_freq.get(char, 0) + 1 | |
| # Combine strategies for robust embedding | |
| embedding = [] | |
| for i in range(self.embedding_dim): | |
| if i < 16: # Use MD5 bytes | |
| embedding.append(float(hash_bytes[i]) / 255.0) | |
| elif i < 48: # Use SHA256 bytes | |
| embedding.append(float(sha_bytes[i - 16]) / 255.0) | |
| else: # Use derived values | |
| # Combine character frequency, position, and hash | |
| char_idx = i % len(text) if text else 0 | |
| char_val = ord(text[char_idx]) if char_idx < len(text) else 0 | |
| freq_val = char_freq.get(text[char_idx] if char_idx < len(text) else ' ', 0) | |
| combined_val = (char_val + freq_val + hash_bytes[i % 16] + sha_bytes[i % 32]) % 256 | |
| embedding.append(float(combined_val) / 255.0) | |
| return embedding | |
| return RobustEmbeddingFunction() | |
| def initialize_ai_models(self): | |
| """Initialize AI models for analysis and recommendations""" | |
| try: | |
| logger.info("🧠 Initializing AI models for enhanced recommendations...") | |
| # Initialize small AI models for property recommendations | |
| self.initialize_recommendation_models() | |
| # Initialize existing models | |
| self.initialize_fallback_models() | |
| logger.info("✅ AI models initialized successfully") | |
| except Exception as e: | |
| logger.error(f"❌ Error initializing AI models: {e}") | |
| self.initialize_fallback_models() | |
| def initialize_recommendation_models(self): | |
| """Initialize small AI models specifically for property recommendations""" | |
| try: | |
| logger.info("🔍 Initializing recommendation AI models...") | |
| # Small transformer model for property analysis | |
| model_name = "sentence-transformers/all-MiniLM-L6-v2" # Small, fast model | |
| logger.info(f"📥 Loading recommendation model: {model_name}") | |
| self.recommendation_tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.recommendation_model = AutoModel.from_pretrained(model_name) | |
| # Move to CPU for compatibility | |
| self.recommendation_model.to(self.device) | |
| self.recommendation_model.eval() | |
| # Initialize property classification model | |
| self.property_classifier = self.initialize_property_classifier() | |
| # Initialize price range predictor | |
| self.price_predictor = self.initialize_price_predictor() | |
| # Initialize preference analyzer | |
| self.preference_analyzer = self.initialize_preference_analyzer() | |
| logger.info("✅ Recommendation AI models loaded successfully") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Could not load recommendation models: {e}") | |
| self.recommendation_tokenizer = None | |
| self.recommendation_model = None | |
| self.property_classifier = None | |
| self.price_predictor = None | |
| self.preference_analyzer = None | |
| def initialize_property_classifier(self): | |
| """Initialize a small model for property type classification""" | |
| try: | |
| # Simple rule-based classifier with ML enhancement | |
| classifier = { | |
| 'luxury_threshold': 50000000, # 50M | |
| 'premium_threshold': 25000000, # 25M | |
| 'mid_range_threshold': 10000000, # 10M | |
| 'affordable_threshold': 5000000, # 5M | |
| 'property_type_weights': { | |
| 'Apartment': 1.0, | |
| 'Villa': 1.5, | |
| 'Penthouse': 2.0, | |
| 'Townhouse': 1.2, | |
| 'Plot': 1.3, | |
| 'Commercial': 1.8 | |
| }, | |
| 'location_weights': { | |
| 'premium': 1.5, | |
| 'upcoming': 1.2, | |
| 'established': 1.0, | |
| 'developing': 0.8 | |
| } | |
| } | |
| return classifier | |
| except Exception as e: | |
| logger.warning(f"⚠️ Property classifier initialization failed: {e}") | |
| return None | |
| def initialize_price_predictor(self): | |
| """Initialize a simple price range predictor""" | |
| try: | |
| predictor = { | |
| 'price_ranges': { | |
| 'ultra_luxury': (100000000, float('inf')), # 100M+ | |
| 'luxury': (50000000, 100000000), # 50M-100M | |
| 'premium': (25000000, 50000000), # 25M-50M | |
| 'mid_range': (10000000, 25000000), # 10M-25M | |
| 'affordable': (5000000, 10000000), # 5M-10M | |
| 'budget': (0, 5000000) # 0-5M | |
| }, | |
| 'price_preferences': { | |
| 'conservative': 0.7, # Prefer lower prices | |
| 'balanced': 1.0, # No preference | |
| 'premium': 1.3 # Prefer higher prices | |
| } | |
| } | |
| return predictor | |
| except Exception as e: | |
| logger.warning(f"⚠️ Price predictor initialization failed: {e}") | |
| return None | |
| def initialize_preference_analyzer(self): | |
| """Initialize preference analysis model""" | |
| try: | |
| analyzer = { | |
| 'engagement_weights': { | |
| 'high': 1.5, | |
| 'medium': 1.0, | |
| 'low': 0.7 | |
| }, | |
| 'viewing_pattern_weights': { | |
| 'morning': 1.1, | |
| 'afternoon': 1.0, | |
| 'evening': 0.9 | |
| }, | |
| 'property_type_preferences': { | |
| 'diverse': 1.2, | |
| 'focused': 1.0, | |
| 'exploratory': 1.1 | |
| } | |
| } | |
| return analyzer | |
| except Exception as e: | |
| logger.warning(f"⚠️ Preference analyzer initialization failed: {e}") | |
| return None | |
| def initialize_fallback_models(self): | |
| """Initialize fallback models if main models fail""" | |
| logger.info("🔄 Initializing fallback models...") | |
| try: | |
| from transformers import pipeline | |
| self.sentiment_analyzer = pipeline("sentiment-analysis", device=-1) | |
| self.tokenizer = None | |
| self.embedding_model = None | |
| self.behavior_classifier = None | |
| logger.info("✅ Fallback models loaded") | |
| except Exception as e: | |
| logger.error(f"❌ Fallback models failed: {e}") | |
| self.sentiment_analyzer = None | |
| self.tokenizer = None | |
| self.embedding_model = None | |
| self.behavior_classifier = None | |
| def get_property_embeddings(self, property_text: str) -> np.ndarray: | |
| """Generate embeddings for property text using Hugging Face model""" | |
| try: | |
| if self.tokenizer and self.embedding_model: | |
| inputs = self.tokenizer(property_text, return_tensors='pt', | |
| truncation=True, padding=True, max_length=512) | |
| with torch.no_grad(): | |
| outputs = self.embedding_model(**inputs) | |
| embeddings = outputs.last_hidden_state.mean(dim=1) | |
| return embeddings.numpy().flatten() | |
| else: | |
| # Fallback to TF-IDF | |
| return self.get_tfidf_embeddings(property_text) | |
| except Exception as e: | |
| logger.error(f"❌ Error generating embeddings: {e}") | |
| return self.get_tfidf_embeddings(property_text) | |
| def get_tfidf_embeddings(self, text: str) -> np.ndarray: | |
| """Fallback TF-IDF embeddings""" | |
| vectorizer = TfidfVectorizer(max_features=100, stop_words='english') | |
| try: | |
| tfidf = vectorizer.fit_transform([text]) | |
| return tfidf.toarray().flatten() | |
| except: | |
| return np.random.rand(100) # Random fallback | |
| def analyze_user_behavior_with_ai(self, analysis_data: Dict) -> Dict: | |
| """Use AI to analyze user behavior patterns""" | |
| logger.info("🧠 Analyzing user behavior with AI...") | |
| try: | |
| # Extract behavior patterns | |
| lead_qual = analysis_data.get('data', {}).get('lead_qualification', {}) | |
| analytics = analysis_data.get('data', {}).get('analytics', {}) | |
| properties = analysis_data.get('data', {}).get('properties', []) | |
| # Create behavior text for AI analysis | |
| behavior_text = self.create_behavior_description(lead_qual, analytics, properties) | |
| # Sentiment analysis of user engagement | |
| sentiment_result = None | |
| if self.sentiment_analyzer: | |
| try: | |
| sentiment_result = self.sentiment_analyzer(behavior_text[:512]) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Sentiment analysis failed: {e}") | |
| # AI-enhanced insights | |
| ai_insights = { | |
| 'behavior_summary': behavior_text, | |
| 'sentiment_analysis': sentiment_result, | |
| 'ai_personality_type': self.determine_personality_type(analytics), | |
| 'buying_motivation': self.analyze_buying_motivation(properties), | |
| 'recommendation_strategy': self.determine_recommendation_strategy(lead_qual), | |
| 'urgency_level': self.calculate_urgency_level(analytics), | |
| 'property_preferences': self.extract_ai_preferences(properties) | |
| } | |
| logger.info("✅ AI behavior analysis completed") | |
| return ai_insights | |
| except Exception as e: | |
| logger.error(f"❌ AI behavior analysis failed: {e}") | |
| return {'error': str(e)} | |
| def create_behavior_description(self, lead_qual: Dict, analytics: Dict, properties: List) -> str: | |
| """Create a text description of user behavior for AI analysis""" | |
| lead_status = lead_qual.get('lead_status', 'UNKNOWN') | |
| engagement_level = analytics.get('engagement_level', 'Unknown') | |
| total_views = len(properties) | |
| # Calculate average price interest | |
| prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] | |
| avg_price = sum(prices) / len(prices) if prices else 0 | |
| # Property types viewed | |
| prop_types = list(set(p.get('propertyTypeName', 'Unknown') for p in properties)) | |
| behavior_text = f""" | |
| Customer shows {engagement_level.lower()} engagement with {lead_status.lower()} lead status. | |
| They have viewed {total_views} properties with an average price interest of ₹{avg_price:,.0f}. | |
| Property types of interest include: {', '.join(prop_types[:3])}. | |
| Their viewing pattern suggests they are a {self.get_buyer_archetype(analytics)} buyer. | |
| """ | |
| return behavior_text.strip() | |
| def determine_personality_type(self, analytics: Dict) -> str: | |
| """Determine customer personality type using AI analysis""" | |
| engagement = analytics.get('engagement_level', 'Medium') | |
| viewing_patterns = analytics.get('viewing_patterns', {}) | |
| if engagement == 'Very High': | |
| return "Decisive Decision Maker" | |
| elif engagement == 'High': | |
| return "Research-Oriented Buyer" | |
| elif engagement == 'Medium': | |
| return "Cautious Evaluator" | |
| else: | |
| return "Passive Browser" | |
| def analyze_buying_motivation(self, properties: List) -> str: | |
| """Analyze what motivates the buyer based on property choices""" | |
| if not properties: | |
| return "Unknown motivation" | |
| # Analyze price ranges | |
| prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] | |
| if not prices: | |
| return "Price-conscious buyer" | |
| price_range = max(prices) - min(prices) | |
| avg_price = sum(prices) / len(prices) | |
| if price_range < avg_price * 0.2: # Consistent price range | |
| return "Budget-focused with clear price target" | |
| elif avg_price > 20000000: # High-value properties | |
| return "Luxury-seeking with investment focus" | |
| elif len(set(p.get('propertyTypeName') for p in properties)) > 3: | |
| return "Exploring options, needs guidance" | |
| else: | |
| return "Value-conscious with specific requirements" | |
| def determine_recommendation_strategy(self, lead_qual: Dict) -> str: | |
| """Determine the best recommendation strategy""" | |
| lead_status = lead_qual.get('lead_status', 'COLD') | |
| lead_score = lead_qual.get('lead_score', 0) | |
| if lead_status == 'HOT' or lead_score > 70: | |
| return "aggressive_immediate" # Show premium properties, create urgency | |
| elif lead_status == 'WARM' or lead_score > 50: | |
| return "nurturing_educational" # Provide comparisons, detailed info | |
| elif lead_status == 'LUKEWARM' or lead_score > 30: | |
| return "engagement_building" # Variety of options, build interest | |
| else: | |
| return "awareness_creation" # Basic information, build trust | |
| def calculate_urgency_level(self, analytics: Dict) -> str: | |
| """Calculate how urgent the customer's need is""" | |
| recent_activity = analytics.get('viewing_patterns', {}).get('peak_viewing_time') | |
| engagement = analytics.get('engagement_level', 'Low') | |
| if engagement in ['Very High', 'High'] and recent_activity: | |
| return "High" | |
| elif engagement == 'Medium': | |
| return "Medium" | |
| else: | |
| return "Low" | |
| def extract_ai_preferences(self, properties: List) -> Dict: | |
| """Extract detailed preferences using AI analysis""" | |
| if not properties: | |
| return {} | |
| # Analyze property features that attract the user | |
| features = { | |
| 'preferred_types': [], | |
| 'price_sensitivity': 'medium', | |
| 'location_preferences': [], | |
| 'feature_importance': {} | |
| } | |
| # Property types preference | |
| type_counts = {} | |
| for prop in properties: | |
| prop_type = prop.get('propertyTypeName', 'Unknown') | |
| type_counts[prop_type] = type_counts.get(prop_type, 0) + prop.get('viewCount', 1) | |
| # Return just the property type names as strings, not tuples | |
| features['preferred_types'] = [prop_type for prop_type, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)[:3]] | |
| # Price sensitivity analysis | |
| prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] | |
| if prices: | |
| price_std = np.std(prices) | |
| price_mean = np.mean(prices) | |
| cv = price_std / price_mean if price_mean > 0 else 0 | |
| if cv < 0.2: | |
| features['price_sensitivity'] = 'high' # Very specific about price | |
| elif cv > 0.5: | |
| features['price_sensitivity'] = 'low' # Flexible on price | |
| else: | |
| features['price_sensitivity'] = 'medium' | |
| return features | |
| def get_buyer_archetype(self, analytics: Dict) -> str: | |
| """Determine buyer archetype""" | |
| engagement = analytics.get('engagement_level', 'Medium') | |
| opportunity_score = analytics.get('opportunity_score', 50) | |
| if engagement == 'Very High' and opportunity_score > 80: | |
| return "motivated" | |
| elif engagement in ['High', 'Medium'] and opportunity_score > 60: | |
| return "serious" | |
| else: | |
| return "casual" | |
| def fetch_and_store_all_properties(self) -> bool: | |
| """Fetch ALL properties from API using PARALLEL processing and store in ChromaDB""" | |
| try: | |
| logger.info("🚀 Starting PARALLEL property fetching for faster performance...") | |
| # Use parallel fetching for speed | |
| all_properties = self.fetch_all_properties_parallel(max_workers=8) | |
| if not all_properties: | |
| logger.warning("⚠️ No properties fetched from API, falling back to sequential method...") | |
| # Fallback to the original sequential method | |
| return self.fetch_all_properties_sequential() | |
| logger.info(f"🎉 Successfully fetched {len(all_properties)} total properties using parallel processing!") | |
| # Store all properties in ChromaDB | |
| if all_properties: | |
| success = self.store_properties_in_chromadb(all_properties) | |
| return success | |
| else: | |
| logger.warning("⚠️ No properties to store") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error in parallel property fetch: {e}") | |
| logger.info("🔄 Falling back to sequential fetching...") | |
| return self.fetch_all_properties_sequential() | |
| def fetch_all_properties_sequential(self) -> bool: | |
| """Fallback sequential property fetching method""" | |
| try: | |
| logger.info("🌐 Fetching properties using sequential method...") | |
| all_properties = [] | |
| page = 1 | |
| page_size = 100 | |
| while True: | |
| try: | |
| params = { | |
| 'pageNumber': page, | |
| 'pageSize': page_size | |
| } | |
| logger.info(f"📄 Fetching page {page}...") | |
| response = requests.get(self.properties_api, params=params, verify=False, timeout=None) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Handle different response structures | |
| if isinstance(data, list): | |
| properties_batch = data | |
| elif isinstance(data, dict): | |
| if 'data' in data: | |
| properties_batch = data['data'] | |
| elif 'properties' in data: | |
| properties_batch = data['properties'] | |
| elif 'results' in data: | |
| properties_batch = data['results'] | |
| else: | |
| properties_batch = list(data.values()) if data else [] | |
| else: | |
| logger.warning(f"Unexpected response type on page {page}: {type(data)}") | |
| break | |
| if not properties_batch or len(properties_batch) == 0: | |
| logger.info(f"📄 Page {page} is empty - reached end") | |
| break | |
| all_properties.extend(properties_batch) | |
| logger.info(f"✅ Page {page}: {len(properties_batch)} properties (Total: {len(all_properties)})") | |
| page += 1 | |
| if page > 1000: # Safety limit | |
| logger.warning("⚠️ Reached page limit (1000) - stopping fetch") | |
| break | |
| except Exception as page_error: | |
| logger.error(f"❌ Error fetching page {page}: {page_error}") | |
| page += 1 | |
| if page > 5: | |
| break | |
| continue | |
| logger.info(f"🎉 Sequential fetch completed: {len(all_properties)} total properties!") | |
| if all_properties: | |
| success = self.store_properties_in_chromadb(all_properties) | |
| return success | |
| else: | |
| logger.warning("⚠️ No properties fetched from API") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error in sequential property fetch: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return False | |
| def store_properties_in_chromadb(self, properties: List[Dict]): | |
| """Store properties in ChromaDB with embeddings""" | |
| try: | |
| logger.info(f"🗄️ Storing {len(properties)} properties in ChromaDB...") | |
| # Ensure ChromaDB is properly initialized | |
| if self.properties_collection is None: | |
| logger.error("❌ ChromaDB properties collection not initialized, cannot store properties") | |
| return False | |
| # Clear existing properties to avoid duplicates | |
| existing_count = self.properties_collection.count() | |
| if existing_count > 0: | |
| logger.info(f"🗑️ Clearing {existing_count} existing properties...") | |
| # ChromaDB doesn't have a clear method, so we recreate the collection | |
| collection_name = self.properties_collection.name | |
| self.chroma_client.delete_collection(collection_name) | |
| # Get the same embedding function that was used initially | |
| embedding_function = self.get_custom_embedding_function() | |
| if embedding_function is None: | |
| logger.error("❌ No embedding function available - cannot recreate collection") | |
| return False | |
| self.properties_collection = self.chroma_client.create_collection( | |
| name=collection_name, | |
| metadata={"description": "Real estate properties with embeddings"}, | |
| embedding_function=embedding_function | |
| ) | |
| # Process properties in batches for efficiency | |
| batch_size = 50 | |
| total_stored = 0 | |
| for i in range(0, len(properties), batch_size): | |
| batch = properties[i:i+batch_size] | |
| # Prepare data for ChromaDB | |
| documents = [] | |
| metadatas = [] | |
| ids = [] | |
| for prop in batch: | |
| # Create comprehensive text description for embeddings | |
| description = self.create_property_description(prop) | |
| documents.append(description) | |
| # Store all property metadata | |
| metadata = { | |
| 'id': str(prop.get('id', prop.get('propertyId', f'prop_{i}'))), | |
| 'propertyName': prop.get('propertyName', ''), | |
| 'propertyTypeName': prop.get('propertyTypeName', prop.get('typeName', '')), | |
| 'price': float(prop.get('price', 0) or prop.get('marketValue', 0) or 0), | |
| 'address': prop.get('address', prop.get('location', '')), | |
| 'beds': int(prop.get('beds', 0) or prop.get('bedrooms', 0) or 0), | |
| 'baths': int(prop.get('baths', 0) or prop.get('bathrooms', 0) or 0), | |
| 'sqft': float(prop.get('totalSquareFeet', 0) or prop.get('area', 0) or 0), | |
| 'description': prop.get('description', ''), | |
| 'features': json.dumps(prop.get('features', [])) if prop.get('features') else '[]' | |
| } | |
| # Clean metadata to remove None values | |
| metadata = self.clean_metadata_for_chromadb(metadata) | |
| metadatas.append(metadata) | |
| # Use property ID as document ID | |
| prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{total_stored + len(ids)}'))) | |
| ids.append(prop_id) | |
| # Add batch to ChromaDB | |
| try: | |
| # Validate that all metadatas are clean | |
| for idx, metadata in enumerate(metadatas): | |
| for key, value in metadata.items(): | |
| if value is None: | |
| logger.warning(f"⚠️ Found None value in metadata[{idx}][{key}], this should have been cleaned") | |
| self.properties_collection.add( | |
| documents=documents, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| total_stored += len(batch) | |
| logger.info(f"📊 Stored batch {i//batch_size + 1}: {len(batch)} properties (Total: {total_stored})") | |
| except Exception as batch_error: | |
| logger.error(f"❌ Error storing batch {i//batch_size + 1}: {batch_error}") | |
| # Check if it's an embedding function error | |
| if "embedding function" in str(batch_error).lower(): | |
| logger.error("🔧 This appears to be an embedding function issue. Checking collection setup...") | |
| if self.properties_collection is None: | |
| logger.error("❌ Properties collection is None - ChromaDB not properly initialized") | |
| else: | |
| logger.error("❌ Collection exists but embedding function may be missing") | |
| # Log the problematic data for debugging | |
| logger.error(f"Batch size: {len(batch)}, Documents: {len(documents)}, Metadatas: {len(metadatas)}, IDs: {len(ids)}") | |
| if metadatas: | |
| logger.error(f"Sample metadata keys: {list(metadatas[0].keys())}") | |
| for idx, metadata in enumerate(metadatas[:3]): # Log first 3 for debugging | |
| logger.error(f"Metadata {idx}: {metadata}") | |
| # Log sample document for debugging | |
| if documents: | |
| logger.error(f"Sample document: {documents[0][:200]}...") | |
| raise # Re-raise to stop processing | |
| logger.info(f"✅ Successfully stored {total_stored} properties in ChromaDB!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Error storing properties in ChromaDB: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return False | |
| def create_property_description(self, prop: Dict) -> str: | |
| """Create a comprehensive text description for property embeddings""" | |
| parts = [] | |
| # Basic info | |
| name = prop.get('propertyName', prop.get('name', '')) | |
| if name: | |
| parts.append(f"Property: {name}") | |
| property_type = prop.get('propertyTypeName', prop.get('typeName', '')) | |
| if property_type: | |
| parts.append(f"Type: {property_type}") | |
| # Location | |
| location = prop.get('address', prop.get('location', '')) | |
| if location: | |
| parts.append(f"Location: {location}") | |
| # Price | |
| price = prop.get('price', 0) or prop.get('marketValue', 0) or 0 | |
| if price > 0: | |
| parts.append(f"Price: ${price:,.0f}") | |
| # Specs | |
| beds = prop.get('beds', 0) or prop.get('bedrooms', 0) or 0 | |
| baths = prop.get('baths', 0) or prop.get('bathrooms', 0) or 0 | |
| sqft = prop.get('totalSquareFeet', 0) or prop.get('area', 0) or 0 | |
| if beds > 0: | |
| parts.append(f"{beds} bedrooms") | |
| if baths > 0: | |
| parts.append(f"{baths} bathrooms") | |
| if sqft > 0: | |
| parts.append(f"{sqft:,.0f} sq ft") | |
| # Description | |
| description = prop.get('description', '') | |
| if description: | |
| parts.append(f"Description: {description}") | |
| # Features | |
| features = prop.get('features', []) | |
| if features: | |
| if isinstance(features, list): | |
| parts.append(f"Features: {', '.join(features)}") | |
| elif isinstance(features, str): | |
| parts.append(f"Features: {features}") | |
| return '. '.join(parts) | |
| def clean_metadata_for_chromadb(self, metadata: Dict) -> Dict: | |
| """Clean metadata to ensure no None values for ChromaDB storage""" | |
| cleaned = {} | |
| for key, value in metadata.items(): | |
| if value is None: | |
| # Set appropriate defaults based on expected type | |
| if key in ['price', 'sqft']: | |
| cleaned[key] = 0.0 | |
| elif key in ['beds', 'baths']: | |
| cleaned[key] = 0 | |
| else: | |
| cleaned[key] = '' | |
| elif isinstance(value, str): | |
| # Ensure string is not None and handle encoding issues | |
| cleaned[key] = str(value).strip() if value else '' | |
| elif isinstance(value, (int, float)): | |
| # Handle NaN and infinity values | |
| if value != value or value == float('inf') or value == float('-inf'): | |
| cleaned[key] = 0.0 if key in ['price', 'sqft'] else 0 | |
| else: | |
| # Ensure proper type conversion | |
| if key in ['price', 'sqft']: | |
| cleaned[key] = float(value) | |
| elif key in ['beds', 'baths']: | |
| cleaned[key] = int(value) | |
| else: | |
| cleaned[key] = value | |
| elif isinstance(value, bool): | |
| # Convert boolean to string for ChromaDB compatibility | |
| cleaned[key] = str(value).lower() | |
| else: | |
| # Convert any other type to string | |
| try: | |
| cleaned[key] = str(value) if value is not None else '' | |
| except Exception: | |
| cleaned[key] = '' | |
| return cleaned | |
| def find_similar_properties_ai(self, user_analysis: Dict, ai_insights: Dict) -> List[Dict]: | |
| """Find similar properties using enhanced AI recommendation system""" | |
| try: | |
| logger.info("🔍 Finding AI recommendations using enhanced system...") | |
| # Use the new enhanced AI recommendation system | |
| recommendations = self.get_enhanced_ai_recommendations(user_analysis, ai_insights, count=15) | |
| if recommendations: | |
| logger.info(f"✅ Found {len(recommendations)} enhanced AI recommendations") | |
| return recommendations | |
| else: | |
| logger.warning("⚠️ No enhanced recommendations found, using fallback") | |
| return self.get_fallback_recommendations(10) | |
| except Exception as e: | |
| logger.error(f"❌ Error in enhanced AI recommendations: {e}") | |
| return self.get_fallback_recommendations(10) | |
| def create_multiple_user_queries(self, viewed_properties: List[Dict], preferences: Dict, personality_type: str) -> List[str]: | |
| """Create multiple search queries for better property coverage""" | |
| queries = [] | |
| # Query 1: Based on user's actual viewed properties | |
| if viewed_properties: | |
| primary_query = self.create_user_preference_query(viewed_properties, preferences, personality_type) | |
| queries.append(primary_query) | |
| # Query 2: Based on preferred property types | |
| property_types = preferences.get('preferred_types', []) | |
| if property_types: | |
| # Handle both strings and tuples in property_types | |
| type_strings = [] | |
| for prop_type in property_types: | |
| if isinstance(prop_type, tuple): | |
| # If it's a tuple, take the first element (usually the type name) | |
| type_strings.append(str(prop_type[0]) if prop_type else 'Property') | |
| elif isinstance(prop_type, str): | |
| type_strings.append(prop_type) | |
| else: | |
| type_strings.append(str(prop_type)) | |
| if type_strings: | |
| type_query = f"Property types: {', '.join(type_strings)}. Modern amenities and good location." | |
| queries.append(type_query) | |
| # Query 3: Based on price range preferences | |
| price_range = preferences.get('price_range', {}) | |
| if price_range: | |
| min_price = price_range.get('min', 0) | |
| max_price = price_range.get('max', 0) | |
| if min_price > 0 or max_price > 0: | |
| price_query = f"Properties in price range {min_price} to {max_price}. Good value for money." | |
| queries.append(price_query) | |
| # Query 4: Generic high-quality properties | |
| queries.append("High-quality properties with modern amenities, good location, and excellent facilities.") | |
| # Query 5: Diverse property types | |
| queries.append("Diverse mix of Villas, Flats, Apartments, and Houses with different price ranges.") | |
| return queries[:5] # Limit to 5 queries for performance | |
| def enhance_recommendations_with_ai_parallel(self, properties: List[Dict], viewed_properties: List[Dict], | |
| preferences: Dict, strategy: str) -> List[Dict]: | |
| """Enhanced AI scoring with parallel processing for speed""" | |
| logger.info(f"🧠 Enhanced {len(properties)} properties with AI analysis using parallel processing") | |
| if not properties: | |
| return [] | |
| # Process properties in parallel batches | |
| batch_size = 20 | |
| processed_properties = [] | |
| with ThreadPoolExecutor(max_workers=6) as executor: | |
| # Split properties into batches | |
| batches = [properties[i:i + batch_size] for i in range(0, len(properties), batch_size)] | |
| # Submit batch processing tasks | |
| future_to_batch = { | |
| executor.submit( | |
| self.process_property_batch_ai, | |
| batch, viewed_properties, preferences, strategy | |
| ): batch_idx | |
| for batch_idx, batch in enumerate(batches) | |
| } | |
| for future in as_completed(future_to_batch): | |
| batch_idx = future_to_batch[future] | |
| try: | |
| batch_results = future.result() | |
| processed_properties.extend(batch_results) | |
| logger.info(f"✅ Processed batch {batch_idx + 1}/{len(batches)} with AI analysis") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to process batch {batch_idx + 1}: {e}") | |
| # Sort by AI score | |
| processed_properties.sort(key=lambda x: x.get('ai_score', 0), reverse=True) | |
| return processed_properties | |
| def process_property_batch_ai(self, properties_batch: List[Dict], viewed_properties: List[Dict], | |
| preferences: Dict, strategy: str) -> List[Dict]: | |
| """Process a batch of properties with AI analysis""" | |
| enhanced_batch = [] | |
| for prop in properties_batch: | |
| try: | |
| # Calculate AI score based on user preferences and behavior | |
| ai_score = self.calculate_ai_property_score(prop, viewed_properties, preferences, strategy) | |
| # Add AI insights | |
| prop['ai_score'] = ai_score | |
| prop['ai_match_reasons'] = self.generate_match_reasons(prop, preferences) | |
| prop['ai_recommendation_confidence'] = min(ai_score / 100, 1.0) | |
| enhanced_batch.append(prop) | |
| except Exception as e: | |
| logger.error(f"❌ Error processing property {prop.get('id', 'unknown')}: {e}") | |
| # Include property without AI enhancement | |
| prop['ai_score'] = 50 # Default score | |
| enhanced_batch.append(prop) | |
| return enhanced_batch | |
| def get_guaranteed_additional_recommendations(self, current_recommendations: List[Dict], | |
| viewed_properties: List[Dict], needed_count: int) -> List[Dict]: | |
| """Get additional recommendations with broader search criteria to guarantee minimum count""" | |
| logger.info(f"➕ Getting {needed_count} additional recommendations with broader criteria...") | |
| if self.properties_collection is None or needed_count <= 0: | |
| return [] | |
| try: | |
| # Get excluded IDs | |
| excluded_ids = set() | |
| for prop in current_recommendations + viewed_properties: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id: | |
| excluded_ids.add(prop_id) | |
| # Use multiple broader queries in parallel | |
| broader_queries = [ | |
| "Properties with good amenities and location", | |
| "Affordable housing options with modern features", | |
| "Premium properties with luxury amenities", | |
| "Family-friendly properties with good connectivity", | |
| "Investment properties with high potential" | |
| ] | |
| additional_properties = [] | |
| with ThreadPoolExecutor(max_workers=3) as executor: | |
| future_to_query = { | |
| executor.submit( | |
| self.search_properties_with_query, | |
| query, excluded_ids, needed_count | |
| ): query | |
| for query in broader_queries | |
| } | |
| for future in as_completed(future_to_query): | |
| try: | |
| properties = future.result() | |
| additional_properties.extend(properties) | |
| if len(additional_properties) >= needed_count: | |
| break | |
| except Exception as e: | |
| logger.error(f"❌ Broader search failed: {e}") | |
| # Remove duplicates | |
| seen_ids = set() | |
| unique_additional = [] | |
| for prop in additional_properties: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id and prop_id not in seen_ids and prop_id not in excluded_ids: | |
| seen_ids.add(prop_id) | |
| unique_additional.append(prop) | |
| if len(unique_additional) >= needed_count: | |
| break | |
| logger.info(f"➕ Added {len(unique_additional)} additional recommendations") | |
| return unique_additional[:needed_count] | |
| except Exception as e: | |
| logger.error(f"❌ Error getting additional recommendations: {e}") | |
| return [] | |
| def get_diverse_properties(self, current_recommendations: List[Dict], | |
| viewed_properties: List[Dict], needed_count: int) -> List[Dict]: | |
| """Get diverse properties to ensure variety in recommendations""" | |
| logger.info(f"🎲 Getting {needed_count} diverse properties...") | |
| if self.properties_collection is None or needed_count <= 0: | |
| return [] | |
| try: | |
| # Get all property IDs to exclude | |
| excluded_ids = set() | |
| for prop in current_recommendations + viewed_properties: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id: | |
| excluded_ids.add(prop_id) | |
| # Get random selection from different property types | |
| diverse_queries = [ | |
| "Villa with luxury amenities", | |
| "Apartment with modern facilities", | |
| "House with family features", | |
| "Flat with good connectivity", | |
| "Commercial property with investment potential" | |
| ] | |
| diverse_properties = [] | |
| for query in diverse_queries: | |
| try: | |
| results = self.properties_collection.query( | |
| query_texts=[query], | |
| n_results=min(5, needed_count), | |
| include=["metadatas", "documents", "distances"] | |
| ) | |
| if results and results['metadatas']: | |
| for metadata in results['metadatas'][0]: | |
| prop_id = metadata.get('id', '') | |
| if prop_id not in excluded_ids: | |
| diverse_properties.append(metadata) | |
| excluded_ids.add(prop_id) | |
| if len(diverse_properties) >= needed_count: | |
| break | |
| if len(diverse_properties) >= needed_count: | |
| break | |
| except Exception as e: | |
| logger.error(f"❌ Error in diverse search with query '{query}': {e}") | |
| continue | |
| logger.info(f"🎲 Found {len(diverse_properties)} diverse properties") | |
| return diverse_properties[:needed_count] | |
| except Exception as e: | |
| logger.error(f"❌ Error getting diverse properties: {e}") | |
| return [] | |
| def get_emergency_fallback_properties(self, needed_count: int) -> List[Dict]: | |
| """Emergency fallback when all else fails - return any available properties""" | |
| logger.warning(f"🚨 Emergency fallback: getting {needed_count} any available properties") | |
| try: | |
| if self.properties_collection is None: | |
| return [] | |
| # Get any properties from ChromaDB | |
| results = self.properties_collection.query( | |
| query_texts=["property"], | |
| n_results=needed_count, | |
| include=["metadatas", "documents"] | |
| ) | |
| if results and results['metadatas']: | |
| properties = results['metadatas'][0] | |
| logger.info(f"🚨 Emergency fallback provided {len(properties)} properties") | |
| return properties | |
| except Exception as e: | |
| logger.error(f"❌ Emergency fallback failed: {e}") | |
| return [] | |
| def create_user_preference_query(self, viewed_properties: List[Dict], preferences: Dict, personality_type: str) -> str: | |
| """Create a comprehensive query based on user behavior and preferences""" | |
| query_parts = [] | |
| # Property types from user behavior | |
| property_types = set() | |
| for prop in viewed_properties: | |
| prop_type = prop.get('propertyTypeName', '') | |
| if prop_type: | |
| property_types.add(prop_type) | |
| if property_types: | |
| query_parts.append(f"Property types: {', '.join(property_types)}") | |
| # Price range from viewed properties | |
| prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] | |
| if prices: | |
| min_price = min(prices) | |
| max_price = max(prices) | |
| avg_price = sum(prices) / len(prices) | |
| query_parts.append(f"Price range {min_price} to {max_price}, average {avg_price}") | |
| # Location preferences | |
| locations = set() | |
| for prop in viewed_properties: | |
| address = prop.get('address', '') | |
| if address: | |
| # Extract city/area from address | |
| parts = address.split(',') | |
| if parts: | |
| locations.add(parts[0].strip()) | |
| if locations: | |
| query_parts.append(f"Preferred locations: {', '.join(locations)}") | |
| # Specifications preferences | |
| beds_list = [p.get('beds', 0) for p in viewed_properties if p.get('beds', 0) > 0] | |
| if beds_list: | |
| avg_beds = sum(beds_list) / len(beds_list) | |
| query_parts.append(f"Preferred {avg_beds:.0f} bedrooms") | |
| # Personality-based preferences | |
| if personality_type == 'Decisive': | |
| query_parts.append("Premium properties, luxury features, move-in ready") | |
| elif personality_type == 'Research-Oriented': | |
| query_parts.append("Good value, detailed specifications, investment potential") | |
| elif personality_type == 'Cautious': | |
| query_parts.append("Established neighborhoods, reliable developers, good amenities") | |
| else: | |
| query_parts.append("Modern amenities, good connectivity, family-friendly") | |
| return '. '.join(query_parts) | |
| def search_similar_properties_chromadb(self, query: str, viewed_properties: List[Dict], min_results: int = 15) -> List[Dict]: | |
| """Search for similar properties in ChromaDB""" | |
| try: | |
| # Auto-initialize ChromaDB if not available | |
| if self.properties_collection is None: | |
| logger.warning("⚠️ ChromaDB properties collection not initialized, auto-initializing...") | |
| self.initialize_chromadb() | |
| # If still not available after init, try to fetch properties | |
| if self.properties_collection is None: | |
| logger.warning("⚠️ ChromaDB still not available, fetching properties...") | |
| self.auto_fetch_and_store_properties() | |
| # Final check | |
| if self.properties_collection is None: | |
| logger.error("❌ ChromaDB properties collection not available after auto-setup") | |
| return [] | |
| # Check if collection is empty and auto-populate | |
| try: | |
| collection_count = self.properties_collection.count() | |
| logger.info(f"📊 ChromaDB collection has {collection_count} properties") | |
| if collection_count == 0: | |
| logger.warning("⚠️ ChromaDB collection is empty, auto-fetching properties...") | |
| self.auto_fetch_and_store_properties() | |
| else: | |
| logger.info(f"✅ ChromaDB collection has {collection_count} properties - ready for recommendations") | |
| except Exception as count_error: | |
| logger.warning(f"⚠️ Collection access error: {count_error}") | |
| # Reinitialize the collection | |
| logger.info("🔄 Reinitializing ChromaDB collection...") | |
| self.initialize_chromadb() | |
| # Try to fetch properties anyway | |
| self.auto_fetch_and_store_properties() | |
| # Get IDs of viewed properties to exclude | |
| viewed_ids = set() | |
| for prop in viewed_properties: | |
| prop_id = str(prop.get('propertyId', prop.get('id', ''))) | |
| if prop_id: | |
| viewed_ids.add(prop_id) | |
| # Perform semantic search with proper result count | |
| collection_count = self.properties_collection.count() | |
| n_results = max(1, min(min_results * 2, collection_count, 100)) # Ensure n_results is at least 1 | |
| logger.info(f"🔍 Querying ChromaDB: '{query}' (requesting {n_results} results from {collection_count} total)") | |
| results = self.properties_collection.query( | |
| query_texts=[query], | |
| n_results=n_results, | |
| include=["metadatas", "documents", "distances"] | |
| ) | |
| # Convert results to property format | |
| similar_properties = [] | |
| if results['metadatas'] and results['metadatas'][0]: | |
| for i, metadata in enumerate(results['metadatas'][0]): | |
| # Skip viewed properties | |
| if metadata.get('id') in viewed_ids: | |
| continue | |
| # Safely extract metadata fields with fallbacks | |
| try: | |
| property_data = { | |
| 'id': metadata.get('id', ''), | |
| 'propertyId': metadata.get('id', ''), | |
| 'propertyName': metadata.get('propertyName', metadata.get('name', 'Unknown Property')), | |
| 'propertyTypeName': metadata.get('propertyTypeName', metadata.get('typeName', metadata.get('type', 'Property'))), | |
| 'price': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), | |
| 'marketValue': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), | |
| 'address': metadata.get('address', metadata.get('location', '')), | |
| 'location': metadata.get('address', metadata.get('location', '')), | |
| 'beds': int(metadata.get('beds', 0) or metadata.get('bedrooms', 0) or 0), | |
| 'baths': int(metadata.get('baths', 0) or metadata.get('bathrooms', 0) or 0), | |
| 'totalSquareFeet': float(metadata.get('sqft', 0) or metadata.get('area', 0) or 0), | |
| 'description': metadata.get('description', ''), | |
| 'features': self.safe_json_parse(metadata.get('features', '[]')), | |
| 'similarity_score': 1 - results['distances'][0][i], # Convert distance to similarity | |
| 'chromadb_rank': i + 1 | |
| } | |
| # Add any additional fields that might exist | |
| for key, value in metadata.items(): | |
| if key not in property_data: | |
| property_data[key] = value | |
| similar_properties.append(property_data) | |
| # Stop when we have enough | |
| if len(similar_properties) >= min_results: | |
| break | |
| except Exception as field_error: | |
| logger.warning(f"⚠️ Error processing metadata field: {field_error}") | |
| # Try to create a minimal property object | |
| try: | |
| minimal_property = { | |
| 'id': metadata.get('id', f'prop_{i}'), | |
| 'propertyId': metadata.get('id', f'prop_{i}'), | |
| 'propertyName': 'Property', | |
| 'propertyTypeName': 'Property', | |
| 'price': 0.0, | |
| 'marketValue': 0.0, | |
| 'address': '', | |
| 'location': '', | |
| 'beds': 0, | |
| 'baths': 0, | |
| 'totalSquareFeet': 0.0, | |
| 'description': '', | |
| 'features': [], | |
| 'similarity_score': 1 - results['distances'][0][i], | |
| 'chromadb_rank': i + 1 | |
| } | |
| similar_properties.append(minimal_property) | |
| except Exception as minimal_error: | |
| logger.error(f"❌ Failed to create minimal property: {minimal_error}") | |
| continue | |
| logger.info(f"🎯 ChromaDB found {len(similar_properties)} similar properties") | |
| return similar_properties | |
| except Exception as e: | |
| logger.error(f"❌ Error searching ChromaDB: {e}") | |
| return [] | |
| def auto_fetch_and_store_properties(self): | |
| """Automatically fetch and store properties in ChromaDB""" | |
| try: | |
| logger.info("🚀 Auto-fetching properties for ChromaDB...") | |
| # Fetch properties from backend | |
| success = self.fetch_all_properties_parallel( | |
| max_workers=5, | |
| page_size=100, | |
| max_pages=10 | |
| ) | |
| if success: | |
| logger.info("✅ Auto-fetched and stored properties in ChromaDB successfully") | |
| return True | |
| else: | |
| logger.warning("⚠️ Failed to auto-fetch from backend, using fallback properties...") | |
| # Create some fallback properties for testing | |
| fallback_properties = self.create_fallback_properties() | |
| fallback_success = self._store_properties_in_chromadb_parallel(fallback_properties) | |
| if fallback_success: | |
| logger.info(f"✅ Stored {len(fallback_properties)} fallback properties in ChromaDB") | |
| return True | |
| else: | |
| logger.error("❌ Failed to store fallback properties") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error in auto-fetch and store: {e}") | |
| return False | |
| def create_fallback_properties(self) -> List[Dict]: | |
| """Create fallback properties for testing when backend is unavailable""" | |
| fallback_properties = [] | |
| property_types = ['Apartment', 'Villa', 'House', 'Condo', 'Townhouse'] | |
| locations = ['Mumbai', 'Delhi', 'Bangalore', 'Chennai', 'Hyderabad', 'Pune', 'Kolkata'] | |
| for i in range(50): # Create 50 fallback properties | |
| property_data = { | |
| 'propertyId': f'fallback_{i+1}', | |
| 'property_name': f'Fallback Property {i+1}', | |
| 'propertyType': property_types[i % len(property_types)], | |
| 'price': 1000000 + (i * 100000), # Varying prices | |
| 'location': locations[i % len(locations)], | |
| 'beds': (i % 4) + 1, | |
| 'baths': (i % 3) + 1, | |
| 'totalSquareFeet': 800 + (i * 50), | |
| 'description': f'Beautiful {property_types[i % len(property_types)]} in {locations[i % len(locations)]} with modern amenities', | |
| 'features': ['Parking', 'Security', 'Garden', 'Gym'][:(i % 4) + 1], | |
| 'yearBuilt': 2020 - (i % 10), | |
| 'propertyStatus': 'For Sale', | |
| 'listingDate': '2024-01-01', | |
| 'agent': f'Agent {i % 10 + 1}', | |
| 'is_fallback': True | |
| } | |
| fallback_properties.append(property_data) | |
| logger.info(f"✅ Created {len(fallback_properties)} fallback properties") | |
| return fallback_properties | |
| def _get_properties_with_multiple_queries(self, queries: List[str], viewed_properties: List[Dict], count: int) -> List[Dict]: | |
| """Helper function to get properties using multiple queries with fallback""" | |
| all_results = [] | |
| for query in queries: | |
| try: | |
| results = self.search_similar_properties_chromadb(query, viewed_properties, count) | |
| all_results.extend(results) | |
| if len(all_results) >= count: | |
| break | |
| except Exception as e: | |
| logger.warning(f"⚠️ Query '{query}' failed: {e}") | |
| continue | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_results = [] | |
| for prop in all_results: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id not in seen: | |
| seen.add(prop_id) | |
| unique_results.append(prop) | |
| if len(unique_results) >= count: | |
| break | |
| # If still not enough, try generic queries | |
| if len(unique_results) < count: | |
| logger.info(f"🔄 Only {len(unique_results)} properties found, trying generic queries...") | |
| generic_queries = [ | |
| "modern property good amenities", | |
| "residential property parking security", | |
| "apartment villa house property", | |
| "property with good facilities" | |
| ] | |
| for query in generic_queries: | |
| try: | |
| results = self.search_similar_properties_chromadb(query, [], count) | |
| for prop in results: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id not in seen and len(unique_results) < count: | |
| seen.add(prop_id) | |
| unique_results.append(prop) | |
| if len(unique_results) >= count: | |
| break | |
| except Exception as e: | |
| continue | |
| return unique_results[:count] | |
| def get_multi_ai_property_recommendations(self, analysis_data: Dict, ai_insights: Dict, email_type: str, count: int = 15) -> List[Dict]: | |
| """Get property recommendations using multiple AI models based on email type""" | |
| try: | |
| logger.info(f"🤖 Getting {email_type} recommendations using specialized AI model...") | |
| # Extract customer preferences and behavior patterns | |
| preferences = self.extract_customer_preferences(analysis_data, ai_insights) | |
| behavioral_patterns = self.analyze_behavioral_patterns(analysis_data, ai_insights) | |
| viewed_properties = analysis_data.get('data', {}).get('properties', []) | |
| # Use different AI strategies based on email type | |
| if email_type == 'property_based': | |
| return self._get_property_type_based_recommendations(preferences, viewed_properties, count) | |
| elif email_type == 'price_based': | |
| return self._get_price_based_recommendations(preferences, viewed_properties, count) | |
| elif email_type == 'location_based': | |
| return self._get_location_based_recommendations(preferences, viewed_properties, count) | |
| elif email_type == 'similarity_based': | |
| return self._get_similarity_based_recommendations(viewed_properties, preferences, count) | |
| elif email_type == 'behavioral_based': | |
| return self._get_behavioral_based_recommendations(behavioral_patterns, viewed_properties, count) | |
| elif email_type == 'premium_properties': | |
| return self._get_premium_properties_recommendations(preferences, count) | |
| elif email_type == 'budget_friendly': | |
| return self._get_budget_friendly_recommendations(preferences, count) | |
| elif email_type == 'trending_properties': | |
| return self._get_trending_properties_recommendations(preferences, count) | |
| elif email_type == 'family_oriented': | |
| return self._get_family_oriented_recommendations(preferences, count) | |
| elif email_type == 'investment_opportunities': | |
| return self._get_investment_opportunities_recommendations(preferences, ai_insights, count) | |
| else: | |
| # Fallback to general recommendations | |
| return self.get_enhanced_ai_recommendations(analysis_data, ai_insights, count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in multi-AI property recommendations for {email_type}: {e}") | |
| return [] | |
| def _get_property_type_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: | |
| """AI Model 1: Property type preference analysis""" | |
| try: | |
| preferred_types = preferences.get('preferred_property_types', []) | |
| if not preferred_types: | |
| # Analyze viewed properties to infer preferences | |
| type_counts = {} | |
| for prop in viewed_properties: | |
| prop_type = prop.get('propertyType', '') | |
| if prop_type: | |
| type_counts[prop_type] = type_counts.get(prop_type, 0) + 1 | |
| preferred_types = [k for k, v in sorted(type_counts.items(), key=lambda x: x[1], reverse=True)] | |
| # Build queries based on property type preferences | |
| queries = [] | |
| if preferred_types: | |
| queries.extend([ | |
| f"property type {' or '.join(preferred_types[:3])} with modern amenities", | |
| f"{preferred_types[0]} property modern facilities" if preferred_types else "modern property", | |
| f"residential {preferred_types[0] if preferred_types else 'apartment'} premium amenities" | |
| ]) | |
| else: | |
| queries.extend([ | |
| "modern property with good amenities", | |
| "residential apartment villa house premium", | |
| "property with parking security amenities" | |
| ]) | |
| return self._get_properties_with_multiple_queries(queries, viewed_properties, count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in property type based recommendations: {e}") | |
| return [] | |
| def _get_price_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: | |
| """AI Model 2: Price range optimization""" | |
| try: | |
| price_range = preferences.get('price_range', {}) | |
| min_price = price_range.get('min_price', 0) | |
| max_price = price_range.get('max_price', 0) | |
| # Build price-based queries | |
| queries = [] | |
| if min_price > 0 and max_price > 0: | |
| avg_price = (min_price + max_price) / 2 | |
| queries.extend([ | |
| f"property price range {min_price} to {max_price} rupees affordable good value", | |
| f"affordable property around {avg_price} rupees", | |
| f"budget property good value for money under {max_price}" | |
| ]) | |
| else: | |
| # Analyze viewed properties for price patterns | |
| prices = [prop.get('price', 0) for prop in viewed_properties if prop.get('price', 0) > 0] | |
| if prices: | |
| avg_price = sum(prices) / len(prices) | |
| queries.extend([ | |
| f"property around {avg_price} rupees good value for money", | |
| f"affordable property budget range", | |
| f"good value property reasonable price" | |
| ]) | |
| else: | |
| queries.extend([ | |
| "affordable property good value for money", | |
| "budget friendly property reasonable price", | |
| "economical property good investment" | |
| ]) | |
| return self._get_properties_with_multiple_queries(queries, viewed_properties, count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in price based recommendations: {e}") | |
| return [] | |
| def _get_location_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: | |
| """AI Model 3: Location preference analysis""" | |
| try: | |
| preferred_locations = preferences.get('preferred_locations', []) | |
| if not preferred_locations: | |
| # Extract locations from viewed properties | |
| locations = [prop.get('location', '') for prop in viewed_properties if prop.get('location')] | |
| preferred_locations = list(set(locations)) | |
| queries = [] | |
| if preferred_locations: | |
| queries.extend([ | |
| f"property in {' or '.join(preferred_locations[:3])} good location connectivity", | |
| f"property near {preferred_locations[0] if preferred_locations else 'city center'} transport", | |
| f"residential property {preferred_locations[0] if preferred_locations else 'prime area'} amenities" | |
| ]) | |
| else: | |
| queries.extend([ | |
| "property in prime location good connectivity transport", | |
| "property near metro station bus transport", | |
| "property central location amenities nearby" | |
| ]) | |
| return self._get_properties_with_multiple_queries(queries, viewed_properties, count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in location based recommendations: {e}") | |
| return [] | |
| def _get_similarity_based_recommendations(self, viewed_properties: List[Dict], preferences: Dict, count: int) -> List[Dict]: | |
| """AI Model 4: Semantic similarity analysis""" | |
| try: | |
| if not viewed_properties: | |
| queries = ["modern apartment villa house property", "residential property good amenities"] | |
| else: | |
| # Create queries based on viewed property characteristics | |
| most_viewed = viewed_properties[0] if viewed_properties else {} | |
| property_type = most_viewed.get('propertyType', 'apartment') | |
| location = most_viewed.get('location', '') | |
| beds = most_viewed.get('beds', 2) | |
| queries = [ | |
| f"similar to {property_type} in {location} with {beds} bedrooms modern amenities", | |
| f"{property_type} property modern facilities parking", | |
| f"residential {property_type} good amenities security" | |
| ] | |
| return self._get_properties_with_multiple_queries(queries, viewed_properties, count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in similarity based recommendations: {e}") | |
| return [] | |
| def _get_behavioral_based_recommendations(self, behavioral_patterns: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: | |
| """AI Model 5: Behavioral pattern analysis""" | |
| try: | |
| engagement_level = behavioral_patterns.get('engagement_level', 'medium') | |
| if engagement_level == 'high': | |
| queries = ["premium property luxury amenities high-end features", "luxury villa penthouse executive", "high-end property modern amenities"] | |
| elif engagement_level == 'low': | |
| queries = ["simple property basic amenities affordable", "budget property economical good value", "affordable residential property"] | |
| else: | |
| queries = ["balanced property good amenities reasonable price", "moderate property facilities parking", "residential property good value"] | |
| return self._get_properties_with_multiple_queries(queries, viewed_properties, count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in behavioral based recommendations: {e}") | |
| return [] | |
| def _get_premium_properties_recommendations(self, preferences: Dict, count: int) -> List[Dict]: | |
| """AI Model 6: Premium property analysis""" | |
| try: | |
| queries = [ | |
| "luxury premium property high-end amenities villa penthouse executive", | |
| "premium villa luxury amenities swimming pool", | |
| "high-end property executive modern facilities" | |
| ] | |
| return self._get_properties_with_multiple_queries(queries, [], count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in premium properties recommendations: {e}") | |
| return [] | |
| def _get_budget_friendly_recommendations(self, preferences: Dict, count: int) -> List[Dict]: | |
| """AI Model 7: Budget optimization analysis""" | |
| try: | |
| queries = [ | |
| "affordable budget friendly property good value money economical", | |
| "budget property affordable reasonable price", | |
| "economical property good investment value" | |
| ] | |
| return self._get_properties_with_multiple_queries(queries, [], count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in budget friendly recommendations: {e}") | |
| return [] | |
| def _get_trending_properties_recommendations(self, preferences: Dict, count: int) -> List[Dict]: | |
| """AI Model 8: Market trend analysis""" | |
| try: | |
| queries = [ | |
| "trending popular property new development modern design latest", | |
| "new property modern design contemporary", | |
| "popular property development amenities" | |
| ] | |
| return self._get_properties_with_multiple_queries(queries, [], count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in trending properties recommendations: {e}") | |
| return [] | |
| def _get_family_oriented_recommendations(self, preferences: Dict, count: int) -> List[Dict]: | |
| """AI Model 9: Family suitability analysis""" | |
| try: | |
| queries = [ | |
| "family friendly property multiple bedrooms school nearby children playground safety", | |
| "family property children school park nearby", | |
| "residential property family multiple bedrooms" | |
| ] | |
| return self._get_properties_with_multiple_queries(queries, [], count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in family oriented recommendations: {e}") | |
| return [] | |
| def _get_investment_opportunities_recommendations(self, preferences: Dict, ai_insights: Dict, count: int) -> List[Dict]: | |
| """AI Model 10: Investment potential analysis""" | |
| try: | |
| queries = [ | |
| "investment property rental yield capital appreciation commercial residential ROI", | |
| "investment opportunity property rental income", | |
| "commercial property investment good returns" | |
| ] | |
| return self._get_properties_with_multiple_queries(queries, [], count) | |
| except Exception as e: | |
| logger.error(f"❌ Error in investment opportunities recommendations: {e}") | |
| return [] | |
| def enhance_recommendations_with_ai(self, similar_properties: List[Dict], viewed_properties: List[Dict], | |
| preferences: Dict, strategy: str) -> List[Dict]: | |
| """Enhance ChromaDB results with additional AI analysis""" | |
| try: | |
| enhanced_properties = [] | |
| for prop in similar_properties: | |
| # Calculate additional AI scores | |
| ai_score = self.calculate_comprehensive_ai_score(prop, viewed_properties, preferences, strategy) | |
| # Combine ChromaDB similarity with AI analysis | |
| chromadb_score = prop.get('similarity_score', 0.5) | |
| combined_score = (chromadb_score * 0.6) + (ai_score * 0.4) # Weight ChromaDB higher | |
| # Add AI-generated recommendation reason | |
| recommendation_reason = self.generate_advanced_recommendation_reason(prop, preferences, strategy) | |
| # Enhanced property data | |
| enhanced_prop = prop.copy() | |
| enhanced_prop.update({ | |
| 'ai_similarity_score': combined_score, | |
| 'chromadb_similarity': chromadb_score, | |
| 'ai_analysis_score': ai_score, | |
| 'recommendation_reason': recommendation_reason, | |
| 'recommendation_confidence': self.calculate_recommendation_confidence(prop, viewed_properties) | |
| }) | |
| enhanced_properties.append(enhanced_prop) | |
| # Sort by combined score | |
| enhanced_properties.sort(key=lambda x: x['ai_similarity_score'], reverse=True) | |
| logger.info(f"🧠 Enhanced {len(enhanced_properties)} properties with AI analysis") | |
| return enhanced_properties | |
| except Exception as e: | |
| logger.error(f"❌ Error enhancing recommendations: {e}") | |
| return similar_properties # Return original if enhancement fails | |
| def get_additional_recommendations(self, current_recommendations: List[Dict], | |
| viewed_properties: List[Dict], needed_count: int) -> List[Dict]: | |
| """Get additional recommendations to reach minimum count""" | |
| try: | |
| if needed_count <= 0: | |
| return [] | |
| # Ensure ChromaDB is properly initialized | |
| if self.properties_collection is None: | |
| logger.error("❌ ChromaDB properties collection not initialized, cannot get additional recommendations") | |
| return [] | |
| # Get excluded IDs | |
| excluded_ids = set() | |
| for prop in current_recommendations + viewed_properties: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id: | |
| excluded_ids.add(prop_id) | |
| # Get additional properties using broader search | |
| results = self.properties_collection.query( | |
| query_texts=["modern property with good amenities"], # Generic query | |
| n_results=needed_count * 3, # Get extra for filtering | |
| include=["metadatas", "documents", "distances"] | |
| ) | |
| additional_properties = [] | |
| if results['metadatas'] and results['metadatas'][0]: | |
| for i, metadata in enumerate(results['metadatas'][0]): | |
| if metadata['id'] in excluded_ids: | |
| continue | |
| # Convert to property format | |
| property_data = { | |
| 'id': metadata['id'], | |
| 'propertyId': metadata['id'], | |
| 'propertyName': metadata['propertyName'], | |
| 'propertyTypeName': metadata['propertyTypeName'], | |
| 'price': metadata['price'], | |
| 'marketValue': metadata['price'], | |
| 'address': metadata['address'], | |
| 'beds': metadata['beds'], | |
| 'baths': metadata['baths'], | |
| 'totalSquareFeet': metadata['sqft'], | |
| 'description': metadata['description'], | |
| 'features': self.safe_json_parse(metadata['features']), | |
| 'ai_similarity_score': 0.3, # Lower score for generic recommendations | |
| 'recommendation_reason': "Popular choice with good features and location" | |
| } | |
| additional_properties.append(property_data) | |
| excluded_ids.add(metadata['id']) | |
| if len(additional_properties) >= needed_count: | |
| break | |
| logger.info(f"➕ Added {len(additional_properties)} additional recommendations") | |
| return additional_properties | |
| except Exception as e: | |
| logger.error(f"❌ Error getting additional recommendations: {e}") | |
| return [] | |
| def calculate_comprehensive_ai_score(self, property_item: Dict, viewed_properties: List, | |
| preferences: Dict, strategy: str) -> float: | |
| """Calculate comprehensive AI score for a property""" | |
| try: | |
| score = 0.0 | |
| # Base similarity from ChromaDB | |
| base_score = property_item.get('similarity_score', 0.5) | |
| score += base_score * 0.3 | |
| # Property type preference | |
| prop_type = property_item.get('propertyTypeName', '') | |
| viewed_types = [p.get('propertyTypeName', '') for p in viewed_properties] | |
| if prop_type in viewed_types: | |
| score += 0.2 | |
| # Price compatibility | |
| prop_price = property_item.get('price', 0) | |
| viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] | |
| if viewed_prices and prop_price > 0: | |
| avg_viewed_price = sum(viewed_prices) / len(viewed_prices) | |
| price_diff = abs(prop_price - avg_viewed_price) / avg_viewed_price | |
| price_score = max(0, 1 - price_diff) # Higher score for similar prices | |
| score += price_score * 0.2 | |
| # Feature matching | |
| prop_features = property_item.get('features', []) | |
| if isinstance(prop_features, str): | |
| prop_features = self.safe_json_parse(prop_features) if prop_features else [] | |
| feature_match_count = 0 | |
| total_viewed_features = set() | |
| for viewed_prop in viewed_properties: | |
| viewed_features = viewed_prop.get('features', []) | |
| if isinstance(viewed_features, list): | |
| total_viewed_features.update(viewed_features) | |
| if total_viewed_features and prop_features: | |
| feature_match_count = len(set(prop_features) & total_viewed_features) | |
| feature_score = min(1.0, feature_match_count / len(total_viewed_features)) | |
| score += feature_score * 0.15 | |
| # Strategy bonus | |
| if strategy == 'aggressive_immediate' and prop_price > 15000000: | |
| score += 0.1 # Prefer premium properties | |
| elif strategy == 'nurturing_educational' and 5000000 <= prop_price <= 20000000: | |
| score += 0.1 # Prefer mid-range | |
| # Specification matching | |
| prop_beds = property_item.get('beds', 0) | |
| viewed_beds = [p.get('beds', 0) for p in viewed_properties if p.get('beds', 0) > 0] | |
| if viewed_beds and prop_beds > 0: | |
| avg_beds = sum(viewed_beds) / len(viewed_beds) | |
| beds_diff = abs(prop_beds - avg_beds) | |
| if beds_diff <= 1: # Within 1 bedroom | |
| score += 0.05 | |
| return min(score, 1.0) | |
| except Exception as e: | |
| logger.error(f"Error calculating AI score: {e}") | |
| return 0.5 # Default score | |
| def generate_advanced_recommendation_reason(self, property_item: Dict, preferences: Dict, strategy: str) -> str: | |
| """Generate advanced AI recommendation reason""" | |
| try: | |
| reasons = [] | |
| # Property type appeal | |
| prop_type = property_item.get('propertyTypeName', '') | |
| if prop_type: | |
| reasons.append(f"Excellent {prop_type.lower()} option") | |
| # Price positioning | |
| price = property_item.get('price', 0) | |
| if price > 20000000: | |
| reasons.append("premium location and features") | |
| elif price > 10000000: | |
| reasons.append("great value in mid-range segment") | |
| else: | |
| reasons.append("affordable with good potential") | |
| # Specification highlights | |
| beds = property_item.get('beds', 0) | |
| if beds >= 3: | |
| reasons.append("spacious family accommodation") | |
| elif beds == 2: | |
| reasons.append("perfect for couples or small families") | |
| # Location appeal | |
| address = property_item.get('address', '') | |
| if 'Gachibowli' in address or 'Hitech City' in address: | |
| reasons.append("prime IT corridor location") | |
| elif 'Banjara Hills' in address or 'Jubilee Hills' in address: | |
| reasons.append("prestigious neighborhood") | |
| else: | |
| reasons.append("well-connected area") | |
| # Strategy-specific reasons | |
| if strategy == 'aggressive_immediate': | |
| reasons.append("immediate possession available") | |
| elif strategy == 'nurturing_educational': | |
| reasons.append("excellent investment opportunity") | |
| else: | |
| reasons.append("suits your viewing preferences") | |
| return "Perfect match with " + ", ".join(reasons[:3]) | |
| except Exception as e: | |
| logger.error(f"Error generating recommendation reason: {e}") | |
| return "Recommended based on your preferences and AI analysis" | |
| def calculate_recommendation_confidence(self, property_item: Dict, viewed_properties: List) -> float: | |
| """Calculate confidence level for recommendation""" | |
| try: | |
| confidence = 0.5 # Base confidence | |
| # Higher confidence for properties similar to heavily viewed ones | |
| prop_type = property_item.get('propertyTypeName', '') | |
| type_views = sum(1 for p in viewed_properties if p.get('propertyTypeName') == prop_type) | |
| if type_views > 0: | |
| confidence += min(0.3, type_views * 0.1) | |
| # Price range confidence | |
| prop_price = property_item.get('price', 0) | |
| viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] | |
| if viewed_prices and prop_price > 0: | |
| avg_price = sum(viewed_prices) / len(viewed_prices) | |
| price_variance = abs(prop_price - avg_price) / avg_price | |
| if price_variance < 0.2: # Within 20% of average | |
| confidence += 0.2 | |
| return min(confidence, 1.0) | |
| except Exception as e: | |
| logger.error(f"Error calculating confidence: {e}") | |
| return 0.5 | |
| def score_properties_with_ai(self, all_properties: List, viewed_properties: List, | |
| preferences: Dict, strategy: str) -> List[Dict]: | |
| """Score properties using AI analysis""" | |
| scored_properties = [] | |
| # Get viewed property IDs to exclude | |
| viewed_ids = set() | |
| for p in viewed_properties: | |
| if isinstance(p, dict): | |
| viewed_ids.add(str(p.get('propertyId', ''))) | |
| for i, prop in enumerate(all_properties): | |
| try: | |
| # Ensure property is a dictionary | |
| if not isinstance(prop, dict): | |
| logger.warning(f"Property {i} is not a dictionary: {type(prop)}") | |
| continue | |
| # Skip already viewed properties | |
| prop_id = str(prop.get('propertyId', prop.get('id', ''))) | |
| if prop_id in viewed_ids: | |
| continue | |
| # Calculate AI-based similarity score | |
| score = self.calculate_ai_similarity_score(prop, viewed_properties, preferences, strategy) | |
| if score > 0.3: # Minimum threshold | |
| prop_copy = prop.copy() | |
| prop_copy['ai_similarity_score'] = score | |
| prop_copy['recommendation_reason'] = self.generate_recommendation_reason(prop, preferences, strategy) | |
| scored_properties.append(prop_copy) | |
| except Exception as e: | |
| logger.warning(f"Error processing property {i}: {e}") | |
| continue | |
| # Sort by AI score | |
| scored_properties.sort(key=lambda x: x['ai_similarity_score'], reverse=True) | |
| return scored_properties | |
| def calculate_ai_similarity_score(self, property_item: Dict, viewed_properties: List, | |
| preferences: Dict, strategy: str) -> float: | |
| """Calculate similarity score using AI analysis""" | |
| score = 0.0 | |
| # Property type preference (30% weight) | |
| preferred_types = dict(preferences.get('preferred_types', [])) | |
| prop_type = property_item.get('propertyTypeName', '') or property_item.get('typeName', '') | |
| if prop_type in preferred_types: | |
| score += 0.3 * (preferred_types[prop_type] / max(preferred_types.values())) | |
| # Price similarity (25% weight) | |
| viewed_prices = [p.get('price', 0) for p in viewed_properties if p.get('price', 0) > 0] | |
| if viewed_prices: | |
| avg_viewed_price = sum(viewed_prices) / len(viewed_prices) | |
| # Try multiple price field names | |
| prop_price = property_item.get('price', 0) or property_item.get('marketValue', 0) or property_item.get('amount', 0) | |
| if prop_price > 0: | |
| price_ratio = min(prop_price, avg_viewed_price) / max(prop_price, avg_viewed_price) | |
| score += 0.25 * price_ratio | |
| # Strategy-based scoring (25% weight) | |
| strategy_bonus = self.get_strategy_bonus(property_item, strategy) | |
| score += 0.25 * strategy_bonus | |
| # Feature similarity using AI (20% weight) | |
| feature_score = self.calculate_feature_similarity_ai(property_item, viewed_properties) | |
| score += 0.2 * feature_score | |
| return min(score, 1.0) | |
| def get_strategy_bonus(self, property_item: Dict, strategy: str) -> float: | |
| """Get bonus score based on recommendation strategy""" | |
| price = property_item.get('price', 0) | |
| if strategy == 'aggressive_immediate': | |
| # Prefer premium properties for hot leads | |
| return 1.0 if price > 15000000 else 0.5 | |
| elif strategy == 'nurturing_educational': | |
| # Prefer mid-range with good value | |
| return 1.0 if 5000000 <= price <= 20000000 else 0.7 | |
| elif strategy == 'engagement_building': | |
| # Prefer variety and attractive options | |
| return 0.8 # Neutral bonus | |
| else: # awareness_creation | |
| # Prefer budget-friendly options | |
| return 1.0 if price < 10000000 else 0.3 | |
| def calculate_feature_similarity_ai(self, property_item: Dict, viewed_properties: List) -> float: | |
| """Calculate feature similarity using AI embeddings""" | |
| try: | |
| # Create text description of the property | |
| prop_text = self.create_property_description(property_item) | |
| # Get embeddings for current property | |
| prop_embedding = self.get_property_embeddings(prop_text) | |
| # Calculate similarity with viewed properties | |
| similarities = [] | |
| for viewed_prop in viewed_properties: | |
| viewed_text = self.create_property_description(viewed_prop) | |
| viewed_embedding = self.get_property_embeddings(viewed_text) | |
| # Calculate cosine similarity | |
| similarity = cosine_similarity([prop_embedding], [viewed_embedding])[0][0] | |
| similarities.append(similarity) | |
| return max(similarities) if similarities else 0.0 | |
| except Exception as e: | |
| logger.warning(f"⚠️ Feature similarity calculation failed: {e}") | |
| return 0.5 # Default similarity | |
| def create_property_description(self, property_item: Dict) -> str: | |
| """Create a text description of property for AI analysis""" | |
| name = property_item.get('propertyName', 'Property') | |
| prop_type = property_item.get('propertyTypeName', 'Residential') | |
| price = property_item.get('price', 0) | |
| description = f"{name} is a {prop_type} property priced at ₹{price:,.0f}" | |
| # Add more features if available | |
| if property_item.get('locationName'): | |
| description += f" located in {property_item['locationName']}" | |
| return description | |
| def generate_recommendation_reason(self, property_item: Dict, preferences: Dict, strategy: str) -> str: | |
| """Generate AI-powered recommendation reason""" | |
| reasons = [] | |
| # Property type match | |
| preferred_types = dict(preferences.get('preferred_types', [])) | |
| prop_type = property_item.get('propertyTypeName', '') | |
| if prop_type in preferred_types: | |
| reasons.append(f"Matches your preference for {prop_type} properties") | |
| # Price consideration | |
| price = property_item.get('price', 0) | |
| if strategy == 'aggressive_immediate': | |
| reasons.append("Premium property perfect for immediate purchase") | |
| elif strategy == 'nurturing_educational': | |
| reasons.append("Great value proposition with excellent features") | |
| elif strategy == 'engagement_building': | |
| reasons.append("Interesting option to explore based on your browsing pattern") | |
| else: | |
| reasons.append("Budget-friendly option worth considering") | |
| return '; '.join(reasons) if reasons else "Recommended based on your viewing history" | |
| def filter_recommendations(self, scored_properties: List, viewed_properties: List) -> List[Dict]: | |
| """Filter and refine recommendations""" | |
| # Remove duplicates and ensure variety | |
| filtered = [] | |
| seen_types = set() | |
| price_ranges = [] | |
| for prop in scored_properties: | |
| prop_type = prop.get('propertyTypeName', 'Unknown') | |
| price = prop.get('price', 0) | |
| # Ensure variety in property types (max 3 per type) | |
| type_count = sum(1 for p in filtered if p.get('propertyTypeName') == prop_type) | |
| if type_count >= 3: | |
| continue | |
| # Ensure price variety | |
| price_range = self.get_price_range(price) | |
| range_count = sum(1 for p in filtered if self.get_price_range(p.get('price', 0)) == price_range) | |
| if range_count >= 4: # Max 4 properties per price range | |
| continue | |
| filtered.append(prop) | |
| if len(filtered) >= 10: # Limit recommendations | |
| break | |
| return filtered | |
| def get_price_range(self, price: float) -> str: | |
| """Categorize price into ranges""" | |
| if price < 5000000: | |
| return "budget" | |
| elif price < 15000000: | |
| return "mid-range" | |
| elif price < 30000000: | |
| return "premium" | |
| else: | |
| return "luxury" | |
| def generate_personalized_email(self, user_analysis: Dict, ai_insights: Dict, | |
| recommendations: List[Dict], recipient_email: str, email_type: str = None) -> Dict: | |
| """Generate personalized email using AI insights""" | |
| logger.info(f"📧 Generating personalized email for {recipient_email}") | |
| try: | |
| # Extract key information | |
| customer_id = user_analysis.get('customer_id') | |
| lead_status = user_analysis.get('data', {}).get('lead_qualification', {}).get('lead_status', 'UNKNOWN') | |
| personality_type = ai_insights.get('ai_personality_type', 'Valued Customer') | |
| buying_motivation = ai_insights.get('buying_motivation', 'finding the perfect property') | |
| urgency_level = ai_insights.get('urgency_level', 'Medium') | |
| # Generate personalized content | |
| email_content = self.create_email_content( | |
| customer_id, lead_status, personality_type, buying_motivation, | |
| urgency_level, recommendations, email_type | |
| ) | |
| # For preview mode, don't actually send the email | |
| if recipient_email == 'preview@example.com': | |
| return { | |
| 'success': True, | |
| 'recipient': recipient_email, | |
| 'subject': email_content['subject'], | |
| 'html_content': email_content['html_content'], | |
| 'text_content': email_content['text_content'], | |
| 'recommendations_count': len(recommendations), | |
| 'personalization': { | |
| 'personality_type': personality_type, | |
| 'buying_motivation': buying_motivation, | |
| 'urgency_level': urgency_level | |
| } | |
| } | |
| # Send email for real recipients | |
| success = self.send_email(recipient_email, email_content) | |
| return { | |
| 'success': success, | |
| 'recipient': recipient_email, | |
| 'subject': email_content['subject'], | |
| 'html_content': email_content['html_content'], | |
| 'text_content': email_content['text_content'], | |
| 'recommendations_count': len(recommendations), | |
| 'personalization': { | |
| 'personality_type': personality_type, | |
| 'buying_motivation': buying_motivation, | |
| 'urgency_level': urgency_level | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error generating email: {e}") | |
| return {'success': False, 'error': str(e)} | |
| def create_email_content(self, customer_id: int, lead_status: str, personality_type: str, | |
| buying_motivation: str, urgency_level: str, recommendations: List[Dict], email_type: str = None) -> Dict: | |
| """Create personalized email content based on AI insights""" | |
| # Personalized greeting | |
| greeting = self.get_personalized_greeting(personality_type, lead_status) | |
| # Subject line based on urgency, personality, and email type | |
| subject = self.generate_subject_line(personality_type, urgency_level, len(recommendations), email_type) | |
| # Prepare recommendations for email - ENSURE MINIMUM 10 PROPERTIES | |
| recommendations_to_show = recommendations[:10] if len(recommendations) >= 10 else recommendations | |
| logger.info(f"📧 Including {len(recommendations_to_show)} properties in email (target: 10 minimum)") | |
| # If we have less than 10, log a warning but still proceed | |
| if len(recommendations_to_show) < 10: | |
| logger.warning(f"⚠️ Only {len(recommendations_to_show)} properties available for email (target was 10)") | |
| # Main content | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css"> | |
| <style> | |
| body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; line-height: 1.6; color: #333; margin: 0; padding: 0; }} | |
| .container {{ max-width: 650px; margin: 0 auto; background: white; box-shadow: 0 0 20px rgba(0,0,0,0.1); }} | |
| .header {{ background: linear-gradient(135deg, #2c3e50 0%, #34495e 100%); color: white; padding: 40px; text-align: center; border-radius: 0; }} | |
| .content {{ background: #ffffff; padding: 40px; }} | |
| .property-card {{ | |
| background: white; | |
| margin: 25px 0; | |
| border-radius: 12px; | |
| box-shadow: 0 4px 15px rgba(0,0,0,0.1); | |
| border: 1px solid #e0e0e0; | |
| transition: all 0.3s ease; | |
| }} | |
| .property-header {{ | |
| display: flex; | |
| justify-content: space-between; | |
| align-items: center; | |
| padding: 20px 25px 15px; | |
| border-bottom: 1px solid #f0f0f0; | |
| }} | |
| .property-name {{ | |
| font-size: 20px; | |
| font-weight: 600; | |
| margin: 0; | |
| color: #2c3e50; | |
| flex: 1; | |
| }} | |
| .property-price {{ | |
| font-size: 24px; | |
| font-weight: bold; | |
| color: #27ae60; | |
| background: #f8f9fa; | |
| padding: 8px 15px; | |
| border-radius: 6px; | |
| }} | |
| .property-details {{ padding: 20px 25px; }} | |
| .property-type-location {{ | |
| display: flex; | |
| gap: 15px; | |
| margin-bottom: 15px; | |
| flex-wrap: wrap; | |
| }} | |
| .property-type, .property-location {{ | |
| background: #ecf0f1; | |
| padding: 6px 12px; | |
| border-radius: 20px; | |
| font-size: 14px; | |
| color: #2c3e50; | |
| }} | |
| .property-specs {{ | |
| display: flex; | |
| gap: 15px; | |
| margin: 15px 0; | |
| flex-wrap: wrap; | |
| }} | |
| .spec-item {{ | |
| background: #3498db; | |
| color: white; | |
| padding: 8px 12px; | |
| border-radius: 6px; | |
| font-size: 14px; | |
| font-weight: 500; | |
| }} | |
| .property-description {{ | |
| margin: 15px 0; | |
| color: #555; | |
| line-height: 1.6; | |
| }} | |
| .property-features {{ | |
| display: flex; | |
| gap: 8px; | |
| margin: 15px 0; | |
| flex-wrap: wrap; | |
| }} | |
| .feature-tag {{ | |
| background: #e8f5e8; | |
| color: #27ae60; | |
| padding: 4px 10px; | |
| border-radius: 15px; | |
| font-size: 12px; | |
| border: 1px solid #27ae60; | |
| }} | |
| .feature-tag.more {{ | |
| background: #f8f9fa; | |
| color: #6c757d; | |
| border: 1px solid #dee2e6; | |
| }} | |
| .recommendation-reason {{ | |
| background: linear-gradient(90deg, #fff3cd 0%, #fef9e7 100%); | |
| border-left: 4px solid #ffc107; | |
| padding: 15px; | |
| border-radius: 6px; | |
| margin: 20px 0; | |
| font-style: italic; | |
| color: #856404; | |
| }} | |
| .property-actions {{ | |
| display: flex; | |
| gap: 12px; | |
| margin-top: 20px; | |
| justify-content: center; | |
| }} | |
| .view-property-btn {{ | |
| background: #3498db; | |
| color: white; | |
| padding: 12px 24px; | |
| text-decoration: none; | |
| border-radius: 6px; | |
| font-weight: 500; | |
| display: inline-flex; | |
| align-items: center; | |
| gap: 8px; | |
| }} | |
| .contact-btn {{ | |
| background: #27ae60; | |
| color: white; | |
| padding: 12px 24px; | |
| text-decoration: none; | |
| border-radius: 6px; | |
| font-weight: 500; | |
| display: inline-flex; | |
| align-items: center; | |
| gap: 8px; | |
| }} | |
| .cta-button {{ | |
| background: #e74c3c; | |
| color: white; | |
| padding: 15px 30px; | |
| text-decoration: none; | |
| border-radius: 6px; | |
| display: inline-block; | |
| margin: 30px 0; | |
| font-weight: 600; | |
| }} | |
| .footer {{ | |
| background: #2c3e50; | |
| color: white; | |
| padding: 30px; | |
| text-align: center; | |
| }} | |
| .ai-insight {{ | |
| background: linear-gradient(90deg, #e3f2fd 0%, #f3e5f5 100%); | |
| border-left: 4px solid #2196f3; | |
| padding: 20px; | |
| margin: 30px 0; | |
| border-radius: 6px; | |
| }} | |
| .fas {{ margin-right: 5px; }} | |
| @media (max-width: 600px) {{ | |
| .property-header {{ flex-direction: column; gap: 10px; }} | |
| .property-type-location {{ flex-direction: column; }} | |
| .property-actions {{ flex-direction: column; }} | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>🏠 Personalized Property Recommendations</h1> | |
| <p>AI-Curated Properties Just for You</p> | |
| </div> | |
| <div class="content"> | |
| <h2>{greeting}</h2> | |
| <div class="ai-insight"> | |
| <h3>🤖 AI Analysis Insights</h3> | |
| <p><strong>Your Profile:</strong> {personality_type}</p> | |
| <p><strong>Buying Motivation:</strong> {buying_motivation}</p> | |
| <p><strong>Current Status:</strong> {lead_status} Lead</p> | |
| </div> | |
| <p>Based on your recent property viewing activity and our AI analysis, we've curated these perfect matches for you:</p> | |
| <div style="background: #f8f9fa; padding: 20px; border-radius: 8px; margin: 20px 0; text-align: center;"> | |
| <h3 style="margin: 0; color: #2c3e50;">🎯 {len(recommendations_to_show)} Personalized Property Recommendations</h3> | |
| <p style="margin: 10px 0 0 0; color: #666;">Handpicked by our AI based on your preferences and behavior</p> | |
| </div> | |
| <h3>🏠 Your Property Matches</h3> | |
| """ | |
| # Add property recommendations - already defined above | |
| for i, prop in enumerate(recommendations_to_show, 1): | |
| # Try multiple price field names | |
| price = prop.get('price', 0) or prop.get('marketValue', 0) or prop.get('amount', 0) | |
| logger.info(f"Property {i} price fields - price: {prop.get('price')}, marketValue: {prop.get('marketValue')}, amount: {prop.get('amount')}") | |
| if price == 0: | |
| # If still no price, try to extract from other fields | |
| price_text = prop.get('priceText', '') or prop.get('priceDisplay', '') | |
| if price_text: | |
| # Try to extract numeric value from price text | |
| import re | |
| price_match = re.findall(r'[\d,]+', str(price_text).replace(',', '')) | |
| if price_match: | |
| try: | |
| price = int(price_match[0]) | |
| except: | |
| price = 0 | |
| # Extract comprehensive property details | |
| name = prop.get('propertyName', '') or prop.get('name', '') or prop.get('title', '') or f'Premium Property {i}' | |
| prop_type = prop.get('propertyTypeName', '') or prop.get('typeName', '') or prop.get('type', '') or 'Residential' | |
| description = prop.get('description', '') or prop.get('desc', '') or 'Excellent property opportunity with modern amenities and prime location.' | |
| location = prop.get('address', '') or prop.get('location', '') or prop.get('area', '') or 'Prime Location' | |
| # Property specifications | |
| beds = prop.get('beds', 0) or prop.get('bedrooms', 0) or prop.get('bhk', 0) | |
| baths = prop.get('baths', 0) or prop.get('bathrooms', 0) or prop.get('bath', 0) | |
| sqft = prop.get('totalSquareFeet', 0) or prop.get('area', 0) or prop.get('sqft', 0) or prop.get('size', 0) | |
| # Features | |
| features = prop.get('features', []) or prop.get('amenities', []) or [] | |
| if isinstance(features, str): | |
| features = [f.strip() for f in features.split(',') if f.strip()] | |
| elif not features: | |
| # Default features based on property type | |
| if 'villa' in prop_type.lower(): | |
| features = ['Parking', 'Garden', 'Security', 'Swimming Pool'] | |
| elif 'flat' in prop_type.lower() or 'apartment' in prop_type.lower(): | |
| features = ['Lift', 'Security', 'Parking', 'Gym'] | |
| else: | |
| features = ['Parking', 'Security', 'CCTV', 'Power Backup'] | |
| # Create property link | |
| property_id = prop.get('id', '') or prop.get('propertyId', '') or f'prop_{i}' | |
| property_link = f"https://your-property-website.com/property/{property_id}" | |
| reason = prop.get('recommendation_reason', 'Recommended based on your viewing patterns and preferences') | |
| # Format price display | |
| if price > 0: | |
| price_display = f"₹{price:,.0f}" | |
| else: | |
| price_display = "Price on Request" | |
| # Format specifications | |
| specs_html = "" | |
| if beds > 0: | |
| specs_html += f"<span class='spec-item'><i class='fas fa-bed'></i> {beds} Bed{'s' if beds > 1 else ''}</span>" | |
| if baths > 0: | |
| specs_html += f"<span class='spec-item'><i class='fas fa-bath'></i> {baths} Bath{'s' if baths > 1 else ''}</span>" | |
| if sqft > 0: | |
| specs_html += f"<span class='spec-item'><i class='fas fa-ruler-combined'></i> {sqft:,.0f} sq ft</span>" | |
| # Format features | |
| features_html = "" | |
| if features: | |
| top_features = features[:4] # Show top 4 features | |
| features_html = "".join([f"<span class='feature-tag'>{feature}</span>" for feature in top_features]) | |
| if len(features) > 4: | |
| features_html += f"<span class='feature-tag more'>+{len(features) - 4} more</span>" | |
| html_content += f""" | |
| <div class="property-card"> | |
| <div class="property-header"> | |
| <h3 class="property-name">{name}</h3> | |
| <div class="property-price">{price_display}</div> | |
| </div> | |
| <div class="property-details"> | |
| <div class="property-type-location"> | |
| <span class="property-type"><i class="fas fa-home"></i> {prop_type}</span> | |
| <span class="property-location"><i class="fas fa-map-marker-alt"></i> {location}</span> | |
| </div> | |
| {f'<div class="property-specs">{specs_html}</div>' if specs_html else ''} | |
| <div class="property-description"> | |
| <p>{description}</p> | |
| </div> | |
| {f'<div class="property-features">{features_html}</div>' if features_html else ''} | |
| <div class="recommendation-reason"> | |
| <i class="fas fa-lightbulb"></i> <strong>AI Recommendation:</strong> {reason} | |
| </div> | |
| <div class="property-actions"> | |
| <a href="{property_link}" class="view-property-btn" target="_blank"> | |
| <i class="fas fa-eye"></i> View Property Details | |
| </a> | |
| <a href="tel:+919876543210" class="contact-btn"> | |
| <i class="fas fa-phone"></i> Call Now | |
| </a> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| # Add call-to-action based on urgency | |
| cta_text = self.get_cta_text(urgency_level) | |
| html_content += f""" | |
| <div style="text-align: center;"> | |
| <a href="#" class="cta-button">{cta_text}</a> | |
| </div> | |
| <div class="ai-insight"> | |
| <h3>💡 AI Recommendation</h3> | |
| <p>{self.get_ai_recommendation_text(personality_type, urgency_level)}</p> | |
| </div> | |
| <p>Our AI system analyzed your viewing patterns, preferences, and behavior to find these properties. Each recommendation is personally curated for your specific needs.</p> | |
| <p>Questions? Reply to this email or call us directly. We're here to help you find your perfect property!</p> | |
| </div> | |
| <div class="footer"> | |
| <p>🤖 Powered by AI Real Estate Recommendations</p> | |
| <p>Customer ID: {customer_id} | Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}</p> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return { | |
| 'subject': subject, | |
| 'html_content': html_content, | |
| 'text_content': self.html_to_text(html_content) | |
| } | |
| def get_personalized_greeting(self, personality_type: str, lead_status: str) -> str: | |
| """Generate personalized greeting""" | |
| if personality_type == "Decisive Decision Maker": | |
| return "Hello, Decisive Property Investor!" | |
| elif personality_type == "Research-Oriented Buyer": | |
| return "Dear Informed Property Seeker," | |
| elif personality_type == "Cautious Evaluator": | |
| return "Hello, Thoughtful Property Explorer," | |
| else: | |
| return "Dear Valued Property Browser," | |
| def generate_subject_line(self, personality_type: str, urgency_level: str, count: int, email_type: str = None) -> str: | |
| """Generate compelling subject line""" | |
| # Use email type specific subjects if available | |
| if email_type: | |
| type_subjects = { | |
| 'property_based': f"🏠 {count} Property-Based Recommendations Just for You", | |
| 'price_based': f"💰 {count} Price-Based Property Matches Within Your Budget", | |
| 'location_based': f"📍 {count} Location-Based Properties in Your Preferred Areas", | |
| 'similarity_based': f"🔍 {count} Properties Similar to Your Viewed Listings", | |
| 'behavioral_based': f"🧠 {count} Behavioral-Based Property Recommendations", | |
| 'premium_properties': f"⭐ {count} Premium Luxury Properties Just for You", | |
| 'budget_friendly': f"💵 {count} Budget-Friendly Value Properties", | |
| 'trending_properties': f"📈 {count} Trending Properties in Today's Market", | |
| 'family_oriented': f"👨👩👧👦 {count} Family-Oriented Properties with Great Amenities", | |
| 'investment_opportunities': f"💼 {count} Investment Properties with High ROI Potential" | |
| } | |
| if email_type in type_subjects: | |
| return type_subjects[email_type] | |
| # Fallback to original logic | |
| if urgency_level == "High": | |
| return f"🔥 {count} Hot Properties - Perfect Match for You!" | |
| elif personality_type == "Decisive Decision Maker": | |
| return f"⚡ {count} Premium Properties - Ready for Immediate Purchase" | |
| elif personality_type == "Research-Oriented Buyer": | |
| return f"📊 {count} Carefully Selected Properties + Detailed Analysis" | |
| else: | |
| return f"🏠 {count} Personalized Property Recommendations Just for You" | |
| def get_cta_text(self, urgency_level: str) -> str: | |
| """Get call-to-action text based on urgency""" | |
| if urgency_level == "High": | |
| return "🔥 View Properties Now - Limited Time!" | |
| elif urgency_level == "Medium": | |
| return "📞 Schedule Viewing Today" | |
| else: | |
| return "💭 Explore These Options" | |
| def get_ai_recommendation_text(self, personality_type: str, urgency_level: str) -> str: | |
| """Get AI recommendation text""" | |
| if personality_type == "Decisive Decision Maker": | |
| return "Our AI suggests focusing on 1-2 properties for immediate action. Your profile indicates you prefer quick decisions with clear value propositions." | |
| elif personality_type == "Research-Oriented Buyer": | |
| return "Our AI recommends comparing these properties in detail. We can provide additional market analysis and property reports to support your research." | |
| else: | |
| return "Our AI suggests exploring these options at your own pace. Each property offers unique value that aligns with your viewing patterns." | |
| def html_to_text(self, html_content: str) -> str: | |
| """Convert HTML to plain text""" | |
| # Simple HTML to text conversion | |
| import re | |
| text = re.sub('<[^<]+?>', '', html_content) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def send_email(self, recipient_email: str, email_content: Dict) -> bool: | |
| """Send personalized email with SendGrid support""" | |
| try: | |
| logger.info(f"📧 Attempting to send real email to {recipient_email}") | |
| # Real email sending - always attempt SendGrid | |
| import requests | |
| # SendGrid API endpoint | |
| url = "https://api.sendgrid.com/v3/mail/send" | |
| # Prepare email data | |
| email_data = { | |
| "personalizations": [ | |
| { | |
| "to": [ | |
| { | |
| "email": recipient_email | |
| } | |
| ] | |
| } | |
| ], | |
| "from": { | |
| "email": self.email_config['sender_email'] | |
| }, | |
| "subject": email_content['subject'], | |
| "content": [ | |
| { | |
| "type": "text/plain", | |
| "value": email_content['text_content'] | |
| }, | |
| { | |
| "type": "text/html", | |
| "value": email_content['html_content'] | |
| } | |
| ] | |
| } | |
| # Send email with SendGrid API | |
| headers = { | |
| "Authorization": f"Bearer {self.email_config['sendgrid_api_key']}", | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.post(url, json=email_data, headers=headers) | |
| if response.status_code == 202: | |
| logger.info(f"✅ SendGrid email sent successfully to {recipient_email}") | |
| return True | |
| elif (response.status_code == 403 and "messaging limits" in response.text) or \ | |
| (response.status_code == 401 and "Maximum credits exceeded" in response.text): | |
| logger.warning(f"⚠️ SendGrid limits/credits exceeded - trying SMTP fallback...") | |
| # Try SMTP fallback first | |
| if self._send_via_smtp(recipient_email, email_content): | |
| logger.info(f"✅ SMTP fallback successful for {recipient_email}") | |
| return True | |
| else: | |
| # Save email to file if SMTP also fails | |
| logger.info("📁 Both SendGrid and SMTP failed - saving email to file...") | |
| self._save_email_to_file(recipient_email, email_content) | |
| return True # Return True since email was "processed" (saved to file) | |
| else: | |
| logger.error(f"❌ SendGrid API error: {response.status_code} - {response.text}") | |
| # Save email to file for manual sending | |
| logger.info("📁 Saving email to file for manual sending...") | |
| # Try SMTP fallback | |
| if self._send_via_smtp(recipient_email, email_content): | |
| logger.info(f"✅ SMTP fallback successful for {recipient_email}") | |
| return True | |
| else: | |
| # Save to file as last resort | |
| self._save_email_to_file(recipient_email, email_content) | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Failed to send SendGrid email to {recipient_email}: {e}") | |
| # Save email to file for manual sending instead of using mock service | |
| logger.info("📁 Saving email to file for manual sending...") | |
| try: | |
| self._save_email_to_file(recipient_email, email_content) | |
| except Exception as save_error: | |
| logger.error(f"❌ Failed to save email to file: {save_error}") | |
| # Return True anyway since email generation was successful | |
| return True | |
| return False | |
| def _send_mock_email(self, recipient_email: str, email_content: Dict) -> bool: | |
| """Mock email service for environments without network access""" | |
| try: | |
| logger.info(f"📧 Mock email service - Logging email content for {recipient_email}") | |
| # Log email details | |
| logger.info(f"📧 Email Subject: {email_content['subject']}") | |
| logger.info(f"📧 Email From: {self.email_config['sender_email']}") | |
| logger.info(f"📧 Email To: {recipient_email}") | |
| # Log a preview of the email content | |
| text_preview = email_content['text_content'][:500] + "..." if len(email_content['text_content']) > 500 else email_content['text_content'] | |
| logger.info(f"📧 Email Text Preview: {text_preview}") | |
| # Log the number of properties in the email | |
| if 'html_content' in email_content: | |
| property_count = email_content['html_content'].count('property-card') | |
| logger.info(f"📧 Email contains {property_count} property recommendations") | |
| # Save email content to a file for manual sending | |
| self._save_email_to_file(recipient_email, email_content) | |
| logger.info(f"✅ Mock email logged successfully for {recipient_email}") | |
| logger.info(f"📧 Email would be sent with {len(email_content.get('html_content', ''))} characters of HTML content") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to log mock email for {recipient_email}: {e}") | |
| return False | |
| def _save_email_to_file(self, recipient_email: str, email_content: Dict): | |
| """Save email content to a file for manual sending""" | |
| try: | |
| import os | |
| from datetime import datetime | |
| # Create emails directory if it doesn't exist - use current directory instead of /tmp | |
| emails_dir = "./saved_emails" | |
| os.makedirs(emails_dir, exist_ok=True) | |
| # Create filename with timestamp and email type info | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # Extract email type from subject if possible | |
| subject = email_content.get('subject', 'email') | |
| email_type = 'unknown' | |
| if 'Property-Based' in subject or 'property type' in subject.lower(): | |
| email_type = 'property_based' | |
| elif 'Price-Based' in subject or 'budget' in subject.lower(): | |
| email_type = 'price_based' | |
| elif 'Location-Based' in subject or 'location' in subject.lower(): | |
| email_type = 'location_based' | |
| elif 'Premium' in subject or 'luxury' in subject.lower(): | |
| email_type = 'premium' | |
| elif 'Budget' in subject or 'affordable' in subject.lower(): | |
| email_type = 'budget_friendly' | |
| elif 'Family' in subject or 'family' in subject.lower(): | |
| email_type = 'family_oriented' | |
| elif 'Investment' in subject or 'investment' in subject.lower(): | |
| email_type = 'investment' | |
| elif 'Trending' in subject or 'trending' in subject.lower(): | |
| email_type = 'trending' | |
| filename = f"{emails_dir}/{email_type}_{timestamp}.html" | |
| # Create complete HTML email with better styling | |
| full_html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <title>{email_content['subject']}</title> | |
| <style> | |
| .email-wrapper {{ | |
| max-width: 800px; | |
| margin: 20px auto; | |
| font-family: Arial, sans-serif; | |
| background: #f5f5f5; | |
| padding: 20px; | |
| }} | |
| .email-header {{ | |
| background: #2c3e50; | |
| color: white; | |
| padding: 20px; | |
| border-radius: 5px; | |
| margin-bottom: 20px; | |
| }} | |
| .email-content {{ | |
| background: white; | |
| padding: 20px; | |
| border-radius: 5px; | |
| box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
| }} | |
| .status-banner {{ | |
| background: #f39c12; | |
| color: white; | |
| padding: 15px; | |
| margin-bottom: 20px; | |
| border-radius: 5px; | |
| text-align: center; | |
| font-weight: bold; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="email-wrapper"> | |
| <div class="status-banner"> | |
| 📧 EMAIL SAVED DUE TO SENDGRID LIMITS - READY FOR MANUAL REVIEW | |
| </div> | |
| <div class="email-header"> | |
| <h2>📧 Email Details</h2> | |
| <p><strong>📧 To:</strong> {recipient_email}</p> | |
| <p><strong>📝 Subject:</strong> {email_content['subject']}</p> | |
| <p><strong>🕒 Generated:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> | |
| <p><strong>📋 Type:</strong> {email_type.replace('_', ' ').title()}</p> | |
| <p><strong>📁 File:</strong> {filename}</p> | |
| </div> | |
| <div class="email-content"> | |
| {email_content['html_content']} | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| # Save to file | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| f.write(full_html) | |
| logger.info(f"📁 Email saved to file: {filename}") | |
| logger.info(f"💡 Open this file in your browser to view the beautiful email content") | |
| logger.info(f"🎯 Email type: {email_type.replace('_', ' ').title()}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to save email to file: {e}") | |
| def _send_via_smtp(self, recipient_email: str, email_content: Dict) -> bool: | |
| """Send email via Gmail SMTP with authentication""" | |
| try: | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| # Gmail SMTP configuration | |
| smtp_server = 'smtp.gmail.com' | |
| smtp_port = 587 | |
| smtp_username = 'sameermujahid7777@gmail.com' | |
| smtp_password = 'wpgn fmut nbkt mdvw' # App password | |
| sender_email = 'shaiksameermujahid@gmail.com' | |
| # Extract email components | |
| subject = email_content.get('subject', 'Property Recommendations') | |
| html_content = email_content.get('html_content', '') | |
| text_content = email_content.get('text_content', '') | |
| # Create message | |
| msg = MIMEMultipart() | |
| msg['From'] = sender_email | |
| msg['To'] = recipient_email | |
| msg['Subject'] = subject | |
| # Attach HTML content | |
| msg.attach(MIMEText(html_content, 'html')) | |
| # Attach text content if provided | |
| if text_content: | |
| msg.attach(MIMEText(text_content, 'plain')) | |
| # Send email via Gmail SMTP | |
| server = smtplib.SMTP(smtp_server, smtp_port) | |
| server.starttls() # Enable TLS | |
| server.login(smtp_username, smtp_password) | |
| # Send email | |
| text = msg.as_string() | |
| server.sendmail(sender_email, recipient_email, text) | |
| server.quit() | |
| logger.info(f"✅ Gmail SMTP email sent successfully to {recipient_email}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Gmail SMTP error: {e}") | |
| return False | |
| def process_customer_with_ai(self, customer_id: int, recipient_email: str, analysis_data: Dict) -> Dict: | |
| """Complete AI processing pipeline for a customer""" | |
| logger.info(f"🤖 Starting AI processing for customer {customer_id}") | |
| try: | |
| # Step 1: AI behavior analysis | |
| ai_insights = self.analyze_user_behavior_with_ai(analysis_data) | |
| # Step 2: Find similar properties using AI | |
| recommendations = self.find_similar_properties_ai(analysis_data, ai_insights) | |
| # Step 3: Generate and send personalized email | |
| email_result = self.generate_personalized_email( | |
| analysis_data, ai_insights, recommendations, recipient_email | |
| ) | |
| # Return comprehensive results | |
| return { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'ai_insights': ai_insights, | |
| 'recommendations': recommendations, | |
| 'email_result': email_result, | |
| 'processing_time': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ AI processing failed for customer {customer_id}: {e}") | |
| return { | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': str(e) | |
| } | |
| def fetch_properties_page(self, page: int, max_retries: int = 3) -> List[Dict]: | |
| """Fetch a single page of properties with retry logic""" | |
| for attempt in range(max_retries): | |
| try: | |
| # Use the correct API endpoint with proper parameters | |
| url = f"{self.properties_api}" | |
| params = { | |
| 'pageNumber': page, | |
| 'pageSize': 100 | |
| } | |
| response = requests.get(url, params=params, verify=False, timeout=None) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Handle different response structures | |
| if isinstance(data, list): | |
| properties = data | |
| elif isinstance(data, dict): | |
| if 'data' in data: | |
| properties = data['data'] | |
| elif 'properties' in data: | |
| properties = data['properties'] | |
| elif 'results' in data: | |
| properties = data['results'] | |
| else: | |
| properties = list(data.values()) if data else [] | |
| else: | |
| properties = [] | |
| if properties: | |
| logger.info(f"✅ Page {page}: {len(properties)} properties") | |
| return properties | |
| else: | |
| logger.info(f"📄 Page {page} is empty - reached end") | |
| return [] | |
| else: | |
| logger.warning(f"⚠️ Page {page} returned status {response.status_code}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2) # Wait before retry | |
| continue | |
| return [] | |
| except Exception as e: | |
| logger.error(f"❌ Error fetching page {page}, attempt {attempt + 1}: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2) # Wait before retry | |
| continue | |
| return [] | |
| return [] | |
| def fetch_all_properties_parallel(self, max_workers: int = 50, page_size: int = 600, max_pages: int = 20) -> bool: | |
| """ULTRA-ENHANCED parallel property fetch optimized for maximum speed and 600+ properties""" | |
| logger.info(f"🚀 Starting ULTRA-ENHANCED parallel property fetch: {max_workers} workers, {page_size} per page, max {max_pages} pages") | |
| # Validate parameters | |
| if page_size <= 0: | |
| logger.error("❌ Invalid page_size: must be greater than 0") | |
| return False | |
| if max_workers <= 0: | |
| logger.error("❌ Invalid max_workers: must be greater than 0") | |
| return False | |
| if max_pages <= 0: | |
| logger.error("❌ Invalid max_pages: must be greater than 0") | |
| return False | |
| try: | |
| # Enhanced total count detection with multiple fallbacks | |
| total_count = self._get_total_property_count() | |
| # Calculate optimal pages needed | |
| if total_count > 0: | |
| estimated_pages = (total_count // page_size) + 1 | |
| max_pages = min(estimated_pages, max_pages) | |
| logger.info(f"📄 Optimized pages needed: {estimated_pages}, using max: {max_pages}") | |
| else: | |
| logger.info(f"📄 Using default max_pages: {max_pages}") | |
| # Enhanced parallel fetching with better error handling and progress tracking | |
| all_properties = [] | |
| failed_pages = [] | |
| # ULTRA-ENHANCED parallel fetching with maximum concurrency | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all page fetch tasks simultaneously for maximum speed | |
| futures = {} | |
| for page in range(1, max_pages + 1): | |
| future = executor.submit(self._fetch_property_page_enhanced, page, page_size) | |
| futures[future] = page # Use future as key, page as value | |
| logger.info(f"🚀 Submitted {len(futures)} parallel fetch tasks with {max_workers} workers for ultra-fast processing") | |
| # Collect results as they complete with enhanced progress tracking | |
| completed_pages = 0 | |
| for future in concurrent.futures.as_completed(futures.keys()): | |
| page = futures[future] # Get the page number from the future | |
| try: | |
| properties = future.result() # No timeout - let it take as long as needed | |
| completed_pages += 1 | |
| if properties: | |
| all_properties.extend(properties) | |
| progress = (completed_pages / max_pages) * 100 | |
| logger.info(f"✅ Page {page}: {len(properties)} properties fetched (Total: {len(all_properties)}, Progress: {progress:.1f}%)") | |
| else: | |
| progress = (completed_pages / max_pages) * 100 | |
| logger.info(f"📄 Page {page}: No properties found (Progress: {progress:.1f}%)") | |
| if page > 1: # Don't stop on first page | |
| break | |
| except Exception as e: | |
| completed_pages += 1 | |
| progress = (completed_pages / max_pages) * 100 | |
| logger.error(f"❌ Error fetching page {page}: {e} (Progress: {progress:.1f}%)") | |
| failed_pages.append(page) | |
| continue | |
| # Retry failed pages with exponential backoff | |
| if failed_pages: | |
| logger.info(f"🔄 Retrying {len(failed_pages)} failed pages...") | |
| retry_properties = self._retry_failed_pages(failed_pages, page_size, max_workers) | |
| all_properties.extend(retry_properties) | |
| logger.info(f"📦 Total properties fetched: {len(all_properties)} (Failed pages: {len(failed_pages)})") | |
| # Enhanced ChromaDB storage with parallel processing | |
| if all_properties: | |
| success = self._store_properties_in_chromadb_parallel(all_properties) | |
| if success: | |
| logger.info(f"💾 Successfully stored {len(all_properties)} properties in ChromaDB") | |
| # Schedule next update in 24 hours | |
| self._schedule_property_update() | |
| return True | |
| else: | |
| logger.error("❌ Failed to store properties in ChromaDB") | |
| return False | |
| else: | |
| logger.warning("⚠️ No properties fetched") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error in enhanced parallel property fetch: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return False | |
| def _get_total_property_count(self) -> int: | |
| """Enhanced method to get total property count with multiple fallbacks""" | |
| try: | |
| # Try multiple endpoints and response structures | |
| endpoints = [ | |
| f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails", | |
| f"{self.config['backend_url']}/api/Property/count", | |
| f"{self.config['backend_url']}/api/Property/total" | |
| ] | |
| for endpoint in endpoints: | |
| try: | |
| response = requests.get( | |
| endpoint, | |
| params={'pageNumber': 1, 'pageSize': 1}, | |
| timeout=30, | |
| headers=self.config.get('headers', {}) | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Try multiple possible field names for total count | |
| count_fields = ['totalCount', 'total', 'count', 'totalRecords', 'recordCount'] | |
| for field in count_fields: | |
| if isinstance(data, dict) and field in data: | |
| total_count = data.get(field, 0) | |
| logger.info(f"📊 Total properties found via {endpoint}: {total_count}") | |
| return total_count | |
| # If no count field found, estimate from first page | |
| if isinstance(data, list) and len(data) > 0: | |
| estimated_count = len(data) * 50 # Conservative estimate | |
| logger.info(f"📊 Estimated total properties from {endpoint}: {estimated_count}") | |
| return estimated_count | |
| except Exception as e: | |
| logger.debug(f"⚠️ Failed to get count from {endpoint}: {e}") | |
| continue | |
| logger.warning("⚠️ Could not determine total property count, using default") | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"❌ Error getting total property count: {e}") | |
| return 0 | |
| def _fetch_property_page_enhanced(self, page: int, page_size: int) -> List[Dict]: | |
| """Enhanced property page fetching with better error handling and retry logic""" | |
| max_retries = 3 | |
| retry_delay = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"📄 Fetching page {page} with {page_size} properties (attempt {attempt + 1}/{max_retries})...") | |
| # Log the exact URL and parameters being sent | |
| url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" | |
| params = {'pageNumber': page, 'pageSize': page_size} | |
| logger.info(f"🌐 Making request to: {url}") | |
| logger.info(f"📋 Parameters: {params}") | |
| response = requests.get( | |
| url, | |
| params=params, | |
| headers=self.config.get('headers', {}) | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Log the raw response structure | |
| logger.info(f"📥 Raw response type: {type(data)}") | |
| if isinstance(data, dict): | |
| logger.info(f"📥 Response keys: {list(data.keys())}") | |
| # Handle different response structures | |
| if isinstance(data, list): | |
| properties = data | |
| logger.info(f"📥 Direct list response with {len(properties)} properties") | |
| elif isinstance(data, dict): | |
| # Try different possible field names | |
| for field in ['data', 'properties', 'items', 'results']: | |
| if field in data and isinstance(data[field], list): | |
| properties = data[field] | |
| logger.info(f"📥 Found properties in '{field}' field: {len(properties)} properties") | |
| break | |
| else: | |
| properties = [] | |
| logger.warning(f"⚠️ No properties found in response data") | |
| else: | |
| properties = [] | |
| logger.warning(f"⚠️ Unexpected response type: {type(data)}") | |
| # Clean and validate properties | |
| cleaned_properties = [] | |
| for prop in properties: | |
| if isinstance(prop, dict) and (prop.get('id') or prop.get('propertyId')): | |
| cleaned_prop = self._clean_property_data(prop) | |
| cleaned_properties.append(cleaned_prop) | |
| logger.info(f"✅ Page {page}: {len(cleaned_properties)} valid properties fetched (requested: {page_size})") | |
| return cleaned_properties | |
| elif response.status_code == 404: | |
| logger.info(f"📄 Page {page}: No more data (404)") | |
| return [] | |
| else: | |
| logger.warning(f"⚠️ Page {page}: HTTP {response.status_code}") | |
| except requests.exceptions.Timeout: | |
| logger.warning(f"⏰ Page {page}: Timeout on attempt {attempt + 1}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff | |
| continue | |
| except Exception as e: | |
| logger.error(f"❌ Page {page}: Error on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay * (2 ** attempt)) | |
| continue | |
| logger.error(f"❌ Page {page}: All attempts failed") | |
| return [] | |
| def _fetch_property_page(self, page: int, page_size: int) -> List[Dict]: | |
| """Legacy method - now calls enhanced version""" | |
| return self._fetch_property_page_enhanced(page, page_size) | |
| def calculate_ai_property_score(self, property_data: Dict, viewed_properties: List[Dict], | |
| preferences: Dict, strategy: str) -> float: | |
| """Calculate AI-based property score for recommendations""" | |
| score = 0.0 | |
| try: | |
| # Base score | |
| score = 50.0 | |
| # Property type preference | |
| prop_type = property_data.get('propertyTypeName', '').lower() | |
| preferred_types_raw = preferences.get('preferred_types', []) | |
| # Handle both strings and tuples in preferred_types | |
| preferred_types = [] | |
| for item in preferred_types_raw: | |
| if isinstance(item, tuple): | |
| preferred_types.append(str(item[0]).lower() if item else '') | |
| elif isinstance(item, str): | |
| preferred_types.append(item.lower()) | |
| else: | |
| preferred_types.append(str(item).lower()) | |
| if prop_type in preferred_types: | |
| score += 20 | |
| # Price range matching | |
| prop_price = float(property_data.get('price', 0) or 0) | |
| price_range = preferences.get('price_range', {}) | |
| min_price = price_range.get('min', 0) | |
| max_price = price_range.get('max', float('inf')) | |
| if min_price <= prop_price <= max_price: | |
| score += 15 | |
| elif prop_price < min_price: | |
| score -= 10 # Too cheap might be suspicious | |
| else: | |
| score -= 20 # Too expensive | |
| # Location and amenities | |
| features = property_data.get('features', []) | |
| if isinstance(features, str): | |
| import json | |
| try: | |
| features = json.loads(features) | |
| except: | |
| features = [] | |
| preferred_features = preferences.get('preferred_features', []) | |
| feature_matches = sum(1 for f in preferred_features if f.lower() in [feat.lower() for feat in features]) | |
| score += feature_matches * 3 | |
| # Property size scoring | |
| sqft = float(property_data.get('sqft', 0) or 0) | |
| beds = int(property_data.get('beds', 0) or 0) | |
| baths = int(property_data.get('baths', 0) or 0) | |
| # Reasonable size scoring | |
| if 500 <= sqft <= 5000: | |
| score += 10 | |
| if 1 <= beds <= 6: | |
| score += 5 | |
| if 1 <= baths <= 4: | |
| score += 5 | |
| # Strategy-based scoring | |
| if strategy == 'motivated': | |
| score += 10 # Boost high-quality matches | |
| elif strategy == 'casual': | |
| score += 5 # Moderate boost | |
| # Similarity to viewed properties | |
| if viewed_properties: | |
| similarity_score = self.calculate_property_similarity(property_data, viewed_properties) | |
| score += similarity_score * 15 | |
| # Ensure score is in reasonable range | |
| score = max(0, min(100, score)) | |
| except Exception as e: | |
| logger.error(f"❌ Error calculating AI score: {e}") | |
| score = 50.0 # Default score | |
| return score | |
| def calculate_property_similarity(self, property_data: Dict, viewed_properties: List[Dict]) -> float: | |
| """Calculate similarity between a property and user's viewed properties""" | |
| if not viewed_properties: | |
| return 0.0 | |
| similarities = [] | |
| for viewed_prop in viewed_properties: | |
| similarity = 0.0 | |
| # Type similarity | |
| if (property_data.get('propertyTypeName', '').lower() == | |
| viewed_prop.get('propertyTypeName', '').lower()): | |
| similarity += 0.3 | |
| # Price similarity (within 50% range) | |
| prop_price = float(property_data.get('price', 0) or 0) | |
| viewed_price = float(viewed_prop.get('price', 0) or viewed_prop.get('marketValue', 0) or 0) | |
| if prop_price > 0 and viewed_price > 0: | |
| price_ratio = min(prop_price, viewed_price) / max(prop_price, viewed_price) | |
| if price_ratio > 0.5: # Within 50% price range | |
| similarity += 0.4 * price_ratio | |
| # Size similarity | |
| prop_sqft = float(property_data.get('sqft', 0) or 0) | |
| viewed_sqft = float(viewed_prop.get('totalSquareFeet', 0) or 0) | |
| if prop_sqft > 0 and viewed_sqft > 0: | |
| size_ratio = min(prop_sqft, viewed_sqft) / max(prop_sqft, viewed_sqft) | |
| if size_ratio > 0.7: # Within 30% size range | |
| similarity += 0.3 * size_ratio | |
| similarities.append(similarity) | |
| return max(similarities) if similarities else 0.0 | |
| def generate_match_reasons(self, property_data: Dict, preferences: Dict) -> List[str]: | |
| """Generate AI explanations for why this property matches user preferences""" | |
| reasons = [] | |
| try: | |
| # Property type match | |
| prop_type = property_data.get('propertyTypeName', '') | |
| preferred_types_raw = preferences.get('preferred_types', []) | |
| # Handle both strings and tuples in preferred_types | |
| preferred_types = [] | |
| for item in preferred_types_raw: | |
| if isinstance(item, tuple): | |
| preferred_types.append(str(item[0]) if item else '') | |
| elif isinstance(item, str): | |
| preferred_types.append(item) | |
| else: | |
| preferred_types.append(str(item)) | |
| if prop_type in preferred_types: | |
| reasons.append(f"Matches your preferred property type: {prop_type}") | |
| # Price range match | |
| prop_price = float(property_data.get('price', 0) or 0) | |
| price_range = preferences.get('price_range', {}) | |
| min_price = price_range.get('min', 0) | |
| max_price = price_range.get('max', float('inf')) | |
| if min_price <= prop_price <= max_price: | |
| reasons.append(f"Within your budget range: ₹{prop_price:,.0f}") | |
| # Feature matches | |
| features = property_data.get('features', []) | |
| if isinstance(features, str): | |
| import json | |
| try: | |
| features = json.loads(features) | |
| except: | |
| features = [] | |
| preferred_features = preferences.get('preferred_features', []) | |
| matching_features = [f for f in preferred_features if f.lower() in [feat.lower() for feat in features]] | |
| if matching_features: | |
| reasons.append(f"Has your preferred amenities: {', '.join(matching_features[:3])}") | |
| # Size appropriateness | |
| beds = int(property_data.get('beds', 0) or 0) | |
| if beds > 0: | |
| reasons.append(f"Suitable size with {beds} bedroom{'s' if beds != 1 else ''}") | |
| # Default reasons if none found | |
| if not reasons: | |
| reasons = [ | |
| "Good property in desirable location", | |
| "Competitive pricing for the area", | |
| "Modern amenities and facilities" | |
| ] | |
| except Exception as e: | |
| logger.error(f"❌ Error generating match reasons: {e}") | |
| reasons = ["AI-recommended based on your preferences"] | |
| return reasons[:3] # Limit to top 3 reasons | |
| def search_properties_with_query(self, query: str, excluded_ids: set, max_results: int) -> List[Dict]: | |
| """Search properties with a specific query and exclusions""" | |
| try: | |
| if self.properties_collection is None: | |
| return [] | |
| results = self.properties_collection.query( | |
| query_texts=[query], | |
| n_results=max_results * 2, # Get extra for filtering | |
| include=["metadatas", "documents", "distances"] | |
| ) | |
| properties = [] | |
| if results and results['metadatas']: | |
| for metadata in results['metadatas'][0]: | |
| prop_id = metadata.get('id', '') | |
| if prop_id and prop_id not in excluded_ids: | |
| properties.append(metadata) | |
| if len(properties) >= max_results: | |
| break | |
| return properties | |
| except Exception as e: | |
| logger.error(f"❌ Error searching with query '{query}': {e}") | |
| return [] | |
| def test_sendgrid_email(self, recipient_email: str = None) -> Dict: | |
| """Test SendGrid email functionality""" | |
| if not recipient_email: | |
| recipient_email = self.email_config['sender_email'] # Send to self for testing | |
| logger.info(f"🧪 Testing SendGrid email to {recipient_email}") | |
| try: | |
| import requests | |
| # First, validate the API key | |
| validation_result = self.validate_sendgrid_api_key() | |
| if not validation_result['valid']: | |
| return { | |
| 'success': False, | |
| 'error': f"SendGrid API key validation failed: {validation_result['error']}", | |
| 'recipient': recipient_email, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # SendGrid API endpoint | |
| url = "https://api.sendgrid.com/v3/mail/send" | |
| # Simple test email | |
| email_data = { | |
| "personalizations": [ | |
| { | |
| "to": [ | |
| { | |
| "email": recipient_email | |
| } | |
| ] | |
| } | |
| ], | |
| "from": { | |
| "email": self.email_config['sender_email'] | |
| }, | |
| "subject": "🧪 SendGrid Test Email - AI Lead Analysis System", | |
| "content": [ | |
| { | |
| "type": "text/plain", | |
| "value": f"SendGrid Test Email - AI Lead Analysis System\n\nTest successful at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| }, | |
| { | |
| "type": "text/html", | |
| "value": f""" | |
| <html> | |
| <body> | |
| <h2>🧪 SendGrid Test Email</h2> | |
| <p>This is a test email to verify SendGrid integration is working correctly.</p> | |
| <p><strong>Test Details:</strong></p> | |
| <ul> | |
| <li>✅ SendGrid API: Connected</li> | |
| <li>✅ Authentication: Successful</li> | |
| <li>✅ Sender: {self.email_config['sender_email']}</li> | |
| <li>✅ Recipient: {recipient_email}</li> | |
| <li>✅ Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</li> | |
| </ul> | |
| <p>🎉 SendGrid email system is working!</p> | |
| <hr> | |
| <p><em>Sent by AI Lead Analysis System</em></p> | |
| </body> | |
| </html> | |
| """ | |
| } | |
| ] | |
| } | |
| # Send email with SendGrid API | |
| headers = { | |
| "Authorization": f"Bearer {self.email_config['sendgrid_api_key']}", | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.post(url, json=email_data, headers=headers, timeout=30) | |
| if response.status_code == 202: | |
| logger.info(f"✅ SendGrid test email sent successfully to {recipient_email}") | |
| return { | |
| 'success': True, | |
| 'recipient': recipient_email, | |
| 'subject': email_data['subject'], | |
| 'timestamp': datetime.now().isoformat(), | |
| 'message': 'SendGrid test email sent successfully' | |
| } | |
| else: | |
| logger.error(f"❌ SendGrid API error: {response.status_code} - {response.text}") | |
| return { | |
| 'success': False, | |
| 'error': f"SendGrid API error: {response.status_code} - {response.text}", | |
| 'recipient': recipient_email, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Failed to send SendGrid test email: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'recipient': recipient_email, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| def validate_sendgrid_api_key(self) -> Dict: | |
| """Validate SendGrid API key""" | |
| try: | |
| logger.info("🔑 Validating SendGrid API key...") | |
| import requests | |
| url = "https://api.sendgrid.com/v3/user/profile" | |
| headers = { | |
| "Authorization": f"Bearer {self.email_config['sendgrid_api_key']}", | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| profile_data = response.json() | |
| logger.info("✅ SendGrid API key is valid") | |
| return { | |
| 'success': True, | |
| 'valid': True, | |
| 'account_info': { | |
| 'email': profile_data.get('email', 'Unknown'), | |
| 'first_name': profile_data.get('first_name', 'Unknown'), | |
| 'last_name': profile_data.get('last_name', 'Unknown') | |
| } | |
| } | |
| else: | |
| logger.error(f"❌ SendGrid API key validation failed: {response.status_code}") | |
| return { | |
| 'success': False, | |
| 'valid': False, | |
| 'error': f"API key validation failed: {response.status_code}" | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error validating SendGrid API key: {e}") | |
| return { | |
| 'success': False, | |
| 'valid': False, | |
| 'error': str(e) | |
| } | |
| def analyze_automated_email_triggers(self, customer_analysis: Dict) -> Dict: | |
| """Analyze customer behavior and determine automated email triggers based on AI analysis""" | |
| logger.info("🤖 Analyzing automated email triggers based on customer behavior...") | |
| try: | |
| # Extract customer data from the actual analysis structure | |
| customer_data = customer_analysis.get('data', {}) | |
| analytics = customer_data.get('analytics', {}) | |
| lead_qual = customer_data.get('lead_qualification', {}) | |
| properties = customer_data.get('properties', []) | |
| if not properties: | |
| return { | |
| 'success': False, | |
| 'error': 'No property data available for analysis' | |
| } | |
| # Extract key metrics from the actual analysis | |
| engagement_level = analytics.get('engagement_level', 'Low') | |
| lead_score = lead_qual.get('lead_score', 0) | |
| lead_status = lead_qual.get('lead_status', 'UNKNOWN') | |
| # Get viewing patterns | |
| viewing_patterns = analytics.get('viewing_patterns', {}) | |
| total_views = viewing_patterns.get('morning_views', 0) + viewing_patterns.get('afternoon_views', 0) + viewing_patterns.get('evening_views', 0) | |
| # Get price preferences | |
| price_prefs = analytics.get('price_preferences', {}) | |
| avg_price = price_prefs.get('avg_price', 0) | |
| max_price = price_prefs.get('max_price', 0) | |
| min_price = price_prefs.get('min_price', 0) | |
| price_range = price_prefs.get('price_range', 0) | |
| # Get property type preferences | |
| preferred_types = analytics.get('preferred_property_types', []) | |
| # Get lead timeline for inactivity analysis | |
| lead_timeline = analytics.get('lead_timeline', []) | |
| days_since_last_view = 0 | |
| if lead_timeline: | |
| from datetime import datetime | |
| last_view_date = lead_timeline[0].get('date', '') | |
| if last_view_date: | |
| try: | |
| last_view = datetime.fromisoformat(last_view_date.replace('Z', '+00:00')) | |
| days_since_last_view = (datetime.now() - last_view).days | |
| except: | |
| days_since_last_view = 14 # Default fallback | |
| # Analyze different trigger conditions based on actual data | |
| triggers = [] | |
| # 1. Inactivity Alert Trigger (High Priority) | |
| if days_since_last_view > 7: | |
| triggers.append({ | |
| 'trigger_type': 'inactivity_alert', | |
| 'condition': f"Customer hasn't viewed properties in {days_since_last_view} days", | |
| 'reason': 'Customer may need re-engagement with fresh property listings', | |
| 'priority': 'high', | |
| 'recommendation_count': 5 | |
| }) | |
| # 2. Property Type Interest Trigger (Medium Priority) | |
| if len(preferred_types) >= 3: | |
| triggers.append({ | |
| 'trigger_type': 'property_type_interest', | |
| 'condition': f"Customer interested in {len(preferred_types)} property types: {', '.join(preferred_types[:3])}", | |
| 'reason': 'Customer exploring multiple property categories, showing diverse interests', | |
| 'priority': 'medium', | |
| 'recommendation_count': 7 | |
| }) | |
| # 3. Price Range Diversity Trigger (Medium Priority) | |
| if price_range > 10000000: # 1 crore range | |
| triggers.append({ | |
| 'trigger_type': 'price_range_diversity', | |
| 'condition': f"Customer exploring properties from ₹{min_price:,.0f} to ₹{max_price:,.0f}", | |
| 'reason': 'Customer showing interest across different price segments', | |
| 'priority': 'medium', | |
| 'recommendation_count': 6 | |
| }) | |
| # 4. Lead Score Milestone Trigger (High Priority) | |
| if lead_score >= 40: # LUKEWARM or better | |
| triggers.append({ | |
| 'trigger_type': 'lead_score_milestone', | |
| 'condition': f"Customer has {lead_status.lower()} lead status with score {lead_score}/100", | |
| 'reason': 'Customer showing qualified interest, ready for targeted recommendations', | |
| 'priority': 'high', | |
| 'recommendation_count': 8 | |
| }) | |
| # 5. Viewing Pattern Trigger (Medium Priority) | |
| if total_views >= 10: | |
| triggers.append({ | |
| 'trigger_type': 'viewing_pattern', | |
| 'condition': f"Customer has viewed {total_views} properties with peak time: {viewing_patterns.get('peak_viewing_time', 'unknown')}", | |
| 'reason': 'Customer actively researching properties, ready for curated recommendations', | |
| 'priority': 'medium', | |
| 'recommendation_count': 6 | |
| }) | |
| # 6. Engagement Level Trigger (Based on actual engagement score) | |
| if engagement_level == 'Low' and lead_score < 50: | |
| triggers.append({ | |
| 'trigger_type': 'engagement_boost', | |
| 'condition': f"Customer has {engagement_level.lower()} engagement ({lead_score}/100 score)", | |
| 'reason': 'Customer needs engagement boost with compelling property options', | |
| 'priority': 'high', | |
| 'recommendation_count': 5 | |
| }) | |
| # 7. Conversion Probability Trigger (High Priority) | |
| conversion_prob = analytics.get('conversion_probability', {}) | |
| final_prob = conversion_prob.get('final_probability', 0) | |
| if final_prob >= 50: | |
| triggers.append({ | |
| 'trigger_type': 'high_conversion_potential', | |
| 'condition': f"Customer has {final_prob:.1f}% conversion probability", | |
| 'reason': 'High-value prospect needing premium property recommendations', | |
| 'priority': 'high', | |
| 'recommendation_count': 10 | |
| }) | |
| # Generate AI insights based on actual analysis | |
| ai_insights = { | |
| 'personality_type': 'Explorer' if len(preferred_types) >= 3 else 'Focused', | |
| 'decision_making_style': 'Analytical' if total_views >= 10 else 'Casual', | |
| 'urgency_level': 'High' if days_since_last_view > 7 else 'Medium', | |
| 'budget_confidence': 'Flexible' if price_range > 10000000 else 'Specific', | |
| 'location_focus': 'Diverse' if len(preferred_types) >= 3 else 'Specific', | |
| 'engagement_trend': 'Declining' if days_since_last_view > 7 else 'Stable', | |
| 'conversion_readiness': 'High' if final_prob >= 50 else 'Medium' | |
| } | |
| return { | |
| 'success': True, | |
| 'triggers': triggers, | |
| 'total_triggers': len(triggers), | |
| 'ai_insights': ai_insights, | |
| 'analysis_summary': { | |
| 'engagement_level': engagement_level, | |
| 'lead_score': lead_score, | |
| 'lead_status': lead_status, | |
| 'total_properties_viewed': len(properties), | |
| 'total_views': total_views, | |
| 'average_price_range': f"₹{avg_price:,.0f}", | |
| 'price_range_diversity': f"₹{min_price:,.0f} - ₹{max_price:,.0f}", | |
| 'preferred_locations': len(preferred_types), | |
| 'days_since_last_view': days_since_last_view, | |
| 'conversion_probability': f"{final_prob:.1f}%", | |
| 'peak_viewing_time': viewing_patterns.get('peak_viewing_time', 'unknown') | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing automated email triggers: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def generate_automated_email_content(self, trigger: Dict, customer_analysis: Dict, ai_insights: Dict) -> Dict: | |
| """Generate email content based on trigger type and actual customer analysis""" | |
| logger.info(f"📝 Generating automated email content for trigger: {trigger['trigger_type']}") | |
| try: | |
| # Get customer data from actual analysis structure | |
| customer_data = customer_analysis.get('data', {}) | |
| analytics = customer_data.get('analytics', {}) | |
| lead_qual = customer_data.get('lead_qualification', {}) | |
| properties = customer_data.get('properties', []) | |
| # Generate personalized greeting | |
| customer_name = f"Customer {customer_analysis.get('customer_id', 'Unknown')}" | |
| greeting = f"Hello {customer_name}," | |
| # Generate content based on trigger type and actual data | |
| trigger_type = trigger['trigger_type'] | |
| if trigger_type == 'inactivity_alert': | |
| days = analytics.get('lead_timeline', [{}])[0].get('date', '') | |
| if days: | |
| from datetime import datetime | |
| try: | |
| last_view = datetime.fromisoformat(days.replace('Z', '+00:00')) | |
| days_ago = (datetime.now() - last_view).days | |
| except: | |
| days_ago = 13 | |
| else: | |
| days_ago = 13 | |
| content = f""" | |
| <p>We noticed you haven't been browsing properties for {days_ago} days. The real estate market is constantly evolving with exciting new opportunities!</p> | |
| <p>Based on your previous interest in {len(analytics.get('preferred_property_types', []))} different property types, we've curated some fresh listings that might reignite your search for the perfect property.</p> | |
| """ | |
| subject = "🆕 Fresh Properties to Rekindle Your Search" | |
| elif trigger_type == 'property_type_interest': | |
| preferred_types = analytics.get('preferred_property_types', []) | |
| content = f""" | |
| <p>Your interest in {', '.join(preferred_types[:3])} properties shows you have excellent taste and diverse preferences!</p> | |
| <p>We've found some amazing properties across these categories that match your sophisticated requirements and might be perfect for your next investment.</p> | |
| """ | |
| subject = "🏠 Diverse Property Options for You" | |
| elif trigger_type == 'price_range_diversity': | |
| price_prefs = analytics.get('price_preferences', {}) | |
| min_price = price_prefs.get('min_price', 0) | |
| max_price = price_prefs.get('max_price', 0) | |
| content = f""" | |
| <p>We see you're exploring properties from ₹{min_price:,.0f} to ₹{max_price:,.0f} - that's an impressive range showing you're considering various investment options!</p> | |
| <p>Here are some outstanding properties across your preferred price segments that offer exceptional value and investment potential.</p> | |
| """ | |
| subject = "💰 Properties Across Your Investment Range" | |
| elif trigger_type == 'lead_score_milestone': | |
| lead_score = lead_qual.get('lead_score', 0) | |
| lead_status = lead_qual.get('lead_status', 'UNKNOWN') | |
| content = f""" | |
| <p>Congratulations! Your lead score of {lead_score}/100 puts you in our {lead_status.lower()} category of prospects.</p> | |
| <p>As a qualified customer, we're excited to share these exclusive properties that match your refined preferences and investment criteria.</p> | |
| """ | |
| subject = "⭐ Exclusive Properties for Our Qualified Customer" | |
| elif trigger_type == 'viewing_pattern': | |
| viewing_patterns = analytics.get('viewing_patterns', {}) | |
| total_views = viewing_patterns.get('morning_views', 0) + viewing_patterns.get('afternoon_views', 0) + viewing_patterns.get('evening_views', 0) | |
| peak_time = viewing_patterns.get('peak_viewing_time', 'your preferred time') | |
| content = f""" | |
| <p>You've viewed {total_views} properties with peak activity during {peak_time} - that's impressive research! You clearly know what you're looking for.</p> | |
| <p>Based on your extensive browsing patterns, here are some properties that align perfectly with your refined preferences and viewing behavior.</p> | |
| """ | |
| subject = "🎯 Targeted Recommendations Based on Your Research" | |
| elif trigger_type == 'engagement_boost': | |
| engagement_level = analytics.get('engagement_level', 'Low') | |
| lead_score = lead_qual.get('lead_score', 0) | |
| content = f""" | |
| <p>We noticed your current engagement level is {engagement_level.lower()} with a lead score of {lead_score}/100.</p> | |
| <p>To help you find your perfect property faster, we've curated some compelling options that might boost your interest and engagement with our platform.</p> | |
| """ | |
| subject = "🚀 Boost Your Property Search with These Gems" | |
| elif trigger_type == 'high_conversion_potential': | |
| conversion_prob = analytics.get('conversion_probability', {}) | |
| final_prob = conversion_prob.get('final_probability', 0) | |
| content = f""" | |
| <p>Excellent news! Our AI analysis shows you have a {final_prob:.1f}% conversion probability - you're a high-value prospect!</p> | |
| <p>As a premium customer with strong buying potential, we're excited to share these exclusive properties that match your sophisticated requirements.</p> | |
| """ | |
| subject = "💎 Premium Properties for High-Value Prospect" | |
| else: | |
| content = """ | |
| <p>We've analyzed your property viewing behavior and found some excellent recommendations for you.</p> | |
| <p>These properties are carefully selected based on your preferences, viewing patterns, and market trends.</p> | |
| """ | |
| subject = "🏠 Personalized Property Recommendations" | |
| # Get AI recommendations using the stored properties in ChromaDB | |
| ai_engine = self.get_ai_engine() if hasattr(self, 'get_ai_engine') else None | |
| if ai_engine and hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: | |
| # Use ChromaDB to find similar properties | |
| recommendations = self._get_ai_recommendations_from_chromadb( | |
| properties, analytics, trigger.get('recommendation_count', 5), trigger_type | |
| ) | |
| else: | |
| # Fallback to existing properties | |
| recommendations = properties[:trigger.get('recommendation_count', 5)] | |
| # Create email HTML | |
| email_content = self.create_automated_email_html( | |
| greeting, content, recommendations, trigger, customer_analysis | |
| ) | |
| return { | |
| 'success': True, | |
| 'subject': subject, | |
| 'html_content': email_content['html'], | |
| 'text_content': email_content['text'], | |
| 'recommendations_count': len(recommendations), | |
| 'trigger_type': trigger_type, | |
| 'priority': trigger['priority'] | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error generating automated email content: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def _get_ai_recommendations_from_chromadb(self, viewed_properties: List[Dict], analytics: Dict, count: int, trigger_type: str = None) -> List[Dict]: | |
| """Get AI recommendations from ChromaDB based on customer analysis and trigger type""" | |
| try: | |
| if not hasattr(self, 'properties_collection') or not self.properties_collection: | |
| return viewed_properties[:count] | |
| # Create different search queries based on trigger type | |
| preferred_types = analytics.get('preferred_property_types', []) | |
| price_prefs = analytics.get('price_preferences', {}) | |
| avg_price = price_prefs.get('avg_price', 0) | |
| min_price = price_prefs.get('min_price', 0) | |
| max_price = price_prefs.get('max_price', 0) | |
| # Different search strategies based on trigger type | |
| if trigger_type == 'inactivity_alert': | |
| # For inactive customers, show fresh/new properties | |
| query_text = f"New and fresh properties in {', '.join(preferred_types[:2])} categories around ₹{avg_price:,.0f}" | |
| count = min(count, 5) # Fewer properties for re-engagement | |
| elif trigger_type == 'property_type_interest': | |
| # For diverse interests, show variety across types | |
| query_text = f"Diverse properties across {', '.join(preferred_types)} types with good investment potential" | |
| count = min(count, 7) # More properties to show variety | |
| elif trigger_type == 'price_range_diversity': | |
| # For price diversity, show properties across different price ranges | |
| query_text = f"Properties from ₹{min_price:,.0f} to ₹{max_price:,.0f} with excellent value" | |
| count = min(count, 6) | |
| elif trigger_type == 'lead_score_milestone': | |
| # For qualified leads, show premium properties | |
| query_text = f"Premium {', '.join(preferred_types[:2])} properties for qualified customers around ₹{avg_price:,.0f}" | |
| count = min(count, 8) | |
| elif trigger_type == 'viewing_pattern': | |
| # For active researchers, show highly relevant properties | |
| query_text = f"Highly relevant {', '.join(preferred_types)} properties matching viewing patterns" | |
| count = min(count, 6) | |
| elif trigger_type == 'engagement_boost': | |
| # For low engagement, show compelling properties | |
| query_text = f"Compelling and attractive properties in {', '.join(preferred_types[:2])} categories" | |
| count = min(count, 5) | |
| elif trigger_type == 'high_conversion_potential': | |
| # For high conversion potential, show exclusive properties | |
| query_text = f"Exclusive and premium properties for high-value prospects in {', '.join(preferred_types)} categories" | |
| count = min(count, 10) | |
| else: | |
| # Default search | |
| query_text = f"Properties similar to {', '.join(preferred_types)} types around ₹{avg_price:,.0f} price range" | |
| logger.info(f"🔍 Searching ChromaDB with query: {query_text} for trigger: {trigger_type}") | |
| # Search in ChromaDB | |
| results = self.properties_collection.query( | |
| query_texts=[query_text], | |
| n_results=count * 2, # Get more to filter | |
| include=["metadatas", "documents", "distances"] | |
| ) | |
| if results and results['metadatas'] and results['metadatas'][0]: | |
| recommendations = [] | |
| for i, metadata in enumerate(results['metadatas'][0]): | |
| if len(recommendations) >= count: | |
| break | |
| # Safely extract metadata fields with fallbacks | |
| try: | |
| property_data = { | |
| 'id': metadata.get('id', f'prop_{i}'), | |
| 'propertyId': metadata.get('id', f'prop_{i}'), | |
| 'propertyName': metadata.get('propertyName', metadata.get('name', 'Property')), | |
| 'propertyTypeName': metadata.get('propertyTypeName', metadata.get('typeName', metadata.get('type', 'Property'))), | |
| 'price': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), | |
| 'marketValue': float(metadata.get('price', 0) or metadata.get('marketValue', 0) or 0), | |
| 'address': metadata.get('address', metadata.get('location', '')), | |
| 'location': metadata.get('address', metadata.get('location', '')), | |
| 'beds': int(metadata.get('beds', 0) or metadata.get('bedrooms', 0) or 0), | |
| 'baths': int(metadata.get('baths', 0) or metadata.get('bathrooms', 0) or 0), | |
| 'totalSquareFeet': float(metadata.get('sqft', 0) or metadata.get('area', 0) or 0), | |
| 'description': metadata.get('description', ''), | |
| 'features': self.safe_json_parse(metadata.get('features', '[]')), | |
| 'similarity_score': 1 - results['distances'][0][i] if results.get('distances') and results['distances'][0] else 0.5, | |
| 'chromadb_rank': i + 1 | |
| } | |
| # Add any additional fields that might exist | |
| for key, value in metadata.items(): | |
| if key not in property_data: | |
| property_data[key] = value | |
| recommendations.append(property_data) | |
| except Exception as field_error: | |
| logger.warning(f"⚠️ Error processing metadata field in _get_ai_recommendations_from_chromadb: {field_error}") | |
| # Try to create a minimal property object | |
| try: | |
| minimal_property = { | |
| 'id': metadata.get('id', f'prop_{i}'), | |
| 'propertyId': metadata.get('id', f'prop_{i}'), | |
| 'propertyName': 'Property', | |
| 'propertyTypeName': 'Property', | |
| 'price': 0.0, | |
| 'marketValue': 0.0, | |
| 'address': '', | |
| 'location': '', | |
| 'beds': 0, | |
| 'baths': 0, | |
| 'totalSquareFeet': 0.0, | |
| 'description': '', | |
| 'features': [], | |
| 'similarity_score': 0.5, | |
| 'chromadb_rank': i + 1 | |
| } | |
| recommendations.append(minimal_property) | |
| except Exception as minimal_error: | |
| logger.error(f"❌ Failed to create minimal property in _get_ai_recommendations_from_chromadb: {minimal_error}") | |
| continue | |
| logger.info(f"✅ Found {len(recommendations)} recommendations for trigger: {trigger_type}") | |
| return recommendations | |
| else: | |
| logger.warning(f"⚠️ No results from ChromaDB for trigger: {trigger_type}") | |
| return viewed_properties[:count] | |
| except Exception as e: | |
| logger.error(f"❌ Error in _get_ai_recommendations_from_chromadb: {e}") | |
| return viewed_properties[:count] | |
| def create_automated_email_html(self, greeting: str, content: str, recommendations: List[Dict], | |
| trigger: Dict, customer_analysis: Dict) -> Dict: | |
| """Create HTML and text versions of automated email content""" | |
| try: | |
| logger.info("🎨 Creating automated email HTML content...") | |
| # Create property cards | |
| property_cards = "" | |
| for i, prop in enumerate(recommendations[:5]): # Limit to 5 properties | |
| property_cards += f""" | |
| <div style="border: 1px solid #ddd; border-radius: 8px; padding: 15px; margin: 15px 0; background: white;"> | |
| <h3 style="color: #2c3e50; margin: 0 0 10px 0;">{prop.get('propertyName', f'Property {i+1}')}</h3> | |
| <p style="color: #7f8c8d; margin: 5px 0;">📍 {prop.get('address', 'Address not available')}</p> | |
| <p style="color: #e74c3c; font-weight: bold; margin: 5px 0;">💰 ₹{prop.get('price', 0):,.0f}</p> | |
| <p style="color: #34495e; margin: 5px 0;">🏠 {prop.get('propertyTypeName', 'Property Type')}</p> | |
| <p style="color: #27ae60; margin: 5px 0;">⭐ AI Match Score: {prop.get('ai_score', 0):.1f}%</p> | |
| </div> | |
| """ | |
| # Create HTML content using string concatenation to avoid f-string backslash issues | |
| html_content = """ | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <title>Automated Property Recommendations</title> | |
| <style> | |
| body { font-family: Arial, sans-serif; line-height: 1.6; color: #333; } | |
| .container { max-width: 600px; margin: 0 auto; padding: 20px; } | |
| .header { background: #2c3e50; color: white; padding: 20px; text-align: center; border-radius: 8px; } | |
| .content { padding: 20px; background: #f8f9fa; border-radius: 8px; margin: 20px 0; } | |
| .trigger-info { background: #e8f5e8; padding: 15px; border-radius: 8px; margin: 15px 0; } | |
| .footer { text-align: center; color: #7f8c8d; font-size: 12px; margin-top: 30px; } | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>🤖 AI-Powered Property Recommendations</h1> | |
| <p>Automated Email Trigger: """ + trigger['trigger_type'].replace('_', ' ').title() + """</p> | |
| </div> | |
| <div class="content"> | |
| <p>""" + greeting + """</p> | |
| """ + content + """ | |
| <div class="trigger-info"> | |
| <h3>📊 Why You Received This Email:</h3> | |
| <p><strong>Condition:</strong> """ + trigger['condition'] + """</p> | |
| <p><strong>Reason:</strong> """ + trigger['reason'] + """</p> | |
| <p><strong>Priority:</strong> """ + trigger['priority'].title() + """</p> | |
| </div> | |
| <h2>🏠 Recommended Properties</h2> | |
| """ + property_cards + """ | |
| <div style="text-align: center; margin: 30px 0;"> | |
| <a href="#" style="background: #3498db; color: white; padding: 12px 30px; text-decoration: none; border-radius: 5px;"> | |
| View All Properties | |
| </a> | |
| </div> | |
| </div> | |
| <div class="footer"> | |
| <p>This email was automatically generated based on your property viewing behavior.</p> | |
| <p>Generated at: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """</p> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| # Create text version using string concatenation | |
| text_content = """ | |
| AI-Powered Property Recommendations | |
| """ + greeting + """ | |
| """ + content.replace('<p>', '').replace('</p>', '\n') + """ | |
| Why you received this email: | |
| Condition: """ + trigger['condition'] + """ | |
| Reason: """ + trigger['reason'] + """ | |
| Priority: """ + trigger['priority'].title() + """ | |
| Recommended Properties: | |
| """ | |
| for i, prop in enumerate(recommendations[:5]): # Limit to 5 in text version | |
| text_content += """ | |
| """ + str(i+1) + """. """ + prop.get('propertyName', f'Property {i+1}') + """ | |
| Address: """ + prop.get('address', 'Address not available') + """ | |
| Price: ₹""" + f"{prop.get('price', 0):,.0f}" + """ | |
| Type: """ + prop.get('propertyTypeName', 'Property Type') + """ | |
| AI Match Score: """ + f"{prop.get('ai_score', 0):.1f}" + """% | |
| """ | |
| text_content += """ | |
| Generated at: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """ | |
| """ | |
| return { | |
| 'html': html_content, | |
| 'text': text_content | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error creating automated email HTML: {e}") | |
| return { | |
| 'html': f"<p>Error generating email content: {e}</p>", | |
| 'text': f"Error generating email content: {e}" | |
| } | |
| def _store_properties_in_chromadb(self, properties: List[Dict]) -> bool: | |
| """Store properties in ChromaDB for AI analysis""" | |
| try: | |
| if not self.properties_collection: | |
| logger.error("❌ Properties collection not initialized") | |
| return False | |
| logger.info(f"💾 Storing {len(properties)} properties in ChromaDB...") | |
| # Process properties in batches | |
| batch_size = 100 | |
| total_stored = 0 | |
| for i in range(0, len(properties), batch_size): | |
| batch = properties[i:i + batch_size] | |
| # Prepare batch data for ChromaDB | |
| documents = [] | |
| metadatas = [] | |
| ids = [] | |
| for prop in batch: | |
| try: | |
| # Create property description | |
| description = self.create_property_description(prop) | |
| # Clean metadata | |
| metadata = self.clean_metadata_for_chromadb(prop) | |
| # Generate unique ID | |
| prop_id = str(uuid.uuid4()) | |
| documents.append(description) | |
| metadatas.append(metadata) | |
| ids.append(prop_id) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Skipping property due to error: {e}") | |
| continue | |
| if documents: | |
| try: | |
| # Add batch to ChromaDB | |
| self.properties_collection.add( | |
| documents=documents, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| total_stored += len(documents) | |
| logger.info(f"✅ Stored batch {i//batch_size + 1}: {len(documents)} properties") | |
| except Exception as e: | |
| logger.error(f"❌ Error storing batch {i//batch_size + 1}: {e}") | |
| continue | |
| logger.info(f"🎉 Successfully stored {total_stored} properties in ChromaDB") | |
| return total_stored > 0 | |
| except Exception as e: | |
| logger.error(f"❌ Error storing properties in ChromaDB: {e}") | |
| return False | |
| def get_enhanced_ai_recommendations(self, customer_analysis: Dict, ai_insights: Dict, count: int = 10) -> List[Dict]: | |
| """Get enhanced AI recommendations using small AI models""" | |
| try: | |
| logger.info("🤖 Generating enhanced AI recommendations...") | |
| # Extract customer preferences and behavior patterns | |
| preferences = self.extract_customer_preferences(customer_analysis, ai_insights) | |
| # Get viewed properties for similarity analysis | |
| viewed_properties = customer_analysis.get('data', {}).get('properties', []) | |
| # Generate multiple recommendation strategies | |
| recommendations = [] | |
| # Strategy 1: Similarity-based recommendations | |
| similarity_recs = self.get_similarity_based_recommendations(viewed_properties, preferences, count // 3) | |
| recommendations.extend(similarity_recs) | |
| # Strategy 2: Preference-based recommendations | |
| preference_recs = self.get_preference_based_recommendations(preferences, viewed_properties, count // 3) | |
| recommendations.extend(preference_recs) | |
| # Strategy 3: Behavioral recommendations | |
| behavioral_recs = self.get_behavioral_recommendations(customer_analysis, ai_insights, count // 3) | |
| recommendations.extend(behavioral_recs) | |
| # Remove duplicates and limit to requested count | |
| unique_recommendations = self.remove_duplicate_recommendations(recommendations) | |
| final_recommendations = unique_recommendations[:count] | |
| # Enhance with AI explanations | |
| enhanced_recommendations = self.add_ai_explanations(final_recommendations, preferences, ai_insights) | |
| logger.info(f"✅ Generated {len(enhanced_recommendations)} enhanced AI recommendations") | |
| return enhanced_recommendations | |
| except Exception as e: | |
| logger.error(f"❌ Error generating enhanced AI recommendations: {e}") | |
| return self.get_fallback_recommendations(count) | |
| def extract_customer_preferences(self, customer_analysis: Dict, ai_insights: Dict) -> Dict: | |
| """Extract detailed customer preferences using AI models""" | |
| try: | |
| preferences = { | |
| 'price_range': self.analyze_price_preferences(customer_analysis), | |
| 'property_types': self.analyze_property_type_preferences(customer_analysis), | |
| 'locations': self.analyze_location_preferences(customer_analysis), | |
| 'features': self.analyze_feature_preferences(customer_analysis), | |
| 'behavioral_patterns': self.analyze_behavioral_patterns(customer_analysis, ai_insights), | |
| 'engagement_level': self.analyze_engagement_level(customer_analysis), | |
| 'urgency_level': ai_insights.get('urgency_level', 'Medium'), | |
| 'personality_type': ai_insights.get('personality_type', 'Balanced') | |
| } | |
| # Use AI models to enhance preferences | |
| if self.property_classifier: | |
| preferences['property_classification'] = self.classify_property_preferences(preferences) | |
| if self.price_predictor: | |
| preferences['price_prediction'] = self.predict_price_preferences(preferences) | |
| if self.preference_analyzer: | |
| preferences['preference_weights'] = self.calculate_preference_weights(preferences) | |
| return preferences | |
| except Exception as e: | |
| logger.error(f"❌ Error extracting customer preferences: {e}") | |
| return {} | |
| def analyze_price_preferences(self, customer_analysis: Dict) -> Dict: | |
| """Analyze price preferences using AI models""" | |
| try: | |
| properties = customer_analysis.get('data', {}).get('properties', []) | |
| if not properties: | |
| return {'preferred_range': 'mid_range', 'confidence': 0.5} | |
| prices = [p.get('price', 0) for p in properties if p.get('price', 0) > 0] | |
| if not prices: | |
| return {'preferred_range': 'mid_range', 'confidence': 0.5} | |
| avg_price = sum(prices) / len(prices) | |
| min_price = min(prices) | |
| max_price = max(prices) | |
| price_variance = np.var(prices) if len(prices) > 1 else 0 | |
| # Use price predictor to classify | |
| if self.price_predictor: | |
| for range_name, (min_val, max_val) in self.price_predictor['price_ranges'].items(): | |
| if min_val <= avg_price <= max_val: | |
| confidence = 1.0 - (price_variance / (avg_price ** 2)) if avg_price > 0 else 0.5 | |
| return { | |
| 'preferred_range': range_name, | |
| 'avg_price': avg_price, | |
| 'min_price': min_price, | |
| 'max_price': max_price, | |
| 'confidence': min(confidence, 1.0), | |
| 'price_consistency': 1.0 - (price_variance / (avg_price ** 2)) if avg_price > 0 else 0.5 | |
| } | |
| # Fallback classification | |
| if avg_price > 50000000: | |
| range_name = 'luxury' | |
| elif avg_price > 25000000: | |
| range_name = 'premium' | |
| elif avg_price > 10000000: | |
| range_name = 'mid_range' | |
| else: | |
| range_name = 'affordable' | |
| return { | |
| 'preferred_range': range_name, | |
| 'avg_price': avg_price, | |
| 'min_price': min_price, | |
| 'max_price': max_price, | |
| 'confidence': 0.7 | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing price preferences: {e}") | |
| return {'preferred_range': 'mid_range', 'confidence': 0.5} | |
| def analyze_property_type_preferences(self, customer_analysis: Dict) -> Dict: | |
| """Analyze property type preferences using AI models""" | |
| try: | |
| properties = customer_analysis.get('data', {}).get('properties', []) | |
| if not properties: | |
| return {'preferred_types': [], 'confidence': 0.5} | |
| type_counts = {} | |
| type_engagement = {} | |
| for prop in properties: | |
| prop_type = prop.get('propertyTypeName', 'Unknown') | |
| engagement = prop.get('viewCount', 0) * prop.get('totalDuration', 0) | |
| if prop_type not in type_counts: | |
| type_counts[prop_type] = 0 | |
| type_engagement[prop_type] = 0 | |
| type_counts[prop_type] += 1 | |
| type_engagement[prop_type] += engagement | |
| # Calculate weighted preferences | |
| weighted_preferences = [] | |
| for prop_type, count in type_counts.items(): | |
| avg_engagement = type_engagement[prop_type] / count | |
| weight = self.property_classifier.get('property_type_weights', {}).get(prop_type, 1.0) if self.property_classifier else 1.0 | |
| weighted_score = count * avg_engagement * weight | |
| weighted_preferences.append((prop_type, weighted_score)) | |
| # Sort by weighted score | |
| weighted_preferences.sort(key=lambda x: x[1], reverse=True) | |
| preferred_types = [pt[0] for pt in weighted_preferences[:3]] | |
| confidence = min(len(preferred_types) / 3, 1.0) | |
| return { | |
| 'preferred_types': preferred_types, | |
| 'type_counts': type_counts, | |
| 'type_engagement': type_engagement, | |
| 'confidence': confidence, | |
| 'diversity_score': len(type_counts) / max(len(properties), 1) | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing property type preferences: {e}") | |
| return {'preferred_types': [], 'confidence': 0.5} | |
| def analyze_location_preferences(self, customer_analysis: Dict) -> Dict: | |
| """Analyze location preferences using AI models""" | |
| try: | |
| properties = customer_analysis.get('data', {}).get('properties', []) | |
| if not properties: | |
| return {'preferred_locations': [], 'confidence': 0.5} | |
| location_counts = {} | |
| location_engagement = {} | |
| for prop in properties: | |
| location = prop.get('location', 'Unknown') | |
| engagement = prop.get('viewCount', 0) * prop.get('totalDuration', 0) | |
| if location not in location_counts: | |
| location_counts[location] = 0 | |
| location_engagement[location] = 0 | |
| location_counts[location] += 1 | |
| location_engagement[location] += engagement | |
| # Calculate location preferences | |
| location_scores = [] | |
| for location, count in location_counts.items(): | |
| avg_engagement = location_engagement[location] / count | |
| score = count * avg_engagement | |
| location_scores.append((location, score)) | |
| location_scores.sort(key=lambda x: x[1], reverse=True) | |
| preferred_locations = [loc[0] for loc in location_scores[:5]] | |
| return { | |
| 'preferred_locations': preferred_locations, | |
| 'location_counts': location_counts, | |
| 'confidence': min(len(preferred_locations) / 5, 1.0) | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing location preferences: {e}") | |
| return {'preferred_locations': [], 'confidence': 0.5} | |
| def analyze_feature_preferences(self, customer_analysis: Dict) -> Dict: | |
| """Analyze feature preferences using AI models""" | |
| try: | |
| properties = customer_analysis.get('data', {}).get('properties', []) | |
| if not properties: | |
| return {'preferred_features': [], 'confidence': 0.5} | |
| # Extract features from property descriptions | |
| feature_counts = {} | |
| for prop in properties: | |
| description = prop.get('description', '') + ' ' + prop.get('propertyName', '') | |
| features = self.extract_features_from_text(description) | |
| for feature in features: | |
| if feature not in feature_counts: | |
| feature_counts[feature] = 0 | |
| feature_counts[feature] += 1 | |
| # Get top features | |
| sorted_features = sorted(feature_counts.items(), key=lambda x: x[1], reverse=True) | |
| preferred_features = [f[0] for f in sorted_features[:10]] | |
| return { | |
| 'preferred_features': preferred_features, | |
| 'feature_counts': feature_counts, | |
| 'confidence': min(len(preferred_features) / 10, 1.0) | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing feature preferences: {e}") | |
| return {'preferred_features': [], 'confidence': 0.5} | |
| def extract_features_from_text(self, text: str) -> List[str]: | |
| """Extract property features from text using simple NLP""" | |
| try: | |
| features = [] | |
| text_lower = text.lower() | |
| # Common property features | |
| feature_keywords = { | |
| 'balcony': ['balcony', 'terrace', 'veranda'], | |
| 'parking': ['parking', 'garage', 'car space'], | |
| 'garden': ['garden', 'lawn', 'backyard'], | |
| 'swimming_pool': ['pool', 'swimming'], | |
| 'gym': ['gym', 'fitness', 'exercise'], | |
| 'security': ['security', 'guard', 'cctv'], | |
| 'elevator': ['elevator', 'lift'], | |
| 'modern': ['modern', 'contemporary', 'new'], | |
| 'luxury': ['luxury', 'premium', 'exclusive'], | |
| 'spacious': ['spacious', 'large', 'big'], | |
| 'corner': ['corner', 'end unit'], | |
| 'view': ['view', 'panoramic', 'city view'] | |
| } | |
| for feature, keywords in feature_keywords.items(): | |
| if any(keyword in text_lower for keyword in keywords): | |
| features.append(feature) | |
| return features | |
| except Exception as e: | |
| logger.error(f"❌ Error extracting features: {e}") | |
| return [] | |
| def analyze_behavioral_patterns(self, customer_analysis: Dict, ai_insights: Dict) -> Dict: | |
| """Analyze behavioral patterns using AI models""" | |
| try: | |
| properties = customer_analysis.get('data', {}).get('properties', []) | |
| analytics = customer_analysis.get('data', {}).get('analytics', {}) | |
| patterns = { | |
| 'viewing_frequency': self.analyze_viewing_frequency(properties), | |
| 'engagement_timing': self.analyze_engagement_timing(properties), | |
| 'property_exploration': self.analyze_property_exploration(properties), | |
| 'decision_making_style': ai_insights.get('decision_making_style', 'Balanced'), | |
| 'urgency_level': ai_insights.get('urgency_level', 'Medium') | |
| } | |
| # Use preference analyzer to enhance patterns | |
| if self.preference_analyzer: | |
| patterns['engagement_weight'] = self.preference_analyzer['engagement_weights'].get( | |
| analytics.get('engagement_level', 'medium').lower(), 1.0 | |
| ) | |
| return patterns | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing behavioral patterns: {e}") | |
| return {} | |
| def analyze_viewing_frequency(self, properties: List[Dict]) -> str: | |
| """Analyze viewing frequency patterns""" | |
| try: | |
| if not properties: | |
| return 'low' | |
| total_views = sum(p.get('viewCount', 0) for p in properties) | |
| avg_views = total_views / len(properties) | |
| if avg_views > 5: | |
| return 'high' | |
| elif avg_views > 2: | |
| return 'medium' | |
| else: | |
| return 'low' | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing viewing frequency: {e}") | |
| return 'medium' | |
| def analyze_engagement_timing(self, properties: List[Dict]) -> str: | |
| """Analyze engagement timing patterns""" | |
| try: | |
| if not properties: | |
| return 'afternoon' | |
| morning_views = 0 | |
| afternoon_views = 0 | |
| evening_views = 0 | |
| for prop in properties: | |
| if prop.get('lastViewedAt'): | |
| try: | |
| view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) | |
| hour = view_time.hour | |
| if 6 <= hour < 12: | |
| morning_views += prop.get('viewCount', 0) | |
| elif 12 <= hour < 18: | |
| afternoon_views += prop.get('viewCount', 0) | |
| else: | |
| evening_views += prop.get('viewCount', 0) | |
| except: | |
| pass | |
| if morning_views > max(afternoon_views, evening_views): | |
| return 'morning' | |
| elif afternoon_views > evening_views: | |
| return 'afternoon' | |
| else: | |
| return 'evening' | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing engagement timing: {e}") | |
| return 'afternoon' | |
| def analyze_property_exploration(self, properties: List[Dict]) -> str: | |
| """Analyze property exploration patterns""" | |
| try: | |
| if not properties: | |
| return 'focused' | |
| property_types = set(p.get('propertyTypeName', 'Unknown') for p in properties) | |
| price_range = max(p.get('price', 0) for p in properties) - min(p.get('price', 0) for p in properties) | |
| avg_price = sum(p.get('price', 0) for p in properties) / len(properties) | |
| if len(property_types) > 3: | |
| return 'diverse' | |
| elif price_range > avg_price * 0.5: | |
| return 'exploratory' | |
| else: | |
| return 'focused' | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing property exploration: {e}") | |
| return 'focused' | |
| def analyze_engagement_level(self, customer_analysis: Dict) -> str: | |
| """Analyze overall engagement level""" | |
| try: | |
| analytics = customer_analysis.get('data', {}).get('analytics', {}) | |
| engagement_score = analytics.get('engagement_level', 'Medium') | |
| if isinstance(engagement_score, str): | |
| return engagement_score.lower() | |
| else: | |
| return 'medium' | |
| except Exception as e: | |
| logger.error(f"❌ Error analyzing engagement level: {e}") | |
| return 'medium' | |
| def get_similarity_based_recommendations(self, viewed_properties: List[Dict], preferences: Dict, count: int) -> List[Dict]: | |
| """Get recommendations based on similarity to viewed properties""" | |
| try: | |
| if not viewed_properties: | |
| return [] | |
| # Create similarity queries based on viewed properties | |
| queries = [] | |
| for prop in viewed_properties[:3]: # Use top 3 most engaged properties | |
| query = self.create_similarity_query(prop, preferences) | |
| queries.append(query) | |
| # Search for similar properties | |
| similar_properties = [] | |
| for query in queries: | |
| results = self.search_similar_properties_chromadb(query, viewed_properties, count // len(queries)) | |
| similar_properties.extend(results) | |
| # Score and rank properties | |
| scored_properties = [] | |
| for prop in similar_properties: | |
| score = self.calculate_similarity_score(prop, viewed_properties, preferences) | |
| prop['ai_similarity_score'] = score | |
| scored_properties.append(prop) | |
| # Sort by score and return top results | |
| scored_properties.sort(key=lambda x: x.get('ai_similarity_score', 0), reverse=True) | |
| return scored_properties[:count] | |
| except Exception as e: | |
| logger.error(f"❌ Error getting similarity-based recommendations: {e}") | |
| return [] | |
| def get_preference_based_recommendations(self, preferences: Dict, viewed_properties: List[Dict], count: int) -> List[Dict]: | |
| """Get recommendations based on customer preferences""" | |
| try: | |
| # Create preference-based queries | |
| queries = self.create_preference_queries(preferences) | |
| # Search for properties matching preferences | |
| preference_properties = [] | |
| for query in queries: | |
| results = self.search_similar_properties_chromadb(query, viewed_properties, count // len(queries)) | |
| preference_properties.extend(results) | |
| # Score based on preference match | |
| scored_properties = [] | |
| for prop in preference_properties: | |
| score = self.calculate_preference_match_score(prop, preferences) | |
| prop['ai_preference_score'] = score | |
| scored_properties.append(prop) | |
| # Sort by score and return top results | |
| scored_properties.sort(key=lambda x: x.get('ai_preference_score', 0), reverse=True) | |
| return scored_properties[:count] | |
| except Exception as e: | |
| logger.error(f"❌ Error getting preference-based recommendations: {e}") | |
| return [] | |
| def get_behavioral_recommendations(self, customer_analysis: Dict, ai_insights: Dict, count: int) -> List[Dict]: | |
| """Get recommendations based on behavioral patterns""" | |
| try: | |
| behavioral_patterns = customer_analysis.get('data', {}).get('analytics', {}) | |
| personality_type = ai_insights.get('personality_type', 'Balanced') | |
| urgency_level = ai_insights.get('urgency_level', 'Medium') | |
| # Create behavioral queries | |
| queries = self.create_behavioral_queries(behavioral_patterns, personality_type, urgency_level) | |
| # Search for properties matching behavioral patterns | |
| behavioral_properties = [] | |
| for query in queries: | |
| results = self.search_similar_properties_chromadb(query, [], count // len(queries)) | |
| behavioral_properties.extend(results) | |
| # Score based on behavioral match | |
| scored_properties = [] | |
| for prop in behavioral_properties: | |
| score = self.calculate_behavioral_match_score(prop, behavioral_patterns, personality_type, urgency_level) | |
| prop['ai_behavioral_score'] = score | |
| scored_properties.append(prop) | |
| # Sort by score and return top results | |
| scored_properties.sort(key=lambda x: x.get('ai_behavioral_score', 0), reverse=True) | |
| return scored_properties[:count] | |
| except Exception as e: | |
| logger.error(f"❌ Error getting behavioral recommendations: {e}") | |
| return [] | |
| def create_similarity_query(self, property_data: Dict, preferences: Dict) -> str: | |
| """Create a query for finding similar properties""" | |
| try: | |
| prop_type = property_data.get('propertyTypeName', '') | |
| location = property_data.get('location', '') | |
| price_range = self.get_price_range(property_data.get('price', 0)) | |
| # Extract key features | |
| description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') | |
| features = self.extract_features_from_text(description) | |
| # Build query | |
| query_parts = [prop_type, location, price_range] | |
| query_parts.extend(features[:3]) # Top 3 features | |
| query = f"{' '.join(query_parts)} property similar to {property_data.get('propertyName', 'this property')}" | |
| return query | |
| except Exception as e: | |
| logger.error(f"❌ Error creating similarity query: {e}") | |
| return f"{property_data.get('propertyTypeName', 'Property')} in {property_data.get('location', '')}" | |
| def create_preference_queries(self, preferences: Dict) -> List[str]: | |
| """Create queries based on customer preferences""" | |
| try: | |
| queries = [] | |
| # Price range query | |
| price_range = preferences.get('price_range', {}).get('preferred_range', 'mid_range') | |
| queries.append(f"{price_range} price range properties") | |
| # Property type queries | |
| preferred_types = preferences.get('property_types', {}).get('preferred_types', []) | |
| for prop_type in preferred_types[:2]: # Top 2 types | |
| queries.append(f"{prop_type} properties") | |
| # Location queries | |
| preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) | |
| for location in preferred_locations[:2]: # Top 2 locations | |
| queries.append(f"properties in {location}") | |
| # Feature queries | |
| preferred_features = preferences.get('features', {}).get('preferred_features', []) | |
| for feature in preferred_features[:2]: # Top 2 features | |
| queries.append(f"{feature} properties") | |
| return queries | |
| except Exception as e: | |
| logger.error(f"❌ Error creating preference queries: {e}") | |
| return ["luxury properties", "premium apartments"] | |
| def create_behavioral_queries(self, behavioral_patterns: Dict, personality_type: str, urgency_level: str) -> List[str]: | |
| """Create queries based on behavioral patterns""" | |
| try: | |
| queries = [] | |
| # Personality-based queries | |
| if personality_type == 'Analytical': | |
| queries.extend(["detailed property information", "comprehensive property features"]) | |
| elif personality_type == 'Impulsive': | |
| queries.extend(["exclusive properties", "limited time offers"]) | |
| elif personality_type == 'Conservative': | |
| queries.extend(["established properties", "proven locations"]) | |
| # Urgency-based queries | |
| if urgency_level == 'High': | |
| queries.extend(["immediate availability", "ready to move properties"]) | |
| elif urgency_level == 'Low': | |
| queries.extend(["upcoming projects", "pre-launch properties"]) | |
| # Engagement-based queries | |
| viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium') | |
| if viewing_frequency == 'high': | |
| queries.extend(["high engagement properties", "popular properties"]) | |
| elif viewing_frequency == 'low': | |
| queries.extend(["unique properties", "exclusive listings"]) | |
| return queries | |
| except Exception as e: | |
| logger.error(f"❌ Error creating behavioral queries: {e}") | |
| return ["premium properties", "luxury apartments"] | |
| def calculate_similarity_score(self, property_data: Dict, viewed_properties: List[Dict], preferences: Dict) -> float: | |
| """Calculate similarity score for a property""" | |
| try: | |
| score = 0.0 | |
| # Property type similarity | |
| prop_type = property_data.get('propertyTypeName', '') | |
| preferred_types = preferences.get('property_types', {}).get('preferred_types', []) | |
| if prop_type in preferred_types: | |
| score += 0.3 | |
| # Price range similarity | |
| price = property_data.get('price', 0) | |
| price_range = preferences.get('price_range', {}) | |
| if self.is_price_in_range(price, price_range): | |
| score += 0.3 | |
| # Location similarity | |
| location = property_data.get('location', '') | |
| preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) | |
| if location in preferred_locations: | |
| score += 0.2 | |
| # Feature similarity | |
| description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') | |
| features = self.extract_features_from_text(description) | |
| preferred_features = preferences.get('features', {}).get('preferred_features', []) | |
| feature_match = len(set(features) & set(preferred_features)) | |
| score += min(feature_match * 0.1, 0.2) | |
| return min(score, 1.0) | |
| except Exception as e: | |
| logger.error(f"❌ Error calculating similarity score: {e}") | |
| return 0.5 | |
| def calculate_preference_match_score(self, property_data: Dict, preferences: Dict) -> float: | |
| """Calculate preference match score for a property""" | |
| try: | |
| score = 0.0 | |
| # Price preference match | |
| price = property_data.get('price', 0) | |
| price_range = preferences.get('price_range', {}) | |
| if self.is_price_in_range(price, price_range): | |
| score += 0.4 | |
| # Property type preference match | |
| prop_type = property_data.get('propertyTypeName', '') | |
| preferred_types = preferences.get('property_types', {}).get('preferred_types', []) | |
| if prop_type in preferred_types: | |
| score += 0.3 | |
| # Location preference match | |
| location = property_data.get('location', '') | |
| preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) | |
| if location in preferred_locations: | |
| score += 0.2 | |
| # Feature preference match | |
| description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') | |
| features = self.extract_features_from_text(description) | |
| preferred_features = preferences.get('features', {}).get('preferred_features', []) | |
| feature_match = len(set(features) & set(preferred_features)) | |
| score += min(feature_match * 0.1, 0.1) | |
| return min(score, 1.0) | |
| except Exception as e: | |
| logger.error(f"❌ Error calculating preference match score: {e}") | |
| return 0.5 | |
| def calculate_behavioral_match_score(self, property_data: Dict, behavioral_patterns: Dict, personality_type: str, urgency_level: str) -> float: | |
| """Calculate behavioral match score for a property""" | |
| try: | |
| score = 0.0 | |
| # Personality-based scoring | |
| if personality_type == 'Analytical': | |
| # Prefer properties with detailed information | |
| description = property_data.get('description', '') | |
| if len(description) > 100: | |
| score += 0.3 | |
| elif personality_type == 'Impulsive': | |
| # Prefer exclusive or unique properties | |
| if 'exclusive' in property_data.get('propertyName', '').lower(): | |
| score += 0.3 | |
| elif personality_type == 'Conservative': | |
| # Prefer established properties | |
| if 'established' in property_data.get('location', '').lower(): | |
| score += 0.3 | |
| # Urgency-based scoring | |
| if urgency_level == 'High': | |
| # Prefer ready properties | |
| if 'ready' in property_data.get('description', '').lower(): | |
| score += 0.2 | |
| elif urgency_level == 'Low': | |
| # Prefer upcoming projects | |
| if 'upcoming' in property_data.get('description', '').lower(): | |
| score += 0.2 | |
| # Engagement-based scoring | |
| viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium') | |
| if viewing_frequency == 'high': | |
| # Prefer popular properties | |
| if property_data.get('viewCount', 0) > 10: | |
| score += 0.2 | |
| elif viewing_frequency == 'low': | |
| # Prefer unique properties | |
| if property_data.get('viewCount', 0) < 5: | |
| score += 0.2 | |
| return min(score, 1.0) | |
| except Exception as e: | |
| logger.error(f"❌ Error calculating behavioral match score: {e}") | |
| return 0.5 | |
| def is_price_in_range(self, price: float, price_range: Dict) -> bool: | |
| """Check if price is in the preferred range""" | |
| try: | |
| preferred_range = price_range.get('preferred_range', 'mid_range') | |
| avg_price = price_range.get('avg_price', 0) | |
| if preferred_range == 'luxury': | |
| return price > 50000000 | |
| elif preferred_range == 'premium': | |
| return 25000000 <= price <= 50000000 | |
| elif preferred_range == 'mid_range': | |
| return 10000000 <= price <= 25000000 | |
| elif preferred_range == 'affordable': | |
| return 5000000 <= price <= 10000000 | |
| else: | |
| # Use average price with tolerance | |
| tolerance = 0.3 | |
| min_price = avg_price * (1 - tolerance) | |
| max_price = avg_price * (1 + tolerance) | |
| return min_price <= price <= max_price | |
| except Exception as e: | |
| logger.error(f"❌ Error checking price range: {e}") | |
| return True | |
| def remove_duplicate_recommendations(self, recommendations: List[Dict]) -> List[Dict]: | |
| """Remove duplicate recommendations based on property ID""" | |
| try: | |
| seen_ids = set() | |
| unique_recommendations = [] | |
| for rec in recommendations: | |
| prop_id = rec.get('id') or rec.get('propertyId') | |
| if prop_id and prop_id not in seen_ids: | |
| seen_ids.add(prop_id) | |
| unique_recommendations.append(rec) | |
| return unique_recommendations | |
| except Exception as e: | |
| logger.error(f"❌ Error removing duplicates: {e}") | |
| return recommendations | |
| def add_ai_explanations(self, recommendations: List[Dict], preferences: Dict, ai_insights: Dict) -> List[Dict]: | |
| """Add AI explanations to recommendations""" | |
| try: | |
| for rec in recommendations: | |
| explanation = self.generate_ai_explanation(rec, preferences, ai_insights) | |
| rec['ai_explanation'] = explanation | |
| rec['ai_confidence'] = self.calculate_ai_confidence(rec, preferences) | |
| return recommendations | |
| except Exception as e: | |
| logger.error(f"❌ Error adding AI explanations: {e}") | |
| return recommendations | |
| def generate_ai_explanation(self, property_data: Dict, preferences: Dict, ai_insights: Dict) -> str: | |
| """Generate AI explanation for why a property is recommended""" | |
| try: | |
| explanations = [] | |
| # Price explanation | |
| price = property_data.get('price', 0) | |
| price_range = preferences.get('price_range', {}) | |
| if self.is_price_in_range(price, price_range): | |
| explanations.append("Matches your preferred price range") | |
| # Property type explanation | |
| prop_type = property_data.get('propertyTypeName', '') | |
| preferred_types = preferences.get('property_types', {}).get('preferred_types', []) | |
| if prop_type in preferred_types: | |
| explanations.append(f"Matches your interest in {prop_type} properties") | |
| # Location explanation | |
| location = property_data.get('location', '') | |
| preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) | |
| if location in preferred_locations: | |
| explanations.append(f"Located in your preferred area: {location}") | |
| # Feature explanation | |
| description = property_data.get('description', '') + ' ' + property_data.get('propertyName', '') | |
| features = self.extract_features_from_text(description) | |
| preferred_features = preferences.get('features', {}).get('preferred_features', []) | |
| matching_features = set(features) & set(preferred_features) | |
| if matching_features: | |
| feature_list = ', '.join(list(matching_features)[:3]) | |
| explanations.append(f"Features you prefer: {feature_list}") | |
| # Behavioral explanation | |
| personality_type = ai_insights.get('personality_type', 'Balanced') | |
| if personality_type == 'Analytical': | |
| explanations.append("Suitable for your analytical decision-making style") | |
| elif personality_type == 'Impulsive': | |
| explanations.append("Perfect for your quick decision-making preference") | |
| if not explanations: | |
| explanations.append("Based on your overall property preferences and behavior patterns") | |
| return " | ".join(explanations) | |
| except Exception as e: | |
| logger.error(f"❌ Error generating AI explanation: {e}") | |
| return "Recommended based on your preferences and behavior analysis" | |
| def calculate_ai_confidence(self, property_data: Dict, preferences: Dict) -> float: | |
| """Calculate AI confidence score for a recommendation""" | |
| try: | |
| confidence = 0.5 # Base confidence | |
| # Price confidence | |
| price = property_data.get('price', 0) | |
| price_range = preferences.get('price_range', {}) | |
| if self.is_price_in_range(price, price_range): | |
| confidence += 0.2 | |
| # Type confidence | |
| prop_type = property_data.get('propertyTypeName', '') | |
| preferred_types = preferences.get('property_types', {}).get('preferred_types', []) | |
| if prop_type in preferred_types: | |
| confidence += 0.2 | |
| # Location confidence | |
| location = property_data.get('location', '') | |
| preferred_locations = preferences.get('locations', {}).get('preferred_locations', []) | |
| if location in preferred_locations: | |
| confidence += 0.1 | |
| return min(confidence, 1.0) | |
| except Exception as e: | |
| logger.error(f"❌ Error calculating AI confidence: {e}") | |
| return 0.5 | |
| def get_fallback_recommendations(self, count: int) -> List[Dict]: | |
| """Get fallback recommendations when AI models fail""" | |
| try: | |
| # First, try to debug the ChromaDB structure | |
| debug_info = self.debug_chromadb_metadata() | |
| if 'error' not in debug_info: | |
| logger.info(f"🔍 ChromaDB structure: {debug_info}") | |
| # Simple fallback based on ChromaDB search | |
| query = "luxury properties premium apartments" | |
| results = self.search_similar_properties_chromadb(query, [], count) | |
| for rec in results: | |
| rec['ai_explanation'] = "Recommended based on general property preferences" | |
| rec['ai_confidence'] = 0.6 | |
| return results | |
| except Exception as e: | |
| logger.error(f"❌ Error getting fallback recommendations: {e}") | |
| return [] | |
| def classify_property_preferences(self, preferences: Dict) -> Dict: | |
| """Classify property preferences using AI models""" | |
| try: | |
| if not self.property_classifier: | |
| return {} | |
| classification = { | |
| 'property_category': 'balanced', | |
| 'location_preference': 'established', | |
| 'price_sensitivity': 'moderate' | |
| } | |
| # Analyze property type diversity | |
| property_types = preferences.get('property_types', {}) | |
| diversity_score = property_types.get('diversity_score', 0) | |
| if diversity_score > 0.7: | |
| classification['property_category'] = 'diverse' | |
| elif diversity_score < 0.3: | |
| classification['property_category'] = 'focused' | |
| else: | |
| classification['property_category'] = 'balanced' | |
| # Analyze price sensitivity | |
| price_range = preferences.get('price_range', {}) | |
| price_consistency = price_range.get('price_consistency', 0.5) | |
| if price_consistency > 0.8: | |
| classification['price_sensitivity'] = 'low' | |
| elif price_consistency < 0.3: | |
| classification['price_sensitivity'] = 'high' | |
| else: | |
| classification['price_sensitivity'] = 'moderate' | |
| return classification | |
| except Exception as e: | |
| logger.error(f"❌ Error classifying property preferences: {e}") | |
| return {} | |
| def predict_price_preferences(self, preferences: Dict) -> Dict: | |
| """Predict price preferences using AI models""" | |
| try: | |
| if not self.price_predictor: | |
| return {} | |
| price_range = preferences.get('price_range', {}) | |
| avg_price = price_range.get('avg_price', 0) | |
| confidence = price_range.get('confidence', 0.5) | |
| prediction = { | |
| 'predicted_range': price_range.get('preferred_range', 'mid_range'), | |
| 'confidence': confidence, | |
| 'price_tolerance': 0.2, | |
| 'upsell_potential': 'medium' | |
| } | |
| # Predict upsell potential | |
| if avg_price > 50000000: | |
| prediction['upsell_potential'] = 'high' | |
| elif avg_price < 10000000: | |
| prediction['upsell_potential'] = 'low' | |
| else: | |
| prediction['upsell_potential'] = 'medium' | |
| # Adjust price tolerance based on confidence | |
| if confidence > 0.8: | |
| prediction['price_tolerance'] = 0.1 | |
| elif confidence < 0.3: | |
| prediction['price_tolerance'] = 0.4 | |
| return prediction | |
| except Exception as e: | |
| logger.error(f"❌ Error predicting price preferences: {e}") | |
| return {} | |
| def calculate_preference_weights(self, preferences: Dict) -> Dict: | |
| """Calculate preference weights using AI models""" | |
| try: | |
| if not self.preference_analyzer: | |
| return {} | |
| weights = { | |
| 'price_weight': 1.0, | |
| 'type_weight': 1.0, | |
| 'location_weight': 1.0, | |
| 'feature_weight': 1.0, | |
| 'engagement_weight': 1.0 | |
| } | |
| # Adjust weights based on engagement level | |
| engagement_level = preferences.get('engagement_level', 'medium') | |
| engagement_weight = self.preference_analyzer['engagement_weights'].get(engagement_level, 1.0) | |
| weights['engagement_weight'] = engagement_weight | |
| # Adjust weights based on behavioral patterns | |
| behavioral_patterns = preferences.get('behavioral_patterns', {}) | |
| viewing_frequency = behavioral_patterns.get('viewing_frequency', 'medium') | |
| if viewing_frequency == 'high': | |
| weights['feature_weight'] = 1.2 # High engagement users care more about features | |
| elif viewing_frequency == 'low': | |
| weights['price_weight'] = 1.3 # Low engagement users care more about price | |
| # Adjust weights based on property exploration | |
| property_exploration = behavioral_patterns.get('property_exploration', 'focused') | |
| if property_exploration == 'diverse': | |
| weights['type_weight'] = 1.2 | |
| elif property_exploration == 'focused': | |
| weights['location_weight'] = 1.2 | |
| return weights | |
| except Exception as e: | |
| logger.error(f"❌ Error calculating preference weights: {e}") | |
| return {} | |
| def debug_chromadb_metadata(self, sample_count: int = 3) -> Dict: | |
| """Debug method to inspect ChromaDB metadata structure""" | |
| try: | |
| if not self.properties_collection: | |
| return {'error': 'ChromaDB collection not initialized'} | |
| total_count = self.properties_collection.count() | |
| if total_count == 0: | |
| return {'error': 'No properties in ChromaDB'} | |
| # Get sample properties | |
| results = self.properties_collection.query( | |
| query_texts=["property"], | |
| n_results=sample_count, | |
| include=["metadatas", "documents"] | |
| ) | |
| debug_info = { | |
| 'total_properties': total_count, | |
| 'sample_metadata_keys': [], | |
| 'sample_metadata': [], | |
| 'field_mapping_issues': [] | |
| } | |
| if results and results['metadatas'] and results['metadatas'][0]: | |
| for i, metadata in enumerate(results['metadatas'][0]): | |
| # Get all keys | |
| keys = list(metadata.keys()) | |
| debug_info['sample_metadata_keys'].append(keys) | |
| # Check for missing expected fields | |
| expected_fields = ['propertyTypeName', 'propertyName', 'price', 'address', 'beds', 'baths'] | |
| missing_fields = [field for field in expected_fields if field not in keys] | |
| if missing_fields: | |
| debug_info['field_mapping_issues'].append({ | |
| 'sample_index': i, | |
| 'missing_fields': missing_fields, | |
| 'available_fields': keys | |
| }) | |
| # Store sample metadata | |
| debug_info['sample_metadata'].append(metadata) | |
| logger.info(f"🔍 ChromaDB Debug Info: {debug_info}") | |
| return debug_info | |
| except Exception as e: | |
| logger.error(f"❌ Error debugging ChromaDB metadata: {e}") | |
| return {'error': str(e)} | |
| def _clean_property_data(self, prop: Dict) -> Dict: | |
| """Clean and standardize property data to fix JSON parsing issues""" | |
| try: | |
| cleaned = {} | |
| # Standardize ID field | |
| cleaned['id'] = str(prop.get('id', prop.get('propertyId', ''))) | |
| cleaned['propertyId'] = cleaned['id'] | |
| # Standardize basic fields | |
| cleaned['propertyName'] = str(prop.get('propertyName', prop.get('name', ''))) | |
| cleaned['propertyTypeName'] = str(prop.get('propertyTypeName', prop.get('typeName', prop.get('type', '')))) | |
| cleaned['price'] = float(prop.get('price', prop.get('marketValue', 0)) or 0) | |
| cleaned['address'] = str(prop.get('address', prop.get('location', ''))) | |
| cleaned['beds'] = int(prop.get('beds', prop.get('bedrooms', 0)) or 0) | |
| cleaned['baths'] = int(prop.get('baths', prop.get('bathrooms', 0)) or 0) | |
| cleaned['sqft'] = float(prop.get('totalSquareFeet', prop.get('area', prop.get('sqft', 0))) or 0) | |
| cleaned['description'] = str(prop.get('description', '')) | |
| # Enhanced features handling to fix JSON parsing errors | |
| features = prop.get('features', []) | |
| if isinstance(features, str): | |
| # Try to parse as JSON first | |
| try: | |
| parsed_features = json.loads(features) | |
| if isinstance(parsed_features, list): | |
| cleaned['features'] = parsed_features | |
| else: | |
| cleaned['features'] = [str(parsed_features)] | |
| except (json.JSONDecodeError, ValueError): | |
| # If JSON parsing fails, split by common delimiters | |
| if ',' in features: | |
| cleaned['features'] = [f.strip() for f in features.split(',') if f.strip()] | |
| elif ';' in features: | |
| cleaned['features'] = [f.strip() for f in features.split(';') if f.strip()] | |
| else: | |
| cleaned['features'] = [features.strip()] if features.strip() else [] | |
| elif isinstance(features, list): | |
| cleaned['features'] = [str(f) for f in features if f] | |
| else: | |
| cleaned['features'] = [] | |
| # Additional fields | |
| cleaned['viewCount'] = int(prop.get('viewCount', 0) or 0) | |
| cleaned['totalDuration'] = int(prop.get('totalDuration', 0) or 0) | |
| cleaned['lastViewedAt'] = str(prop.get('lastViewedAt', '')) | |
| return cleaned | |
| except Exception as e: | |
| logger.error(f"❌ Error cleaning property data: {e}") | |
| # Return minimal valid data | |
| return { | |
| 'id': str(prop.get('id', prop.get('propertyId', 'unknown'))), | |
| 'propertyId': str(prop.get('id', prop.get('propertyId', 'unknown'))), | |
| 'propertyName': 'Unknown Property', | |
| 'propertyTypeName': 'Property', | |
| 'price': 0.0, | |
| 'address': '', | |
| 'beds': 0, | |
| 'baths': 0, | |
| 'sqft': 0.0, | |
| 'description': '', | |
| 'features': [], | |
| 'viewCount': 0, | |
| 'totalDuration': 0, | |
| 'lastViewedAt': '' | |
| } | |
| def _retry_failed_pages(self, failed_pages: List[int], page_size: int, max_workers: int) -> List[Dict]: | |
| """Retry failed pages with exponential backoff and reduced concurrency""" | |
| logger.info(f"🔄 Retrying {len(failed_pages)} failed pages with reduced concurrency...") | |
| retry_properties = [] | |
| retry_workers = max(1, max_workers // 2) # Reduce workers for retry | |
| with ThreadPoolExecutor(max_workers=retry_workers) as executor: | |
| futures = [] | |
| for page in failed_pages: | |
| future = executor.submit(self._fetch_property_page_enhanced, page, page_size) | |
| futures.append((page, future)) | |
| for page, future in futures: | |
| try: | |
| properties = future.result() # No timeout - let it take as long as needed | |
| if properties: | |
| retry_properties.extend(properties) | |
| logger.info(f"✅ Retry successful for page {page}: {len(properties)} properties") | |
| else: | |
| logger.warning(f"⚠️ Retry failed for page {page}: No properties returned") | |
| except Exception as e: | |
| logger.error(f"❌ Retry failed for page {page}: {e}") | |
| logger.info(f"🔄 Retry completed: {len(retry_properties)} properties recovered") | |
| return retry_properties | |
| def _store_properties_in_chromadb_parallel(self, properties: List[Dict]) -> bool: | |
| """Memory-efficient parallel ChromaDB storage with simple embeddings""" | |
| try: | |
| logger.info(f"🗄️ Storing {len(properties)} properties in ChromaDB with memory-efficient processing...") | |
| if self.properties_collection is None: | |
| logger.error("❌ ChromaDB properties collection not initialized") | |
| return False | |
| # Check existing properties but don't delete collection | |
| try: | |
| existing_count = self.properties_collection.count() | |
| logger.info(f"📊 Collection currently has {existing_count} properties") | |
| # Only clear if we have new properties and want to replace | |
| if existing_count > 0 and len(properties) > existing_count * 0.8: # Only clear if new data is substantial | |
| logger.info(f"🗑️ Clearing {existing_count} existing properties for fresh data...") | |
| # Clear items without deleting collection | |
| try: | |
| all_ids = self.properties_collection.get()['ids'] | |
| if all_ids: | |
| # Delete in batches to avoid memory issues | |
| batch_size = 100 | |
| for i in range(0, len(all_ids), batch_size): | |
| batch_ids = all_ids[i:i + batch_size] | |
| self.properties_collection.delete(ids=batch_ids) | |
| logger.info(f"✅ Cleared {len(all_ids)} existing properties") | |
| except Exception as clear_error: | |
| logger.warning(f"⚠️ Error clearing collection: {clear_error}") | |
| elif existing_count > 0: | |
| logger.info(f"📦 Keeping existing {existing_count} properties, adding {len(properties)} new ones") | |
| except Exception as count_error: | |
| logger.warning(f"⚠️ Could not check existing count: {count_error}") | |
| # Collection might be corrupted, reinitialize | |
| self.initialize_chromadb() | |
| # Process properties in smaller batches to avoid memory issues | |
| batch_size = 20 # Smaller batches to prevent memory overflow | |
| total_stored = 0 | |
| failed_batches = [] | |
| # Use fewer workers to reduce memory pressure | |
| with ThreadPoolExecutor(max_workers=5) as executor: # Reduced workers for memory efficiency | |
| futures = {} | |
| for i in range(0, len(properties), batch_size): | |
| batch = properties[i:i+batch_size] | |
| batch_num = i//batch_size + 1 | |
| future = executor.submit(self._process_property_batch_memory_efficient, batch, batch_num) | |
| futures[future] = batch_num | |
| logger.info(f"🚀 Submitted {len(futures)} memory-efficient storage tasks with 5 workers") | |
| completed_batches = 0 | |
| for future in concurrent.futures.as_completed(futures.keys()): | |
| batch_num = futures[future] | |
| try: | |
| batch_result = future.result() # No timeout | |
| completed_batches += 1 | |
| progress = (completed_batches / len(futures)) * 100 | |
| if batch_result['success']: | |
| total_stored += batch_result['count'] | |
| logger.info(f"📊 Batch {batch_num}: {batch_result['count']} properties stored (Total: {total_stored}, Progress: {progress:.1f}%)") | |
| else: | |
| failed_batches.append(batch_num) | |
| logger.error(f"❌ Batch {batch_num}: Failed to store properties (Progress: {progress:.1f}%)") | |
| except Exception as e: | |
| completed_batches += 1 | |
| progress = (completed_batches / len(futures)) * 100 | |
| failed_batches.append(batch_num) | |
| logger.error(f"❌ Batch {batch_num}: Error - {e} (Progress: {progress:.1f}%)") | |
| if failed_batches: | |
| logger.warning(f"⚠️ {len(failed_batches)} batches failed: {failed_batches}") | |
| logger.info(f"✅ Successfully stored {total_stored} properties in ChromaDB!") | |
| return total_stored > 0 | |
| except Exception as e: | |
| logger.error(f"❌ Error in parallel ChromaDB storage: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return False | |
| def _process_property_batch(self, batch: List[Dict], batch_num: int) -> Dict: | |
| """Process a batch of properties for ChromaDB storage with memory-efficient approach""" | |
| try: | |
| documents = [] | |
| metadatas = [] | |
| ids = [] | |
| for prop in batch: | |
| # Create simple text description without heavy AI processing | |
| description = self._create_simple_property_description(prop) | |
| documents.append(description) | |
| # Prepare metadata with enhanced JSON handling | |
| metadata = { | |
| 'id': str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))), | |
| 'propertyName': str(prop.get('propertyName', '')), | |
| 'propertyTypeName': str(prop.get('propertyTypeName', '')), | |
| 'price': float(prop.get('price', 0) or 0), | |
| 'address': str(prop.get('address', '')), | |
| 'beds': int(prop.get('beds', 0) or 0), | |
| 'baths': int(prop.get('baths', 0) or 0), | |
| 'sqft': float(prop.get('sqft', 0) or 0), | |
| 'description': str(prop.get('description', '')), | |
| 'features': self._safe_json_stringify(prop.get('features', [])) | |
| } | |
| # Clean metadata | |
| metadata = self.clean_metadata_for_chromadb(metadata) | |
| metadatas.append(metadata) | |
| prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))) | |
| ids.append(prop_id) | |
| # Add batch to ChromaDB using simple embedding function | |
| try: | |
| self.properties_collection.add( | |
| documents=documents, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| return {'success': True, 'count': len(batch), 'error': None} | |
| except Exception as e: | |
| logger.error(f"❌ ChromaDB add error: {e}") | |
| return {'success': False, 'count': 0, 'error': str(e)} | |
| except Exception as e: | |
| logger.error(f"❌ Error processing batch {batch_num}: {e}") | |
| return {'success': False, 'count': 0, 'error': str(e)} | |
| def _create_simple_property_description(self, prop: Dict) -> str: | |
| """Create simple property description without heavy AI processing""" | |
| try: | |
| parts = [] | |
| # Basic property info | |
| if prop.get('propertyName'): | |
| parts.append(f"Property: {prop.get('propertyName')}") | |
| if prop.get('propertyTypeName'): | |
| parts.append(f"Type: {prop.get('propertyTypeName')}") | |
| if prop.get('price'): | |
| parts.append(f"Price: ${prop.get('price'):,.0f}") | |
| if prop.get('address'): | |
| parts.append(f"Address: {prop.get('address')}") | |
| if prop.get('beds') or prop.get('baths'): | |
| beds = prop.get('beds', 0) | |
| baths = prop.get('baths', 0) | |
| parts.append(f"Bedrooms: {beds}, Bathrooms: {baths}") | |
| if prop.get('sqft'): | |
| parts.append(f"Square Feet: {prop.get('sqft'):,.0f}") | |
| if prop.get('description'): | |
| desc = str(prop.get('description'))[:200] # Limit description length | |
| parts.append(f"Description: {desc}") | |
| # Features | |
| features = prop.get('features', []) | |
| if features: | |
| if isinstance(features, str): | |
| try: | |
| features = json.loads(features) | |
| except: | |
| features = [features] | |
| if isinstance(features, list) and features: | |
| feature_list = [str(f) for f in features[:10]] # Limit to 10 features | |
| parts.append(f"Features: {', '.join(feature_list)}") | |
| return " | ".join(parts) if parts else "Property" | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error creating simple property description: {e}") | |
| return "Property" | |
| def _process_property_batch_memory_efficient(self, batch: List[Dict], batch_num: int) -> Dict: | |
| """Memory-efficient batch processing with minimal AI usage""" | |
| try: | |
| documents = [] | |
| metadatas = [] | |
| ids = [] | |
| for prop in batch: | |
| # Create ultra-simple text description without any AI processing | |
| description = self._create_ultra_simple_description(prop) | |
| documents.append(description) | |
| # Prepare minimal metadata to reduce memory usage | |
| metadata = { | |
| 'id': str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))), | |
| 'propertyName': str(prop.get('propertyName', ''))[:100], # Limit string length | |
| 'propertyTypeName': str(prop.get('propertyTypeName', ''))[:50], | |
| 'price': float(prop.get('price', 0) or 0), | |
| 'address': str(prop.get('address', ''))[:200], | |
| 'beds': int(prop.get('beds', 0) or 0), | |
| 'baths': int(prop.get('baths', 0) or 0), | |
| 'sqft': float(prop.get('sqft', 0) or 0), | |
| 'description': str(prop.get('description', ''))[:300], # Limit description | |
| 'features': self._safe_json_stringify(prop.get('features', []))[:500] # Limit features | |
| } | |
| metadatas.append(metadata) | |
| prop_id = str(prop.get('id', prop.get('propertyId', f'prop_{len(ids)}'))) | |
| ids.append(prop_id) | |
| # Add batch to ChromaDB with error handling | |
| try: | |
| self.properties_collection.add( | |
| documents=documents, | |
| metadatas=metadatas, | |
| ids=ids | |
| ) | |
| return {'success': True, 'count': len(batch), 'error': None} | |
| except Exception as e: | |
| logger.error(f"❌ ChromaDB add error in batch {batch_num}: {e}") | |
| return {'success': False, 'count': 0, 'error': str(e)} | |
| except Exception as e: | |
| logger.error(f"❌ Error processing batch {batch_num}: {e}") | |
| return {'success': False, 'count': 0, 'error': str(e)} | |
| def _create_ultra_simple_description(self, prop: Dict) -> str: | |
| """Create ultra-simple property description without any AI processing""" | |
| try: | |
| parts = [] | |
| # Only essential information to minimize memory usage | |
| if prop.get('propertyName'): | |
| parts.append(prop.get('propertyName')) | |
| if prop.get('propertyTypeName'): | |
| parts.append(prop.get('propertyTypeName')) | |
| if prop.get('price'): | |
| parts.append(f"${prop.get('price'):,.0f}") | |
| if prop.get('address'): | |
| parts.append(prop.get('address')) | |
| if prop.get('beds') or prop.get('baths'): | |
| beds = prop.get('beds', 0) | |
| baths = prop.get('baths', 0) | |
| parts.append(f"{beds}bed {baths}bath") | |
| if prop.get('sqft'): | |
| parts.append(f"{prop.get('sqft'):,.0f}sqft") | |
| # Minimal features (only first 3) | |
| features = prop.get('features', []) | |
| if features: | |
| if isinstance(features, str): | |
| try: | |
| features = json.loads(features) | |
| except: | |
| features = [features] | |
| if isinstance(features, list) and features: | |
| feature_list = [str(f) for f in features[:3]] # Only 3 features | |
| parts.append(f"Features: {', '.join(feature_list)}") | |
| return " | ".join(parts) if parts else "Property" | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error creating ultra-simple description: {e}") | |
| return "Property" | |
| def _safe_json_stringify(self, obj) -> str: | |
| """Safely convert object to JSON string without parsing errors""" | |
| try: | |
| if isinstance(obj, list): | |
| # Clean list items | |
| cleaned_list = [] | |
| for item in obj: | |
| if item is not None: | |
| cleaned_list.append(str(item).strip()) | |
| return json.dumps(cleaned_list) | |
| elif isinstance(obj, str): | |
| # If it's already a string, try to parse and re-stringify | |
| try: | |
| parsed = json.loads(obj) | |
| return json.dumps(parsed) | |
| except (json.JSONDecodeError, ValueError): | |
| # If parsing fails, return as simple string | |
| return json.dumps([obj]) | |
| else: | |
| return json.dumps([]) | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error in safe JSON stringify: {e}") | |
| return json.dumps([]) | |
| def _schedule_property_update(self): | |
| """Schedule next property update in 24 hours""" | |
| try: | |
| import schedule | |
| import threading | |
| def update_properties(): | |
| logger.info("🔄 Starting scheduled property update...") | |
| self.fetch_all_properties_parallel() | |
| # Schedule update for 24 hours from now | |
| schedule.every(24).hours.do(update_properties) | |
| # Start scheduler in background thread | |
| def run_scheduler(): | |
| while True: | |
| schedule.run_pending() | |
| time.sleep(3600) # Check every hour | |
| scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) | |
| scheduler_thread.start() | |
| logger.info("⏰ Property update scheduled for every 24 hours") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Could not schedule property update: {e}") | |
| def safe_json_parse(self, json_str): | |
| """Enhanced JSON parsing with better error handling""" | |
| try: | |
| if not json_str or json_str == '': | |
| return [] | |
| if isinstance(json_str, list): | |
| return json_str | |
| if isinstance(json_str, dict): | |
| return [json_str] | |
| # Try to parse JSON | |
| parsed = json.loads(json_str) | |
| if isinstance(parsed, list): | |
| return parsed | |
| elif isinstance(parsed, dict): | |
| return [parsed] | |
| else: | |
| return [] | |
| except (ValueError, TypeError, json.JSONDecodeError) as e: | |
| logger.warning(f"⚠️ Failed to parse JSON: {json_str[:50]}... Error: {e}") | |
| # Try to extract features from malformed JSON | |
| if isinstance(json_str, str): | |
| # Look for array-like patterns | |
| if '[' in json_str and ']' in json_str: | |
| start = json_str.find('[') | |
| end = json_str.rfind(']') + 1 | |
| try: | |
| extracted = json_str[start:end] | |
| parsed = json.loads(extracted) | |
| if isinstance(parsed, list): | |
| return parsed | |
| except: | |
| pass | |
| # Fallback: split by common delimiters | |
| if ',' in json_str: | |
| return [item.strip().strip('"\'') for item in json_str.split(',') if item.strip()] | |
| elif ';' in json_str: | |
| return [item.strip().strip('"\'') for item in json_str.split(';') if item.strip()] | |
| return [] | |
| # Global instance | |
| ai_engine = AIRecommendationEngine() |