File size: 6,385 Bytes
f64b6e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import time
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
import models
from models import SessionLocal

def get_gmail_service(user_token):
    # Use from_authorized_user_info for better credential management
    creds = Credentials.from_authorized_user_info(user_token)
    return build('gmail', 'v1', credentials=creds)

def send_email(service, to, subject, body, user_email, sender_name=None, reply_to=None, thread_id=None):
    from email.mime.text import MIMEText
    from email.utils import formataddr
    import base64

    message = MIMEText(body, 'html')
    message['to'] = to
    message['subject'] = subject
    
    if sender_name:
        # Gmail requires the actual email in formataddr to show the "Friendly Name" correctly
        message['from'] = formataddr((sender_name, user_email))
    else:
        message['from'] = user_email
    
    if reply_to:
        message['Reply-To'] = reply_to

    raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
    body = {'raw': raw_message}
    
    if thread_id:
        body['threadId'] = thread_id

    return service.users().messages().send(userId='me', body=body).execute()

def check_for_replies(service, thread_id, user_email):
    """Returns True if the lead has replied in this thread."""
    if not thread_id:
        return False
    
    try:
        thread = service.users().threads().get(userId='me', id=thread_id).execute()
        messages = thread.get('messages', [])
        
        for msg in messages:
            headers = msg.get('payload', {}).get('headers', [])
            sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), "")
            
            # If any message in the thread is NOT from us, it's a reply
            if user_email.lower() not in sender.lower():
                return True
    except Exception as e:
        print(f"Error checking replies for thread {thread_id}: {e}")
            
    return False

def automation_engine(campaign_id: int = None):
    """The core loop that replicates n8n logic. 
    If campaign_id is provided, only processes that specific campaign.
    """
    db = SessionLocal()
    try:
        # 1. Get campaigns (all active or a specific one)
        query = db.query(models.Campaign)
        if campaign_id:
            query = query.filter(models.Campaign.id == campaign_id)
        else:
            query = query.filter(models.Campaign.status == "active")
            
        campaigns = query.all()
        
        for campaign in campaigns:
            user = db.query(models.User).filter(models.User.id == campaign.user_id).first()
            if not user or not user.google_token:
                print(f"Skipping campaign {campaign.id}: User not found or not connected")
                continue
            
            service = get_gmail_service(user.google_token)
            
            # 2. Get all pending leads for this campaign
            leads = db.query(models.Lead).filter(
                models.Lead.campaign_id == campaign.id,
                models.Lead.status.in_(["pending", "emailed"])
            ).all()

            print(f"Processing campaign '{campaign.name}' with {len(leads)} leads")

            for lead in leads:
                # 3. Check if lead has already replied
                if lead.thread_id and check_for_replies(service, lead.thread_id, user.email):
                    lead.status = "replied"
                    db.commit()
                    print(f"Lead {lead.email} has replied. Stopping sequence.")
                    continue

                # 4. Check if it's time for the next step
                next_step_idx = lead.current_step
                if next_step_idx >= len(campaign.steps):
                    lead.status = "finished"
                    db.commit()
                    continue

                current_step_config = campaign.steps[next_step_idx]
                days_to_wait = current_step_config.get('day', 0)
                
                send_now = False
                if lead.current_step == 0:
                    send_now = True
                elif lead.last_contact_at:
                    if datetime.utcnow() >= lead.last_contact_at + timedelta(days=days_to_wait):
                        send_now = True
                
                if send_now:
                    # 5. Prepare content and subject with placeholders
                    msg_body = current_step_config.get('html_form') or current_step_config.get('message', '')
                    msg_subject = campaign.subject or "Heads up!"
                    
                    for key, val in lead.data.items():
                        placeholder = f"{{{key}}}"
                        msg_body = msg_body.replace(placeholder, str(val))
                        msg_subject = msg_subject.replace(placeholder, str(val))
                    
                    # 6. Send
                    try:
                        print(f"Attempting to send step {lead.current_step + 1} to {lead.email}")
                        res = send_email(service, lead.email, msg_subject, msg_body, user.email, user.sender_name, user.reply_to, lead.thread_id)
                        
                        # Update Lead State
                        lead.thread_id = res['threadId']
                        lead.last_contact_at = datetime.utcnow()
                        lead.current_step += 1
                        lead.status = "emailed"
                        db.commit()
                        print(f"SUCCESS: Sent to {lead.email} (Thread: {res['threadId']})")
                    except Exception as e:
                        print(f"ERROR: Failed to send to {lead.email}: {e}")
                else:
                    wait_remaining = 0
                    if lead.last_contact_at:
                        next_send = lead.last_contact_at + timedelta(days=days_to_wait)
                        wait_remaining = (next_send - datetime.utcnow()).total_seconds() / 3600
                    print(f"WAIT: Skipping {lead.email} for now. Next send in approx {wait_remaining:.2f} hours (Wait: {days_to_wait} days)")

    finally:
        db.close()

if __name__ == "__main__":
    # For testing: run once
    automation_engine()