mali / engine.py
babaTEEpe's picture
Upload 6 files
f64b6e2 verified
import time
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
import models
from models import SessionLocal
def get_gmail_service(user_token):
# Use from_authorized_user_info for better credential management
creds = Credentials.from_authorized_user_info(user_token)
return build('gmail', 'v1', credentials=creds)
def send_email(service, to, subject, body, user_email, sender_name=None, reply_to=None, thread_id=None):
from email.mime.text import MIMEText
from email.utils import formataddr
import base64
message = MIMEText(body, 'html')
message['to'] = to
message['subject'] = subject
if sender_name:
# Gmail requires the actual email in formataddr to show the "Friendly Name" correctly
message['from'] = formataddr((sender_name, user_email))
else:
message['from'] = user_email
if reply_to:
message['Reply-To'] = reply_to
raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
body = {'raw': raw_message}
if thread_id:
body['threadId'] = thread_id
return service.users().messages().send(userId='me', body=body).execute()
def check_for_replies(service, thread_id, user_email):
"""Returns True if the lead has replied in this thread."""
if not thread_id:
return False
try:
thread = service.users().threads().get(userId='me', id=thread_id).execute()
messages = thread.get('messages', [])
for msg in messages:
headers = msg.get('payload', {}).get('headers', [])
sender = next((h['value'] for h in headers if h['name'].lower() == 'from'), "")
# If any message in the thread is NOT from us, it's a reply
if user_email.lower() not in sender.lower():
return True
except Exception as e:
print(f"Error checking replies for thread {thread_id}: {e}")
return False
def automation_engine(campaign_id: int = None):
"""The core loop that replicates n8n logic.
If campaign_id is provided, only processes that specific campaign.
"""
db = SessionLocal()
try:
# 1. Get campaigns (all active or a specific one)
query = db.query(models.Campaign)
if campaign_id:
query = query.filter(models.Campaign.id == campaign_id)
else:
query = query.filter(models.Campaign.status == "active")
campaigns = query.all()
for campaign in campaigns:
user = db.query(models.User).filter(models.User.id == campaign.user_id).first()
if not user or not user.google_token:
print(f"Skipping campaign {campaign.id}: User not found or not connected")
continue
service = get_gmail_service(user.google_token)
# 2. Get all pending leads for this campaign
leads = db.query(models.Lead).filter(
models.Lead.campaign_id == campaign.id,
models.Lead.status.in_(["pending", "emailed"])
).all()
print(f"Processing campaign '{campaign.name}' with {len(leads)} leads")
for lead in leads:
# 3. Check if lead has already replied
if lead.thread_id and check_for_replies(service, lead.thread_id, user.email):
lead.status = "replied"
db.commit()
print(f"Lead {lead.email} has replied. Stopping sequence.")
continue
# 4. Check if it's time for the next step
next_step_idx = lead.current_step
if next_step_idx >= len(campaign.steps):
lead.status = "finished"
db.commit()
continue
current_step_config = campaign.steps[next_step_idx]
days_to_wait = current_step_config.get('day', 0)
send_now = False
if lead.current_step == 0:
send_now = True
elif lead.last_contact_at:
if datetime.utcnow() >= lead.last_contact_at + timedelta(days=days_to_wait):
send_now = True
if send_now:
# 5. Prepare content and subject with placeholders
msg_body = current_step_config.get('html_form') or current_step_config.get('message', '')
msg_subject = campaign.subject or "Heads up!"
for key, val in lead.data.items():
placeholder = f"{{{key}}}"
msg_body = msg_body.replace(placeholder, str(val))
msg_subject = msg_subject.replace(placeholder, str(val))
# 6. Send
try:
print(f"Attempting to send step {lead.current_step + 1} to {lead.email}")
res = send_email(service, lead.email, msg_subject, msg_body, user.email, user.sender_name, user.reply_to, lead.thread_id)
# Update Lead State
lead.thread_id = res['threadId']
lead.last_contact_at = datetime.utcnow()
lead.current_step += 1
lead.status = "emailed"
db.commit()
print(f"SUCCESS: Sent to {lead.email} (Thread: {res['threadId']})")
except Exception as e:
print(f"ERROR: Failed to send to {lead.email}: {e}")
else:
wait_remaining = 0
if lead.last_contact_at:
next_send = lead.last_contact_at + timedelta(days=days_to_wait)
wait_remaining = (next_send - datetime.utcnow()).total_seconds() / 3600
print(f"WAIT: Skipping {lead.email} for now. Next send in approx {wait_remaining:.2f} hours (Wait: {days_to_wait} days)")
finally:
db.close()
if __name__ == "__main__":
# For testing: run once
automation_engine()