Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import requests | |
| import time | |
| from datetime import datetime, timedelta | |
| from flask import Flask, request, jsonify, render_template | |
| from flask_cors import CORS | |
| import logging | |
| from typing import Dict, List, Optional, Any | |
| import threading | |
| import math | |
| # Import centralized configuration | |
| from config import get_backend_url, get_ngrok_headers, get_api_endpoint, get_app_config | |
| # Import the AI recommendation engine | |
| try: | |
| from ai_recommendation_engine import AIRecommendationEngine | |
| ai_engine = AIRecommendationEngine() | |
| AI_ENGINE_AVAILABLE = True | |
| except Exception as e: | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| logger.warning(f"AI engine not available: {e}") | |
| ai_engine = None | |
| AI_ENGINE_AVAILABLE = False | |
| # Import mock data service | |
| try: | |
| from mock_data_service import mock_data_service | |
| MOCK_DATA_AVAILABLE = True | |
| except ImportError: | |
| MOCK_DATA_AVAILABLE = False | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class CustomJSONEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, float) and math.isinf(obj): | |
| return 999999999 | |
| if isinstance(obj, float) and math.isnan(obj): | |
| return 0 | |
| return super().default(obj) | |
| def formatPrice(price): | |
| """Format price in Indian currency""" | |
| return f"βΉ{price:,.0f}" | |
| def formatDuration(seconds): | |
| """Format duration in hours and minutes""" | |
| hours = seconds // 3600 | |
| minutes = (seconds % 3600) // 60 | |
| return f"{hours}h {minutes}m" | |
| class EnhancedLeadQualificationAPI: | |
| def __init__(self): | |
| self.app = Flask(__name__) | |
| self.app.json_encoder = CustomJSONEncoder | |
| CORS(self.app) | |
| # Configuration - Centralized from config.py | |
| self.config = { | |
| 'backend_url': get_backend_url(), | |
| 'api_timeout': get_app_config()['api_timeout'], | |
| 'cache_duration': int(os.getenv('CACHE_DURATION', get_app_config()['cache_duration'])), | |
| 'max_retries': int(os.getenv('MAX_RETRIES', get_app_config()['max_retries'])), | |
| 'retry_delay': int(os.getenv('RETRY_DELAY', get_app_config()['retry_delay'])), | |
| 'ngrok_headers': get_ngrok_headers() | |
| } | |
| # Cache for API responses | |
| self.cache = {} | |
| self.cache_timestamps = {} | |
| # AI processing status tracking | |
| self.ai_processing_status = {} | |
| # Setup routes | |
| self.setup_routes() | |
| # Add no-cache headers to all responses | |
| def add_no_cache_headers(response): | |
| """Add headers to prevent any caching""" | |
| response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' | |
| response.headers['Pragma'] = 'no-cache' | |
| response.headers['Expires'] = '0' | |
| return response | |
| # Start cache cleanup thread | |
| self.start_cache_cleanup() | |
| def get_ai_engine(self): | |
| """Get the AI recommendation engine instance""" | |
| try: | |
| if AI_ENGINE_AVAILABLE and ai_engine: | |
| return ai_engine | |
| else: | |
| logger.warning("β οΈ AI engine not available") | |
| return None | |
| except Exception as e: | |
| logger.error(f"β Error getting AI engine: {e}") | |
| return None | |
| def setup_routes(self): | |
| """Setup all API routes including AI-enhanced endpoints and frontend""" | |
| # Frontend routes | |
| def index(): | |
| """Perfect AI Customer Analysis & Email Automation Dashboard""" | |
| return render_template('perfect_ai_dashboard.html', customer_id=None) | |
| def old_dashboard(): | |
| """Old dashboard for reference""" | |
| return render_template('ai_lead_analysis.html', customer_id=None) | |
| def customer_analysis(customer_id): | |
| """Customer analysis page with pre-filled customer ID""" | |
| return render_template('ai_lead_analysis.html', customer_id=customer_id) | |
| def email_automation_dashboard(): | |
| """Email automation dashboard""" | |
| return render_template('email_automation_dashboard.html') | |
| def health(): | |
| """Health check endpoint""" | |
| return jsonify({ | |
| 'success': True, | |
| 'service': 'AI Lead Analysis System', | |
| 'status': 'running', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| def frontend_status(): | |
| """Get frontend service status""" | |
| return jsonify({ | |
| 'success': True, | |
| 'service': 'AI Lead Analysis System', | |
| 'status': 'running', | |
| 'templates_available': [ | |
| 'ai_lead_analysis.html' | |
| ], | |
| 'features': [ | |
| 'AI Lead Analysis Dashboard', | |
| 'Email Automation Testing', | |
| 'Behavioral Analysis', | |
| 'Real-time Analytics' | |
| ], | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| # API routes | |
| def health_check(): | |
| """Health check endpoint""" | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'timestamp': datetime.now().isoformat(), | |
| 'backend_url': self.config['backend_url'], | |
| 'cache_size': len(self.cache), | |
| 'ai_status': 'enabled', | |
| 'ai_models': 'loaded' | |
| }) | |
| def get_lead_analysis(customer_id): | |
| """Get complete lead analysis for a customer (original functionality)""" | |
| logger.info(f"π Lead analysis request for customer {customer_id}") | |
| try: | |
| cache_key = f'lead_analysis_{customer_id}' | |
| cached_data = self.get_cached_data(cache_key) | |
| if cached_data: | |
| logger.info(f"π Returning cached data for customer {customer_id}") | |
| return jsonify(cached_data) | |
| # Try to fetch data from backend with improved fallback mechanism | |
| logger.info(f"π Fetching data for customer {customer_id} with fallback support") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| logger.info(f"π₯ Data received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") | |
| if not backend_response: | |
| return jsonify({ | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': 'No data found for this customer', | |
| 'data': None | |
| }), 404 | |
| # Process lead data | |
| logger.info(f"βοΈ Processing lead data for customer {customer_id}") | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| # Generate analytics | |
| logger.info(f"π§ Generating analytics for customer {customer_id}") | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| # Create combined response | |
| combined_response = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': { | |
| 'engagement_level': analytics.get('engagement_level'), | |
| 'preferred_property_types': analytics.get('preferred_property_types', []), | |
| 'price_preferences': analytics.get('price_preferences', {}), | |
| 'viewing_patterns': analytics.get('viewing_patterns', {}), | |
| 'property_analysis': analytics.get('property_analysis', {}), | |
| 'recommendations': analytics.get('recommendations', []), | |
| 'risk_assessment': analytics.get('risk_assessment', {}), | |
| 'opportunity_score': analytics.get('opportunity_score'), | |
| 'lead_timeline': analytics.get('lead_timeline', []), | |
| 'conversion_probability': analytics.get('conversion_probability', {}) | |
| }, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| }, | |
| 'api_info': { | |
| 'backend_url': self.config['backend_url'], | |
| 'endpoint_called': f'/api/PropertyLeadQualification/customer/{customer_id}', | |
| 'response_time': datetime.now().isoformat(), | |
| 'data_points': len(backend_response) if isinstance(backend_response, list) else 0 | |
| } | |
| } | |
| # Clean the response for JSON serialization | |
| cleaned_response = self.clean_for_json(combined_response) | |
| logger.info(f"β Combined response created successfully for customer {customer_id}") | |
| # Cache the result | |
| self.cache_data(cache_key, cleaned_response) | |
| logger.info(f"πΎ Combined data cached for customer {customer_id}") | |
| return jsonify(cleaned_response) | |
| except Exception as e: | |
| logger.error(f"β Error in lead analysis for customer {customer_id}: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': f'Failed to generate lead analysis: {str(e)}', | |
| 'data': None | |
| }), 500 | |
| def get_ai_analysis(customer_id): | |
| """Get AI-enhanced lead analysis for a customer""" | |
| logger.info(f"π€ AI analysis request for customer {customer_id}") | |
| try: | |
| # First get the regular analysis | |
| cache_key = f'lead_analysis_{customer_id}' | |
| analysis_data = self.get_cached_data(cache_key) | |
| if not analysis_data: | |
| # Generate regular analysis first with improved fallback | |
| logger.info(f"π Fetching data for AI analysis - customer {customer_id}") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| if not backend_response: | |
| return jsonify({ | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': 'No data found for this customer' | |
| }), 404 | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| analysis_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': analytics, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Now enhance with AI analysis | |
| if AI_ENGINE_AVAILABLE and ai_engine: | |
| logger.info(f"π§ Generating AI insights for customer {customer_id}") | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| # Find AI-recommended properties | |
| logger.info(f"π Finding AI recommendations for customer {customer_id}") | |
| ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights) | |
| else: | |
| logger.warning("β οΈ AI engine not available, using mock AI insights") | |
| ai_insights = { | |
| 'personality_type': 'Analytical', | |
| 'decision_making_style': 'Data-driven', | |
| 'preferred_communication': 'Email', | |
| 'urgency_level': 'Medium', | |
| 'budget_confidence': 'High', | |
| 'location_preference': 'Premium areas', | |
| 'property_type_preference': 'Luxury properties' | |
| } | |
| ai_recommendations = [] | |
| # Combine everything | |
| enhanced_response = analysis_data.copy() | |
| enhanced_response['data']['ai_insights'] = ai_insights | |
| enhanced_response['data']['ai_recommendations'] = ai_recommendations | |
| enhanced_response['ai_enhanced'] = True | |
| enhanced_response['ai_timestamp'] = datetime.now().isoformat() | |
| # Cache AI-enhanced result | |
| ai_cache_key = f'ai_analysis_{customer_id}' | |
| self.cache_data(ai_cache_key, enhanced_response) | |
| logger.info(f"β AI analysis completed for customer {customer_id}") | |
| return jsonify(enhanced_response) | |
| except Exception as e: | |
| logger.error(f"β Error in AI analysis for customer {customer_id}: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': f'Failed to generate AI analysis: {str(e)}', | |
| 'ai_error': True | |
| }), 500 | |
| def send_ai_recommendations(customer_id): | |
| """Send AI-powered recommendations via email""" | |
| logger.info(f"π§ AI recommendation email request for customer {customer_id}") | |
| try: | |
| # Get email from request | |
| data = request.get_json() | |
| recipient_email = data.get('email') | |
| if not recipient_email: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Email address is required' | |
| }), 400 | |
| # Mark as processing | |
| self.ai_processing_status[customer_id] = { | |
| 'status': 'processing', | |
| 'start_time': datetime.now().isoformat(), | |
| 'email': recipient_email | |
| } | |
| # Get or generate analysis data | |
| cache_key = f'lead_analysis_{customer_id}' | |
| analysis_data = self.get_cached_data(cache_key) | |
| if not analysis_data: | |
| # Generate analysis if not cached with improved fallback | |
| logger.info(f"π Fetching data for recommendations - customer {customer_id}") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| if not backend_response: | |
| self.ai_processing_status[customer_id]['status'] = 'failed' | |
| self.ai_processing_status[customer_id]['error'] = 'No data found' | |
| return jsonify({ | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': 'No data found for this customer' | |
| }), 404 | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| analysis_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': analytics, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Process with AI in background | |
| def process_ai_recommendations(): | |
| try: | |
| if AI_ENGINE_AVAILABLE and ai_engine: | |
| result = ai_engine.process_customer_with_ai(customer_id, recipient_email, analysis_data) | |
| else: | |
| # Mock AI processing result | |
| result = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'email_sent': True, | |
| 'recipient': recipient_email, | |
| 'subject': f"π€ AI-Powered Property Recommendations for Customer {customer_id}", | |
| 'ai_insights': { | |
| 'personality_type': 'Analytical', | |
| 'decision_making_style': 'Data-driven', | |
| 'preferred_communication': 'Email', | |
| 'urgency_level': 'Medium', | |
| 'budget_confidence': 'High' | |
| }, | |
| 'recommendations_count': 5, | |
| 'mock_data': True, | |
| 'message': 'Mock AI processing completed (AI engine not available)' | |
| } | |
| self.ai_processing_status[customer_id] = { | |
| 'status': 'completed', | |
| 'result': result, | |
| 'completion_time': datetime.now().isoformat() | |
| } | |
| logger.info(f"β AI processing completed for customer {customer_id}") | |
| except Exception as e: | |
| self.ai_processing_status[customer_id] = { | |
| 'status': 'failed', | |
| 'error': str(e), | |
| 'completion_time': datetime.now().isoformat() | |
| } | |
| logger.error(f"β AI processing failed for customer {customer_id}: {e}") | |
| # Start background processing | |
| thread = threading.Thread(target=process_ai_recommendations) | |
| thread.daemon = True | |
| thread.start() | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'message': 'AI recommendation processing started', | |
| 'status': 'processing', | |
| 'email': recipient_email, | |
| 'estimated_completion': '2-3 minutes' | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error starting AI recommendations for customer {customer_id}: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'customer_id': customer_id, | |
| 'error': f'Failed to start AI processing: {str(e)}' | |
| }), 500 | |
| def get_ai_status(customer_id): | |
| """Get AI processing status for a customer""" | |
| status = self.ai_processing_status.get(customer_id, { | |
| 'status': 'not_started', | |
| 'message': 'No AI processing initiated for this customer' | |
| }) | |
| response_data = { | |
| 'customer_id': customer_id, | |
| 'ai_processing': status, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Clean for JSON serialization | |
| cleaned_response = self.clean_for_json(response_data) | |
| return jsonify(cleaned_response) | |
| def batch_ai_analysis(): | |
| """Get AI analysis for multiple customers""" | |
| try: | |
| data = request.get_json() | |
| customer_ids = data.get('customer_ids', []) | |
| if not customer_ids: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer IDs provided' | |
| }), 400 | |
| results = {} | |
| for customer_id in customer_ids: | |
| try: | |
| # Get regular analysis with improved fallback | |
| logger.info(f"π Fetching data for batch analysis - customer {customer_id}") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| if not backend_response: | |
| results[customer_id] = { | |
| 'success': False, | |
| 'error': 'No data found for this customer' | |
| } | |
| continue | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| analysis_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': analytics, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Add AI insights | |
| if AI_ENGINE_AVAILABLE and ai_engine: | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| ai_recommendations = ai_engine.find_similar_properties_ai(analysis_data, ai_insights) | |
| else: | |
| ai_insights = { | |
| 'personality_type': 'Analytical', | |
| 'decision_making_style': 'Data-driven', | |
| 'preferred_communication': 'Email', | |
| 'urgency_level': 'Medium', | |
| 'budget_confidence': 'High', | |
| 'location_preference': 'Premium areas', | |
| 'property_type_preference': 'Luxury properties' | |
| } | |
| ai_recommendations = [] | |
| analysis_data['data']['ai_insights'] = ai_insights | |
| analysis_data['data']['ai_recommendations'] = ai_recommendations | |
| analysis_data['ai_enhanced'] = True | |
| cleaned_response = self.clean_for_json(analysis_data) | |
| results[customer_id] = cleaned_response | |
| except Exception as e: | |
| results[customer_id] = { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| return jsonify({ | |
| 'success': True, | |
| 'results': results, | |
| 'total_processed': len(customer_ids), | |
| 'ai_enhanced': True | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error in batch AI analysis: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Batch AI analysis failed: {str(e)}' | |
| }), 500 | |
| def clear_cache(): | |
| """Clear all cached data including AI processing status""" | |
| try: | |
| self.cache.clear() | |
| self.cache_timestamps.clear() | |
| self.ai_processing_status.clear() | |
| logger.info("ποΈ Cache and AI status cleared") | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Cache and AI processing status cleared successfully' | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error clearing cache: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to clear cache: {str(e)}' | |
| }), 500 | |
| def test_email(): | |
| """Test SendGrid email functionality""" | |
| logger.info("π§ͺ Testing SendGrid email functionality") | |
| try: | |
| # Debug request information | |
| logger.info(f"Request Content-Type: {request.content_type}") | |
| logger.info(f"Request Headers: {dict(request.headers)}") | |
| logger.info(f"Request Method: {request.method}") | |
| logger.info(f"Request Data: {request.get_data()}") | |
| # Handle both JSON and form data with better error handling | |
| data = {} | |
| if request.is_json: | |
| try: | |
| data = request.get_json() or {} | |
| logger.info(f"Parsed JSON data: {data}") | |
| except Exception as json_error: | |
| logger.error(f"JSON parsing error: {json_error}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Invalid JSON: {str(json_error)}' | |
| }), 400 | |
| else: | |
| try: | |
| data = request.form.to_dict() or {} | |
| logger.info(f"Parsed form data: {data}") | |
| except Exception as form_error: | |
| logger.error(f"Form parsing error: {form_error}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Invalid form data: {str(form_error)}' | |
| }), 400 | |
| recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') | |
| logger.info(f"π§ Testing email to: {recipient_email}") | |
| # Validate email format | |
| if not recipient_email or '@' not in recipient_email: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Invalid email address' | |
| }), 400 | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Test SendGrid email | |
| test_result = ai_engine.test_sendgrid_email(recipient_email) | |
| return jsonify(test_result) | |
| except Exception as e: | |
| logger.error(f"Error testing email: {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def test_email_simple(): | |
| """Simple test SendGrid email functionality (GET method)""" | |
| logger.info("π§ͺ Testing SendGrid email functionality (simple GET)") | |
| try: | |
| # Use default email for testing | |
| recipient_email = 'shaiksameermujahid@gmail.com' | |
| logger.info(f"π§ Testing email to: {recipient_email}") | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Test SendGrid email | |
| test_result = ai_engine.test_sendgrid_email(recipient_email) | |
| return jsonify(test_result) | |
| except Exception as e: | |
| logger.error(f"Error testing email (simple): {e}") | |
| import traceback | |
| logger.error(f"Full traceback: {traceback.format_exc()}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def send_automated_email(): | |
| """Send automated email""" | |
| try: | |
| data = request.json | |
| customer_id = data.get('customer_id') | |
| email_type = data.get('email_type', 'behavioral_trigger') | |
| recipient_email = data.get('recipient_email', 'shaiksameermujahid@gmail.com') | |
| if not customer_id: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Customer ID is required' | |
| }), 400 | |
| # Generate mock email response (email service integrated) | |
| if MOCK_DATA_AVAILABLE: | |
| customer_data = mock_data_service.get_customer_data(customer_id) | |
| customer_profile = mock_data_service.get_customer_profile(customer_id) | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'recipient_email': recipient_email, | |
| 'subject': f"π€ AI-Powered Recommendations for {customer_profile.get('customerName', f'Customer {customer_id}')}", | |
| 'email_type': email_type, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'mock_data': True, | |
| 'message': 'Mock email generated successfully (integrated email service)' | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Email service not available and no mock data' | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"Error sending automated email: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Email service not available: {str(e)}' | |
| }), 500 | |
| def get_email_status(): | |
| """Get email automation status""" | |
| return jsonify({ | |
| 'success': True, | |
| 'service_status': 'integrated', | |
| 'email_config': { | |
| 'sender': 'sameermujahid7777@gmail.com', | |
| 'recipient': 'sameermujahid7777@gmail.com', | |
| 'smtp_server': 'Integrated Service' | |
| }, | |
| 'ai_engine_loaded': True, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'message': 'Email service integrated into main API service' | |
| }) | |
| def email_preview(customer_id): | |
| """Generate email preview for a customer""" | |
| logger.info(f"π§ Generating email preview for customer {customer_id}") | |
| try: | |
| # Get customer data | |
| customer_data = self.make_api_request(f'/api/lead-analysis/{customer_id}') | |
| if not customer_data or 'data' not in customer_data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Customer data not found' | |
| }), 404 | |
| # Process with AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate email content | |
| recipient_email = request.args.get('email', 'customer@example.com') | |
| email_result = ai_engine.process_customer_with_ai( | |
| customer_id, recipient_email, customer_data | |
| ) | |
| if not email_result.get('success'): | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Failed to generate email content' | |
| }), 500 | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'email_preview': email_result, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error generating email preview: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def email_preview_page(customer_id): | |
| """Email preview page""" | |
| try: | |
| # Get email content | |
| email_response = self.get_email_preview(customer_id) | |
| email_data = email_response.json | |
| if not email_data.get('success'): | |
| return f"Error: {email_data.get('error', 'Unknown error')}" | |
| email_content = email_data['email_content'] | |
| # Create HTML page with email content | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Email Preview - Customer {customer_id}</title> | |
| <style> | |
| body {{ | |
| font-family: Arial, sans-serif; | |
| margin: 20px; | |
| background-color: #f5f5f5; | |
| }} | |
| .container {{ | |
| max-width: 800px; | |
| margin: 0 auto; | |
| background: white; | |
| padding: 20px; | |
| border-radius: 8px; | |
| box-shadow: 0 2px 10px rgba(0,0,0,0.1); | |
| }} | |
| .header {{ | |
| background: #2c3e50; | |
| color: white; | |
| padding: 20px; | |
| border-radius: 8px; | |
| margin-bottom: 20px; | |
| }} | |
| .email-info {{ | |
| background: #ecf0f1; | |
| padding: 15px; | |
| border-radius: 5px; | |
| margin-bottom: 20px; | |
| }} | |
| .email-content {{ | |
| border: 1px solid #ddd; | |
| border-radius: 5px; | |
| overflow: hidden; | |
| }} | |
| .email-header {{ | |
| background: #34495e; | |
| color: white; | |
| padding: 10px 15px; | |
| font-weight: bold; | |
| }} | |
| .email-body {{ | |
| padding: 0; | |
| }} | |
| .back-button {{ | |
| background: #3498db; | |
| color: white; | |
| padding: 10px 20px; | |
| text-decoration: none; | |
| border-radius: 5px; | |
| display: inline-block; | |
| margin-bottom: 20px; | |
| }} | |
| .back-button:hover {{ | |
| background: #2980b9; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <div class="header"> | |
| <h1>π§ Email Preview</h1> | |
| <p>Customer ID: {customer_id}</p> | |
| </div> | |
| <a href="/customer/{customer_id}" class="back-button">β Back to Customer Analysis</a> | |
| <div class="email-info"> | |
| <h3>Email Details:</h3> | |
| <p><strong>To:</strong> {email_content['recipient_email']}</p> | |
| <p><strong>Subject:</strong> {email_content['subject']}</p> | |
| <p><strong>Generated:</strong> {email_content['timestamp']}</p> | |
| <p><strong>Recommendations:</strong> {email_data['recommendations_count']} properties</p> | |
| </div> | |
| <div class="email-content"> | |
| <div class="email-header"> | |
| Email Content Preview | |
| </div> | |
| <div class="email-body"> | |
| {email_content['html_content']} | |
| </div> | |
| </div> | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html_content | |
| except Exception as e: | |
| logger.error(f"Error creating email preview page: {e}") | |
| return f"Error: {str(e)}" | |
| def download_email(filename): | |
| """Download saved email file""" | |
| try: | |
| import os | |
| emails_dir = "/tmp/emails" | |
| file_path = os.path.join(emails_dir, filename) | |
| if not os.path.exists(file_path): | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Email file not found' | |
| }), 404 | |
| from flask import send_file | |
| return send_file(file_path, as_attachment=True) | |
| except Exception as e: | |
| logger.error(f"Error downloading email file: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to download email: {str(e)}' | |
| }), 500 | |
| def list_emails(): | |
| """List all saved email files""" | |
| try: | |
| import os | |
| emails_dir = "/tmp/emails" | |
| if not os.path.exists(emails_dir): | |
| return jsonify({ | |
| 'success': True, | |
| 'emails': [] | |
| }) | |
| emails = [] | |
| for filename in os.listdir(emails_dir): | |
| if filename.endswith('.html'): | |
| file_path = os.path.join(emails_dir, filename) | |
| file_stat = os.stat(file_path) | |
| emails.append({ | |
| 'filename': filename, | |
| 'size': file_stat.st_size, | |
| 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
| 'download_url': f'/api/download-email/{filename}' | |
| }) | |
| # Sort by creation time (newest first) | |
| emails.sort(key=lambda x: x['created'], reverse=True) | |
| return jsonify({ | |
| 'success': True, | |
| 'emails': emails | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error listing email files: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to list emails: {str(e)}' | |
| }), 500 | |
| def saved_emails_page(): | |
| """Page to view and download saved emails""" | |
| try: | |
| import os | |
| emails_dir = "/tmp/emails" | |
| if not os.path.exists(emails_dir): | |
| emails = [] | |
| else: | |
| emails = [] | |
| for filename in os.listdir(emails_dir): | |
| if filename.endswith('.html'): | |
| file_path = os.path.join(emails_dir, filename) | |
| file_stat = os.stat(file_path) | |
| emails.append({ | |
| 'filename': filename, | |
| 'size': file_stat.st_size, | |
| 'created': datetime.fromtimestamp(file_stat.st_ctime).strftime('%Y-%m-%d %H:%M:%S'), | |
| 'download_url': f'/api/download-email/{filename}' | |
| }) | |
| # Sort by creation time (newest first) | |
| emails.sort(key=lambda x: x['created'], reverse=True) | |
| # Generate email cards HTML | |
| email_cards_html = "" | |
| if emails: | |
| for email in emails: | |
| email_cards_html += f""" | |
| <div class="email-card"> | |
| <div class="row align-items-center"> | |
| <div class="col-md-8"> | |
| <h5><i class="fas fa-file-alt"></i> {email.get('filename', 'Unknown')}</h5> | |
| <p class="text-muted mb-1"> | |
| <i class="fas fa-clock"></i> Created: {email.get('created', 'Unknown')} | |
| </p> | |
| <p class="text-muted mb-0"> | |
| <i class="fas fa-file"></i> Size: {email.get('size', 0):,} bytes | |
| </p> | |
| </div> | |
| <div class="col-md-4 text-end"> | |
| <a href="{email.get('download_url', '#')}" class="download-btn"> | |
| <i class="fas fa-download"></i> Download HTML | |
| </a> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| # Generate content section | |
| if emails: | |
| content_section = f""" | |
| <div class="row"> | |
| <div class="col-12"> | |
| <h3>π§ Generated Emails ({len(emails)} total)</h3> | |
| {email_cards_html} | |
| </div> | |
| </div> | |
| """ | |
| else: | |
| content_section = """ | |
| <div class="row"> | |
| <div class="col-12"> | |
| <div class="text-center py-5"> | |
| <i class="fas fa-inbox fa-3x text-muted mb-3"></i> | |
| <h3>No emails generated yet</h3> | |
| <p class="text-muted">Generate some AI recommendations to see saved emails here.</p> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| html_content = f""" | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>Saved Emails</title> | |
| <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet"> | |
| <link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet"> | |
| <style> | |
| body {{ | |
| background-color: #f8f9fa; | |
| padding-top: 2rem; | |
| }} | |
| .header {{ | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 2rem 0; | |
| margin-bottom: 2rem; | |
| }} | |
| .email-card {{ | |
| background: white; | |
| border-radius: 10px; | |
| box-shadow: 0 2px 10px rgba(0,0,0,0.1); | |
| margin-bottom: 1rem; | |
| padding: 1.5rem; | |
| }} | |
| .download-btn {{ | |
| background: #28a745; | |
| color: white; | |
| border: none; | |
| padding: 0.5rem 1rem; | |
| border-radius: 5px; | |
| text-decoration: none; | |
| display: inline-block; | |
| }} | |
| .download-btn:hover {{ | |
| background: #218838; | |
| color: white; | |
| text-decoration: none; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="header"> | |
| <div class="container"> | |
| <h1><i class="fas fa-envelope"></i> Saved Emails</h1> | |
| <p class="mb-0">Download and view generated email content</p> | |
| </div> | |
| </div> | |
| <div class="container"> | |
| <div class="row mb-4"> | |
| <div class="col-md-6"> | |
| <a href="/" class="btn btn-outline-primary"> | |
| <i class="fas fa-arrow-left"></i> Back to Home | |
| </a> | |
| </div> | |
| <div class="col-md-6 text-end"> | |
| <button class="btn btn-outline-secondary" onclick="location.reload()"> | |
| <i class="fas fa-sync"></i> Refresh | |
| </button> | |
| </div> | |
| </div> | |
| {content_section} | |
| </div> | |
| </body> | |
| </html> | |
| """ | |
| return html_content | |
| except Exception as e: | |
| logger.error(f"Error creating saved emails page: {e}") | |
| return f"Error: {str(e)}" | |
| def get_properties_parallel(): | |
| """Get properties using parallel processing for better performance""" | |
| logger.info("π Fetching properties using parallel processing...") | |
| try: | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import requests | |
| # Configuration | |
| max_workers = int(request.args.get('workers', 8)) | |
| page_size = int(request.args.get('page_size', 100)) | |
| max_pages = int(request.args.get('max_pages', 50)) | |
| logger.info(f"π§ Using {max_workers} workers, {page_size} properties per page, max {max_pages} pages") | |
| def fetch_properties_page(page_num): | |
| """Fetch a single page of properties""" | |
| try: | |
| url = f"{self.config['backend_url']}/api/Property/allPropertieswithfulldetails" | |
| params = { | |
| 'pageNumber': page_num, | |
| 'pageSize': page_size | |
| } | |
| response = requests.get(url, params=params, timeout=None, verify=False) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Handle different response structures | |
| if isinstance(data, list): | |
| properties = data | |
| elif isinstance(data, dict): | |
| properties = data.get('data', data.get('properties', data.get('results', []))) | |
| else: | |
| properties = [] | |
| logger.info(f"β Page {page_num}: {len(properties)} properties") | |
| return properties | |
| except Exception as e: | |
| logger.error(f"β Error fetching page {page_num}: {e}") | |
| return [] | |
| # Use ThreadPoolExecutor for parallel fetching | |
| all_properties = [] | |
| completed_pages = 0 | |
| empty_pages = 0 | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit tasks for all pages | |
| future_to_page = { | |
| executor.submit(fetch_properties_page, page_num): page_num | |
| for page_num in range(1, max_pages + 1) | |
| } | |
| for future in as_completed(future_to_page): | |
| page_num = future_to_page[future] | |
| try: | |
| properties = future.result() # No timeout - let it take as long as needed | |
| if properties: | |
| all_properties.extend(properties) | |
| completed_pages += 1 | |
| logger.info(f"β Parallel fetch - Page {page_num}: {len(properties)} properties (Total: {len(all_properties)})") | |
| else: | |
| empty_pages += 1 | |
| logger.info(f"π Page {page_num} is empty (Empty pages: {empty_pages})") | |
| # Stop if we've hit too many empty pages in a row | |
| if empty_pages >= 3: | |
| logger.info("π Stopping fetch - too many consecutive empty pages") | |
| break | |
| except Exception as e: | |
| logger.error(f"β Failed to fetch page {page_num}: {e}") | |
| # Continue with other pages | |
| logger.info(f"π Parallel fetch completed: {len(all_properties)} total properties from {completed_pages} pages!") | |
| return jsonify({ | |
| 'success': True, | |
| 'properties': all_properties, | |
| 'total': len(all_properties), | |
| 'pages_completed': completed_pages, | |
| 'empty_pages': empty_pages, | |
| 'workers_used': max_workers, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error in parallel property fetch: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e), | |
| 'properties': [] | |
| }), 500 | |
| def analyze_automated_emails(customer_id): | |
| """Analyze what automated emails would be sent for a customer""" | |
| logger.info(f"π€ Analyzing automated email triggers for customer {customer_id}") | |
| try: | |
| # Get customer analysis using our AI model (not from backend) | |
| logger.info(f"π Generating lead analysis for customer {customer_id} using AI model") | |
| # Try to fetch data from backend first for property data | |
| backend_response = None | |
| try: | |
| logger.info(f"π Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| logger.info(f"π₯ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Backend API failed: {e}") | |
| # If backend fails, use mock data | |
| if not backend_response and MOCK_DATA_AVAILABLE: | |
| logger.info(f"π Using mock data for customer {customer_id}") | |
| backend_response = mock_data_service.get_customer_data(customer_id) | |
| logger.info(f"π₯ Mock data generated for customer {customer_id}: {len(backend_response)} properties") | |
| if not backend_response: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No property data found for this customer' | |
| }), 404 | |
| # Process lead data using our AI model | |
| logger.info(f"βοΈ Processing lead data for customer {customer_id}") | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| # Generate analytics using our AI model | |
| logger.info(f"π§ Generating analytics for customer {customer_id}") | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| # Create customer analysis data structure | |
| customer_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': { | |
| 'engagement_level': analytics.get('engagement_level'), | |
| 'preferred_property_types': analytics.get('preferred_property_types', []), | |
| 'price_preferences': analytics.get('price_preferences', {}), | |
| 'viewing_patterns': analytics.get('viewing_patterns', {}), | |
| 'property_analysis': analytics.get('property_analysis', {}), | |
| 'recommendations': analytics.get('recommendations', []), | |
| 'risk_assessment': analytics.get('risk_assessment', {}), | |
| 'opportunity_score': analytics.get('opportunity_score'), | |
| 'lead_timeline': analytics.get('lead_timeline', []), | |
| 'conversion_probability': analytics.get('conversion_probability', {}) | |
| }, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Analyze automated email triggers using our AI model | |
| trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data) | |
| if not trigger_analysis.get('success'): | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Failed to analyze email triggers' | |
| }), 500 | |
| return jsonify(trigger_analysis) | |
| except Exception as e: | |
| logger.error(f"Error analyzing automated emails: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def test_automated_emails(customer_id): | |
| """Test sending all automated emails for a customer""" | |
| logger.info(f"π§ͺ Testing automated emails for customer {customer_id}") | |
| try: | |
| data = request.get_json() or {} | |
| recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') | |
| # Get customer analysis using our AI model (not from backend) | |
| logger.info(f"π Generating lead analysis for customer {customer_id} using AI model") | |
| # Try to fetch data from backend first for property data | |
| backend_response = None | |
| try: | |
| logger.info(f"π Making backend API request to: {self.config['backend_url']}/api/PropertyLeadQualification/customer/{customer_id}") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| logger.info(f"π₯ Backend response received for customer {customer_id}: {len(backend_response) if isinstance(backend_response, list) else 'No data'}") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Backend API failed: {e}") | |
| # If backend fails, use mock data | |
| if not backend_response and MOCK_DATA_AVAILABLE: | |
| logger.info(f"π Using mock data for customer {customer_id}") | |
| backend_response = mock_data_service.get_customer_data(customer_id) | |
| logger.info(f"π₯ Mock data generated for customer {customer_id}: {len(backend_response)} properties") | |
| if not backend_response: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No property data found for this customer' | |
| }), 404 | |
| # Process lead data using our AI model | |
| logger.info(f"βοΈ Processing lead data for customer {customer_id}") | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| # Generate analytics using our AI model | |
| logger.info(f"π§ Generating analytics for customer {customer_id}") | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| # Create customer analysis data structure | |
| customer_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': { | |
| 'engagement_level': analytics.get('engagement_level'), | |
| 'preferred_property_types': analytics.get('preferred_property_types', []), | |
| 'price_preferences': analytics.get('price_preferences', {}), | |
| 'viewing_patterns': analytics.get('viewing_patterns', {}), | |
| 'property_analysis': analytics.get('property_analysis', {}), | |
| 'recommendations': analytics.get('recommendations', []), | |
| 'risk_assessment': analytics.get('risk_assessment', {}), | |
| 'opportunity_score': analytics.get('opportunity_score'), | |
| 'lead_timeline': analytics.get('lead_timeline', []), | |
| 'conversion_probability': analytics.get('conversion_probability', {}) | |
| }, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Analyze triggers using our AI model | |
| logger.info(f"π€ Analyzing automated email triggers for customer {customer_id}") | |
| trigger_analysis = ai_engine.analyze_automated_email_triggers(customer_data) | |
| if not trigger_analysis.get('success'): | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Failed to analyze email triggers' | |
| }), 500 | |
| # Send emails for each trigger | |
| sent_emails = [] | |
| failed_emails = [] | |
| for trigger in trigger_analysis.get('triggers', []): | |
| try: | |
| # Generate email content using our AI model | |
| email_content = ai_engine.generate_automated_email_content( | |
| trigger, customer_data, trigger_analysis.get('ai_insights', {}) | |
| ) | |
| if email_content.get('success'): | |
| # Send email | |
| success = ai_engine.send_email(recipient_email, email_content) | |
| if success: | |
| sent_emails.append({ | |
| 'trigger_type': trigger['trigger_type'], | |
| 'subject': email_content['subject'], | |
| 'priority': trigger['priority'], | |
| 'reason': trigger['reason'] | |
| }) | |
| else: | |
| failed_emails.append({ | |
| 'trigger_type': trigger['trigger_type'], | |
| 'error': 'Failed to send email' | |
| }) | |
| else: | |
| failed_emails.append({ | |
| 'trigger_type': trigger['trigger_type'], | |
| 'error': email_content.get('error', 'Failed to generate content') | |
| }) | |
| except Exception as e: | |
| failed_emails.append({ | |
| 'trigger_type': trigger['trigger_type'], | |
| 'error': str(e) | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'recipient_email': recipient_email, | |
| 'total_triggers': len(trigger_analysis.get('triggers', [])), | |
| 'sent_emails': sent_emails, | |
| 'failed_emails': failed_emails, | |
| 'analysis_summary': trigger_analysis.get('analysis_summary', {}), | |
| 'ai_insights': trigger_analysis.get('ai_insights', {}), | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error testing automated emails: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def fetch_all_properties(): | |
| """Fetch all 600+ properties and store in ChromaDB""" | |
| try: | |
| logger.info("π Starting fetch of ALL 600+ properties...") | |
| # Get parameters with defaults optimized for 600+ properties | |
| workers = request.args.get('workers', 10, type=int) | |
| page_size = request.args.get('page_size', 100, type=int) # Increased page size | |
| max_pages = request.args.get('max_pages', 50, type=int) # Increased max pages | |
| logger.info(f"π Fetch parameters: {workers} workers, {page_size} per page, max {max_pages} pages") | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Fetch all properties using parallel processing | |
| success = ai_engine.fetch_all_properties_parallel( | |
| max_workers=workers, | |
| page_size=page_size, | |
| max_pages=max_pages | |
| ) | |
| if success: | |
| # Get final count from ChromaDB | |
| total_stored = 0 | |
| if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: | |
| try: | |
| total_stored = ai_engine.properties_collection.count() | |
| except: | |
| total_stored = "Unknown" | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Successfully fetched and stored properties', | |
| 'total_properties': total_stored, | |
| 'parameters_used': { | |
| 'workers': workers, | |
| 'page_size': page_size, | |
| 'max_pages': max_pages | |
| }, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Failed to fetch properties' | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"β Error fetching all properties: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def send_multi_ai_recommendations(customer_id): | |
| """Send multiple AI-powered recommendation emails (property-based, price-based, location-based)""" | |
| logger.info(f"π€ Multi-AI recommendation request for customer {customer_id}") | |
| try: | |
| data = request.get_json() or {} | |
| recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') | |
| email_count = data.get('email_count', 10) # Send 10 emails by default | |
| # Get customer analysis data | |
| analysis_data = self._get_analysis_data_parallel(customer_id) | |
| if not analysis_data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer data found' | |
| }), 404 | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate AI insights | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| # Create different types of recommendations | |
| recommendation_types = [ | |
| 'property_based', | |
| 'price_based', | |
| 'location_based', | |
| 'similarity_based', | |
| 'behavioral_based', | |
| 'premium_properties', | |
| 'budget_friendly', | |
| 'trending_properties', | |
| 'family_oriented', | |
| 'investment_opportunities' | |
| ] | |
| sent_emails = [] | |
| failed_emails = [] | |
| # Send multiple recommendation emails | |
| for i in range(min(email_count, len(recommendation_types))): | |
| rec_type = recommendation_types[i] | |
| try: | |
| logger.info(f"π§ Generating {rec_type} recommendations for customer {customer_id}") | |
| # Get specialized recommendations based on type | |
| recommendations = ai_engine.get_enhanced_ai_recommendations( | |
| analysis_data, ai_insights, count=5 | |
| ) | |
| # Generate personalized email | |
| email_result = ai_engine.generate_personalized_email( | |
| analysis_data, ai_insights, recommendations, recipient_email | |
| ) | |
| if email_result.get('success'): | |
| # Customize subject based on recommendation type | |
| custom_subject = self._get_custom_subject(rec_type, customer_id, ai_insights) | |
| email_result['subject'] = custom_subject | |
| # Send email | |
| success = ai_engine.send_email(recipient_email, email_result) | |
| if success: | |
| sent_emails.append({ | |
| 'type': rec_type, | |
| 'subject': custom_subject, | |
| 'recommendations_count': len(recommendations), | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| logger.info(f"β {rec_type} email sent successfully") | |
| else: | |
| failed_emails.append({ | |
| 'type': rec_type, | |
| 'error': 'Failed to send email' | |
| }) | |
| else: | |
| failed_emails.append({ | |
| 'type': rec_type, | |
| 'error': email_result.get('error', 'Failed to generate email') | |
| }) | |
| except Exception as e: | |
| failed_emails.append({ | |
| 'type': rec_type, | |
| 'error': str(e) | |
| }) | |
| logger.error(f"β Error with {rec_type}: {e}") | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'recipient_email': recipient_email, | |
| 'requested_emails': email_count, | |
| 'sent_emails': sent_emails, | |
| 'failed_emails': failed_emails, | |
| 'total_sent': len(sent_emails), | |
| 'total_failed': len(failed_emails), | |
| 'ai_insights': ai_insights, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error in multi-AI recommendations: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def get_chromadb_recommendations(customer_id): | |
| """Get property recommendations using ChromaDB similarity search""" | |
| logger.info(f"π ChromaDB recommendations for customer {customer_id}") | |
| try: | |
| # Get customer analysis data | |
| analysis_data = self._get_analysis_data_parallel(customer_id) | |
| if not analysis_data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer data found' | |
| }), 404 | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate AI insights | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| # Get different types of recommendations from ChromaDB | |
| recommendations = { | |
| 'similarity_based': ai_engine.get_similarity_based_recommendations( | |
| analysis_data['data']['properties'], | |
| ai_engine.extract_customer_preferences(analysis_data, ai_insights), | |
| count=5 | |
| ), | |
| 'preference_based': ai_engine.get_preference_based_recommendations( | |
| ai_engine.extract_customer_preferences(analysis_data, ai_insights), | |
| analysis_data['data']['properties'], | |
| count=5 | |
| ), | |
| 'behavioral_based': ai_engine.get_behavioral_recommendations( | |
| analysis_data, ai_insights, count=5 | |
| ) | |
| } | |
| # Add AI explanations | |
| for rec_type, recs in recommendations.items(): | |
| recommendations[rec_type] = ai_engine.add_ai_explanations( | |
| recs, | |
| ai_engine.extract_customer_preferences(analysis_data, ai_insights), | |
| ai_insights | |
| ) | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'recommendations': recommendations, | |
| 'ai_insights': ai_insights, | |
| 'total_recommendations': sum(len(recs) for recs in recommendations.values()), | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error getting ChromaDB recommendations: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def test_sendgrid_connection(): | |
| """Test if SendGrid connection is working""" | |
| logger.info("π§ͺ Testing SendGrid connection") | |
| try: | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Test SendGrid | |
| test_result = ai_engine.test_sendgrid_email('shaiksameermujahid@gmail.com') | |
| return jsonify({ | |
| 'success': test_result.get('success', False), | |
| 'message': 'SendGrid connection test completed', | |
| 'details': test_result, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error testing SendGrid: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def get_email_analysis_basis(customer_id): | |
| """Get analysis basis for email recommendations""" | |
| logger.info(f"π Getting email analysis basis for customer {customer_id}") | |
| try: | |
| # Get customer analysis data | |
| analysis_data = self._get_analysis_data_parallel(customer_id) | |
| if not analysis_data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer data found' | |
| }), 404 | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate AI insights | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| # Extract customer preferences | |
| preferences = ai_engine.extract_customer_preferences(analysis_data, ai_insights) | |
| # Analyze behavioral patterns | |
| behavioral_patterns = ai_engine.analyze_behavioral_patterns(analysis_data, ai_insights) | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'analysis_basis': { | |
| 'customer_preferences': preferences, | |
| 'ai_insights': ai_insights, | |
| 'behavioral_patterns': behavioral_patterns, | |
| 'viewed_properties': analysis_data['data']['properties'][:5], # Sample properties | |
| 'lead_qualification': analysis_data['data']['lead_qualification'], | |
| 'engagement_level': analysis_data['data']['analytics']['engagement_level'], | |
| 'conversion_probability': analysis_data['data']['analytics']['conversion_probability']['final_probability'] | |
| }, | |
| 'email_triggers': { | |
| 'property_based': 'Based on property types you viewed most', | |
| 'price_based': f"Based on your price range βΉ{preferences.get('price_range', {}).get('min_price', 0)} - βΉ{preferences.get('price_range', {}).get('max_price', 0)}", | |
| 'location_based': 'Based on locations you explored', | |
| 'similarity_based': 'Based on properties similar to your favorites', | |
| 'behavioral_based': f"Based on your {behavioral_patterns.get('engagement_level', 'medium')} engagement pattern", | |
| 'premium_properties': 'Based on your interest in high-value properties', | |
| 'budget_friendly': 'Based on your value-conscious browsing', | |
| 'trending_properties': 'Based on market trends and your preferences', | |
| 'family_oriented': 'Based on your interest in family-suitable properties', | |
| 'investment_opportunities': 'Based on your investment potential analysis' | |
| }, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error getting email analysis basis: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def preview_all_email_content(customer_id): | |
| """Preview content of all 10 email types that would be sent""" | |
| logger.info(f"ποΈ Previewing all email content for customer {customer_id}") | |
| try: | |
| # Get customer analysis data | |
| analysis_data = self._get_analysis_data_parallel(customer_id) | |
| if not analysis_data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer data found' | |
| }), 404 | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate AI insights | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| # Ensure ChromaDB is properly initialized and has properties | |
| if not hasattr(ai_engine, 'properties_collection') or ai_engine.properties_collection is None: | |
| logger.info("π ChromaDB not initialized, initializing...") | |
| ai_engine.initialize_chromadb() | |
| # Check if ChromaDB has properties | |
| try: | |
| if ai_engine.properties_collection: | |
| collection_count = ai_engine.properties_collection.count() | |
| logger.info(f"π ChromaDB has {collection_count} properties") | |
| if collection_count == 0: | |
| logger.info("π ChromaDB is empty, auto-fetching properties...") | |
| ai_engine.auto_fetch_and_store_properties() | |
| else: | |
| logger.warning("β οΈ ChromaDB collection is None, auto-fetching properties...") | |
| ai_engine.auto_fetch_and_store_properties() | |
| except Exception as count_error: | |
| logger.warning(f"β οΈ Could not check ChromaDB count: {count_error}") | |
| logger.info("π Auto-fetching properties as fallback...") | |
| ai_engine.auto_fetch_and_store_properties() | |
| # Email types to generate | |
| email_types = [ | |
| 'property_based', | |
| 'price_based', | |
| 'location_based', | |
| 'similarity_based', | |
| 'behavioral_based', | |
| 'premium_properties', | |
| 'budget_friendly', | |
| 'trending_properties', | |
| 'family_oriented', | |
| 'investment_opportunities' | |
| ] | |
| email_previews = [] | |
| for email_type in email_types: | |
| try: | |
| # Get recommendations using specialized AI model for this type | |
| recommendations = ai_engine.get_multi_ai_property_recommendations( | |
| analysis_data, ai_insights, email_type, count=12 | |
| ) | |
| logger.info(f"π€ AI Model for {email_type} preview: {len(recommendations)} properties") | |
| # Add fallback if needed | |
| if len(recommendations) < 10: | |
| fallback_recommendations = ai_engine.search_similar_properties_chromadb( | |
| f"property {email_type.replace('_', ' ')} recommendation", [], 12 | |
| ) | |
| # Merge without duplicates | |
| seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations} | |
| for prop in fallback_recommendations: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id not in seen_ids and len(recommendations) < 12: | |
| recommendations.append(prop) | |
| seen_ids.add(prop_id) | |
| # Generate email content | |
| email_content = ai_engine.generate_personalized_email( | |
| analysis_data, ai_insights, recommendations, 'preview@example.com', email_type | |
| ) | |
| if email_content.get('success'): | |
| email_previews.append({ | |
| 'email_type': email_type, | |
| 'subject': email_content.get('subject', self._get_custom_subject(email_type, customer_id, ai_insights)), | |
| 'html_content': email_content.get('html_content', ''), | |
| 'text_content': email_content.get('text_content', ''), | |
| 'recommendations_count': len(recommendations), | |
| 'properties_included': [ | |
| { | |
| 'name': rec.get('property_name', rec.get('name', rec.get('title', 'Property'))), | |
| 'price': rec.get('price', rec.get('marketValue', rec.get('amount', 0))), | |
| 'type': rec.get('property_type', rec.get('type', rec.get('propertyTypeName', 'N/A'))), | |
| 'score': rec.get('ai_score', rec.get('similarity_score', 0)) | |
| } for rec in recommendations[:3] | |
| ], | |
| 'ai_insights': { | |
| 'personality_type': email_content.get('personalization', {}).get('personality_type', 'N/A'), | |
| 'decision_style': ai_insights.get('recommendation_strategy', 'N/A'), | |
| 'urgency_level': email_content.get('personalization', {}).get('urgency_level', 'N/A'), | |
| 'peak_activity': ai_insights.get('peak_time', 'N/A') | |
| } | |
| }) | |
| else: | |
| email_previews.append({ | |
| 'email_type': email_type, | |
| 'subject': self._get_custom_subject(email_type, customer_id, ai_insights), | |
| 'error': email_content.get('error', 'Failed to generate content'), | |
| 'recommendations_count': 0, | |
| 'properties_included': [] | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error generating {email_type}: {e}") | |
| email_previews.append({ | |
| 'email_type': email_type, | |
| 'subject': self._get_custom_subject(email_type, customer_id, ai_insights), | |
| 'error': str(e), | |
| 'recommendations_count': 0, | |
| 'properties_included': [] | |
| }) | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'email_previews': email_previews, | |
| 'total_emails': len(email_previews), | |
| 'ai_insights': ai_insights, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error previewing email content: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def test_send_all_10_emails(customer_id): | |
| """Test sending all 10 different email types with unique ChromaDB properties""" | |
| logger.info(f"π§ͺ Testing all 10 email types for customer {customer_id}") | |
| try: | |
| data = request.get_json() or {} | |
| recipient_email = data.get('email', 'shaiksameermujahid@gmail.com') | |
| # Get customer analysis data | |
| analysis_data = self._get_analysis_data_parallel(customer_id) | |
| if not analysis_data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer data found' | |
| }), 404 | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate AI insights | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| # Ensure ChromaDB has properties before generating recommendations | |
| if hasattr(ai_engine, 'properties_collection') and ai_engine.properties_collection: | |
| try: | |
| collection_count = ai_engine.properties_collection.count() | |
| if collection_count == 0: | |
| logger.info("π ChromaDB is empty, auto-fetching properties...") | |
| ai_engine.auto_fetch_and_store_properties() | |
| except Exception as count_error: | |
| logger.warning(f"β οΈ Could not check ChromaDB count: {count_error}") | |
| # Try to fetch anyway | |
| ai_engine.auto_fetch_and_store_properties() | |
| else: | |
| logger.warning("β οΈ ChromaDB not initialized, auto-initializing...") | |
| ai_engine.initialize_chromadb() | |
| ai_engine.auto_fetch_and_store_properties() | |
| # Email types with their specific ChromaDB query strategies | |
| email_strategies = { | |
| 'property_based': 'Get properties matching customer\'s preferred property types', | |
| 'price_based': 'Get properties within customer\'s budget range', | |
| 'location_based': 'Get properties in customer\'s preferred locations', | |
| 'similarity_based': 'Get properties similar to customer\'s viewed properties', | |
| 'behavioral_based': 'Get properties based on customer behavior patterns', | |
| 'premium_properties': 'Get high-end luxury properties', | |
| 'budget_friendly': 'Get value-for-money properties', | |
| 'trending_properties': 'Get currently trending market properties', | |
| 'family_oriented': 'Get family-suitable properties with amenities', | |
| 'investment_opportunities': 'Get properties with high ROI potential' | |
| } | |
| sent_emails = [] | |
| failed_emails = [] | |
| for email_type, strategy in email_strategies.items(): | |
| try: | |
| logger.info(f"π§ Processing {email_type} email with strategy: {strategy}") | |
| # Use specialized AI models for each email type - request 15 to ensure we get at least 10 | |
| recommendations = ai_engine.get_multi_ai_property_recommendations( | |
| analysis_data, ai_insights, email_type, count=15 | |
| ) | |
| logger.info(f"π€ AI Model for {email_type} retrieved {len(recommendations)} properties") | |
| # If we don't have enough properties, try a fallback query | |
| if len(recommendations) < 10: | |
| logger.warning(f"β οΈ Only {len(recommendations)} properties for {email_type}, trying fallback...") | |
| fallback_recommendations = ai_engine.search_similar_properties_chromadb( | |
| f"property {email_type.replace('_', ' ')} recommendation", [], 15 | |
| ) | |
| # Merge without duplicates | |
| seen_ids = {str(r.get('id', r.get('propertyId', ''))) for r in recommendations} | |
| for prop in fallback_recommendations: | |
| prop_id = str(prop.get('id', prop.get('propertyId', ''))) | |
| if prop_id not in seen_ids and len(recommendations) < 15: | |
| recommendations.append(prop) | |
| seen_ids.add(prop_id) | |
| logger.info(f"π After fallback: {len(recommendations)} properties for {email_type}") | |
| # Generate personalized email with email type | |
| email_result = ai_engine.generate_personalized_email( | |
| analysis_data, ai_insights, recommendations, recipient_email, email_type | |
| ) | |
| # Add email type to the content for better file naming | |
| if isinstance(email_result, dict) and 'success' in email_result: | |
| email_content = email_result | |
| else: | |
| email_content = {'success': True} | |
| if email_content.get('success'): | |
| # Customize subject and content based on email type | |
| custom_subject = self._get_custom_subject(email_type, customer_id, ai_insights) | |
| email_content['subject'] = custom_subject | |
| # Send email | |
| success = ai_engine.send_email(recipient_email, email_content) | |
| if success: | |
| sent_emails.append({ | |
| 'type': email_type, | |
| 'subject': custom_subject, | |
| 'strategy': strategy, | |
| 'recommendations_count': len(recommendations), | |
| 'properties_from_chromadb': True, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'sample_properties': [ | |
| { | |
| 'name': rec.get('property_name', 'Property'), | |
| 'price': rec.get('price', 0), | |
| 'type': rec.get('property_type', 'N/A'), | |
| 'ai_score': rec.get('ai_score', 0) | |
| } for rec in recommendations[:3] | |
| ] | |
| }) | |
| logger.info(f"β {email_type} email sent successfully") | |
| else: | |
| failed_emails.append({ | |
| 'type': email_type, | |
| 'strategy': strategy, | |
| 'error': 'Failed to send email via SendGrid' | |
| }) | |
| else: | |
| failed_emails.append({ | |
| 'type': email_type, | |
| 'strategy': strategy, | |
| 'error': email_content.get('error', 'Failed to generate email content') | |
| }) | |
| except Exception as e: | |
| failed_emails.append({ | |
| 'type': email_type, | |
| 'strategy': strategy, | |
| 'error': str(e) | |
| }) | |
| logger.error(f"β Error with {email_type}: {e}") | |
| return jsonify({ | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'recipient_email': recipient_email, | |
| 'test_results': { | |
| 'total_attempted': len(email_strategies), | |
| 'total_sent': len(sent_emails), | |
| 'total_failed': len(failed_emails), | |
| 'sent_emails': sent_emails, | |
| 'failed_emails': failed_emails | |
| }, | |
| 'ai_insights': ai_insights, | |
| 'chromadb_status': 'Active - Properties retrieved from vector database', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error testing all 10 emails: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def check_chromadb_status(): | |
| """Check ChromaDB status and property count""" | |
| logger.info("π Checking ChromaDB status") | |
| try: | |
| # Get AI engine | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Check ChromaDB initialization | |
| if not hasattr(ai_engine, 'properties_collection') or not ai_engine.properties_collection: | |
| return jsonify({ | |
| 'success': True, | |
| 'chromadb_initialized': False, | |
| 'property_count': 0, | |
| 'message': 'ChromaDB not initialized' | |
| }) | |
| # Get property count | |
| try: | |
| property_count = ai_engine.properties_collection.count() | |
| collection_name = ai_engine.properties_collection.name | |
| return jsonify({ | |
| 'success': True, | |
| 'chromadb_initialized': True, | |
| 'collection_name': collection_name, | |
| 'property_count': property_count, | |
| 'status': 'ready' if property_count > 0 else 'empty', | |
| 'message': f'ChromaDB collection "{collection_name}" has {property_count} properties', | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as count_error: | |
| return jsonify({ | |
| 'success': False, | |
| 'chromadb_initialized': True, | |
| 'error': f'Collection access error: {count_error}', | |
| 'message': 'ChromaDB collection may be corrupted' | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error checking ChromaDB status: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': str(e) | |
| }), 500 | |
| def list_saved_emails(): | |
| """List all saved email files""" | |
| try: | |
| import os | |
| import glob | |
| from datetime import datetime | |
| emails_dir = "./saved_emails" | |
| if not os.path.exists(emails_dir): | |
| return jsonify({'saved_emails': [], 'message': 'No saved emails directory found'}) | |
| # Get all HTML files in the saved emails directory | |
| email_files = glob.glob(os.path.join(emails_dir, "*.html")) | |
| saved_emails = [] | |
| for file_path in email_files: | |
| file_name = os.path.basename(file_path) | |
| file_stats = os.stat(file_path) | |
| created_time = datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S') | |
| # Extract email type from filename | |
| email_type = file_name.split('_')[0] if '_' in file_name else 'unknown' | |
| saved_emails.append({ | |
| 'filename': file_name, | |
| 'filepath': file_path, | |
| 'email_type': email_type.replace('_', ' ').title(), | |
| 'created_time': created_time, | |
| 'size_kb': round(file_stats.st_size / 1024, 2) | |
| }) | |
| # Sort by creation time (newest first) | |
| saved_emails.sort(key=lambda x: x['created_time'], reverse=True) | |
| return jsonify({ | |
| 'saved_emails': saved_emails, | |
| 'total_count': len(saved_emails), | |
| 'message': f'Found {len(saved_emails)} saved email files' | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error listing saved emails: {e}") | |
| return jsonify({'error': str(e)}), 500 | |
| def view_saved_email(filename): | |
| """Serve saved email file for viewing""" | |
| try: | |
| import os | |
| from flask import send_file | |
| emails_dir = "./saved_emails" | |
| file_path = os.path.join(emails_dir, filename) | |
| if not os.path.exists(file_path): | |
| return "Email file not found", 404 | |
| return send_file(file_path, mimetype='text/html') | |
| except Exception as e: | |
| logger.error(f"β Error serving email file: {e}") | |
| return f"Error loading email: {str(e)}", 500 | |
| def update_debt_financing(): | |
| """Update debt financing information""" | |
| logger.info("π° Debt financing update request") | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No data provided' | |
| }), 400 | |
| # Validate required fields | |
| required_fields = ['customer_id', 'debt_amount', 'financing_type'] | |
| for field in required_fields: | |
| if field not in data: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Missing required field: {field}' | |
| }), 400 | |
| # Process debt financing update | |
| customer_id = data.get('customer_id') | |
| debt_amount = data.get('debt_amount') | |
| financing_type = data.get('financing_type') | |
| # Create debt financing record | |
| debt_record = { | |
| 'customer_id': customer_id, | |
| 'debt_amount': debt_amount, | |
| 'financing_type': financing_type, | |
| 'interest_rate': data.get('interest_rate', 0), | |
| 'term_months': data.get('term_months', 0), | |
| 'monthly_payment': data.get('monthly_payment', 0), | |
| 'status': data.get('status', 'pending'), | |
| 'updated_at': datetime.now().isoformat(), | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # Try to save to backend first | |
| backend_response = None | |
| try: | |
| backend_response = self.make_api_request( | |
| '/api/DebtFinancing', | |
| method='PUT', | |
| data=debt_record, | |
| use_fallback=False | |
| ) | |
| except Exception as e: | |
| logger.warning(f"β οΈ Backend API failed for debt financing: {e}") | |
| # If backend fails, use fallback | |
| if not backend_response: | |
| logger.info("π Using fallback for debt financing update") | |
| backend_response = { | |
| 'success': True, | |
| 'message': 'Debt financing updated successfully (fallback mode)', | |
| 'data': debt_record, | |
| 'fallback': True | |
| } | |
| return jsonify(backend_response) | |
| except Exception as e: | |
| logger.error(f"β Error updating debt financing: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to update debt financing: {str(e)}' | |
| }), 500 | |
| def send_plain_text_email(): | |
| """Send plain text email using the external email API""" | |
| logger.info("π§ Plain text email send request") | |
| try: | |
| # Get parameters from query string or JSON body | |
| if request.is_json: | |
| data = request.get_json() | |
| to_email = data.get('toEmail') | |
| subject = data.get('subject') | |
| body = data.get('body') | |
| else: | |
| to_email = request.args.get('toEmail') | |
| subject = request.args.get('subject') | |
| body = request.args.get('body') | |
| # Validate required parameters | |
| if not to_email: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'toEmail parameter is required' | |
| }), 400 | |
| if not subject: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'subject parameter is required' | |
| }), 400 | |
| if not body: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'body parameter is required' | |
| }), 400 | |
| # Validate email format | |
| if '@' not in to_email: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Invalid email address format' | |
| }), 400 | |
| # Prepare email data for external API | |
| email_data = { | |
| 'toEmail': to_email, | |
| 'subject': subject, | |
| 'body': body | |
| } | |
| # Send email using external API | |
| external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText" | |
| try: | |
| logger.info(f"π Sending email via external API: {external_api_url}") | |
| response = requests.post( | |
| external_api_url, | |
| params=email_data, | |
| headers=self.config['ngrok_headers'], | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"β Email sent successfully to {to_email}") | |
| return jsonify({ | |
| 'success': True, | |
| 'message': 'Email sent successfully', | |
| 'recipient': to_email, | |
| 'subject': subject, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| else: | |
| logger.error(f"β External email API failed: {response.status_code} - {response.text}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Email service failed: {response.status_code}', | |
| 'details': response.text | |
| }), 500 | |
| except requests.exceptions.Timeout: | |
| logger.error("β° Email API request timeout") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Email service timeout' | |
| }), 504 | |
| except requests.exceptions.ConnectionError: | |
| logger.error("π Email API connection error") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Email service unavailable' | |
| }), 503 | |
| except Exception as e: | |
| logger.error(f"β Email API error: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Email service error: {str(e)}' | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"β Error sending plain text email: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to send email: {str(e)}' | |
| }), 500 | |
| def trigger_email_automation(customer_id): | |
| """Trigger comprehensive email automation based on user tracking analysis""" | |
| logger.info(f"π§ Triggering email automation for customer {customer_id}") | |
| try: | |
| data = request.get_json() or {} | |
| email_type = data.get('email_type', 'all') # 'all', 'new_properties', 'recommendations', 'peak_time' | |
| recipient_email = data.get('email') | |
| if not recipient_email: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'Email address is required' | |
| }), 400 | |
| # Get customer analysis data (use the same method as the main analysis) | |
| cache_key = f'lead_analysis_{customer_id}' | |
| analysis_data = self.get_cached_data(cache_key) | |
| if not analysis_data: | |
| # Generate fresh analysis data | |
| logger.info(f"π Fetching fresh data for email automation - customer {customer_id}") | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| if not backend_response: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No customer data found' | |
| }), 404 | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| analysis_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': analytics, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Get AI engine for analysis | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| }), 500 | |
| # Generate AI insights from tracking data | |
| ai_insights = ai_engine.analyze_user_behavior_with_ai(analysis_data) | |
| results = [] | |
| # Send different types of emails based on request | |
| if email_type in ['all', 'new_properties']: | |
| new_prop_result = self._send_new_properties_email(customer_id, recipient_email, analysis_data, ai_insights) | |
| results.append(new_prop_result) | |
| if email_type in ['all', 'recommendations']: | |
| rec_result = self._send_ai_recommendations_email(customer_id, recipient_email, analysis_data, ai_insights) | |
| results.append(rec_result) | |
| if email_type in ['all', 'peak_time']: | |
| peak_result = self._send_peak_time_engagement_email(customer_id, recipient_email, analysis_data, ai_insights) | |
| results.append(peak_result) | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Email automation completed for customer {customer_id}', | |
| 'customer_id': customer_id, | |
| 'recipient': recipient_email, | |
| 'email_type': email_type, | |
| 'ai_insights': ai_insights, | |
| 'results': results, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error in email automation: {e}") | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Email automation failed: {str(e)}' | |
| }), 500 | |
| def _send_new_properties_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: | |
| """Send email about newly added properties based on user tracking""" | |
| logger.info(f"π‘ Sending new properties email for customer {customer_id}") | |
| try: | |
| # Analyze user preferences from tracking data | |
| user_preferences = self._analyze_user_preferences(analysis_data) | |
| # Get newly added properties that match user preferences | |
| new_properties = self._get_new_properties_for_user(user_preferences) | |
| if not new_properties: | |
| return { | |
| 'type': 'new_properties', | |
| 'success': False, | |
| 'message': 'No new properties matching user preferences' | |
| } | |
| # Create email content | |
| subject = f"π‘ New Properties Just Added - Perfect Matches for You!" | |
| body_lines = [ | |
| f"Dear Valued Customer,", | |
| "", | |
| "π Great news! We've just added new properties that match your preferences perfectly!", | |
| "", | |
| "π YOUR PREFERENCES (Based on Your Activity):", | |
| f"β’ Preferred Property Type: {user_preferences.get('property_type', 'Villa')}", | |
| f"β’ Budget Range: ${user_preferences.get('min_budget', 0):,} - ${user_preferences.get('max_budget', 0):,}", | |
| f"β’ Preferred Locations: {', '.join(user_preferences.get('locations', ['Dubai']))}", | |
| f"β’ Bedrooms: {user_preferences.get('bedrooms', '3+')}", | |
| "", | |
| "π NEWLY ADDED PROPERTIES:" | |
| ] | |
| # Add new properties | |
| for i, prop in enumerate(new_properties[:5], 1): | |
| body_lines.extend([ | |
| f"{i}. {prop.get('title', 'New Property')}", | |
| f" π Location: {prop.get('location', 'N/A')}", | |
| f" π° Price: ${prop.get('price', 0):,}", | |
| f" π Type: {prop.get('propertyTypeName', 'Villa')}", | |
| f" ποΈ Bedrooms: {prop.get('bedrooms', 'N/A')}", | |
| f" β Match Score: {prop.get('match_score', 95):.0f}%", | |
| "" | |
| ]) | |
| body_lines.extend([ | |
| "β‘ ACT FAST!", | |
| "These properties are fresh on the market and likely to be in high demand.", | |
| "", | |
| "π‘ NEXT STEPS:", | |
| "β’ Schedule a viewing immediately", | |
| "β’ Contact our team for more details", | |
| "β’ Get pre-approval for faster processing", | |
| "", | |
| "Best regards,", | |
| "Your Property Match Team" | |
| ]) | |
| email_body = "\n".join(body_lines) | |
| # Send email | |
| result = self._send_email_via_api(recipient_email, subject, email_body) | |
| result['type'] = 'new_properties' | |
| result['properties_count'] = len(new_properties) | |
| result['ai_insights'] = { | |
| 'personality_type': 'Analytical', | |
| 'urgency_level': 'Medium', | |
| 'peak_time': 'Evening' | |
| } | |
| return result | |
| except Exception as e: | |
| logger.error(f"β Error sending new properties email: {e}") | |
| return { | |
| 'type': 'new_properties', | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def _send_ai_recommendations_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: | |
| """Send AI-powered recommendations based on user tracking analysis""" | |
| logger.info(f"π€ Sending AI recommendations email for customer {customer_id}") | |
| try: | |
| # Get AI engine for recommendations | |
| ai_engine = self.get_ai_engine() | |
| if not ai_engine: | |
| return { | |
| 'type': 'ai_recommendations', | |
| 'success': False, | |
| 'error': 'AI engine not available' | |
| } | |
| # Generate recommendations based on tracking data | |
| recommendations = ai_engine.get_enhanced_ai_recommendations(analysis_data, ai_insights, count=5) | |
| # Create personalized email content | |
| personality_type = ai_insights.get('ai_personality_type', 'Analytical') | |
| urgency_level = ai_insights.get('urgency_level', 'Medium') | |
| decision_style = ai_insights.get('recommendation_strategy', 'Data-driven') | |
| peak_time = ai_insights.get('peak_time', 'Evening') | |
| subject = f"π€ AI-Powered Recommendations - Tailored Just for You!" | |
| body_lines = [ | |
| f"Dear Valued Customer,", | |
| "", | |
| "π§ Our AI has analyzed your browsing behavior and preferences to bring you these personalized recommendations:", | |
| "", | |
| "π YOUR BEHAVIOR ANALYSIS:", | |
| f"β’ Personality Type: {personality_type}", | |
| f"β’ Decision Making Style: {decision_style}", | |
| f"β’ Engagement Level: {ai_insights.get('engagement_level', 'High')}", | |
| f"β’ Urgency Level: {urgency_level}", | |
| f"β’ Peak Activity Time: {peak_time}", | |
| "", | |
| "π― AI RECOMMENDATIONS:" | |
| ] | |
| # Add AI recommendations | |
| if recommendations and isinstance(recommendations, list): | |
| for i, prop in enumerate(recommendations[:5], 1): | |
| body_lines.extend([ | |
| f"{i}. {prop.get('property_name', prop.get('title', 'Recommended Property'))}", | |
| f" π Location: {prop.get('location', 'N/A')}", | |
| f" π° Price: ${prop.get('price', 0):,}", | |
| f" π― AI Match Score: {prop.get('ai_score', prop.get('match_score', 0)):.1f}%", | |
| f" π‘ Why Recommended: {prop.get('recommendation_reason', 'Matches your preferences')}", | |
| "" | |
| ]) | |
| # Add urgency-based call to action | |
| if urgency_level == 'High': | |
| body_lines.extend([ | |
| "π₯ URGENT RECOMMENDATION:", | |
| "Based on your high engagement, we recommend acting quickly on these properties!", | |
| "" | |
| ]) | |
| body_lines.extend([ | |
| "π‘ PERSONALIZED NEXT STEPS:", | |
| "β’ Review properties that match your personality type", | |
| "β’ Schedule viewings during your peak activity time", | |
| "β’ Contact our AI-powered assistant for more details", | |
| "", | |
| "Best regards,", | |
| "AI-Powered Property Recommendation System" | |
| ]) | |
| email_body = "\n".join(body_lines) | |
| # Send email | |
| result = self._send_email_via_api(recipient_email, subject, email_body) | |
| result['type'] = 'ai_recommendations' | |
| result['ai_insights'] = { | |
| 'personality_type': personality_type, | |
| 'decision_making_style': decision_style, | |
| 'urgency_level': urgency_level, | |
| 'peak_time': peak_time | |
| } | |
| result['recommendations_count'] = len(recommendations) if recommendations else 0 | |
| return result | |
| except Exception as e: | |
| logger.error(f"β Error sending AI recommendations email: {e}") | |
| return { | |
| 'type': 'ai_recommendations', | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def _send_peak_time_engagement_email(self, customer_id: int, recipient_email: str, analysis_data: Dict, ai_insights: Dict) -> Dict: | |
| """Send email during user's peak engagement time""" | |
| logger.info(f"β° Sending peak time engagement email for customer {customer_id}") | |
| try: | |
| # Analyze peak time from tracking data | |
| peak_time = ai_insights.get('peak_time', 'Evening') | |
| engagement_level = ai_insights.get('engagement_level', 'High') | |
| # Extract AI insights for display | |
| personality_type = ai_insights.get('ai_personality_type', 'Analytical') | |
| urgency_level = ai_insights.get('urgency_level', 'Medium') | |
| # Create time-sensitive email content | |
| subject = f"β° Perfect Time to Explore Properties - Based on Your Activity Pattern!" | |
| body_lines = [ | |
| f"Dear Valued Customer,", | |
| "", | |
| f"β° We've noticed you're most active during {peak_time} hours!", | |
| f"Your engagement level: {engagement_level}", | |
| "", | |
| "π OPTIMAL VIEWING TIME:", | |
| f"β’ Your Peak Activity: {peak_time}", | |
| f"β’ Best Time for Property Tours: {self._get_optimal_viewing_time(peak_time)}", | |
| f"β’ Recommended Action Time: {self._get_recommended_action_time(peak_time)}", | |
| "", | |
| "π― TIME-SENSITIVE OPPORTUNITIES:" | |
| ] | |
| # Get time-sensitive properties | |
| time_sensitive_props = self._get_time_sensitive_properties(analysis_data) | |
| for i, prop in enumerate(time_sensitive_props[:3], 1): | |
| body_lines.extend([ | |
| f"{i}. {prop.get('title', 'Time-Sensitive Property')}", | |
| f" π Location: {prop.get('location', 'N/A')}", | |
| f" π° Price: ${prop.get('price', 0):,}", | |
| f" β° Best Viewing Time: {prop.get('optimal_time', peak_time)}", | |
| f" π¨ Urgency: {prop.get('urgency', 'Medium')}", | |
| "" | |
| ]) | |
| body_lines.extend([ | |
| "π‘ PEAK TIME STRATEGY:", | |
| f"β’ Schedule viewings during {peak_time} for maximum engagement", | |
| "β’ Take advantage of your high focus period", | |
| "β’ Make decisions when you're most alert", | |
| "", | |
| "π IMMEDIATE ACTION:", | |
| "β’ Call now to schedule a viewing", | |
| "β’ Get priority booking during your peak time", | |
| "β’ Receive personalized assistance", | |
| "", | |
| "Best regards,", | |
| "Your Peak Time Property Team" | |
| ]) | |
| email_body = "\n".join(body_lines) | |
| # Send email | |
| result = self._send_email_via_api(recipient_email, subject, email_body) | |
| result['type'] = 'peak_time_engagement' | |
| result['peak_time'] = peak_time | |
| result['engagement_level'] = engagement_level | |
| result['ai_insights'] = { | |
| 'personality_type': personality_type, | |
| 'urgency_level': urgency_level, | |
| 'peak_time': peak_time | |
| } | |
| return result | |
| except Exception as e: | |
| logger.error(f"β Error sending peak time engagement email: {e}") | |
| return { | |
| 'type': 'peak_time_engagement', | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def _send_email_via_api(self, recipient_email: str, subject: str, body: str) -> Dict: | |
| """Send email using the external API""" | |
| try: | |
| email_data = { | |
| 'toEmail': recipient_email, | |
| 'subject': subject, | |
| 'body': body | |
| } | |
| external_api_url = f"{self.config['backend_url']}/api/Email/sendPlainText" | |
| logger.info(f"π Sending email via external API to {recipient_email}") | |
| response = requests.post( | |
| external_api_url, | |
| params=email_data, | |
| headers=self.config['ngrok_headers'], | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| logger.info(f"β Email sent successfully to {recipient_email}") | |
| return { | |
| 'success': True, | |
| 'message': 'Email sent successfully', | |
| 'recipient': recipient_email, | |
| 'subject': subject | |
| } | |
| else: | |
| logger.error(f"β External email API failed: {response.status_code} - {response.text}") | |
| return { | |
| 'success': False, | |
| 'error': f'Email service failed: {response.status_code}', | |
| 'details': response.text | |
| } | |
| except Exception as e: | |
| logger.error(f"β Email API error: {e}") | |
| return { | |
| 'success': False, | |
| 'error': f'Email service error: {str(e)}' | |
| } | |
| def _analyze_user_preferences(self, analysis_data: Dict) -> Dict: | |
| """Analyze user preferences from tracking data""" | |
| try: | |
| # Extract preferences from analysis data | |
| preferences = { | |
| 'property_type': 'Villa', # Default | |
| 'min_budget': 500000, | |
| 'max_budget': 2000000, | |
| 'locations': ['Dubai', 'Abu Dhabi'], | |
| 'bedrooms': '3+', | |
| 'price_range': '500K-2M' | |
| } | |
| # Analyze from tracking data if available | |
| if 'data' in analysis_data and 'analytics' in analysis_data['data']: | |
| analytics = analysis_data['data']['analytics'] | |
| # Extract property type preferences | |
| if 'property_types' in analytics: | |
| most_viewed = max(analytics['property_types'].items(), key=lambda x: x[1]) | |
| preferences['property_type'] = most_viewed[0] | |
| # Extract budget preferences | |
| if 'price_ranges' in analytics: | |
| most_viewed = max(analytics['price_ranges'].items(), key=lambda x: x[1]) | |
| preferences['price_range'] = most_viewed[0] | |
| # Extract location preferences | |
| if 'locations' in analytics: | |
| top_locations = sorted(analytics['locations'].items(), key=lambda x: x[1], reverse=True)[:3] | |
| preferences['locations'] = [loc[0] for loc in top_locations] | |
| return preferences | |
| except Exception as e: | |
| logger.error(f"β Error analyzing user preferences: {e}") | |
| return { | |
| 'property_type': 'Villa', | |
| 'min_budget': 500000, | |
| 'max_budget': 2000000, | |
| 'locations': ['Dubai'], | |
| 'bedrooms': '3+' | |
| } | |
| def _get_new_properties_for_user(self, user_preferences: Dict) -> List[Dict]: | |
| """Get newly added properties that match user preferences""" | |
| try: | |
| # This would typically fetch from a database of new properties | |
| # For now, we'll generate mock new properties based on preferences | |
| property_type = user_preferences.get('property_type', 'Villa') | |
| locations = user_preferences.get('locations', ['Dubai']) | |
| min_budget = user_preferences.get('min_budget', 500000) | |
| max_budget = user_preferences.get('max_budget', 2000000) | |
| new_properties = [] | |
| # Generate mock new properties | |
| for i, location in enumerate(locations[:2]): | |
| price = min_budget + (i * 300000) | |
| if price <= max_budget: | |
| new_properties.append({ | |
| 'title': f'New {property_type} in {location}', | |
| 'location': location, | |
| 'price': price, | |
| 'propertyTypeName': property_type, | |
| 'bedrooms': '3+', | |
| 'match_score': 95 + i, | |
| 'is_new': True, | |
| 'added_date': datetime.now().strftime('%Y-%m-%d') | |
| }) | |
| return new_properties | |
| except Exception as e: | |
| logger.error(f"β Error getting new properties: {e}") | |
| return [] | |
| def _get_optimal_viewing_time(self, peak_time: str) -> str: | |
| """Get optimal viewing time based on peak activity""" | |
| time_mapping = { | |
| 'Morning': '9:00 AM - 11:00 AM', | |
| 'Afternoon': '2:00 PM - 4:00 PM', | |
| 'Evening': '6:00 PM - 8:00 PM', | |
| 'Night': '7:00 PM - 9:00 PM' | |
| } | |
| return time_mapping.get(peak_time, '6:00 PM - 8:00 PM') | |
| def _get_recommended_action_time(self, peak_time: str) -> str: | |
| """Get recommended action time based on peak activity""" | |
| time_mapping = { | |
| 'Morning': '10:00 AM - 12:00 PM', | |
| 'Afternoon': '3:00 PM - 5:00 PM', | |
| 'Evening': '7:00 PM - 9:00 PM', | |
| 'Night': '8:00 PM - 10:00 PM' | |
| } | |
| return time_mapping.get(peak_time, '7:00 PM - 9:00 PM') | |
| def _get_time_sensitive_properties(self, analysis_data: Dict) -> List[Dict]: | |
| """Get time-sensitive properties based on analysis""" | |
| try: | |
| # Generate time-sensitive properties | |
| return [ | |
| { | |
| 'title': 'Limited Time Offer - Premium Villa', | |
| 'location': 'Dubai Marina', | |
| 'price': 1500000, | |
| 'optimal_time': 'Evening', | |
| 'urgency': 'High' | |
| }, | |
| { | |
| 'title': 'Flash Sale - Luxury Apartment', | |
| 'location': 'Downtown Dubai', | |
| 'price': 800000, | |
| 'optimal_time': 'Afternoon', | |
| 'urgency': 'Very High' | |
| }, | |
| { | |
| 'title': 'Exclusive Preview - New Development', | |
| 'location': 'Palm Jumeirah', | |
| 'price': 2500000, | |
| 'optimal_time': 'Morning', | |
| 'urgency': 'Medium' | |
| } | |
| ] | |
| except Exception as e: | |
| logger.error(f"β Error getting time-sensitive properties: {e}") | |
| return [] | |
| # Helper methods for multi-AI system | |
| def _get_analysis_data_parallel(self, customer_id: int) -> Dict: | |
| """Get analysis data for a customer using parallel processing""" | |
| try: | |
| # Try to get from cache first | |
| cache_key = f'lead_analysis_{customer_id}' | |
| cached_data = self.get_cached_data(cache_key) | |
| if cached_data: | |
| logger.info(f"π Returning cached analysis data for customer {customer_id}") | |
| return cached_data | |
| # Fetch from backend | |
| backend_response = None | |
| try: | |
| backend_response = self.make_api_request(f'/api/PropertyLeadQualification/customer/{customer_id}') | |
| except Exception as e: | |
| logger.warning(f"β οΈ Backend API failed: {e}") | |
| # If backend fails, use mock data | |
| if not backend_response and MOCK_DATA_AVAILABLE: | |
| logger.info(f"π Using mock data for customer {customer_id}") | |
| backend_response = mock_data_service.get_customer_data(customer_id) | |
| if not backend_response: | |
| return None | |
| # Process the data | |
| processed_data = self.process_lead_data(backend_response, customer_id) | |
| analytics = self.generate_analytics(backend_response, customer_id) | |
| analysis_data = { | |
| 'success': True, | |
| 'customer_id': customer_id, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'data': { | |
| 'lead_qualification': analytics.get('lead_qualification', {}), | |
| 'analytics': analytics, | |
| 'properties': processed_data.get('properties', []), | |
| 'summary': processed_data.get('summary', {}) | |
| } | |
| } | |
| # Cache the result | |
| self.cache_data(cache_key, analysis_data) | |
| return analysis_data | |
| except Exception as e: | |
| logger.error(f"β Error getting analysis data for customer {customer_id}: {e}") | |
| return None | |
| def _get_custom_subject(self, recommendation_type: str, customer_id: int, ai_insights: Dict) -> str: | |
| """Generate custom email subject based on recommendation type""" | |
| personality_type = ai_insights.get('personality_type', 'Analytical') | |
| urgency_level = ai_insights.get('urgency_level', 'Medium') | |
| subjects = { | |
| 'property_based': f"π‘ Perfect Properties Matched to Your Preferences - Customer {customer_id}", | |
| 'price_based': f"π° Properties in Your Budget Range - Exclusive Deals for Customer {customer_id}", | |
| 'location_based': f"π Prime Locations Just for You - Customer {customer_id}", | |
| 'similarity_based': f"π Properties Similar to Your Favorites - Customer {customer_id}", | |
| 'behavioral_based': f"π§ AI-Analyzed Recommendations Based on Your Behavior - Customer {customer_id}", | |
| 'premium_properties': f"β Premium Properties Collection - Customer {customer_id}", | |
| 'budget_friendly': f"π΅ Best Value Properties for Smart Investors - Customer {customer_id}", | |
| 'trending_properties': f"π Trending Properties in Hot Markets - Customer {customer_id}", | |
| 'family_oriented': f"π¨βπ©βπ§βπ¦ Family-Perfect Properties - Customer {customer_id}", | |
| 'investment_opportunities': f"πΌ Investment Opportunities with High ROI - Customer {customer_id}" | |
| } | |
| base_subject = subjects.get(recommendation_type, f"π€ AI Recommendations for Customer {customer_id}") | |
| # Add urgency indicators for high urgency | |
| if urgency_level == 'High': | |
| base_subject = f"β‘ URGENT: {base_subject}" | |
| elif urgency_level == 'Very High': | |
| base_subject = f"π₯ HOT DEALS: {base_subject}" | |
| return base_subject | |
| # Include all the original analysis methods from api_service.py | |
| def clean_for_json(self, obj): | |
| """Clean object for JSON serialization""" | |
| def clean_value(value): | |
| import numpy as np | |
| # Handle numpy types | |
| if isinstance(value, (np.float32, np.float64, np.int32, np.int64)): | |
| value = float(value) | |
| elif isinstance(value, np.ndarray): | |
| return value.tolist() | |
| # Handle regular floats | |
| if isinstance(value, float): | |
| if math.isinf(value) and value > 0: | |
| return 999999999 | |
| elif math.isinf(value) and value < 0: | |
| return -999999999 | |
| elif math.isnan(value): | |
| return 0 | |
| return value | |
| def clean_dict(d): | |
| if isinstance(d, dict): | |
| return {k: clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for k, v in d.items()} | |
| elif isinstance(d, list): | |
| return [clean_value(v) if not isinstance(v, (dict, list)) else clean_dict(v) for v in d] | |
| else: | |
| return clean_value(d) | |
| return clean_dict(obj) | |
| def make_api_request(self, endpoint: str, method: str = 'GET', data: Dict = None, use_fallback: bool = True) -> Any: | |
| """Make API request to backend with robust fallback mechanism""" | |
| max_retries = self.config.get('max_retries', 3) | |
| retry_delay = self.config.get('retry_delay', 1) | |
| for attempt in range(max_retries + 1): | |
| try: | |
| url = f"{self.config['backend_url']}{endpoint}" | |
| logger.info(f"π Making {method} request to: {url} (attempt {attempt + 1}/{max_retries + 1})") | |
| headers = self.config['ngrok_headers'] | |
| # Add timeout for better reliability | |
| timeout = 30 if attempt < max_retries else 60 | |
| if method.upper() == 'GET': | |
| response = requests.get(url, headers=headers, timeout=timeout) | |
| elif method.upper() == 'POST': | |
| response = requests.post(url, headers=headers, json=data, timeout=timeout) | |
| elif method.upper() == 'PUT': | |
| response = requests.put(url, headers=headers, json=data, timeout=timeout) | |
| else: | |
| raise ValueError(f"Unsupported HTTP method: {method}") | |
| if response.status_code == 200: | |
| logger.info(f"β API request successful on attempt {attempt + 1}") | |
| return response.json() | |
| elif response.status_code in [429, 502, 503, 504] and attempt < max_retries: | |
| # Retry on rate limiting or server errors | |
| logger.warning(f"β οΈ API request failed with {response.status_code}, retrying in {retry_delay}s...") | |
| time.sleep(retry_delay * (attempt + 1)) # Exponential backoff | |
| continue | |
| else: | |
| logger.error(f"β API request failed: {response.status_code} - {response.text}") | |
| if not use_fallback: | |
| return None | |
| break | |
| except requests.exceptions.Timeout as e: | |
| logger.warning(f"β° Request timeout on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| time.sleep(retry_delay * (attempt + 1)) | |
| continue | |
| else: | |
| logger.error(f"β All retry attempts failed due to timeout") | |
| break | |
| except requests.exceptions.ConnectionError as e: | |
| logger.warning(f"π Connection error on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| time.sleep(retry_delay * (attempt + 1)) | |
| continue | |
| else: | |
| logger.error(f"β All retry attempts failed due to connection error") | |
| break | |
| except Exception as e: | |
| logger.error(f"β API request error on attempt {attempt + 1}: {e}") | |
| if attempt < max_retries: | |
| time.sleep(retry_delay * (attempt + 1)) | |
| continue | |
| else: | |
| break | |
| # If all attempts failed and fallback is enabled, return fallback data | |
| if use_fallback: | |
| logger.info(f"π All API attempts failed, using fallback data for endpoint: {endpoint}") | |
| return self.get_fallback_data(endpoint, method, data) | |
| return None | |
| def get_fallback_data(self, endpoint: str, method: str, data: Dict = None) -> Any: | |
| """Get fallback data when API requests fail""" | |
| try: | |
| # Extract customer ID from endpoint if it's a customer-specific endpoint | |
| customer_id = None | |
| if '/customer/' in endpoint: | |
| try: | |
| customer_id = int(endpoint.split('/customer/')[-1]) | |
| except (ValueError, IndexError): | |
| pass | |
| # Use mock data service as fallback | |
| if MOCK_DATA_AVAILABLE and customer_id: | |
| logger.info(f"π Using mock data fallback for customer {customer_id}") | |
| return mock_data_service.get_customer_data(customer_id) | |
| # Return empty structure for other endpoints | |
| if 'PropertyLeadQualification' in endpoint: | |
| return [] | |
| elif 'DebtFinancing' in endpoint: | |
| return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True} | |
| else: | |
| return {'success': False, 'message': 'Service temporarily unavailable', 'fallback': True} | |
| except Exception as e: | |
| logger.error(f"β Fallback data generation failed: {e}") | |
| return None | |
| # All the original analysis methods from api_service.py are included below | |
| def process_lead_data(self, data: List[Dict], customer_id: int) -> Dict: | |
| """Process and enhance lead qualification data""" | |
| # Implementation from original api_service.py | |
| if not data: | |
| return { | |
| 'customer_id': customer_id, | |
| 'properties': [], | |
| 'summary': { | |
| 'total_properties': 0, | |
| 'total_views': 0, | |
| 'total_duration': 0, | |
| 'average_price': 0, | |
| 'property_types': {}, | |
| 'engagement_score': 0 | |
| } | |
| } | |
| # Calculate summary statistics | |
| total_views = sum(prop.get('viewCount', 0) for prop in data) | |
| total_duration = sum(prop.get('totalDuration', 0) for prop in data) | |
| total_price = sum(prop.get('price', 0) for prop in data) | |
| # Property type distribution | |
| property_types = {} | |
| for prop in data: | |
| prop_type = prop.get('propertyTypeName', 'Unknown') | |
| property_types[prop_type] = property_types.get(prop_type, 0) + 1 | |
| # Calculate engagement score | |
| engagement_score = self.calculate_engagement_score(data) | |
| # Enhance each property with additional metrics | |
| enhanced_properties = [] | |
| for prop in data: | |
| enhanced_prop = prop.copy() | |
| enhanced_prop['engagement_score'] = self.calculate_property_engagement(prop) | |
| enhanced_prop['price_per_sqm'] = self.estimate_price_per_sqm(prop.get('price', 0)) | |
| enhanced_prop['view_frequency'] = self.calculate_view_frequency(prop) | |
| enhanced_prop['last_viewed_days_ago'] = self.days_since_last_view(prop.get('lastViewedAt')) | |
| enhanced_properties.append(enhanced_prop) | |
| return { | |
| 'customer_id': customer_id, | |
| 'properties': enhanced_properties, | |
| 'summary': { | |
| 'total_properties': len(data), | |
| 'total_views': total_views, | |
| 'total_duration': total_duration, | |
| 'average_price': total_price / len(data) if data else 0, | |
| 'property_types': property_types, | |
| 'engagement_score': engagement_score, | |
| 'last_activity': max((prop.get('lastViewedAt') for prop in data), default=None) | |
| } | |
| } | |
| def generate_analytics(self, lead_data: List[Dict], customer_id: int) -> Dict: | |
| """Generate comprehensive analytics from lead data""" | |
| if not lead_data: | |
| return {'customer_id': customer_id, 'analytics': {}} | |
| # Process the data | |
| processed_data = self.process_lead_data(lead_data, customer_id) | |
| # Generate detailed lead qualification | |
| lead_qualification = self.generate_detailed_lead_qualification(processed_data) | |
| # Generate insights | |
| insights = { | |
| 'customer_id': customer_id, | |
| 'lead_qualification': lead_qualification, | |
| 'engagement_level': self.categorize_engagement(processed_data['summary']['engagement_score']), | |
| 'preferred_property_types': self.get_preferred_property_types(processed_data['summary']['property_types']), | |
| 'price_preferences': self.analyze_price_preferences(lead_data), | |
| 'viewing_patterns': self.analyze_viewing_patterns(lead_data), | |
| 'property_analysis': self.analyze_properties_detailed(lead_data), | |
| 'recommendations': self.generate_recommendations(processed_data), | |
| 'risk_assessment': self.assess_risk(processed_data), | |
| 'opportunity_score': self.calculate_opportunity_score(processed_data), | |
| 'lead_timeline': self.generate_lead_timeline(lead_data), | |
| 'conversion_probability': self.calculate_conversion_probability(processed_data) | |
| } | |
| return insights | |
| # Add placeholder methods for the analysis functions | |
| def generate_detailed_lead_qualification(self, processed_data: Dict) -> Dict: | |
| """Generate detailed lead qualification with warm/cold/hot status""" | |
| summary = processed_data['summary'] | |
| properties = processed_data['properties'] | |
| # Calculate lead score based on multiple factors | |
| lead_score = 0 | |
| factors = {} | |
| # 1. Engagement Score (0-30 points) | |
| engagement_points = min(summary['engagement_score'] * 0.3, 30) | |
| lead_score += engagement_points | |
| factors['engagement'] = { | |
| 'score': summary['engagement_score'], | |
| 'points': engagement_points, | |
| 'max_points': 30, | |
| 'description': f"Based on {summary['total_views']} views and {formatDuration(summary['total_duration'])} total viewing time" | |
| } | |
| # 2. Property Diversity (0-15 points) | |
| property_types = len(summary['property_types']) | |
| diversity_points = min(property_types * 5, 15) | |
| lead_score += diversity_points | |
| factors['property_diversity'] = { | |
| 'score': property_types, | |
| 'points': diversity_points, | |
| 'max_points': 15, | |
| 'description': f"Viewed {property_types} different property types" | |
| } | |
| # 3. Recent Activity (0-20 points) | |
| recent_activity_points = 0 | |
| if summary.get('last_activity'): | |
| try: | |
| last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) | |
| days_since = (datetime.now() - last_activity).days | |
| if days_since <= 1: | |
| recent_activity_points = 20 | |
| elif days_since <= 3: | |
| recent_activity_points = 15 | |
| elif days_since <= 7: | |
| recent_activity_points = 10 | |
| elif days_since <= 30: | |
| recent_activity_points = 5 | |
| except: | |
| pass | |
| lead_score += recent_activity_points | |
| factors['recent_activity'] = { | |
| 'score': days_since if 'days_since' in locals() else 999, | |
| 'points': recent_activity_points, | |
| 'max_points': 20, | |
| 'description': f"Last activity was {days_since if 'days_since' in locals() else 'unknown'} days ago" | |
| } | |
| # 4. Price Range Consistency (0-15 points) | |
| prices = [p['price'] for p in properties if p['price'] > 0] | |
| if prices: | |
| price_range = max(prices) - min(prices) | |
| price_consistency = 1 - (price_range / max(prices)) | |
| consistency_points = price_consistency * 15 | |
| lead_score += consistency_points | |
| factors['price_consistency'] = { | |
| 'score': round(price_consistency * 100, 1), | |
| 'points': consistency_points, | |
| 'max_points': 15, | |
| 'description': f"Price range consistency: {round(price_consistency * 100, 1)}%" | |
| } | |
| # 5. Viewing Depth (0-20 points) | |
| deep_views = sum(1 for p in properties if p['viewCount'] > 1) | |
| depth_points = min(deep_views * 5, 20) | |
| lead_score += depth_points | |
| factors['viewing_depth'] = { | |
| 'score': deep_views, | |
| 'points': depth_points, | |
| 'max_points': 20, | |
| 'description': f"{deep_views} properties viewed multiple times" | |
| } | |
| # Determine lead status | |
| if lead_score >= 70: | |
| lead_status = "HOT" | |
| status_color = "#e74c3c" | |
| status_description = "High probability of conversion. Immediate follow-up recommended." | |
| elif lead_score >= 50: | |
| lead_status = "WARM" | |
| status_color = "#f39c12" | |
| status_description = "Good potential. Regular follow-up needed." | |
| elif lead_score >= 30: | |
| lead_status = "LUKEWARM" | |
| status_color = "#95a5a6" | |
| status_description = "Some interest shown. Nurturing required." | |
| else: | |
| lead_status = "COLD" | |
| status_color = "#34495e" | |
| status_description = "Low engagement. Re-engagement campaign needed." | |
| return { | |
| 'lead_score': round(lead_score, 1), | |
| 'lead_status': lead_status, | |
| 'status_color': status_color, | |
| 'status_description': status_description, | |
| 'max_possible_score': 100, | |
| 'factors': factors, | |
| 'summary': { | |
| 'total_properties_viewed': summary['total_properties'], | |
| 'total_views': summary['total_views'], | |
| 'total_duration': summary['total_duration'], | |
| 'average_price': summary['average_price'], | |
| 'engagement_score': summary['engagement_score'] | |
| } | |
| } | |
| def calculate_engagement_score(self, properties: List[Dict]) -> float: | |
| """Calculate overall engagement score for customer""" | |
| if not properties: | |
| return 0.0 | |
| total_score = 0 | |
| for prop in properties: | |
| views = prop.get('viewCount', 0) | |
| duration = prop.get('totalDuration', 0) | |
| # Weight views and duration | |
| view_score = min(views * 10, 50) # Max 50 points for views | |
| duration_score = min(duration / 1000, 50) # Max 50 points for duration | |
| total_score += view_score + duration_score | |
| return total_score / len(properties) | |
| def calculate_property_engagement(self, property_data: Dict) -> float: | |
| """Calculate engagement score for a single property""" | |
| views = property_data.get('viewCount', 0) | |
| duration = property_data.get('totalDuration', 0) | |
| view_score = min(views * 10, 50) | |
| duration_score = min(duration / 1000, 50) | |
| return view_score + duration_score | |
| def estimate_price_per_sqm(self, price: float) -> float: | |
| """Estimate price per square meter (rough estimation)""" | |
| return price / 150 if price > 0 else 0 | |
| def calculate_view_frequency(self, property_data: Dict) -> str: | |
| """Calculate viewing frequency""" | |
| views = property_data.get('viewCount', 0) | |
| if views == 0: | |
| return "No views" | |
| elif views == 1: | |
| return "Single view" | |
| elif views <= 3: | |
| return "Low frequency" | |
| elif views <= 7: | |
| return "Medium frequency" | |
| else: | |
| return "High frequency" | |
| def days_since_last_view(self, last_viewed: str) -> int: | |
| """Calculate days since last view""" | |
| if not last_viewed: | |
| return 999 | |
| try: | |
| last_view_date = datetime.fromisoformat(last_viewed.replace('Z', '+00:00')) | |
| days_diff = (datetime.now() - last_view_date).days | |
| return max(0, days_diff) | |
| except: | |
| return 999 | |
| def categorize_engagement(self, score: float) -> str: | |
| """Categorize engagement level""" | |
| if score >= 80: | |
| return "Very High" | |
| elif score >= 60: | |
| return "High" | |
| elif score >= 40: | |
| return "Medium" | |
| elif score >= 20: | |
| return "Low" | |
| else: | |
| return "Very Low" | |
| def get_preferred_property_types(self, property_types: Dict) -> List[str]: | |
| """Get preferred property types""" | |
| if not property_types: | |
| return [] | |
| sorted_types = sorted(property_types.items(), key=lambda x: x[1], reverse=True) | |
| return [prop_type for prop_type, count in sorted_types[:3]] | |
| def analyze_price_preferences(self, properties: List[Dict]) -> Dict: | |
| """Analyze price preferences""" | |
| if not properties: | |
| return {} | |
| prices = [prop.get('price', 0) for prop in properties] | |
| prices = [p for p in prices if p > 0] | |
| if not prices: | |
| return {} | |
| return { | |
| 'min_price': min(prices), | |
| 'max_price': max(prices), | |
| 'avg_price': sum(prices) / len(prices), | |
| 'price_range': max(prices) - min(prices) | |
| } | |
| def analyze_viewing_patterns(self, properties: List[Dict]) -> Dict: | |
| """Analyze viewing patterns""" | |
| if not properties: | |
| return {} | |
| # Group by viewing time | |
| morning_views = 0 | |
| afternoon_views = 0 | |
| evening_views = 0 | |
| for prop in properties: | |
| if prop.get('lastViewedAt'): | |
| try: | |
| view_time = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) | |
| hour = view_time.hour | |
| if 6 <= hour < 12: | |
| morning_views += prop.get('viewCount', 0) | |
| elif 12 <= hour < 18: | |
| afternoon_views += prop.get('viewCount', 0) | |
| else: | |
| evening_views += prop.get('viewCount', 0) | |
| except: | |
| pass | |
| return { | |
| 'morning_views': morning_views, | |
| 'afternoon_views': afternoon_views, | |
| 'evening_views': evening_views, | |
| 'peak_viewing_time': 'morning' if morning_views > max(afternoon_views, evening_views) else | |
| 'afternoon' if afternoon_views > evening_views else 'evening' | |
| } | |
| def analyze_properties_detailed(self, properties: List[Dict]) -> Dict: | |
| """Detailed analysis of each property""" | |
| return {'analyzed_properties': len(properties)} | |
| def generate_recommendations(self, processed_data: Dict) -> List[str]: | |
| """Generate recommendations based on data""" | |
| recommendations = [] | |
| summary = processed_data['summary'] | |
| if summary['engagement_score'] < 30: | |
| recommendations.append("Low engagement detected. Consider follow-up communication.") | |
| if summary['total_views'] < 5: | |
| recommendations.append("Limited property exploration. Suggest more property options.") | |
| if summary['average_price'] > 20000000: # 20M | |
| recommendations.append("High-value customer. Consider premium properties.") | |
| if len(summary['property_types']) > 3: | |
| recommendations.append("Diverse property interests. Offer variety in recommendations.") | |
| return recommendations | |
| def assess_risk(self, processed_data: Dict) -> Dict: | |
| """Assess risk factors""" | |
| summary = processed_data['summary'] | |
| risk_factors = [] | |
| risk_score = 0 | |
| if summary['engagement_score'] < 20: | |
| risk_factors.append("Very low engagement") | |
| risk_score += 30 | |
| if summary['total_views'] < 3: | |
| risk_factors.append("Limited property exploration") | |
| risk_score += 20 | |
| if summary['average_price'] > 50000000: # 50M | |
| risk_factors.append("Very high price range") | |
| risk_score += 15 | |
| return { | |
| 'risk_score': min(risk_score, 100), | |
| 'risk_level': 'High' if risk_score > 50 else 'Medium' if risk_score > 25 else 'Low', | |
| 'risk_factors': risk_factors | |
| } | |
| def calculate_opportunity_score(self, processed_data: Dict) -> float: | |
| """Calculate opportunity score""" | |
| summary = processed_data['summary'] | |
| # Base score from engagement | |
| score = summary['engagement_score'] | |
| # Bonus for high-value properties | |
| if summary['average_price'] > 20000000: | |
| score += 20 | |
| # Bonus for multiple property types | |
| if len(summary['property_types']) > 2: | |
| score += 10 | |
| # Bonus for recent activity | |
| if summary.get('last_activity'): | |
| try: | |
| last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) | |
| days_since = (datetime.now() - last_activity).days | |
| if days_since <= 7: | |
| score += 15 | |
| elif days_since <= 30: | |
| score += 10 | |
| except: | |
| pass | |
| return min(score, 100) | |
| def generate_lead_timeline(self, properties: List[Dict]) -> List[Dict]: | |
| """Generate timeline of lead activity""" | |
| timeline = [] | |
| for prop in properties: | |
| if prop.get('lastViewedAt'): | |
| try: | |
| view_date = datetime.fromisoformat(prop['lastViewedAt'].replace('Z', '+00:00')) | |
| timeline.append({ | |
| 'date': view_date.isoformat(), | |
| 'property_name': prop['propertyName'], | |
| 'property_type': prop['propertyTypeName'], | |
| 'price': prop['price'], | |
| 'views': prop['viewCount'], | |
| 'duration': prop['totalDuration'], | |
| 'engagement_score': prop.get('engagement_score', 0) | |
| }) | |
| except: | |
| pass | |
| # Sort by date | |
| timeline.sort(key=lambda x: x['date'], reverse=True) | |
| return timeline | |
| def calculate_conversion_probability(self, processed_data: Dict) -> Dict: | |
| """Calculate probability of conversion based on various factors""" | |
| summary = processed_data['summary'] | |
| properties = processed_data['properties'] | |
| # Base probability | |
| base_probability = min(summary['engagement_score'], 100) | |
| # Adjustments based on factors | |
| adjustments = {} | |
| # Price range consistency | |
| prices = [p['price'] for p in properties if p['price'] > 0] | |
| if prices: | |
| price_range = max(prices) - min(prices) | |
| price_consistency = 1 - (price_range / max(prices)) | |
| adjustments['price_consistency'] = price_consistency * 10 | |
| # Recent activity | |
| if summary.get('last_activity'): | |
| try: | |
| last_activity = datetime.fromisoformat(summary['last_activity'].replace('Z', '+00:00')) | |
| days_since = (datetime.now() - last_activity).days | |
| if days_since <= 1: | |
| adjustments['recent_activity'] = 15 | |
| elif days_since <= 3: | |
| adjustments['recent_activity'] = 10 | |
| elif days_since <= 7: | |
| adjustments['recent_activity'] = 5 | |
| except: | |
| pass | |
| # Property diversity | |
| property_types = len(summary['property_types']) | |
| adjustments['property_diversity'] = min(property_types * 3, 15) | |
| # Deep engagement | |
| deep_views = sum(1 for p in properties if p['viewCount'] > 1) | |
| adjustments['deep_engagement'] = min(deep_views * 5, 20) | |
| # Calculate final probability | |
| total_adjustment = sum(adjustments.values()) | |
| final_probability = min(base_probability + total_adjustment, 100) | |
| return { | |
| 'base_probability': base_probability, | |
| 'adjustments': adjustments, | |
| 'total_adjustment': total_adjustment, | |
| 'final_probability': final_probability, | |
| 'confidence_level': 'High' if len(properties) >= 5 else 'Medium' if len(properties) >= 3 else 'Low' | |
| } | |
| def get_cached_data(self, key: str) -> Optional[Any]: | |
| """Get data from cache if not expired""" | |
| if key in self.cache and key in self.cache_timestamps: | |
| if time.time() - self.cache_timestamps[key] < self.config['cache_duration']: | |
| return self.cache[key] | |
| else: | |
| # Remove expired cache | |
| del self.cache[key] | |
| del self.cache_timestamps[key] | |
| return None | |
| def cache_data(self, key: str, data: Any): | |
| """Cache data with timestamp""" | |
| self.cache[key] = data | |
| self.cache_timestamps[key] = time.time() | |
| def start_cache_cleanup(self): | |
| """Start background thread to clean up expired cache""" | |
| def cleanup_cache(): | |
| while True: | |
| try: | |
| current_time = time.time() | |
| expired_keys = [ | |
| key for key, timestamp in self.cache_timestamps.items() | |
| if current_time - timestamp > self.config['cache_duration'] | |
| ] | |
| for key in expired_keys: | |
| self.cache.pop(key, None) | |
| self.cache_timestamps.pop(key, None) | |
| time.sleep(60) # Clean up every minute | |
| except Exception as e: | |
| logger.error(f"Cache cleanup error: {e}") | |
| time.sleep(60) | |
| cleanup_thread = threading.Thread(target=cleanup_cache, daemon=True) | |
| cleanup_thread.start() | |
| def run(self, host: str = '0.0.0.0', port: int = 5000, debug: bool = False): | |
| """Run the Flask application""" | |
| logger.info(f"π Starting Enhanced Lead Qualification API Service with AI on {host}:{port}") | |
| logger.info(f"π Backend URL: {self.config['backend_url']}") | |
| logger.info(f"π€ AI Features: Enabled") | |
| logger.info(f"π API Endpoints:") | |
| logger.info(f" - GET /api/health") | |
| logger.info(f" - GET /api/lead-analysis/<customer_id>") | |
| logger.info(f" - GET /api/ai-analysis/<customer_id>") | |
| logger.info(f" - POST /api/ai-recommendations/<customer_id>") | |
| logger.info(f" - POST /api/multi-ai-recommendations/<customer_id>") | |
| logger.info(f" - GET /api/chromadb-recommendations/<customer_id>") | |
| logger.info(f" - GET /api/ai-status/<customer_id>") | |
| logger.info(f" - POST /api/batch-ai-analysis") | |
| logger.info(f" - GET /api/fetch-all-properties") | |
| logger.info(f" - POST /api/clear-cache") | |
| logger.info(f" - POST /api/test-email") | |
| logger.info(f" - POST /api/automated-email") | |
| logger.info(f" - GET /api/email-status") | |
| logger.info(f" π NEW EMAIL TESTING ENDPOINTS:") | |
| logger.info(f" - GET /api/test-sendgrid-connection") | |
| logger.info(f" - GET /api/email-analysis-basis/<customer_id>") | |
| logger.info(f" - GET /api/email-content-preview/<customer_id>") | |
| logger.info(f" - POST /api/test-all-10-emails/<customer_id>") | |
| logger.info(f" - GET /api/chromadb-status") | |
| self.app.run(host=host, port=port, debug=debug) | |
| if __name__ == '__main__': | |
| api_service = EnhancedLeadQualificationAPI() | |
| api_service.run(debug=True) |