| import time |
| from datetime import datetime, timedelta |
| from sqlalchemy.orm import Session |
| from google.oauth2.credentials import Credentials |
| from googleapiclient.discovery import build |
| import models |
| from models import SessionLocal |
|
|
| def get_gmail_service(user_token): |
| |
| creds = Credentials.from_authorized_user_info(user_token) |
| return build('gmail', 'v1', credentials=creds) |
|
|
| def send_email(service, to, subject, body, user_email, sender_name=None, reply_to=None, thread_id=None): |
| from email.mime.text import MIMEText |
| from email.utils import formataddr |
| import base64 |
|
|
| message = MIMEText(body, 'html') |
| message['to'] = to |
| message['subject'] = subject |
| |
| if sender_name: |
| |
| message['from'] = formataddr((sender_name, user_email)) |
| else: |
| message['from'] = user_email |
| |
| if reply_to: |
| message['Reply-To'] = reply_to |
|
|
| raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8') |
| body = {'raw': raw_message} |
| |
| if thread_id: |
| body['threadId'] = thread_id |
|
|
| return service.users().messages().send(userId='me', body=body).execute() |
|
|
| def check_for_replies(service, thread_id, user_email): |
| """Returns True if the lead has replied in this thread.""" |
| if not thread_id: |
| return False |
| |
| try: |
| thread = service.users().threads().get(userId='me', id=thread_id).execute() |
| messages = thread.get('messages', []) |
| |
| for msg in messages: |
| headers = msg.get('payload', {}).get('headers', []) |
| sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), "") |
| |
| |
| if user_email.lower() not in sender.lower(): |
| return True |
| except Exception as e: |
| print(f"Error checking replies for thread {thread_id}: {e}") |
| |
| return False |
|
|
| def automation_engine(campaign_id: int = None): |
| """The core loop that replicates n8n logic. |
| If campaign_id is provided, only processes that specific campaign. |
| """ |
| db = SessionLocal() |
| try: |
| |
| query = db.query(models.Campaign) |
| if campaign_id: |
| query = query.filter(models.Campaign.id == campaign_id) |
| else: |
| query = query.filter(models.Campaign.status == "active") |
| |
| campaigns = query.all() |
| |
| for campaign in campaigns: |
| user = db.query(models.User).filter(models.User.id == campaign.user_id).first() |
| if not user or not user.google_token: |
| print(f"Skipping campaign {campaign.id}: User not found or not connected") |
| continue |
| |
| service = get_gmail_service(user.google_token) |
| |
| |
| leads = db.query(models.Lead).filter( |
| models.Lead.campaign_id == campaign.id, |
| models.Lead.status.in_(["pending", "emailed"]) |
| ).all() |
|
|
| print(f"Processing campaign '{campaign.name}' with {len(leads)} leads") |
|
|
| for lead in leads: |
| |
| if lead.thread_id and check_for_replies(service, lead.thread_id, user.email): |
| lead.status = "replied" |
| db.commit() |
| print(f"Lead {lead.email} has replied. Stopping sequence.") |
| continue |
|
|
| |
| next_step_idx = lead.current_step |
| if next_step_idx >= len(campaign.steps): |
| lead.status = "finished" |
| db.commit() |
| continue |
|
|
| current_step_config = campaign.steps[next_step_idx] |
| days_to_wait = current_step_config.get('day', 0) |
| |
| send_now = False |
| if lead.current_step == 0: |
| send_now = True |
| elif lead.last_contact_at: |
| if datetime.utcnow() >= lead.last_contact_at + timedelta(days=days_to_wait): |
| send_now = True |
| |
| if send_now: |
| |
| msg_body = current_step_config.get('html_form') or current_step_config.get('message', '') |
| msg_subject = campaign.subject or "Heads up!" |
| |
| for key, val in lead.data.items(): |
| placeholder = f"{{{key}}}" |
| msg_body = msg_body.replace(placeholder, str(val)) |
| msg_subject = msg_subject.replace(placeholder, str(val)) |
| |
| |
| try: |
| print(f"Attempting to send step {lead.current_step + 1} to {lead.email}") |
| res = send_email(service, lead.email, msg_subject, msg_body, user.email, user.sender_name, user.reply_to, lead.thread_id) |
| |
| |
| lead.thread_id = res['threadId'] |
| lead.last_contact_at = datetime.utcnow() |
| lead.current_step += 1 |
| lead.status = "emailed" |
| db.commit() |
| print(f"SUCCESS: Sent to {lead.email} (Thread: {res['threadId']})") |
| except Exception as e: |
| print(f"ERROR: Failed to send to {lead.email}: {e}") |
| else: |
| wait_remaining = 0 |
| if lead.last_contact_at: |
| next_send = lead.last_contact_at + timedelta(days=days_to_wait) |
| wait_remaining = (next_send - datetime.utcnow()).total_seconds() / 3600 |
| print(f"WAIT: Skipping {lead.email} for now. Next send in approx {wait_remaining:.2f} hours (Wait: {days_to_wait} days)") |
|
|
| finally: |
| db.close() |
|
|
| if __name__ == "__main__": |
| |
| automation_engine() |
|
|