import time from datetime import datetime, timedelta from sqlalchemy.orm import Session from google.oauth2.credentials import Credentials from googleapiclient.discovery import build import models from models import SessionLocal def get_gmail_service(user_token): # Use from_authorized_user_info for better credential management creds = Credentials.from_authorized_user_info(user_token) return build('gmail', 'v1', credentials=creds) def send_email(service, to, subject, body, user_email, sender_name=None, reply_to=None, thread_id=None): from email.mime.text import MIMEText from email.utils import formataddr import base64 message = MIMEText(body, 'html') message['to'] = to message['subject'] = subject if sender_name: # Gmail requires the actual email in formataddr to show the "Friendly Name" correctly message['from'] = formataddr((sender_name, user_email)) else: message['from'] = user_email if reply_to: message['Reply-To'] = reply_to raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') body = {'raw': raw_message} if thread_id: body['threadId'] = thread_id return service.users().messages().send(userId='me', body=body).execute() def check_for_replies(service, thread_id, user_email): """Returns True if the lead has replied in this thread.""" if not thread_id: return False try: thread = service.users().threads().get(userId='me', id=thread_id).execute() messages = thread.get('messages', []) for msg in messages: headers = msg.get('payload', {}).get('headers', []) sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), "") # If any message in the thread is NOT from us, it's a reply if user_email.lower() not in sender.lower(): return True except Exception as e: print(f"Error checking replies for thread {thread_id}: {e}") return False def automation_engine(campaign_id: int = None): """The core loop that replicates n8n logic. If campaign_id is provided, only processes that specific campaign. """ db = SessionLocal() try: # 1. Get campaigns (all active or a specific one) query = db.query(models.Campaign) if campaign_id: query = query.filter(models.Campaign.id == campaign_id) else: query = query.filter(models.Campaign.status == "active") campaigns = query.all() for campaign in campaigns: user = db.query(models.User).filter(models.User.id == campaign.user_id).first() if not user or not user.google_token: print(f"Skipping campaign {campaign.id}: User not found or not connected") continue service = get_gmail_service(user.google_token) # 2. Get all pending leads for this campaign leads = db.query(models.Lead).filter( models.Lead.campaign_id == campaign.id, models.Lead.status.in_(["pending", "emailed"]) ).all() print(f"Processing campaign '{campaign.name}' with {len(leads)} leads") for lead in leads: # 3. Check if lead has already replied if lead.thread_id and check_for_replies(service, lead.thread_id, user.email): lead.status = "replied" db.commit() print(f"Lead {lead.email} has replied. Stopping sequence.") continue # 4. Check if it's time for the next step next_step_idx = lead.current_step if next_step_idx >= len(campaign.steps): lead.status = "finished" db.commit() continue current_step_config = campaign.steps[next_step_idx] days_to_wait = current_step_config.get('day', 0) send_now = False if lead.current_step == 0: send_now = True elif lead.last_contact_at: if datetime.utcnow() >= lead.last_contact_at + timedelta(days=days_to_wait): send_now = True if send_now: # 5. Prepare content and subject with placeholders msg_body = current_step_config.get('html_form') or current_step_config.get('message', '') msg_subject = campaign.subject or "Heads up!" for key, val in lead.data.items(): placeholder = f"{{{key}}}" msg_body = msg_body.replace(placeholder, str(val)) msg_subject = msg_subject.replace(placeholder, str(val)) # 6. Send try: print(f"Attempting to send step {lead.current_step + 1} to {lead.email}") res = send_email(service, lead.email, msg_subject, msg_body, user.email, user.sender_name, user.reply_to, lead.thread_id) # Update Lead State lead.thread_id = res['threadId'] lead.last_contact_at = datetime.utcnow() lead.current_step += 1 lead.status = "emailed" db.commit() print(f"SUCCESS: Sent to {lead.email} (Thread: {res['threadId']})") except Exception as e: print(f"ERROR: Failed to send to {lead.email}: {e}") else: wait_remaining = 0 if lead.last_contact_at: next_send = lead.last_contact_at + timedelta(days=days_to_wait) wait_remaining = (next_send - datetime.utcnow()).total_seconds() / 3600 print(f"WAIT: Skipping {lead.email} for now. Next send in approx {wait_remaining:.2f} hours (Wait: {days_to_wait} days)") finally: db.close() if __name__ == "__main__": # For testing: run once automation_engine()