File size: 6,385 Bytes
f64b6e2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | import time
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
import models
from models import SessionLocal
def get_gmail_service(user_token):
# Use from_authorized_user_info for better credential management
creds = Credentials.from_authorized_user_info(user_token)
return build('gmail', 'v1', credentials=creds)
def send_email(service, to, subject, body, user_email, sender_name=None, reply_to=None, thread_id=None):
from email.mime.text import MIMEText
from email.utils import formataddr
import base64
message = MIMEText(body, 'html')
message['to'] = to
message['subject'] = subject
if sender_name:
# Gmail requires the actual email in formataddr to show the "Friendly Name" correctly
message['from'] = formataddr((sender_name, user_email))
else:
message['from'] = user_email
if reply_to:
message['Reply-To'] = reply_to
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
body = {'raw': raw_message}
if thread_id:
body['threadId'] = thread_id
return service.users().messages().send(userId='me', body=body).execute()
def check_for_replies(service, thread_id, user_email):
"""Returns True if the lead has replied in this thread."""
if not thread_id:
return False
try:
thread = service.users().threads().get(userId='me', id=thread_id).execute()
messages = thread.get('messages', [])
for msg in messages:
headers = msg.get('payload', {}).get('headers', [])
sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), "")
# If any message in the thread is NOT from us, it's a reply
if user_email.lower() not in sender.lower():
return True
except Exception as e:
print(f"Error checking replies for thread {thread_id}: {e}")
return False
def automation_engine(campaign_id: int = None):
"""The core loop that replicates n8n logic.
If campaign_id is provided, only processes that specific campaign.
"""
db = SessionLocal()
try:
# 1. Get campaigns (all active or a specific one)
query = db.query(models.Campaign)
if campaign_id:
query = query.filter(models.Campaign.id == campaign_id)
else:
query = query.filter(models.Campaign.status == "active")
campaigns = query.all()
for campaign in campaigns:
user = db.query(models.User).filter(models.User.id == campaign.user_id).first()
if not user or not user.google_token:
print(f"Skipping campaign {campaign.id}: User not found or not connected")
continue
service = get_gmail_service(user.google_token)
# 2. Get all pending leads for this campaign
leads = db.query(models.Lead).filter(
models.Lead.campaign_id == campaign.id,
models.Lead.status.in_(["pending", "emailed"])
).all()
print(f"Processing campaign '{campaign.name}' with {len(leads)} leads")
for lead in leads:
# 3. Check if lead has already replied
if lead.thread_id and check_for_replies(service, lead.thread_id, user.email):
lead.status = "replied"
db.commit()
print(f"Lead {lead.email} has replied. Stopping sequence.")
continue
# 4. Check if it's time for the next step
next_step_idx = lead.current_step
if next_step_idx >= len(campaign.steps):
lead.status = "finished"
db.commit()
continue
current_step_config = campaign.steps[next_step_idx]
days_to_wait = current_step_config.get('day', 0)
send_now = False
if lead.current_step == 0:
send_now = True
elif lead.last_contact_at:
if datetime.utcnow() >= lead.last_contact_at + timedelta(days=days_to_wait):
send_now = True
if send_now:
# 5. Prepare content and subject with placeholders
msg_body = current_step_config.get('html_form') or current_step_config.get('message', '')
msg_subject = campaign.subject or "Heads up!"
for key, val in lead.data.items():
placeholder = f"{{{key}}}"
msg_body = msg_body.replace(placeholder, str(val))
msg_subject = msg_subject.replace(placeholder, str(val))
# 6. Send
try:
print(f"Attempting to send step {lead.current_step + 1} to {lead.email}")
res = send_email(service, lead.email, msg_subject, msg_body, user.email, user.sender_name, user.reply_to, lead.thread_id)
# Update Lead State
lead.thread_id = res['threadId']
lead.last_contact_at = datetime.utcnow()
lead.current_step += 1
lead.status = "emailed"
db.commit()
print(f"SUCCESS: Sent to {lead.email} (Thread: {res['threadId']})")
except Exception as e:
print(f"ERROR: Failed to send to {lead.email}: {e}")
else:
wait_remaining = 0
if lead.last_contact_at:
next_send = lead.last_contact_at + timedelta(days=days_to_wait)
wait_remaining = (next_send - datetime.utcnow()).total_seconds() / 3600
print(f"WAIT: Skipping {lead.email} for now. Next send in approx {wait_remaining:.2f} hours (Wait: {days_to_wait} days)")
finally:
db.close()
if __name__ == "__main__":
# For testing: run once
automation_engine()
|