| | import os |
| | import requests |
| | import time |
| | from typing import Dict, List, Optional |
| | from datetime import datetime |
| |
|
| |
|
| | class SmartleadClient: |
| | """Client for Smartlead API integration""" |
| | |
| | BASE_URL = "https://server.smartlead.ai/api/v1" |
| | |
| | def __init__(self, api_key: Optional[str] = None, client_api_key: Optional[str] = None): |
| | """ |
| | Initialize Smartlead client |
| | |
| | Args: |
| | api_key: Admin API key (required for campaign management operations) |
| | client_api_key: Client API key (optional, may be required for certain operations) |
| | """ |
| | self.api_key = api_key or os.getenv("SMARTLEAD_API_KEY") |
| | self.client_api_key = client_api_key or os.getenv("SMARTLEAD_CLIENT_API_KEY") |
| | |
| | if not self.api_key: |
| | raise ValueError("SMARTLEAD_API_KEY environment variable is required (Admin API key)") |
| | |
| | |
| | |
| | self.headers = { |
| | "Content-Type": "application/json" |
| | } |
| | |
| | def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None, max_retries: int = 3) -> Dict: |
| | """Make API request with retry logic for 429/5xx errors""" |
| | |
| | |
| | url = f"{self.BASE_URL}{endpoint}?api_key={self.api_key}" |
| | |
| | |
| | if self.client_api_key: |
| | url += f"&client_api_key={self.client_api_key}" |
| | |
| | for attempt in range(max_retries): |
| | try: |
| | response = requests.request( |
| | method=method, |
| | url=url, |
| | headers=self.headers, |
| | json=data, |
| | timeout=30 |
| | ) |
| | |
| | |
| | if response.status_code == 401: |
| | try: |
| | error_data = response.json() |
| | error_msg = error_data.get('message') or error_data.get('error') or response.text or "Unauthorized" |
| | except: |
| | error_msg = response.text or "Unauthorized" |
| | raise Exception(f"Smartlead API authentication failed (401): {error_msg}. Please verify your API key is correctly set in SMARTLEAD_API_KEY environment variable. If using client API key, also set SMARTLEAD_CLIENT_API_KEY.") |
| | |
| | |
| | if response.status_code == 429: |
| | retry_after = int(response.headers.get("Retry-After", 60)) |
| | if attempt < max_retries - 1: |
| | time.sleep(retry_after) |
| | continue |
| | |
| | |
| | if 400 <= response.status_code < 500: |
| | try: |
| | error_data = response.json() |
| | |
| | error_msg = ( |
| | error_data.get('message') or |
| | error_data.get('error') or |
| | error_data.get('detail') or |
| | error_data.get('msg') or |
| | str(error_data) or |
| | response.text or |
| | f"Client error {response.status_code}" |
| | ) |
| | except: |
| | error_msg = response.text or f"Client error {response.status_code}" |
| | |
| | raise Exception(f"Smartlead API client error ({response.status_code}): {error_msg}. URL: {url}") |
| | |
| | |
| | if response.status_code >= 500: |
| | if attempt < max_retries - 1: |
| | time.sleep(2 ** attempt) |
| | continue |
| | |
| | response.raise_for_status() |
| | return response.json() if response.content else {} |
| | |
| | except requests.exceptions.HTTPError as e: |
| | if response.status_code == 401: |
| | error_msg = response.text or 'Unauthorized' |
| | raise Exception(f"Smartlead API authentication failed (401): {error_msg}. Please verify your API key is correctly set in SMARTLEAD_API_KEY environment variable.") |
| | if attempt == max_retries - 1: |
| | raise Exception(f"Smartlead API error after {max_retries} attempts: {str(e)}") |
| | if response.status_code < 500: |
| | raise |
| | time.sleep(2 ** attempt) |
| | except requests.exceptions.RequestException as e: |
| | if attempt == max_retries - 1: |
| | raise Exception(f"Smartlead API error after {max_retries} attempts: {str(e)}") |
| | time.sleep(2 ** attempt) |
| | |
| | raise Exception("Failed to make request after retries") |
| | |
| | def get_campaigns(self) -> List[Dict]: |
| | """Get list of all campaigns from Smartlead |
| | |
| | Returns list of campaigns with their IDs and names |
| | """ |
| | try: |
| | response = self._make_request("GET", "/campaigns") |
| | |
| | |
| | if isinstance(response, list): |
| | return response |
| | elif isinstance(response, dict): |
| | |
| | campaigns = response.get('campaigns') or response.get('data') or response.get('results') or [] |
| | if isinstance(campaigns, list): |
| | return campaigns |
| | |
| | if response.get('campaign_id') or response.get('id'): |
| | return [response] |
| | |
| | return [] |
| | except Exception as e: |
| | |
| | return [] |
| | |
| | def save_campaign_sequence(self, campaign_id: str, sequences: List[Dict]) -> Dict: |
| | """Save campaign sequence steps""" |
| | data = { |
| | "sequences": sequences |
| | } |
| | return self._make_request("POST", f"/campaigns/{campaign_id}/sequences", data) |
| | |
| | def add_leads_to_campaign(self, campaign_id: str, leads: List[Dict]) -> Dict: |
| | """Add leads to a campaign |
| | |
| | Smartlead API expects "lead_list" parameter, not "leads" |
| | """ |
| | data = { |
| | "lead_list": leads |
| | } |
| | return self._make_request("POST", f"/campaigns/{campaign_id}/leads", data) |
| | |
| | def get_lead_by_email(self, campaign_id: str, email: str) -> Optional[Dict]: |
| | """Get lead information by email address |
| | |
| | This may use different endpoint patterns: |
| | - GET /campaigns/{campaign_id}/leads?email={email} |
| | - GET /campaigns/{campaign_id}/leads/{email} |
| | """ |
| | endpoints = [ |
| | f"/campaigns/{campaign_id}/leads?email={email}", |
| | f"/campaigns/{campaign_id}/leads", |
| | ] |
| | |
| | for endpoint in endpoints: |
| | try: |
| | response = self._make_request("GET", endpoint) |
| | |
| | if isinstance(response, list): |
| | for lead in response: |
| | if lead.get('email') == email or lead.get('email_address') == email: |
| | return lead |
| | elif isinstance(response, dict): |
| | if response.get('email') == email or response.get('email_address') == email: |
| | return response |
| | |
| | leads_list = response.get('leads') or response.get('data') or [] |
| | for lead in leads_list: |
| | if lead.get('email') == email or lead.get('email_address') == email: |
| | return lead |
| | return None |
| | except Exception as e: |
| | |
| | if "404" in str(e): |
| | continue |
| | |
| | continue |
| | |
| | return None |
| | |
| | def update_campaign_settings(self, campaign_id: str, settings: Dict) -> Dict: |
| | """Update campaign settings""" |
| | return self._make_request("POST", f"/campaigns/{campaign_id}/settings", settings) |
| | |
| | def update_lead(self, campaign_id: str, lead_id: int, custom_variables: Dict) -> Dict: |
| | """Update a lead's custom variables after adding to campaign |
| | |
| | Smartlead API requires lead_id (number) to update leads. |
| | """ |
| | |
| | endpoints = [ |
| | f"/campaigns/{campaign_id}/leads/{lead_id}", |
| | f"/campaigns/{campaign_id}/leads/{lead_id}/update", |
| | f"/campaigns/{campaign_id}/leads/update" |
| | ] |
| | |
| | |
| | payloads = [ |
| | custom_variables, |
| | {"custom_variables": custom_variables}, |
| | {"variables": custom_variables}, |
| | {"customFields": custom_variables}, |
| | {"lead_id": lead_id, **custom_variables}, |
| | ] |
| | |
| | last_error = None |
| | for endpoint in endpoints: |
| | for payload in payloads: |
| | try: |
| | if endpoint.endswith("/update"): |
| | return self._make_request("POST", endpoint, payload) |
| | else: |
| | return self._make_request("PUT", endpoint, payload) |
| | except Exception as e: |
| | last_error = e |
| | |
| | if "400" in str(e) or "404" in str(e): |
| | continue |
| | |
| | raise |
| | |
| | |
| | raise last_error if last_error else Exception(f"Failed to update lead {lead_id}: All endpoint variations failed") |
| | |
| | def build_sequences(self, steps_count: int) -> List[Dict]: |
| | """Build sequence templates for campaign |
| | |
| | Smartlead campaign steps use variables: |
| | - Step N subject template: {{subject_N}} |
| | - Step N body template: Hi {{first_name}},\n\n{{body_N}}\n\n |
| | The actual per-contact content is injected via custom_variables when adding leads. |
| | |
| | seq_delay_details is required by Smartlead API and specifies delay between emails. |
| | Format: delay in days (e.g., 2 days between emails) |
| | """ |
| | sequences = [] |
| | for i in range(1, steps_count + 1): |
| | |
| | |
| | delay_days = 0 if i == 1 else 2 |
| | |
| | sequences.append({ |
| | "seq_number": i, |
| | "subject": f"{{{{subject_{i}}}}}", |
| | "email_body": f"Hi {{{{first_name}}}},\n\n{{{{body_{i}}}}}\n\n", |
| | "seq_delay_details": { |
| | "delay_in_days": delay_days |
| | } |
| | }) |
| | return sequences |
| |
|