#!/usr/bin/env python3 """ Unified AI Lead Analysis System Single port (7860) solution with API, Frontend, and Email features Enhanced with 24-hour background property fetching and peak time email automation """ import os import sys import logging import threading import schedule import time from datetime import datetime, timedelta # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class UnifiedAISystem: def __init__(self): """Initialize the unified AI system with background services""" self.background_thread = None self.email_thread = None self.is_running = False # Initialize AI engine try: from ai_recommendation_engine import AIRecommendationEngine self.ai_engine = AIRecommendationEngine() logger.info("โœ… AI Recommendation Engine initialized") except Exception as e: logger.error(f"โŒ Failed to initialize AI engine: {e}") self.ai_engine = None def start_background_services(self): """Start background services for property fetching and email automation""" logger.info("๐Ÿš€ Starting background services...") # Start background property fetching thread self.background_thread = threading.Thread(target=self._run_background_services, daemon=True) self.background_thread.start() # Start email automation thread self.email_thread = threading.Thread(target=self._run_email_automation, daemon=True) self.email_thread.start() logger.info("โœ… Background services started successfully") def _run_background_services(self): """Run background services including property fetching""" logger.info("๐Ÿ”„ Background services thread started") # Schedule property fetching every 24 hours schedule.every(24).hours.do(self._fetch_properties_background) # Run initial property fetch logger.info("๐Ÿš€ Running initial property fetch...") self._fetch_properties_background() # Keep the scheduler running while self.is_running: schedule.run_pending() time.sleep(60) # Check every minute def _fetch_properties_background(self): """Fetch properties in background without blocking the main application""" try: logger.info("๐Ÿ”„ Starting background property fetch...") if self.ai_engine: # Run property fetching in a separate thread to avoid blocking fetch_thread = threading.Thread(target=self._execute_property_fetch, daemon=True) fetch_thread.start() logger.info("โœ… Background property fetch initiated") else: logger.warning("โš ๏ธ AI engine not available for property fetching") except Exception as e: logger.error(f"โŒ Error in background property fetch: {e}") def _execute_property_fetch(self): """Execute the actual property fetching""" try: logger.info("๐Ÿ“ฅ Executing parallel property fetch...") success = self.ai_engine.fetch_all_properties_parallel( max_workers=10, page_size=100, max_pages=10 ) if success: logger.info("โœ… Background property fetch completed successfully") # Update cache timestamp self._update_cache_timestamp() else: logger.warning("โš ๏ธ Background property fetch completed with issues") except Exception as e: logger.error(f"โŒ Error executing property fetch: {e}") def _update_cache_timestamp(self): """Update the cache timestamp file""" try: import os cache_dir = 'cache' os.makedirs(cache_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') with open(f'{cache_dir}/property_cache_timestamp.txt', 'w') as f: f.write(timestamp) logger.info(f"๐Ÿ“… Cache timestamp updated: {timestamp}") except Exception as e: logger.error(f"โŒ Error updating cache timestamp: {e}") def _run_email_automation(self): """Run email automation services""" logger.info("๐Ÿ“ง Email automation thread started") # Schedule peak time emails at 2 PM, 6 PM, and 10 PM daily based on user analysis schedule.every().day.at("14:00").do(self._send_peak_time_emails_2pm) schedule.every().day.at("18:00").do(self._send_peak_time_emails_6pm) schedule.every().day.at("22:00").do(self._send_peak_time_emails_10pm) # Also schedule a test email every hour for testing purposes schedule.every().hour.do(self._send_test_emails) logger.info("๐Ÿ“… Scheduled peak time emails: 2 PM, 6 PM, and 10 PM daily") logger.info("๐Ÿ“ง Scheduled test emails: Every hour for testing") # Keep the scheduler running while self.is_running: schedule.run_pending() time.sleep(60) # Check every minute def _send_peak_time_emails_2pm(self): """Send peak time emails at 2 PM based on user analysis""" try: logger.info("๐Ÿ“ง Starting 2 PM peak time email automation based on user analysis...") # Import email service from email_automation_service import EmailAutomationService email_service = EmailAutomationService() # Send Multi-AI emails for all customers customers = self._get_all_customers() for customer_id in customers: try: analysis_data = self._get_customer_analysis(customer_id) if analysis_data: email_service.send_multi_ai_emails(customer_id, analysis_data) logger.info(f"โœ… Multi-AI emails sent for customer {customer_id}") except Exception as e: logger.error(f"โŒ Error sending Multi-AI emails for customer {customer_id}: {e}") logger.info("โœ… 2 PM peak time email automation completed") except Exception as e: logger.error(f"โŒ Error in 2 PM peak time email automation: {e}") def _send_peak_time_emails_6pm(self): """Send peak time emails at 6 PM based on user analysis""" try: logger.info("๐Ÿ“ง Starting 6 PM peak time email automation based on user analysis...") # Import email service from email_automation_service import EmailAutomationService email_service = EmailAutomationService() # Send peak time emails using the enhanced method email_service.send_peak_time_emails() logger.info("โœ… 6 PM peak time email automation completed") except Exception as e: logger.error(f"โŒ Error in 6 PM peak time email automation: {e}") def _send_peak_time_emails_10pm(self): """Send peak time emails at 10 PM based on user analysis""" try: logger.info("๐Ÿ“ง Starting 10 PM peak time email automation based on user analysis...") # Import email service from email_automation_service import EmailAutomationService email_service = EmailAutomationService() # Send peak time emails using the enhanced method email_service.send_peak_time_emails() logger.info("โœ… 10 PM peak time email automation completed") except Exception as e: logger.error(f"โŒ Error in 10 PM peak time email automation: {e}") def _send_test_emails(self): """Send test emails for debugging and testing""" try: logger.info("๐Ÿ“ง Starting test email sending...") # Import email service from email_automation_service import EmailAutomationService email_service = EmailAutomationService() # Test with customer 144 test_customer_id = 144 test_recipient = "sameermujahid7777@gmail.com" # Get customer analysis analysis_data = self._get_customer_analysis(test_customer_id) if analysis_data: # Generate and send test email email_result = email_service.generate_automated_email( test_customer_id, email_type="test_email" ) if email_result.get('success'): logger.info(f"โœ… Test email sent successfully to {test_recipient}") else: logger.warning(f"โš ๏ธ Test email failed: {email_result.get('error')}") else: logger.warning("โš ๏ธ No analysis data available for test email") except Exception as e: logger.error(f"โŒ Error in test email sending: {e}") def _get_all_customers(self): """Get list of all customers from the system""" try: # This would typically come from your database # For now, return a sample list of customer IDs return [105, 106, 107, 108, 109] # Sample customer IDs except Exception as e: logger.error(f"โŒ Error getting customers: {e}") return [] def _get_customer_analysis(self, customer_id): """Get customer analysis data""" try: # Import API service to get analysis from api_service_enhanced import EnhancedLeadQualificationAPI api_service = EnhancedLeadQualificationAPI() # Get analysis data analysis_data = api_service._get_analysis_data_parallel(customer_id) return analysis_data except Exception as e: logger.error(f"โŒ Error getting analysis for customer {customer_id}: {e}") return None def stop_background_services(self): """Stop background services""" logger.info("๐Ÿ›‘ Stopping background services...") self.is_running = False if self.background_thread: self.background_thread.join(timeout=5) if self.email_thread: self.email_thread.join(timeout=5) logger.info("โœ… Background services stopped") def main(): """Start the unified AI system""" logger.info("=" * 80) logger.info("๐Ÿค– Enhanced Unified AI Lead Analysis System Starting...") logger.info("=" * 80) logger.info(f"๐Ÿ“… Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("") logger.info("๐Ÿš€ Enhanced System Features:") logger.info(" โœ… Single Port Solution (Port 7860)") logger.info(" โœ… AI-Enhanced Lead Analysis") logger.info(" โœ… Interactive Frontend Dashboard") logger.info(" โœ… Email Automation & Testing") logger.info(" โœ… Mock Data Support (No External Backend Required)") logger.info(" โœ… Behavioral Analytics") logger.info(" โœ… AI-Powered Recommendations") logger.info(" โœ… 24-Hour Background Property Fetching") logger.info(" โœ… Peak Time Email Automation (2 PM & 6 PM Daily)") logger.info(" โœ… Small-Batch Parallel Processing") logger.info(" โœ… ChromaDB Integration for Recommendations") logger.info(" โœ… Email Testing & Automation Dashboard") logger.info("") logger.info("๐ŸŒ Access Points:") logger.info(" ๐Ÿ“Š Main Dashboard: http://localhost:7860") logger.info(" ๐Ÿ” Customer Analysis: http://localhost:7860/customer/105") logger.info(" ๐Ÿ“‹ API Health: http://localhost:7860/api/health") logger.info("") logger.info("๐Ÿ”„ Background Services:") logger.info(" ๐Ÿ“ฅ Property Fetching: Every 24 hours (small-batch parallel)") logger.info(" ๐Ÿ“ง Peak Time Emails: Daily at 2:00 PM & 6:00 PM") logger.info(" ๐Ÿ“ง Test Emails: Every hour for testing") logger.info(" ๐Ÿง  AI Recommendations: Real-time processing") logger.info("") logger.info("๐Ÿงช Test Instructions:") logger.info(" 1. Open: http://localhost:7860") logger.info(" 2. Enter Customer ID: 144") logger.info(" 3. Click 'Analyze Customer'") logger.info(" 4. Click 'AI Analysis' for enhanced insights") logger.info(" 5. Test email features in the Email Testing Section:") logger.info(" - Send Test Email") logger.info(" - Test Email Triggers") logger.info(" - Test Peak Time Emails") logger.info(" - View Email Schedule") logger.info(" - Check Email Logs") logger.info("") logger.info("๐Ÿ“ง Email Features:") logger.info(" - Test Email System") logger.info(" - Test Automated Email") logger.info(" - Peak Time Recommendations (2 PM & 6 PM Daily)") logger.info(" - Email Automation Testing Dashboard") logger.info(" - Email Triggers Analysis") logger.info(" - Email Schedule Management") logger.info(" - All emails sent to: shaiksameermujahid@gmail.com") logger.info(" - Mock data ensures everything works without external services") logger.info("") logger.info("โน๏ธ Press Ctrl+C to stop the system") logger.info("=" * 80) # Initialize unified system unified_system = UnifiedAISystem() unified_system.is_running = True try: # Start background services unified_system.start_background_services() # Import and run the unified API service from api_service_enhanced import EnhancedLeadQualificationAPI api_service = EnhancedLeadQualificationAPI() # Use port 7860 for Hugging Face Spaces port = int(os.environ.get('PORT', 7860)) api_service.run(host='0.0.0.0', port=port, debug=False) except KeyboardInterrupt: logger.info("") logger.info("๐Ÿ›‘ Enhanced Unified AI System stopped by user") unified_system.stop_background_services() logger.info("โœ… System shutdown complete") except Exception as e: logger.error(f"โŒ System error: {e}") unified_system.stop_background_services() sys.exit(1) if __name__ == "__main__": # Change to the lead directory script_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(script_dir) main()