sksameermujahid's picture
Upload 19 files
c061318 verified
#!/usr/bin/env python3
"""
Unified AI Lead Analysis System
Single port (7860) solution with API, Frontend, and Email features
Enhanced with 24-hour background property fetching and peak time email automation
"""
import os
import sys
import logging
import threading
import schedule
import time
from datetime import datetime, timedelta
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class UnifiedAISystem:
def __init__(self):
"""Initialize the unified AI system with background services"""
self.background_thread = None
self.email_thread = None
self.is_running = False
# Initialize AI engine
try:
from ai_recommendation_engine import AIRecommendationEngine
self.ai_engine = AIRecommendationEngine()
logger.info("βœ… AI Recommendation Engine initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize AI engine: {e}")
self.ai_engine = None
def start_background_services(self):
"""Start background services for property fetching and email automation"""
logger.info("πŸš€ Starting background services...")
# Start background property fetching thread
self.background_thread = threading.Thread(target=self._run_background_services, daemon=True)
self.background_thread.start()
# Start email automation thread
self.email_thread = threading.Thread(target=self._run_email_automation, daemon=True)
self.email_thread.start()
logger.info("βœ… Background services started successfully")
def _run_background_services(self):
"""Run background services including property fetching"""
logger.info("πŸ”„ Background services thread started")
# Schedule property fetching every 24 hours
schedule.every(24).hours.do(self._fetch_properties_background)
# Run initial property fetch
logger.info("πŸš€ Running initial property fetch...")
self._fetch_properties_background()
# Keep the scheduler running
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
def _fetch_properties_background(self):
"""Fetch properties in background without blocking the main application"""
try:
logger.info("πŸ”„ Starting background property fetch...")
if self.ai_engine:
# Run property fetching in a separate thread to avoid blocking
fetch_thread = threading.Thread(target=self._execute_property_fetch, daemon=True)
fetch_thread.start()
logger.info("βœ… Background property fetch initiated")
else:
logger.warning("⚠️ AI engine not available for property fetching")
except Exception as e:
logger.error(f"❌ Error in background property fetch: {e}")
def _execute_property_fetch(self):
"""Execute the actual property fetching"""
try:
logger.info("πŸ“₯ Executing parallel property fetch...")
success = self.ai_engine.fetch_all_properties_parallel(
max_workers=10,
page_size=100,
max_pages=10
)
if success:
logger.info("βœ… Background property fetch completed successfully")
# Update cache timestamp
self._update_cache_timestamp()
else:
logger.warning("⚠️ Background property fetch completed with issues")
except Exception as e:
logger.error(f"❌ Error executing property fetch: {e}")
def _update_cache_timestamp(self):
"""Update the cache timestamp file"""
try:
import os
cache_dir = 'cache'
os.makedirs(cache_dir, exist_ok=True)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(f'{cache_dir}/property_cache_timestamp.txt', 'w') as f:
f.write(timestamp)
logger.info(f"πŸ“… Cache timestamp updated: {timestamp}")
except Exception as e:
logger.error(f"❌ Error updating cache timestamp: {e}")
def _run_email_automation(self):
"""Run email automation services"""
logger.info("πŸ“§ Email automation thread started")
# Schedule peak time emails at 2 PM, 6 PM, and 10 PM daily based on user analysis
schedule.every().day.at("14:00").do(self._send_peak_time_emails_2pm)
schedule.every().day.at("18:00").do(self._send_peak_time_emails_6pm)
schedule.every().day.at("22:00").do(self._send_peak_time_emails_10pm)
# Also schedule a test email every hour for testing purposes
schedule.every().hour.do(self._send_test_emails)
logger.info("πŸ“… Scheduled peak time emails: 2 PM, 6 PM, and 10 PM daily")
logger.info("πŸ“§ Scheduled test emails: Every hour for testing")
# Keep the scheduler running
while self.is_running:
schedule.run_pending()
time.sleep(60) # Check every minute
def _send_peak_time_emails_2pm(self):
"""Send peak time emails at 2 PM based on user analysis"""
try:
logger.info("πŸ“§ Starting 2 PM peak time email automation based on user analysis...")
# Import email service
from email_automation_service import EmailAutomationService
email_service = EmailAutomationService()
# Send Multi-AI emails for all customers
customers = self._get_all_customers()
for customer_id in customers:
try:
analysis_data = self._get_customer_analysis(customer_id)
if analysis_data:
email_service.send_multi_ai_emails(customer_id, analysis_data)
logger.info(f"βœ… Multi-AI emails sent for customer {customer_id}")
except Exception as e:
logger.error(f"❌ Error sending Multi-AI emails for customer {customer_id}: {e}")
logger.info("βœ… 2 PM peak time email automation completed")
except Exception as e:
logger.error(f"❌ Error in 2 PM peak time email automation: {e}")
def _send_peak_time_emails_6pm(self):
"""Send peak time emails at 6 PM based on user analysis"""
try:
logger.info("πŸ“§ Starting 6 PM peak time email automation based on user analysis...")
# Import email service
from email_automation_service import EmailAutomationService
email_service = EmailAutomationService()
# Send peak time emails using the enhanced method
email_service.send_peak_time_emails()
logger.info("βœ… 6 PM peak time email automation completed")
except Exception as e:
logger.error(f"❌ Error in 6 PM peak time email automation: {e}")
def _send_peak_time_emails_10pm(self):
"""Send peak time emails at 10 PM based on user analysis"""
try:
logger.info("πŸ“§ Starting 10 PM peak time email automation based on user analysis...")
# Import email service
from email_automation_service import EmailAutomationService
email_service = EmailAutomationService()
# Send peak time emails using the enhanced method
email_service.send_peak_time_emails()
logger.info("βœ… 10 PM peak time email automation completed")
except Exception as e:
logger.error(f"❌ Error in 10 PM peak time email automation: {e}")
def _send_test_emails(self):
"""Send test emails for debugging and testing"""
try:
logger.info("πŸ“§ Starting test email sending...")
# Import email service
from email_automation_service import EmailAutomationService
email_service = EmailAutomationService()
# Test with customer 144
test_customer_id = 144
test_recipient = "sameermujahid7777@gmail.com"
# Get customer analysis
analysis_data = self._get_customer_analysis(test_customer_id)
if analysis_data:
# Generate and send test email
email_result = email_service.generate_automated_email(
test_customer_id,
email_type="test_email"
)
if email_result.get('success'):
logger.info(f"βœ… Test email sent successfully to {test_recipient}")
else:
logger.warning(f"⚠️ Test email failed: {email_result.get('error')}")
else:
logger.warning("⚠️ No analysis data available for test email")
except Exception as e:
logger.error(f"❌ Error in test email sending: {e}")
def _get_all_customers(self):
"""Get list of all customers from the system"""
try:
# This would typically come from your database
# For now, return a sample list of customer IDs
return [105, 106, 107, 108, 109] # Sample customer IDs
except Exception as e:
logger.error(f"❌ Error getting customers: {e}")
return []
def _get_customer_analysis(self, customer_id):
"""Get customer analysis data"""
try:
# Import API service to get analysis
from api_service_enhanced import EnhancedLeadQualificationAPI
api_service = EnhancedLeadQualificationAPI()
# Get analysis data
analysis_data = api_service._get_analysis_data_parallel(customer_id)
return analysis_data
except Exception as e:
logger.error(f"❌ Error getting analysis for customer {customer_id}: {e}")
return None
def stop_background_services(self):
"""Stop background services"""
logger.info("πŸ›‘ Stopping background services...")
self.is_running = False
if self.background_thread:
self.background_thread.join(timeout=5)
if self.email_thread:
self.email_thread.join(timeout=5)
logger.info("βœ… Background services stopped")
def main():
"""Start the unified AI system"""
logger.info("=" * 80)
logger.info("πŸ€– Enhanced Unified AI Lead Analysis System Starting...")
logger.info("=" * 80)
logger.info(f"πŸ“… Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("")
logger.info("πŸš€ Enhanced System Features:")
logger.info(" βœ… Single Port Solution (Port 7860)")
logger.info(" βœ… AI-Enhanced Lead Analysis")
logger.info(" βœ… Interactive Frontend Dashboard")
logger.info(" βœ… Email Automation & Testing")
logger.info(" βœ… Mock Data Support (No External Backend Required)")
logger.info(" βœ… Behavioral Analytics")
logger.info(" βœ… AI-Powered Recommendations")
logger.info(" βœ… 24-Hour Background Property Fetching")
logger.info(" βœ… Peak Time Email Automation (2 PM & 6 PM Daily)")
logger.info(" βœ… Small-Batch Parallel Processing")
logger.info(" βœ… ChromaDB Integration for Recommendations")
logger.info(" βœ… Email Testing & Automation Dashboard")
logger.info("")
logger.info("🌐 Access Points:")
logger.info(" πŸ“Š Main Dashboard: http://localhost:7860")
logger.info(" πŸ” Customer Analysis: http://localhost:7860/customer/105")
logger.info(" πŸ“‹ API Health: http://localhost:7860/api/health")
logger.info("")
logger.info("πŸ”„ Background Services:")
logger.info(" πŸ“₯ Property Fetching: Every 24 hours (small-batch parallel)")
logger.info(" πŸ“§ Peak Time Emails: Daily at 2:00 PM & 6:00 PM")
logger.info(" πŸ“§ Test Emails: Every hour for testing")
logger.info(" 🧠 AI Recommendations: Real-time processing")
logger.info("")
logger.info("πŸ§ͺ Test Instructions:")
logger.info(" 1. Open: http://localhost:7860")
logger.info(" 2. Enter Customer ID: 144")
logger.info(" 3. Click 'Analyze Customer'")
logger.info(" 4. Click 'AI Analysis' for enhanced insights")
logger.info(" 5. Test email features in the Email Testing Section:")
logger.info(" - Send Test Email")
logger.info(" - Test Email Triggers")
logger.info(" - Test Peak Time Emails")
logger.info(" - View Email Schedule")
logger.info(" - Check Email Logs")
logger.info("")
logger.info("πŸ“§ Email Features:")
logger.info(" - Test Email System")
logger.info(" - Test Automated Email")
logger.info(" - Peak Time Recommendations (2 PM & 6 PM Daily)")
logger.info(" - Email Automation Testing Dashboard")
logger.info(" - Email Triggers Analysis")
logger.info(" - Email Schedule Management")
logger.info(" - All emails sent to: shaiksameermujahid@gmail.com")
logger.info(" - Mock data ensures everything works without external services")
logger.info("")
logger.info("⏹️ Press Ctrl+C to stop the system")
logger.info("=" * 80)
# Initialize unified system
unified_system = UnifiedAISystem()
unified_system.is_running = True
try:
# Start background services
unified_system.start_background_services()
# Import and run the unified API service
from api_service_enhanced import EnhancedLeadQualificationAPI
api_service = EnhancedLeadQualificationAPI()
# Use port 7860 for Hugging Face Spaces
port = int(os.environ.get('PORT', 7860))
api_service.run(host='0.0.0.0', port=port, debug=False)
except KeyboardInterrupt:
logger.info("")
logger.info("πŸ›‘ Enhanced Unified AI System stopped by user")
unified_system.stop_background_services()
logger.info("βœ… System shutdown complete")
except Exception as e:
logger.error(f"❌ System error: {e}")
unified_system.stop_background_services()
sys.exit(1)
if __name__ == "__main__":
# Change to the lead directory
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
main()