Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Unified AI Lead Analysis System | |
| Single port (7860) solution with API, Frontend, and Email features | |
| Enhanced with 24-hour background property fetching and peak time email automation | |
| """ | |
| import os | |
| import sys | |
| import logging | |
| import threading | |
| import schedule | |
| import time | |
| from datetime import datetime, timedelta | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class UnifiedAISystem: | |
| def __init__(self): | |
| """Initialize the unified AI system with background services""" | |
| self.background_thread = None | |
| self.email_thread = None | |
| self.is_running = False | |
| # Initialize AI engine | |
| try: | |
| from ai_recommendation_engine import AIRecommendationEngine | |
| self.ai_engine = AIRecommendationEngine() | |
| logger.info("β AI Recommendation Engine initialized") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize AI engine: {e}") | |
| self.ai_engine = None | |
| def start_background_services(self): | |
| """Start background services for property fetching and email automation""" | |
| logger.info("π Starting background services...") | |
| # Start background property fetching thread | |
| self.background_thread = threading.Thread(target=self._run_background_services, daemon=True) | |
| self.background_thread.start() | |
| # Start email automation thread | |
| self.email_thread = threading.Thread(target=self._run_email_automation, daemon=True) | |
| self.email_thread.start() | |
| logger.info("β Background services started successfully") | |
| def _run_background_services(self): | |
| """Run background services including property fetching""" | |
| logger.info("π Background services thread started") | |
| # Schedule property fetching every 24 hours | |
| schedule.every(24).hours.do(self._fetch_properties_background) | |
| # Run initial property fetch | |
| logger.info("π Running initial property fetch...") | |
| self._fetch_properties_background() | |
| # Keep the scheduler running | |
| while self.is_running: | |
| schedule.run_pending() | |
| time.sleep(60) # Check every minute | |
| def _fetch_properties_background(self): | |
| """Fetch properties in background without blocking the main application""" | |
| try: | |
| logger.info("π Starting background property fetch...") | |
| if self.ai_engine: | |
| # Run property fetching in a separate thread to avoid blocking | |
| fetch_thread = threading.Thread(target=self._execute_property_fetch, daemon=True) | |
| fetch_thread.start() | |
| logger.info("β Background property fetch initiated") | |
| else: | |
| logger.warning("β οΈ AI engine not available for property fetching") | |
| except Exception as e: | |
| logger.error(f"β Error in background property fetch: {e}") | |
| def _execute_property_fetch(self): | |
| """Execute the actual property fetching""" | |
| try: | |
| logger.info("π₯ Executing parallel property fetch...") | |
| success = self.ai_engine.fetch_all_properties_parallel( | |
| max_workers=10, | |
| page_size=100, | |
| max_pages=10 | |
| ) | |
| if success: | |
| logger.info("β Background property fetch completed successfully") | |
| # Update cache timestamp | |
| self._update_cache_timestamp() | |
| else: | |
| logger.warning("β οΈ Background property fetch completed with issues") | |
| except Exception as e: | |
| logger.error(f"β Error executing property fetch: {e}") | |
| def _update_cache_timestamp(self): | |
| """Update the cache timestamp file""" | |
| try: | |
| import os | |
| cache_dir = 'cache' | |
| os.makedirs(cache_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
| with open(f'{cache_dir}/property_cache_timestamp.txt', 'w') as f: | |
| f.write(timestamp) | |
| logger.info(f"π Cache timestamp updated: {timestamp}") | |
| except Exception as e: | |
| logger.error(f"β Error updating cache timestamp: {e}") | |
| def _run_email_automation(self): | |
| """Run email automation services""" | |
| logger.info("π§ Email automation thread started") | |
| # Schedule peak time emails at 2 PM, 6 PM, and 10 PM daily based on user analysis | |
| schedule.every().day.at("14:00").do(self._send_peak_time_emails_2pm) | |
| schedule.every().day.at("18:00").do(self._send_peak_time_emails_6pm) | |
| schedule.every().day.at("22:00").do(self._send_peak_time_emails_10pm) | |
| # Also schedule a test email every hour for testing purposes | |
| schedule.every().hour.do(self._send_test_emails) | |
| logger.info("π Scheduled peak time emails: 2 PM, 6 PM, and 10 PM daily") | |
| logger.info("π§ Scheduled test emails: Every hour for testing") | |
| # Keep the scheduler running | |
| while self.is_running: | |
| schedule.run_pending() | |
| time.sleep(60) # Check every minute | |
| def _send_peak_time_emails_2pm(self): | |
| """Send peak time emails at 2 PM based on user analysis""" | |
| try: | |
| logger.info("π§ Starting 2 PM peak time email automation based on user analysis...") | |
| # Import email service | |
| from email_automation_service import EmailAutomationService | |
| email_service = EmailAutomationService() | |
| # Send Multi-AI emails for all customers | |
| customers = self._get_all_customers() | |
| for customer_id in customers: | |
| try: | |
| analysis_data = self._get_customer_analysis(customer_id) | |
| if analysis_data: | |
| email_service.send_multi_ai_emails(customer_id, analysis_data) | |
| logger.info(f"β Multi-AI emails sent for customer {customer_id}") | |
| except Exception as e: | |
| logger.error(f"β Error sending Multi-AI emails for customer {customer_id}: {e}") | |
| logger.info("β 2 PM peak time email automation completed") | |
| except Exception as e: | |
| logger.error(f"β Error in 2 PM peak time email automation: {e}") | |
| def _send_peak_time_emails_6pm(self): | |
| """Send peak time emails at 6 PM based on user analysis""" | |
| try: | |
| logger.info("π§ Starting 6 PM peak time email automation based on user analysis...") | |
| # Import email service | |
| from email_automation_service import EmailAutomationService | |
| email_service = EmailAutomationService() | |
| # Send peak time emails using the enhanced method | |
| email_service.send_peak_time_emails() | |
| logger.info("β 6 PM peak time email automation completed") | |
| except Exception as e: | |
| logger.error(f"β Error in 6 PM peak time email automation: {e}") | |
| def _send_peak_time_emails_10pm(self): | |
| """Send peak time emails at 10 PM based on user analysis""" | |
| try: | |
| logger.info("π§ Starting 10 PM peak time email automation based on user analysis...") | |
| # Import email service | |
| from email_automation_service import EmailAutomationService | |
| email_service = EmailAutomationService() | |
| # Send peak time emails using the enhanced method | |
| email_service.send_peak_time_emails() | |
| logger.info("β 10 PM peak time email automation completed") | |
| except Exception as e: | |
| logger.error(f"β Error in 10 PM peak time email automation: {e}") | |
| def _send_test_emails(self): | |
| """Send test emails for debugging and testing""" | |
| try: | |
| logger.info("π§ Starting test email sending...") | |
| # Import email service | |
| from email_automation_service import EmailAutomationService | |
| email_service = EmailAutomationService() | |
| # Test with customer 144 | |
| test_customer_id = 144 | |
| test_recipient = "sameermujahid7777@gmail.com" | |
| # Get customer analysis | |
| analysis_data = self._get_customer_analysis(test_customer_id) | |
| if analysis_data: | |
| # Generate and send test email | |
| email_result = email_service.generate_automated_email( | |
| test_customer_id, | |
| email_type="test_email" | |
| ) | |
| if email_result.get('success'): | |
| logger.info(f"β Test email sent successfully to {test_recipient}") | |
| else: | |
| logger.warning(f"β οΈ Test email failed: {email_result.get('error')}") | |
| else: | |
| logger.warning("β οΈ No analysis data available for test email") | |
| except Exception as e: | |
| logger.error(f"β Error in test email sending: {e}") | |
| def _get_all_customers(self): | |
| """Get list of all customers from the system""" | |
| try: | |
| # This would typically come from your database | |
| # For now, return a sample list of customer IDs | |
| return [105, 106, 107, 108, 109] # Sample customer IDs | |
| except Exception as e: | |
| logger.error(f"β Error getting customers: {e}") | |
| return [] | |
| def _get_customer_analysis(self, customer_id): | |
| """Get customer analysis data""" | |
| try: | |
| # Import API service to get analysis | |
| from api_service_enhanced import EnhancedLeadQualificationAPI | |
| api_service = EnhancedLeadQualificationAPI() | |
| # Get analysis data | |
| analysis_data = api_service._get_analysis_data_parallel(customer_id) | |
| return analysis_data | |
| except Exception as e: | |
| logger.error(f"β Error getting analysis for customer {customer_id}: {e}") | |
| return None | |
| def stop_background_services(self): | |
| """Stop background services""" | |
| logger.info("π Stopping background services...") | |
| self.is_running = False | |
| if self.background_thread: | |
| self.background_thread.join(timeout=5) | |
| if self.email_thread: | |
| self.email_thread.join(timeout=5) | |
| logger.info("β Background services stopped") | |
| def main(): | |
| """Start the unified AI system""" | |
| logger.info("=" * 80) | |
| logger.info("π€ Enhanced Unified AI Lead Analysis System Starting...") | |
| logger.info("=" * 80) | |
| logger.info(f"π Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| logger.info("") | |
| logger.info("π Enhanced System Features:") | |
| logger.info(" β Single Port Solution (Port 7860)") | |
| logger.info(" β AI-Enhanced Lead Analysis") | |
| logger.info(" β Interactive Frontend Dashboard") | |
| logger.info(" β Email Automation & Testing") | |
| logger.info(" β Mock Data Support (No External Backend Required)") | |
| logger.info(" β Behavioral Analytics") | |
| logger.info(" β AI-Powered Recommendations") | |
| logger.info(" β 24-Hour Background Property Fetching") | |
| logger.info(" β Peak Time Email Automation (2 PM & 6 PM Daily)") | |
| logger.info(" β Small-Batch Parallel Processing") | |
| logger.info(" β ChromaDB Integration for Recommendations") | |
| logger.info(" β Email Testing & Automation Dashboard") | |
| logger.info("") | |
| logger.info("π Access Points:") | |
| logger.info(" π Main Dashboard: http://localhost:7860") | |
| logger.info(" π Customer Analysis: http://localhost:7860/customer/105") | |
| logger.info(" π API Health: http://localhost:7860/api/health") | |
| logger.info("") | |
| logger.info("π Background Services:") | |
| logger.info(" π₯ Property Fetching: Every 24 hours (small-batch parallel)") | |
| logger.info(" π§ Peak Time Emails: Daily at 2:00 PM & 6:00 PM") | |
| logger.info(" π§ Test Emails: Every hour for testing") | |
| logger.info(" π§ AI Recommendations: Real-time processing") | |
| logger.info("") | |
| logger.info("π§ͺ Test Instructions:") | |
| logger.info(" 1. Open: http://localhost:7860") | |
| logger.info(" 2. Enter Customer ID: 144") | |
| logger.info(" 3. Click 'Analyze Customer'") | |
| logger.info(" 4. Click 'AI Analysis' for enhanced insights") | |
| logger.info(" 5. Test email features in the Email Testing Section:") | |
| logger.info(" - Send Test Email") | |
| logger.info(" - Test Email Triggers") | |
| logger.info(" - Test Peak Time Emails") | |
| logger.info(" - View Email Schedule") | |
| logger.info(" - Check Email Logs") | |
| logger.info("") | |
| logger.info("π§ Email Features:") | |
| logger.info(" - Test Email System") | |
| logger.info(" - Test Automated Email") | |
| logger.info(" - Peak Time Recommendations (2 PM & 6 PM Daily)") | |
| logger.info(" - Email Automation Testing Dashboard") | |
| logger.info(" - Email Triggers Analysis") | |
| logger.info(" - Email Schedule Management") | |
| logger.info(" - All emails sent to: shaiksameermujahid@gmail.com") | |
| logger.info(" - Mock data ensures everything works without external services") | |
| logger.info("") | |
| logger.info("βΉοΈ Press Ctrl+C to stop the system") | |
| logger.info("=" * 80) | |
| # Initialize unified system | |
| unified_system = UnifiedAISystem() | |
| unified_system.is_running = True | |
| try: | |
| # Start background services | |
| unified_system.start_background_services() | |
| # Import and run the unified API service | |
| from api_service_enhanced import EnhancedLeadQualificationAPI | |
| api_service = EnhancedLeadQualificationAPI() | |
| # Use port 7860 for Hugging Face Spaces | |
| port = int(os.environ.get('PORT', 7860)) | |
| api_service.run(host='0.0.0.0', port=port, debug=False) | |
| except KeyboardInterrupt: | |
| logger.info("") | |
| logger.info("π Enhanced Unified AI System stopped by user") | |
| unified_system.stop_background_services() | |
| logger.info("β System shutdown complete") | |
| except Exception as e: | |
| logger.error(f"β System error: {e}") | |
| unified_system.stop_background_services() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| # Change to the lead directory | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| os.chdir(script_dir) | |
| main() |